<img style="max-width:20em; height:auto;" src="../graphics/A-Little-Book-on-Adversarial-AI-Cover.png"/>

Author: Nik Alleyne   
Author Blog: https://www.securitynik.com   
Author GitHub: github.com/securitynik   

Author Other Books: [   

            "https://www.amazon.ca/Learning-Practicing-Leveraging-Practical-Detection/dp/1731254458/",   
            
            "https://www.amazon.ca/Learning-Practicing-Mastering-Network-Forensics/dp/1775383024/"   
        ]   


This notebook ***(adversarial_training.ipynb)*** is part of the series of notebooks From ***A Little Book on Adversarial AI***  A free ebook released by Nik Alleyne

### Adversarial Training

### Lab Objectives:   
- Learn how to perform adversarial training  
- Aim to make our models more robust   
- See how adversarial examples can be used as part of the normal training process to attempt to improve a model's robustness  


### Step 1:   

In [1]:
# Import the libraries
import torch
import torch.nn as nn
import torch.nn.functional as F
import tqdm
import matplotlib.pyplot as plt
import numpy as np
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
import netron
import IPython
from tqdm import tqdm_notebook, tqdm

In [2]:
### Version of key libraries used  
print(f'Torch version used:  {torch.__version__}')
print(f'Numpy version used:  {np.__version__}')
print(f'Netron version used:  {netron.__version__}')


Torch version used:  2.7.1+cu128
Numpy version used:  2.1.3
Netron version used:  8.4.4


In [3]:
# Setup the device to work with
# This should ensure if there are accelerators in place, such as Apple backend or CUDA, 
# we should be able to take advantage of it.

if torch.cuda.is_available():
    print('Setting the device to cuda')
    device = 'cuda'
elif torch.backends.mps.is_available():
    print('Setting the device to Apple mps')
    device = 'mps'
else:
    print('Setting the device to CPU')
    device = torch.device('cpu')

Setting the device to cuda


In [4]:
# Load the file with the features and labels
data = np.load(file=r'../data/bodmas.npz', allow_pickle=False)
print(f'The files are: {data.files}')

# Extract only the features of the malware
malware_features = data['X'] 

# Get the labels 
malware_labels = data['y']

# Get the shape of the dataset
malware_features.shape, malware_labels.shape

The files are: ['X', 'y']


((134435, 2381), (134435,))

With a shape of 2381 columns, from a linear layer perspective, we should have no concerns if building a neural network. Then again, Gradient Boosting has also been used to solve this problem with malware classification: https://whyisyoung.github.io/BODMAS/.   

We will use a neural network instead.  

In [5]:
# Get some statistics on the raw dataset
# Below results show that this dataset should be scaled
# For scaling we will use Standardization

print(f'Max value: {malware_features.max().item()}')
print(f'Min value: {malware_features.min().item()}')
print(f'Mean value: {malware_features.mean()}')
print(f'Std value: {malware_features.std()}')

Max value: 4294967296.0
Min value: -654044672.0
Mean value: 573773.4375
Std value: 29769684.0


### Step 2:  
Shuffle the data and visualize the data   

In [6]:
# Set the random seed to ensure reproducibility
np.random.seed(1)

# Create some indexes to shuffle the data
shuffle_idx = np.random.randint(low=0, high=len(malware_labels), size=len(malware_labels))

malware_features = malware_features[shuffle_idx]
malware_labels = malware_labels[shuffle_idx]

# Get a print again to see the effect of the change
print(f'First 20 labels are: {malware_labels[:20]}')
print(f'last 20 labels are: {malware_labels[-20:]}')

First 20 labels are: [1 0 0 1 0 1 1 0 0 0 0 0 0 0 1 1 1 1 1 0]
last 20 labels are: [0 1 1 0 1 0 0 0 1 0 0 1 1 0 0 1 0 1 1 0]


In [7]:
# Split the data into 80% training and 20% testing
X_train, X_test, y_train, y_test = train_test_split(malware_features, malware_labels, train_size=.85, test_size=.15, random_state=10, shuffle=True)

# Get the shapes of the various sets
X_train.shape, X_test.shape, y_train.shape, y_test.shape

((114269, 2381), (20166, 2381), (114269,), (20166,))

In [8]:
# Preparing to Standardize and scale
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA

In [9]:
# Before using PCA, we need to scale the data
std_scaler = StandardScaler(with_mean=True, with_std=True, copy=True)
X_train_scaled = std_scaler.fit_transform(X_train)

In [10]:
# Get the new stats
print(f'Max value: {X_train_scaled.max().item()}')
print(f'Min value: {X_train_scaled.min().item()}')
print(f'Mean value: {X_train_scaled.mean()}')
print(f'Std value: {X_train_scaled.std()}')

Max value: 338.0354919433594
Min value: -338.0354919433594
Mean value: 8.535852580315861e-11
Std value: 0.9888073205947876


Our dataset is not squared. Worse it, the number of features is 2381. With this being a prime number, we would not be able to get this into a shape that works best with our convolutional neural network. To address that shortcoming, let us use PCA and reduce the dimensions to 1024. THis is a number we can easily work with to reshape.  

### Step 3:  

In [11]:
# Leverage PCA to reduce the number of features
pca = PCA(n_components=1024, copy=False, random_state=10)
pca.fit(X=X_train_scaled)

0,1,2
,n_components,1024
,copy,False
,whiten,False
,svd_solver,'auto'
,tol,0.0
,iterated_power,'auto'
,n_oversamples,10
,power_iteration_normalizer,'auto'
,random_state,10


In [12]:
# Transform the data
X_train_scaled_scaled_pca = pca.transform(X=X_train_scaled)
X_train_scaled_scaled_pca.shape

(114269, 1024)

In [13]:
# Get the cummulative sum of the PCA vectors
np.cumsum(pca.explained_variance_ratio_)

array([0.05087879, 0.09228626, 0.11405408, ..., 0.896683  , 0.8968474 ,
       0.89701134], dtype=float32)

In [14]:
# Take out the last value 
# This is the amount of "meaning" of the data that we were able to retain  
np.cumsum(pca.explained_variance_ratio_)[-1]

np.float32(0.89701134)

In [15]:
# Transform the test set
X_test_pca_scaled = pca.transform(X=X_test)
X_test_pca_scaled.shape

(20166, 1024)

Let us build a dataset to make it easier to process our data

### Step 4:   

In [16]:
# Setup a torch custom dataset
class MalClfDataset(Dataset):
    def __init__(self, X=None, y=None):
        super(MalClfDataset, self).__init__()
        
        self.X = torch.tensor(X.reshape(-1, 1, 32, 32), dtype=torch.float32)
        self.X = self.X / 255.
        self.y = torch.tensor(y.reshape(-1, 1), dtype=torch.float32)
    
    def __getitem__(self, index):
        return self.X[index], self.y[index]

    def __len__(self):
        return self.X.size(0)

In [17]:
# Setup the training and testing datasets
train_dataset = MalClfDataset(X_train_scaled_scaled_pca, y_train)
test_dataset = MalClfDataset(X_test_pca_scaled, y_test)

len(train_dataset), len(test_dataset)

(114269, 20166)

With that in place, let us now define the hyperparameters for our model

### Step 5: 

In [18]:
# Define some variables
batch_size = 32
hidden_chnls = 8
num_classes = 1
num_epochs = 5
kernel_size = 4
stride = 2
pool_size = 2
linear_dims = 128

In [19]:
# Create the dataloader
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
#test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)

In [20]:
# Get a sample from the train loader
next(iter(train_loader))[0].size()

torch.Size([32, 1, 32, 32])

Create our malware classifier. This is no different from what was done before. The architecture in itself for adversarial training is no different from any other architecture you would have built   

### Step 6:  

In [21]:
# Build the malware classifier
class MalwareClassifier(nn.Module):
    def __init__(self, ):
        super(MalwareClassifier, self).__init__()

        self.layers = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=hidden_chnls, kernel_size=kernel_size, stride=stride, padding=1),
            nn.BatchNorm2d(num_features=hidden_chnls),
            nn.MaxPool2d(kernel_size=kernel_size, stride=stride, padding=1),
            nn.Dropout(p=.5),

            nn.Conv2d(in_channels=hidden_chnls, out_channels=hidden_chnls * 2, kernel_size=kernel_size, stride=stride, padding=1),
            nn.BatchNorm2d(num_features=hidden_chnls * 2),
            nn.MaxPool2d(kernel_size=kernel_size, stride=stride, padding=1),
            nn.Dropout(p=.5),

            nn.Flatten(start_dim=1),
            nn.Linear(in_features=hidden_chnls * 8, out_features=256),
            nn.Linear(in_features=256, out_features=1),
            
            nn.Sigmoid()
        )
        
    def forward(self, x):
        return self.layers(x)

In [22]:
# Set the manual seed for reproducibility 
torch.manual_seed(seed=15)

# Instantiate the classifier 
malware_classifier = MalwareClassifier().to(device=device)

# Setup the optimizer
optimizer = torch.optim.SGD(params=malware_classifier.parameters(), lr=0.001, weight_decay=0.01)

# Setup the loss function
loss_fn = nn.BCELoss(reduction='mean')

malware_classifier(next(iter(train_loader))[0].to(device)).size()

torch.Size([32, 1])

In [23]:
#x_tmp = next(iter(train_loader))[0]
#x_tmp

In [24]:
# Ensure the model works before attempting to build the training loop
#torch.manual_seed(10)
#tmp_preds = malware_classifier(x_tmp)
#tmp_preds[:5]

While there are many ways we can create adversarial examples. For this purpose, we will go with the Fast Gradient Sign Method (FGSM) approach.  You can see the FGSM notebooks to learn more about FGSM:  
- fgsm_art.ipynb
- fgsm_foolbox.ipynb
- pgd_manual.ipynb

### Step 7:  

In [25]:
# use in this case the FGSM function to create the adversarial examples
def fgsm_attack(model=None, X=None, y=None, eps=None):

    model = model.to(device)
    model.eval()

    X, y  = X.to(device), y.to(device)
    X.requires_grad_(True)

    preds = model(X)
    loss = F.binary_cross_entropy(input=preds, target=y)
    model.zero_grad(set_to_none=True)
    loss.backward()
    gradients = X.grad.sign()
    perturbations = eps * gradients
    adv_examples = X + perturbations
    adv_examples = torch.clamp(adv_examples, 0, 1)

    return adv_examples

With the FGSM attack and our model in place, we can go ahead and with performing the actual training. It is during this training process that the adversarial examples are generated and provided to the model as part of the training process.  

We define a fit function to help us with our training. 

### Step 8:  

In [None]:
# Define a training function
def fit(model=None, epochs=10, optimizer=optimizer, loss_fn=loss_fn, eps=0.02):
    train_loss_tracker = []
    train_accuracy_tracker = []

    test_loss_tracker = []
    test_loss_accuracy = []
    
    for epoch in range(epochs):
        n_train_batches = 0
        train_loss = 0.
        train_acc = 0.

        for phase in ['train', 'test']:
            if phase == 'train':
                model.train()

                for X_batch, y_label in tqdm(train_loader, desc=f'Epoch: {epoch + 1}/{epochs}'):

                    X_batch, y_label = X_batch.to(device), y_label.to(device)

                    # Let's use the FGSM to generate some adversarial examples
                    # This is where the adversarial examples are generated  
                    # https://github.com/peyman-paknezhad/Adversarial-Robustness-in-Neural-Networks-Implementation-and-Assessment-of-FGSM-BIM-and-PGD-Attacks/blob/main/Adversarial_Robustness_in_Neural_Networks_Implementation_and_Assessment_of_FGSM%2C_BIM%2C_and_PGD_Attacks.ipynb
                    adv_examples = fgsm_attack(model=model, X=X_batch, y=y_label, eps=eps)

                    # Combine the adversarial Xs with the true X
                    combined_X = torch.cat(tensors=[X_batch, adv_examples], dim=0)
                    combined_y = torch.cat(tensors=[y_label, y_label], dim=0)

                    optimizer.zero_grad(set_to_none=True)
                    train_batch_preds = model(combined_X)
                    train_batch_loss = loss_fn(input=train_batch_preds, target=combined_y)
                    
                    # Perform backpropagation
                    train_batch_loss.backward()

                    # Update the model parameters
                    optimizer.step()

                    train_loss += train_batch_loss
                    train_acc += (train_batch_preds.round() == combined_y).sum() / combined_y.size(dim=0)
                    n_train_batches += 1

                    #break

            else:
                # Evaluate the model during training
                model.eval()
                x_test = test_dataset[...][0]
                x_test = x_test.to(device)

                y_test = test_dataset[...][1]
                y_test = y_test.to(device)

                # Start the inference process
                with torch.inference_mode(mode=True):
                    test_preds = model(x_test)
                    
                    test_loss = loss_fn(input=test_preds, target=y_test)
                    test_loss_tracker.append(test_loss.item())

                    test_acc = (test_preds.round() == y_test).sum() / y_test.size(dim=0)
                    test_loss_accuracy.append(test_acc.item())

        train_loss_tracker.append(train_loss.item()/n_train_batches)
        train_accuracy_tracker.append(train_acc.item()/n_train_batches)

        if ( epoch + 1) % 10 == 0:
            print(f'\t train loss: {train_loss.item() / n_train_batches } \t train accuracy: {train_acc.item() / n_train_batches} \t test loss: {test_loss} \t test accuracy: {test_acc.item()}')

        #break
            
    return model

In [28]:
torch.manual_seed(10)
# Just using 10 epochs here to complete the training process
# We definitely need to train longer than this to ensure our model is robust  
model = fit(model=malware_classifier, optimizer=optimizer, loss_fn=loss_fn, epochs=10, eps=0.02)

Epoch: 1/10: 100%|██████████| 3571/3571 [00:26<00:00, 132.76it/s]
Epoch: 2/10: 100%|██████████| 3571/3571 [00:30<00:00, 116.20it/s]
Epoch: 3/10: 100%|██████████| 3571/3571 [00:27<00:00, 129.02it/s]
Epoch: 4/10: 100%|██████████| 3571/3571 [00:27<00:00, 130.49it/s]
Epoch: 5/10: 100%|██████████| 3571/3571 [00:29<00:00, 122.78it/s]
Epoch: 6/10: 100%|██████████| 3571/3571 [00:26<00:00, 134.39it/s] 
Epoch: 7/10: 100%|██████████| 3571/3571 [00:26<00:00, 132.35it/s] 
Epoch: 8/10: 100%|██████████| 3571/3571 [00:29<00:00, 120.45it/s]
Epoch: 9/10: 100%|██████████| 3571/3571 [00:25<00:00, 142.54it/s]
Epoch: 10/10: 100%|██████████| 3571/3571 [00:26<00:00, 133.19it/s]


	 train loss: 0.6806172990758891 	 train accuracy: 0.5776399320262181 	 test loss: 42.13911819458008 	 test accuracy: 0.5681840777397156


In [29]:
# With the training finish clear the GPU cache
# Setup the device to work with
if torch.cuda.is_available():
    # For CUDA GPU
    print(f'Cleaning {device} cache')
    torch.cuda.empty_cache()
elif torch.backends.mps.is_available():
    # For Apple devices
    print(f'Cleaning {device} cache')
    torch.mps.empty_cache()
else:
    # Default to cpu
    pass

Cleaning cuda cache


### Lab Takeaways:  
- We saw how to incorporate adversarial examples as part of our training process
- In our scenario, we used FGSM. We could have used another method if needed.
- This is just a method for assisting with adversarial robustness. It is not *the* solution, it is *a* solution. 