In [None]:
import glob

import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from PIL import Image
from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import StratifiedKFold
from torch.utils.data import Dataset, DataLoader, Subset
from torchvision.models import resnet18, ResNet18_Weights
from tqdm import tqdm

In [None]:
goodware_files = glob.glob("/Users/giuseppe/PycharmProjects/urbanSecurityGDGV/resources/goodware_dataset/*.exe")
malware_files = glob.glob("/Users/giuseppe/PycharmProjects/urbanSecurityGDGV/resources/malware_dataset/*")

file_paths = malware_files + goodware_files
labels = [1] * len(malware_files) + [0] * len(goodware_files)

In [None]:
def exe_to_image(file_path, width=256):
    with open(file_path, "rb") as f:
        byte_array = np.frombuffer(f.read(), dtype=np.uint8)

    length = len(byte_array)
    height = int(np.ceil(length / width))

    padded = np.pad(byte_array, (0, height * width - length), 'constant', constant_values=0)
    image = padded.reshape((height, width))

    return Image.fromarray(image)

In [None]:
class EXEDataset(Dataset):
    def __init__(self, file_paths, labels, transform=None):
        self.file_paths = file_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        img = exe_to_image(self.file_paths[idx])
        if self.transform:
            img = self.transform(img)
        label = self.labels[idx]
        return img, label


In [None]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.Grayscale(num_output_channels=3),  # ResNet richiede 3 canali
    transforms.ToTensor()
])

device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")

In [None]:
def create_model():
    model = resnet18(weights=ResNet18_Weights.DEFAULT)
    model.fc = nn.Linear(model.fc.in_features, 2)  # Binary classification
    return model.to(torch.float32).to(device)

In [None]:
def evaluate(model, dataloader):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for x, y in dataloader:
            x, y = x.to(device), y.to(device)
            outputs = model(x)
            preds = outputs.argmax(dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(y.cpu().numpy())

    acc = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, average='macro')
    return acc, f1

In [None]:
class EarlyStopping:
    def __init__(self, patience=5):
        self.patience = patience
        self.counter = 0
        self.best_score = None
        self.early_stop = False

    def __call__(self, f1_score):
        if self.best_score is None or f1_score > self.best_score:
            self.best_score = f1_score
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True

In [None]:
dataset = EXEDataset(file_paths, labels, transform)
skf = StratifiedKFold(n_splits=10, shuffle=True, random_state=42)

accuracies = []
f1_scores = []
fold_lengths = []
train_losses = []

for fold, (train_idx, val_idx) in enumerate(skf.split(file_paths, labels)):
    print(f"--- Fold {fold+1} ---")

    train_subset = Subset(dataset, train_idx)
    val_subset = Subset(dataset, val_idx)

    train_loader = DataLoader(train_subset, batch_size=32, shuffle=True)
    val_loader = DataLoader(val_subset, batch_size=32)

    model = create_model()
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

    early_stopping = EarlyStopping(patience=5)
    # Training loop

    best_f1 = 0
    patience = 5
    wait = 0
    epoch_count = 0

    for epoch in range(50):
        model.train()
        epoch_loss = 0
        epoch_count += 1  # 🔸Conta un'epoca completata per questo fold

        for x, y in tqdm(train_loader, desc=f"Epoch {epoch+1}", leave=True):
            x, y = x.to(device, dtype=torch.float32), y.to(device)

            optimizer.zero_grad()
            outputs = model(x)
            loss = criterion(outputs, y)
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()

        train_losses.append(epoch_loss / len(train_loader))

        acc, f1 = evaluate(model, val_loader)
        accuracies.append(acc)
        f1_scores.append(f1)
        tqdm.write(f"Accuracy: {acc:.4f} | F1-score: {f1:.4f}")

        # ⏹ Early stopping
        if f1 > best_f1:
            best_f1 = f1
            wait = 0
            torch.save(model.state_dict(), "best_model.pt")
        else:
            wait += 1
            if wait >= patience:
                tqdm.write("Early stopping")
                break

    fold_lengths.append(epoch_count)

In [None]:
fold_end_positions = []
cumulative = 0
for length in fold_lengths:
    cumulative += length
    fold_end_positions.append(cumulative)

print("\n--- Final results (10-fold CV) ---")
print(f"Average accuracy: {np.mean(accuracies):.4f} ± {np.std(accuracies):.4f}")
print(f"Average f1-score: {np.mean(f1_scores):.4f} ± {np.std(f1_scores):.4f}")

plt.figure(figsize=(10, 4))
plt.plot(train_losses, label='Train Loss')
plt.plot(f1_scores, label='Val F1')
for i, pos in enumerate(fold_end_positions[:-1]):
    plt.axvline(x=pos, color='red', linestyle='-', linewidth=1)
    plt.text(x=pos - 0.7,
             y=plt.ylim()[1] * 0.56,
             s=f"Fold {i+1}",
             rotation=90,
             verticalalignment='top',
             color='red',
             fontsize=8)
plt.xlabel("Epoch")
plt.ylabel("Loss / F1")
plt.title("Training Loss & Validation F1")
plt.legend()
plt.grid()
plt.show()