# Lab 4 Bonus

In [20]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

# data paths
data_X_path = "cda_lab4_data/X.npy"
data_y_path = "cda_lab4_data/y.npy"
feature_names_path = "cda_lab4_data/feature_names.csv"

In [2]:
# load dataset
X = np.load(data_X_path)
print("X dimensions: ", X.shape)
y = np.load(data_y_path)
print("y dimensions: ", y.shape)

# load feature names
feature_names = pd.read_csv(feature_names_path, header=None)
print("Feature names: ", feature_names.head())

X dimensions:  (30396, 22761)
y dimensions:  (30396,)
Feature names:       0                               1
0  NaN                    feature_name
1  0.0        kernel32.dll:SetFileTime
2  1.0    kernel32.dll:CompareFileTime
3  2.0        kernel32.dll:SearchPathW
4  3.0  kernel32.dll:GetShortPathNameW


In [21]:
# split data into train and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

print("Train size: ", len(y_train))
print("Train malwares: ", np.sum(y_train == 1))
print()
print("Test size: ", len(y_test))
print("Test malwares: ", np.sum(y_test == 1))

Train size:  24316
Train malwares:  12168

Test size:  6080
Test malwares:  3042


In [3]:
# Define a simple neural network model
class SimpleModel(nn.Module):
    def __init__(self, input_size, num_classes):
        super(SimpleModel, self).__init__()
        self.fc = nn.Linear(input_size, num_classes)
    
    def forward(self, x):
        return self.fc(x)

In [4]:
# Cross entropy loss function
loss_fn = nn.CrossEntropyLoss()

In [5]:
def grams_topk_variant(b, model, loss_fn, target_labels, k_init=8, device='cpu'):
    # Convert numpy arrays to torch tensors
    b = torch.tensor(b, dtype=torch.float32, requires_grad=True).to(device)
    orig_x = b.clone().detach()
    best_x = b.clone().detach()
    target_labels = torch.tensor(target_labels, dtype=torch.long).to(device)
    k = k_init
    
    def compute_loss(x):
        model_output = model(x)
        return loss_fn(model_output, target_labels)
    
    while k > 0.5:
        print("k: ", k)
        # Compute loss
        loss = compute_loss(b)
        
        # Zero the gradients
        model.zero_grad()
        
        # Compute gradients
        loss.backward()
        grad = b.grad.data
        
        # Compute the sign of the gradient
        sign = grad.sign()
        
        # Adjust the gradient
        adjusted_grad = torch.abs(grad - orig_x * grad)
        
        # Get the top-k elements
        topk_indices = torch.topk(adjusted_grad, int(k)).indices
        
        # Update x with top-k elements and their signs
        x_new = b.clone().detach()
        x_new[topk_indices] = x_new[topk_indices] + sign[topk_indices]
        
        # Compute new loss
        new_loss = compute_loss(x_new)
        
        # Compute loss for the best observed x
        best_loss = compute_loss(best_x)
        
        # Update best_x if new loss is better
        if new_loss.item() < best_loss.item():
            best_x[topk_indices] = x_new[topk_indices]
            b = x_new
            k *= 2
        else:
            k /= 2
    
    return best_x.cpu().detach().numpy()

In [45]:
# Example usage
input_size = X.shape[1]  # Number of features
num_classes = 2  # Number of classes
model = SimpleModel(input_size, num_classes).to('cpu')

In [46]:
# train the model
def train_model(model, X, y, loss_fn, device='cpu', epochs=100, lr=0.01):
    # Convert numpy arrays to torch tensors
    X = torch.tensor(X, dtype=torch.float32).to(device)
    y = torch.tensor(y, dtype=torch.long).to(device)
    
    # Define optimizer
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    
    model.train()

    # Train the model
    for epoch in range(epochs):
        # Forward pass
        model_output = model(X)
        
        # Compute loss
        loss = loss_fn(model_output, y)
        
        # Zero the gradients
        optimizer.zero_grad()
        
        # Backward pass
        loss.backward()
        
        # Update the weights
        optimizer.step()
        
        if (epoch + 1) % 10 == 0:
            print("Epoch [{}/{}], Loss: {:.4f}".format(epoch + 1, epochs, loss.item()))
            test_accuracy = evaluate_model(model, X, y)
            print("Accuracy: {:.2f}%".format(test_accuracy * 100))

In [47]:
# Evaluate the model
def evaluate_model(model, X, y, device='cpu'):
    # Convert numpy arrays to torch tensors
    X = torch.tensor(X, dtype=torch.float32).to(device)
    y = torch.tensor(y, dtype=torch.long).to(device)
    
    # Forward pass
    model_output = model(X)
    
    # Get predictions
    _, predicted = torch.max(model_output, 1)
    
    # Compute accuracy
    correct = (predicted == y).sum().item()
    total = y.size(0)
    accuracy = correct / total
    return accuracy

In [48]:
train_model(model, X_train, y_train, loss_fn, epochs=40)

Epoch [10/40], Loss: 0.2897


  X = torch.tensor(X, dtype=torch.float32).to(device)
  y = torch.tensor(y, dtype=torch.long).to(device)


Accuracy: 89.28%
Epoch [20/40], Loss: 0.2324
Accuracy: 90.73%
Epoch [30/40], Loss: 0.2102
Accuracy: 90.73%
Epoch [40/40], Loss: 0.1957
Accuracy: 91.05%


In [49]:
evaluate_model(model, X_test, y_test)

0.9108552631578948

In [50]:
evaluate_model(model, X_train, y_train)

0.9105115973021879

In [51]:
b = X_train[y_train == 1][0]
print('b:', b.shape)
target_labels = y_train[y_train == 1][0]

best_x = grams_topk_variant(b, model, loss_fn, target_labels)
print('original x:', b.shape)
print("Best x:", best_x.shape)

b: (22761,)
k:  8
k:  4.0
k:  2.0
k:  1.0
original x: (22761,)
Best x: (22761,)


In [54]:
best_xs = []
for i in range(10):
    b = X_train[y_train == 1][i]
    target_labels = y_train[y_train == 1][i]
    best_x = grams_topk_variant(b, model, loss_fn, target_labels)
    best_xs.append(best_x)

k:  8
k:  4.0
k:  2.0
k:  1.0
k:  8
k:  4.0
k:  2.0
k:  1.0
k:  8
k:  4.0
k:  2.0
k:  1.0
k:  8
k:  4.0
k:  2.0
k:  1.0
k:  8
k:  4.0
k:  2.0
k:  1.0
k:  8
k:  4.0
k:  2.0
k:  1.0
k:  8
k:  4.0
k:  2.0
k:  1.0
k:  8
k:  4.0
k:  2.0
k:  1.0
k:  8
k:  4.0
k:  2.0
k:  1.0
k:  8
k:  4.0
k:  2.0
k:  1.0


In [56]:
target_labels = y_train[y_train == 1][:10]
evaluate_model(model, best_xs, target_labels)

1.0

In [57]:
best_xs = X_train[y_train == 1][:10]
target_labels = y_train[y_train == 1][:10]
evaluate_model(model, best_xs, target_labels)

1.0