In [2]:
import torch
import numpy as np
from math import ceil
import torch.nn as nn
import torch.nn.functional as F
from typing import Tuple
from torch.utils.data import DataLoader
from torchvision.datasets import DatasetFolder

## Loading the baseline model

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cpu')

In [4]:
class ConvNet(nn.Module):
    def __init__(self):
        super(ConvNet, self).__init__()
        self.conv1 = nn.Conv1d(1, 16, kernel_size=(10,), stride=(1,))
        self.fc1 = nn.Linear(65496, 1)  # Adjust the input size based on your data size

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
        x = x.view(-1, 65496)
        return F.sigmoid(self.fc1(x))

In [5]:
model = ConvNet().to(device)
model.load_state_dict(torch.load("model.pt"))
model

ConvNet(
  (conv1): Conv1d(1, 16, kernel_size=(10,), stride=(1,))
  (fc1): Linear(in_features=65496, out_features=1, bias=True)
)

### Evaluation

In [6]:
class BinaryTransform:
    def __init__(self, input_length):
        self.input_length = input_length

    def __call__(self, binary_data):
        binary_data = np.frombuffer(binary_data, dtype=np.uint8)
        
        l = len(binary_data)

        # Pad or truncate the binary data
        if l < self.input_length:
            padding = np.zeros(self.input_length - l, dtype=np.uint8)
            binary_data = np.concatenate((binary_data, padding))
        elif l > self.input_length:
            excess = ceil(l / self.input_length)
            padding = np.zeros(self.input_length * excess - l, dtype=np.uint8)
            binary_data = np.concatenate((binary_data, padding))
            binary_data = binary_data.reshape(len(binary_data)//excess, -1)
            binary_data = np.mean(binary_data, axis=1)
            
        # Scale the data to [0, 1]
        scaled_data = binary_data / 255.0
        tensor = torch.tensor(scaled_data, dtype=torch.float32)
        return tensor.unsqueeze(0)

In [7]:
test_data_path = "data/test"
transform = BinaryTransform(16384)
test_dataset = DatasetFolder(
    root=test_data_path,
    loader=lambda x: open(x, 'rb').read(),
    extensions=('',),
    transform=transform
)
test_loader = DataLoader(test_dataset, batch_size=64)

In [8]:
correct = 0
total = len(test_dataset)

with torch.no_grad():
    for X, y in test_loader:
        X, y = X.to(device), y.to(device)
        y_pred = model(X).squeeze()
        pred_label = y_pred > 0.5
        correct += pred_label.eq(y).sum().item()

correct / total

0.9923076923076923

## Baseline adversarial accuracy with random suffix

In [9]:
class BinaryTransformWithMask:
    def __init__(self, input_length: int, adversarial_ratio: float) -> None:
        self.input_length = input_length
        self.adversarial_ratio = adversarial_ratio

    def __call__(self, binary_data: bytes) -> Tuple[torch.Tensor, np.array]:
        """Returns the model input prepared as a (1,input_length) Tensor,
        and the mask which indicates positions influenced exclusively by the adv. suffix."""
        l_original = len(binary_data)
        binary_array = self.get_extended_binary_array(binary_data)
        l_with_adversarial = len(binary_array)


        if l_with_adversarial < self.input_length:
            # the bytes array is too short and zero padding should be added to match input_length
            # the mask does not include the zero-padding bytes
            padding = np.zeros(self.input_length - l_with_adversarial, dtype=np.uint8)
            binary_array = np.concatenate((binary_array, padding))
            mask = np.arange(l_original, l_with_adversarial)
        elif l_with_adversarial > self.input_length:
            # the byte array should be split into ceil(l_with_adversarial / input_length) chunks,
            # with the last chunk being padded to chunk size if needed
            # when padding is used, that last chunk is not considered part of the adversarial mask 
            window_size = ceil(l_with_adversarial / self.input_length)
            num_original_groups = ceil(l_original / window_size) # byte groups influenced by the original binary
            l_padding = self.input_length * window_size - l_with_adversarial
            num_padding_groups = ceil(l_padding / window_size)
            padding = np.zeros(l_padding, dtype=np.uint8)
            binary_array = np.concatenate((binary_array, padding))
            binary_array = binary_array.reshape(-1, window_size)
            binary_array = np.mean(binary_array, axis=1)
            mask = np.arange(num_original_groups, self.input_length - num_padding_groups)
        else:
            ...
            
        # Scale the data to [0, 1]
        scaled_data = binary_array / 255.0
        tensor = torch.tensor(scaled_data, dtype=torch.float32)
        return tensor.unsqueeze(0), mask

    def get_extended_binary_array(self, binary_data: bytes) -> np.array:
        """Build the extended binary with the adversarial suffix set to zero."""
        l = len(binary_data)
        l_with_adversarial = ceil(l * (1 + self.adversarial_ratio))
        binary_array = np.zeros(l_with_adversarial, dtype=np.uint8)
        binary_array[:l] = np.frombuffer(binary_data, dtype=np.uint8)
        return binary_array

In [10]:
# example #1 in the assignment text
binary_data = bytes(list(range(1, 11)))
transform = BinaryTransformWithMask(input_length=6, adversarial_ratio=0.4) # +4 bytes
X, M = transform(binary_data)
X, M

(tensor([[0.0078, 0.0196, 0.0314, 0.0131, 0.0000, 0.0000]]),
 array([], dtype=int64))

In [11]:
# example #2 in the assignment text
binary_data = bytes(list(range(1, 11)))
transform = BinaryTransformWithMask(input_length=6, adversarial_ratio=0.5) # +5 bytes
X, M = transform(binary_data)
X, M

(tensor([[0.0078, 0.0196, 0.0314, 0.0131, 0.0000, 0.0000]]), array([4]))

In [12]:
file_path = "data/victim/malware/0d41d1d904aecf716303f55108e020fbd9a4dbcd997efb08fba5e10e936d419c"
with open(file_path, "rb") as f:
    binary_data = f.read()

transform = BinaryTransformWithMask(input_length=2**14, adversarial_ratio=0.1)
input_tensor, M = transform(binary_data)
input_tensor = input_tensor.unsqueeze(0) # add batch dimension

In [13]:
model(input_tensor)

tensor([[0.9895]], grad_fn=<SigmoidBackward0>)

## Random adversary suffix for baseline attack

In [18]:
input_with_adversary = input_tensor.clone()
adversary_features = torch.rand(len(M), dtype=torch.float32)
input_with_adversary[...,M] += adversary_features
model(input_with_adversary), len(M)

(tensor([[0.8141]], grad_fn=<SigmoidBackward0>), 1450)

In [71]:
for adversarial_ratio in [0.05, 0.1, 0.15, 0.2]:
    transform = BinaryTransformWithMask(input_length=2**14, adversarial_ratio=adversarial_ratio)
    victim_dataset = DatasetFolder(root="data/victim", loader=lambda x: open(x, 'rb').read(), extensions=('',), transform=transform)
    
    num_successful = 0
    num_total = 0

    with torch.no_grad():
        for (X, M), y in victim_dataset:
            if victim_dataset.classes[y] == "benign":
                # we only need to attack malware samples
                continue
            
            input_with_adversary = X.clone()
            adversary_features = torch.rand(len(M), dtype=torch.float32)
            input_with_adversary[...,M] += adversary_features
            y_pred = model(input_with_adversary.unsqueeze(0)).squeeze()
            pred_label = y_pred > 0.5
            if pred_label != y:
                num_successful += 1
            num_total += 1

    print(f"{adversarial_ratio}:\t{num_successful / num_total:.4f}")

0.05:	0.1000
0.1:	0.1000
0.15:	0.2200
0.2:	0.2200


## Optimized attack with PGD

In [15]:
adversary_features = torch.rand(len(M), dtype=torch.float32, requires_grad=True)
opt = torch.optim.SGD([adversary_features], lr=0.01)

loss_fn = nn.BCELoss()

for t in range(50):
    input_with_adversary = input_tensor.clone()
    input_with_adversary[...,M] += adversary_features
    pred = model(input_with_adversary).squeeze()
    loss = -loss_fn(pred, torch.tensor(1, dtype=torch.float32)) # 1 = malware
    if t % 5 == 0:
        print(t, loss.item())
       
    opt.zero_grad()
    loss.backward()
    adversary_features.grad.sign_()
    opt.step()

    # projection with clipping
    adversary_features.data.clamp_(0, 1)

0 -0.09132120013237
5 -0.438446581363678
10 -1.2860522270202637
15 -2.439924478530884
20 -3.640349864959717
25 -4.766554355621338
30 -5.805830001831055
35 -6.708270072937012
40 -7.517575740814209
45 -8.221761703491211


In [16]:
model(input_with_adversary)

tensor([[0.0002]], grad_fn=<SigmoidBackward0>)

In [17]:
adversary_features

tensor([0.7072, 0.0000, 0.3410,  ..., 0.0000, 1.0000, 1.0000],
       requires_grad=True)

In [70]:
loss_fn = nn.BCELoss()

for adversarial_ratio in [0.05, 0.1, 0.15, 0.2]:
    transform = BinaryTransformWithMask(input_length=2**14, adversarial_ratio=adversarial_ratio)
    victim_dataset = DatasetFolder(root="data/victim", loader=lambda x: open(x, 'rb').read(), extensions=('',), transform=transform)
    
    num_successful = 0
    num_total = 0

    for (X, M), y in victim_dataset:
        if victim_dataset.classes[y] == "benign":
            # we only need to attack malware samples
            continue

        malware_idx = victim_dataset.class_to_idx["malware"]
        
        adversary_features = torch.rand(len(M), dtype=torch.float32, requires_grad=True)
        opt = torch.optim.SGD([adversary_features], lr=0.01)

        # PGD
        for t in range(100):
            input_with_adversary = X.clone().unsqueeze(0) # add batch dimension
            input_with_adversary[...,M] += adversary_features
            pred = model(input_with_adversary).squeeze()
            loss = -loss_fn(pred, torch.tensor(malware_idx, dtype=torch.float32))
            # if t % 5 == 0:
            #     print(t, loss.item())
            
            opt.zero_grad()
            loss.backward()
            adversary_features.grad.sign_()
            opt.step()

            # projection with clipping
            adversary_features.data.clamp_(0, 1)

        # Final prediction:
        with torch.no_grad():
            y_pred = model(input_with_adversary).squeeze()
            pred_label = y_pred > 0.5
            if pred_label != y:
                num_successful += 1
            num_total += 1

    print(f"{adversarial_ratio}:\t{num_successful / num_total:.4f}")

0.05:	0.5000
0.1:	0.8000
0.15:	0.8600
0.2:	0.9200
