# Final Project Code

## Dataset

In [1]:
!pip install kagglehub



In [2]:
import kagglehub
# Download latest version
# add '/Data' to the end to properly access the underlying images
DATA_DIR = kagglehub.dataset_download("satyaprakash138/balanced-malware-image-dataset") + "/Data"

print("Path to dataset files:", DATA_DIR)

Downloading from https://www.kaggle.com/api/v1/datasets/download/satyaprakash138/balanced-malware-image-dataset?dataset_version_number=1...


100%|██████████| 1.13G/1.13G [00:52<00:00, 23.0MB/s]

Extracting files...





Path to dataset files: /root/.cache/kagglehub/datasets/satyaprakash138/balanced-malware-image-dataset/versions/1/Data


### ML Library Dependencies

In [3]:
!pip install -q skorch

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/268.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m268.5/268.5 kB[0m [31m15.5 MB/s[0m eta [36m0:00:00[0m
[?25h

In [4]:
import os
import numpy as np
from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader

from skorch import NeuralNetClassifier
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression

In [5]:
IMAGE_SIZE = 128


transform = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.Grayscale(num_output_channels=1),
    transforms.ToTensor()
])

train_dir = os.path.join(DATA_DIR, "train")
test_dir = os.path.join(DATA_DIR, "test")
val_dir = os.path.join(DATA_DIR, "val")

train_dataset = datasets.ImageFolder(root=train_dir, transform=transform)
test_dataset = datasets.ImageFolder(root=test_dir, transform=transform)
val_dataset = datasets.ImageFolder(root=val_dir, transform=transform)

print("Train classes:", train_dataset.classes)
print("Val   classes:", val_dataset.classes)
print("Test  classes:", test_dataset.classes)
print("Num train:", len(train_dataset), "Num val:", len(val_dataset), "Num test:", len(test_dataset))

num_classes = len(train_dataset.classes)



Train classes: ['MaliciousImages', 'NormalImages']
Val   classes: ['MaliciousImages', 'NormalImages']
Test  classes: ['MaliciousImages', 'NormalImages']
Num train: 17066 Num val: 2438 Num test: 4876


In [6]:
def dataset_to_numpy(ds):
  X_list, y_list = [], []
  for img, label in ds:
    X_list.append(img.numpy())
    y_list.append(label)
  X = np.stack(X_list).astype("float32")
  y = np.array(y_list).astype("int64")

  return X, y


In [7]:
X_train, y_train = dataset_to_numpy(train_dataset)
X_val, y_val = dataset_to_numpy(val_dataset)
X_test, y_test = dataset_to_numpy(test_dataset)

print("X_train:", X_train.shape, "y_train:", y_train.shape)
print("X_val:",   X_val.shape,   "y_val:",   y_val.shape)
print("X_test:",  X_test.shape,  "y_test:",  y_test.shape)

X_train: (17066, 1, 128, 128) y_train: (17066,)
X_val: (2438, 1, 128, 128) y_val: (2438,)
X_test: (4876, 1, 128, 128) y_test: (4876,)


### CNN Feature Extraction for KNN, SVM, SVM-LS

In [14]:
class FeatureExtractorCNN(nn.Module):
    def __init__(self, num_classes=25):
        super(FeatureExtractorCNN, self).__init__()
        # 1 channel, 32 convolutional filters, 3x3 kernel
        # stride 1, padding 1 to preserve spatial size
        # ReLU for nonlinearity
        # maxpool downsamples by getting max value in 2x2
        # repeat steps for 64 filters and 128 filters
        self.features = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d((1, 1))
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

    def get_features(self, x):
        x = self.features(x)
        return x.view(x.size(0), -1)

def train_model(model, train_loader, val_loader, epochs=25, patience=5):
    model.train()
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=1e-3)

    train_losses = []
    val_losses = []
    best_val_loss = float('inf')
    patience_counter = 0

    for epoch in range(epochs):
        # train
        running_loss = 0.0
        for images, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        train_loss_epoch = running_loss / len(train_loader)
        train_losses.append(train_loss_epoch)

        # val
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for images, labels in val_loader:
                outputs = model(images)
                val_loss += criterion(outputs, labels).item()
        val_loss_epoch = val_loss / len(val_loader)
        val_losses.append(val_loss_epoch)

        print(f"Epoch {epoch+1}, Loss: {train_loss_epoch:.4f}, Val Loss: {val_loss_epoch:.4f}")

        if val_loss_epoch < best_val_loss:
            best_val_loss = val_loss_epoch
            patience_counter = 0
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print(f"Early stopping at epoch {epoch+1}")
                break

        model.train()

    return train_losses, val_losses


def extract_features(model, data_loader):
    model.eval()
    features = []
    labels = []
    with torch.no_grad():
        for images, batch_labels in data_loader:
            batch_features = model.get_features(images)
            features.append(batch_features.cpu().numpy())
            labels.append(batch_labels.cpu().numpy())
    return np.concatenate(features), np.concatenate(labels)

## Train model

In [13]:
# init
model = FeatureExtractorCNN(num_classes=num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

# dataloader for CNN, NumPy for KNN, SVM/LS
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

train_model(model, train_loader, val_loader, epochs=25, patience=3)

# extract + normalize features
X_train_features, y_train = extract_features(model, train_loader)
X_val_features, y_val = extract_features(model, val_loader)
X_test_features, y_test = extract_features(model, test_loader)

scaler = StandardScaler()
X_train_features = scaler.fit_transform(X_train_features)
X_val_features = scaler.transform(X_val_features)
X_test_features = scaler.transform(X_test_features)

# KNN
knn = KNeighborsClassifier(n_neighbors=5)
knn.fit(X_train_features, y_train)
y_pred_knn = knn.predict(X_test_features)
print("k-NN Test Accuracy:", accuracy_score(y_test, y_pred_knn))

# SVM
svm = SVC()
svm.fit(X_train_features, y_train)
y_pred_svm = svm.predict(X_test_features)
print("SVM Test Accuracy:", accuracy_score(y_test, y_pred_svm))

KeyboardInterrupt: 