In [None]:
# License: BSD
# Author: Sasank Chilamkurthy

import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import torch.backends.cudnn as cudnn
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import matplotlib.pyplot as plt
import time
import os
import copy
from PIL import Image
from tempfile import TemporaryDirectory
from torch.utils.tensorboard import SummaryWriter
from torch.utils.data import random_split

In [None]:
data_dir = "C:/Users/user/OneDrive/Masaüstü/big2015_g/big2015_g/train"

# Define transformations for your images (adjust as needed)
train_transforms = transforms.Compose([
    transforms.RandomResizedCrop(224),  # Resize for EfficientNet-B3
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Create the ImageFolder dataset
dataset = datasets.ImageFolder(root=data_dir, transform=train_transforms)

# Define the validation set split ratio (e.g., 20% for validation)
val_split = 0.2

# Split the dataset into train and validation sets using random_split
train_size = int(len(dataset) * (1 - val_split))
val_size = len(dataset) - train_size
train_data, val_data = random_split(dataset, [train_size, val_size])

image_datasets = {
    'train': train_data,
    'val': val_data,
}

dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=4,
                                             shuffle=True, num_workers=4)
              for x in ['train', 'val']}

dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}
class_names = dataset.classes

# ... rest of your transfer learning code using train_loader and val_loader
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
test_transform = transforms.Compose([
    transforms.RandomResizedCrop(224),
    transforms.ToTensor(), 
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
test_data = datasets.ImageFolder("C:/Users/user/OneDrive/Masaüstü/big2015_g/big2015_g/test", transform=test_transform)
test_loader = torch.utils.data.DataLoader(test_data, batch_size = 16, num_workers = 0)

In [None]:
def imshow(inp, title=None):
    #Display image for Tensor
    inp = inp.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    plt.imshow(inp)
    if title is not None:
        plt.title(title)
    plt.pause(0.001)  # pause a bit so that plots are updated


# Get a batch of training data
inputs, classes = next(iter(dataloaders['train']))

# Make a grid from batch
out = torchvision.utils.make_grid(inputs)

imshow(out, title=[class_names[x] for x in classes])

In [None]:
# EarlyStopping class definition
class EarlyStopping:
    def __init__(self, patience=5, verbose=False, delta=0):
        self.patience = patience
        self.verbose = verbose
        self.delta = delta
        self.counter = 0
        self.best_loss = None
        self.early_stop = False

    def __call__(self, val_loss):
        if self.best_loss is None:
            self.best_loss = val_loss
        elif val_loss > self.best_loss - self.delta:
            self.counter += 1
            if self.verbose:
                print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_loss = val_loss
            self.counter = 0

In [None]:
writer = SummaryWriter("C:/Users/user/experiment_efficientnetb7_sgd_001")

In [None]:
# Training function with early stopping
def train_model(model, criterion, optimizer, scheduler, num_epochs=25, patience=5):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    early_stopping = EarlyStopping(patience=patience, verbose=True)

    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            if phase == "train":
                writer.add_scalar("Training Loss", epoch_loss, epoch)
                writer.add_scalar("Training Accuracy", epoch_acc, epoch)
            elif phase == 'val':
                writer.add_scalar("Validation Loss", epoch_loss, epoch)
                writer.add_scalar("Validation Accuracy", epoch_acc, epoch)

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            # deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

            # Check early stopping
            if phase == 'val':
                early_stopping(epoch_loss)
                if early_stopping.early_stop:
                    print("Early stopping")
                    time_elapsed = time.time() - since
                    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
                    print(f'Best val Acc: {best_acc:4f}')
                    writer.close()
                    model.load_state_dict(best_model_wts)
                    return model

        print()

    time_elapsed = time.time() - since
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best val Acc: {best_acc:4f}')
    writer.close()
    # load best model weights
    model.load_state_dict(best_model_wts)
    return model

In [None]:
# Define the custom EfficientNet model
class CustomEfficientNet(nn.Module):
    def __init__(self, num_classes=9):
        super(CustomEfficientNet, self).__init__()
        self.efficientnet = models.efficientnet_b7(weights='IMAGENET1K_V1')
        
        # Replace the Global Average Pooling layer with Max Pooling
        self.efficientnet.avgpool = nn.MaxPool2d(kernel_size=7, stride=1) #7 1
        
        # Calculate the number of features after the Max Pooling layer
        # The kernel size used here is important to make sure the output size matches
        # Expected input size after MaxPool2d should match the linear layer input size
        num_ftrs = self.efficientnet.classifier[1].in_features
        
        # Adjust the classifier
        self.efficientnet.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Dropout(p=0.2, inplace=True),
            nn.Linear(num_ftrs, num_classes)
        )
    
    def forward(self, x):
        #x = self.efficientnet.features(x)
        #x = self.efficientnet.avgpool(x)
        #x = self.efficientnet.classifier(x)
        x = self.efficientnet(x)
        return x
        #return x

# Instantiate the model
model_ft = CustomEfficientNet(num_classes=9)
model_ft = model_ft.to(device)

# Define loss function
criterion = nn.CrossEntropyLoss()

# Define optimizer
optimizer_ft = optim.SGD(model_ft.parameters(), lr=0.01, momentum=0.9)

# Define learning rate scheduler
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)

# Print model summary
print(model_ft)

In [None]:
model_ft = train_model(model_ft, criterion, optimizer_ft, exp_lr_scheduler,num_epochs=120)

In [None]:
torch.save(model_ft.state_dict(), "C:/Users/user/Documents/experimentefficientnetb7001sgdrgb.pt")

In [None]:
correct = 0
total = 0
# since we're not training, we don't need to calculate the gradients for our outputs
with torch.no_grad():
    for data in test_loader:
        images, labels = data
        images = images.to(device)
        labels = labels.to(device)
        # calculate outputs by running images through the network
        outputs = model_ft(images)
        # the class with the highest energy is what we choose as prediction
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f'Accuracy of the network on the big15 malware test images: {100 * correct // total} %')

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns
predicted_labels = []
true_labels = []
label_names = ['Gatak', 'Kelihos_ver1', 'Kelihos_ver3', 'Lollipop', 'Obfuscator.ACY', 'Ramnit', 'Simba', 'Tracur', 'Vundo']
model_ft.eval()
with torch.no_grad():
    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device)
        outputs = model_ft(images)
        _,predicted = torch.max(outputs, 1)
        predicted_labels.extend(predicted.cpu().numpy())
        true_labels.extend(labels.cpu().numpy())
predicted_labels = np.array(predicted_labels)
true_labels = np.array(true_labels)
confusion = confusion_matrix(true_labels, predicted_labels)
normalized_matrix = confusion.astype('float') / confusion.sum(axis=1)[:, np.newaxis]
sns.set()
plt.figure(figsize=(10, 8))
sns.heatmap(normalized_matrix, annot=True, cmap="Blues", xticklabels=label_names, yticklabels=label_names)
plt.xlabel("Predicted labels")
plt.ylabel("True labels")

In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score



# Set the model to evaluation mode
model_ft.eval()

# Initialize lists to hold true and predicted labels
true_labels = []
pred_labels = []

# Disable gradient calculation for evaluation
with torch.no_grad():
    for data in test_loader:
        images, labels = data
        images = images.to(device)
        labels = labels.to(device)
        outputs = model_ft(images)
        _, preds = torch.max(outputs, 1)
        true_labels.extend(labels.cpu().numpy())
        pred_labels.extend(preds.cpu().numpy())

# Calculate precision, recall, and F1-score
precision = precision_score(true_labels, pred_labels, average='macro')
recall = recall_score(true_labels, pred_labels, average='macro')
f1 = f1_score(true_labels, pred_labels, average='macro')

print(f'Precision: {precision:.4f}')
print(f'Recall: {recall:.4f}')
print(f'F1 Score: {f1:.4f}')