<a href="https://colab.research.google.com/github/Michael-Aladejobi/30-days-with-JavaScript/blob/main/Malwaredetection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
@!pip install datasets

Collecting datasets
  Downloading datasets-3.5.0-py3-none-any.whl.metadata (19 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py311-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2024.12.0,>=2023.1.0 (from fsspec[http]<=2024.12.0,>=2023.1.0->datasets)
  Downloading fsspec-2024.12.0-py3-none-any.whl.metadata (11 kB)
Downloading datasets-3.5.0-py3-none-any.whl (491 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m491.2/491.2 kB[0m [31m8.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m9.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading fsspec-2024.12.0-py3-none-any.wh

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
from sklearn.model_selection import train_test_split
from datasets import load_dataset
import json
from tqdm import tqdm

# Set random seed for reproducibility
torch.manual_seed(42)
np.random.seed(42)

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Load dataset
print("Loading dataset...")
ds = load_dataset("PurCL/malware-top-100")
print("Dataset loaded successfully.")

# Display dataset info
print(f"Dataset splits: {ds.keys()}")
print(f"Train set size: {len(ds['train'])}")
for key in ds['train'].features.keys():
    print(f"Feature: {key}, Type: {ds['train'].features[key]}")

# Pre-process the dataset
class MalwareDataset(Dataset):
    def __init__(self, hf_dataset, split='train'):
        self.dataset = hf_dataset[split]

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        sample = self.dataset[idx]

        # Extract binary data (assuming it's a string of binary data)
        binary_name = sample['binary_name']

        # Convert binary string to fixed length vector (first 1024 bytes)
        # This is a simplified approach - you might want to use more sophisticated feature extraction
        binary_data = []
        if isinstance(binary_name, str):
            for char in binary_name[:1024]:
                binary_data.append(ord(char) / 255.0)  # Normalize to [0,1]
            # Pad if needed
            binary_data = binary_data + [0] * (1024 - len(binary_data))
        else:
            binary_data = [0] * 1024

        # Extract function data
        functions = sample['functions']
        if isinstance(functions, str):
            # Simple feature extraction from functions
            func_data = []
            for char in functions[:1024]:
                func_data.append(ord(char) / 255.0)
            func_data = func_data + [0] * (1024 - len(func_data))
        else:
            func_data = [0] * 1024

        # Extract label - convert to one-hot encoding
        labels_list = sample['labels']
        # Creating a multi-hot encoding for multi-label classification
        labels = []
        if isinstance(labels_list, list):
            label_map = {"stealer": 0, "startpage_": 1, "winnt1": 2, "zusy": 3}
            label_vector = [0] * len(label_map)
            for label in labels_list:
                if label in label_map:
                    label_vector[label_map[label]] = 1
            labels = label_vector
        else:
            labels = [0, 0, 0, 0]  # Default for missing labels

        return {
            'binary_data': torch.FloatTensor(binary_data),
            'function_data': torch.FloatTensor(func_data),
            'labels': torch.FloatTensor(labels)
        }

# Define the CNN model for binary data
class BinaryDataCNN(nn.Module):
    def __init__(self):
        super(BinaryDataCNN, self).__init__()
        self.conv1 = nn.Conv1d(1, 32, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm1d(32)
        self.conv2 = nn.Conv1d(32, 64, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm1d(64)
        self.conv3 = nn.Conv1d(64, 128, kernel_size=3, stride=1, padding=1)
        self.bn3 = nn.BatchNorm1d(128)
        self.pool = nn.MaxPool1d(2)
        self.dropout = nn.Dropout(0.5)
        # Compute the correct FC input size
        self.fc = nn.Linear(128 * 128, 128)  # Will calculate exact size in forward pass

    def forward(self, x):
        # x shape: [batch_size, sequence_length]
        x = x.unsqueeze(1)  # Add channel dimension [batch_size, 1, sequence_length]

        x = self.pool(F.relu(self.bn1(self.conv1(x))))
        x = self.pool(F.relu(self.bn2(self.conv2(x))))
        x = self.pool(F.relu(self.bn3(self.conv3(x))))

        # Flatten the tensor properly
        batch_size = x.size(0)
        x = x.reshape(batch_size, -1)  # Using reshape instead of view

        # Adjust the FC layer on first forward pass if needed
        if not hasattr(self, 'fc_size_set'):
            input_size = x.size(1)
            self.fc = nn.Linear(input_size, 128).to(x.device)
            self.fc_size_set = True

        x = self.dropout(x)
        x = self.fc(x)
        return x

# Define the Transformer model for function data
class FunctionTransformer(nn.Module):
    def __init__(self):
        super(FunctionTransformer, self).__init__()
        self.embedding = nn.Linear(1, 64)  # Simple embedding layer
        encoder_layer = nn.TransformerEncoderLayer(d_model=64, nhead=4, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=2)
        # We'll set the size of the FC layer in the first forward pass
        self.fc = None
        self.pool = nn.MaxPool1d(2)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        # x shape: [batch_size, sequence_length]
        x = x.unsqueeze(-1)  # Add feature dimension [batch_size, sequence_length, 1]

        x = self.embedding(x)  # [batch_size, sequence_length, 64]
        x = self.transformer_encoder(x)  # [batch_size, sequence_length, 64]

        # Reshape for pooling
        batch_size = x.size(0)
        seq_len = x.size(1)
        emb_dim = x.size(2)

        x = x.permute(0, 2, 1)  # [batch_size, 64, sequence_length]
        x = self.pool(x)
        x = self.pool(x)

        # Reshape correctly using reshape instead of view
        x = x.reshape(batch_size, -1)  # Flatten

        # Create the FC layer on first forward pass with correct input size
        if self.fc is None:
            input_size = x.size(1)
            self.fc = nn.Linear(input_size, 128).to(x.device)

        x = self.dropout(x)
        x = self.fc(x)
        return x

# Define the ensemble model
class BiModalEnsemble(nn.Module):
    def __init__(self, num_classes=4):
        super(BiModalEnsemble, self).__init__()
        self.binary_cnn = BinaryDataCNN()
        self.function_transformer = FunctionTransformer()

        # Fusion layer
        self.fusion = nn.Linear(128 + 128, 128)
        self.bn = nn.BatchNorm1d(128)
        self.classifier = nn.Linear(128, num_classes)
        self.dropout = nn.Dropout(0.5)

    def forward(self, binary_data, function_data):
        binary_features = self.binary_cnn(binary_data)
        function_features = self.function_transformer(function_data)

        # Feature fusion
        combined = torch.cat((binary_features, function_features), dim=1)
        combined = F.relu(self.bn(self.fusion(combined)))
        combined = self.dropout(combined)
        output = self.classifier(combined)

        return output

# Create train and validation datasets
def create_datasets():
    full_dataset = MalwareDataset(ds)

    # Calculate sizes
    dataset_size = len(full_dataset)
    train_size = int(0.8 * dataset_size)
    val_size = dataset_size - train_size

    # Split dataset
    train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])

    print(f"Created training set with {train_size} samples and validation set with {val_size} samples")
    return train_dataset, val_dataset

# Training function
def train_model(model, train_loader, val_loader, num_epochs=10):
    criterion = nn.BCEWithLogitsLoss()  # For multi-label classification
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2)

    best_val_loss = float('inf')
    history = {'train_loss': [], 'val_loss': [], 'val_accuracy': []}

    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0

        # Training loop
        progress_bar = tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}')
        for batch in progress_bar:
            binary_data = batch['binary_data'].to(device)
            function_data = batch['function_data'].to(device)
            labels = batch['labels'].to(device)

            optimizer.zero_grad()
            outputs = model(binary_data, function_data)
            loss = criterion(outputs, labels)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()

            train_loss += loss.item()
            progress_bar.set_postfix({'train_loss': loss.item()})

        avg_train_loss = train_loss / len(train_loader)
        history['train_loss'].append(avg_train_loss)

        # Validation loop
        val_loss, val_accuracy, val_preds, val_targets = evaluate_model(model, val_loader, criterion)
        history['val_loss'].append(val_loss)
        history['val_accuracy'].append(val_accuracy)

        print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {avg_train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}')

        # Learning rate scheduling
        scheduler.step(val_loss)

        # Save best model
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'best_malware_model.pth')
            print("Model saved!")

    return history, val_preds, val_targets

# Evaluation function
def evaluate_model(model, data_loader, criterion):
    model.eval()
    val_loss = 0.0
    all_preds = []
    all_targets = []

    with torch.no_grad():
        for batch in data_loader:
            binary_data = batch['binary_data'].to(device)
            function_data = batch['function_data'].to(device)
            labels = batch['labels'].to(device)

            outputs = model(binary_data, function_data)
            loss = criterion(outputs, labels)
            val_loss += loss.item()

            preds = torch.sigmoid(outputs) > 0.5
            all_preds.extend(preds.cpu().numpy())
            all_targets.extend(labels.cpu().numpy())

    val_loss /= len(data_loader)
    all_preds = np.array(all_preds)
    all_targets = np.array(all_targets)

    # Calculate accuracy for multi-label classification (exact match)
    accuracy = np.mean(np.all(all_preds == all_targets, axis=1))

    return val_loss, accuracy, all_preds, all_targets

# Visualization functions
def plot_confusion_matrices(preds, targets, class_names):
    fig, axes = plt.subplots(2, 2, figsize=(15, 12))
    axes = axes.flatten()

    for i in range(len(class_names)):
        cm = confusion_matrix(targets[:, i], preds[:, i])
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', cbar=False, ax=axes[i])
        axes[i].set_title(f'Confusion Matrix - {class_names[i]}')
        axes[i].set_xlabel('Predicted')
        axes[i].set_ylabel('Actual')

    plt.tight_layout()
    plt.savefig('confusion_matrices.png')
    plt.close()

    return 'confusion_matrices.png'

def plot_correlation_matrix(targets, class_names):
    corr_matrix = np.zeros((len(class_names), len(class_names)))

    for i in range(len(class_names)):
        for j in range(len(class_names)):
            # Calculate correlation between labels
            corr = np.corrcoef(targets[:, i], targets[:, j])[0, 1]
            corr_matrix[i, j] = corr

    plt.figure(figsize=(10, 8))
    sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', xticklabels=class_names, yticklabels=class_names)
    plt.title('Label Correlation Matrix')
    plt.tight_layout()
    plt.savefig('correlation_matrix.png')
    plt.close()

    return 'correlation_matrix.png'

def plot_training_history(history):
    plt.figure(figsize=(12, 5))

    plt.subplot(1, 2, 1)
    plt.plot(history['train_loss'], label='Train Loss')
    plt.plot(history['val_loss'], label='Validation Loss')
    plt.title('Loss Over Time')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(history['val_accuracy'], label='Validation Accuracy')
    plt.title('Accuracy Over Time')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()

    plt.tight_layout()
    plt.savefig('training_history.png')
    plt.close()

    return 'training_history.png'

def generate_classification_report(preds, targets, class_names):
    reports = {}

    for i, class_name in enumerate(class_names):
        report = classification_report(targets[:, i], preds[:, i], output_dict=True)
        reports[class_name] = report

    # Overall metrics
    accuracy = np.mean(np.all(preds == targets, axis=1))
    hamming_loss = np.mean(np.mean(preds != targets, axis=1))

    print(f"Overall Accuracy (Exact Match): {accuracy:.4f}")
    print(f"Hamming Loss: {hamming_loss:.4f}")

    for class_name, report in reports.items():
        print(f"\nClass: {class_name}")
        print(f"Precision: {report['1']['precision']:.4f}")
        print(f"Recall: {report['1']['recall']:.4f}")
        print(f"F1-Score: {report['1']['f1-score']:.4f}")

    return reports

# Add a testing function to evaluate on the test set
def test_model(model_path, test_loader):
    # Load the best model
    model = BiModalEnsemble(num_classes=4).to(device)
    model.load_state_dict(torch.load(model_path))
    model.eval()

    criterion = nn.BCEWithLogitsLoss()
    test_loss, test_accuracy, test_preds, test_targets = evaluate_model(model, test_loader, criterion)

    print(f"\nTest Results:")
    print(f"Test Loss: {test_loss:.4f}")
    print(f"Test Accuracy: {test_accuracy:.4f}")

    return test_preds, test_targets

def main():
    print("Creating datasets...")
    train_dataset, val_dataset = create_datasets()

    # Create data loaders with smaller batch size to avoid memory issues
    train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)

    print("Creating model...")
    model = BiModalEnsemble(num_classes=4).to(device)

    print("Training model...")
    history, val_preds, val_targets = train_model(model, train_loader, val_loader, num_epochs=10)

    # Class names for visualization
    class_names = ["stealer", "startpage_", "winnt1", "zusy"]

    print("Generating evaluation metrics...")
    # Plot confusion matrices
    confusion_matrix_path = plot_confusion_matrices(val_preds, val_targets, class_names)
    print(f"Confusion matrices saved to: {confusion_matrix_path}")

    # Plot correlation matrix
    correlation_matrix_path = plot_correlation_matrix(val_targets, class_names)
    print(f"Correlation matrix saved to: {correlation_matrix_path}")

    # Plot training history
    history_plot_path = plot_training_history(history)
    print(f"Training history plot saved to: {history_plot_path}")

    # Generate classification report
    print("\nClassification Report:")
    reports = generate_classification_report(val_preds, val_targets, class_names)

    # Create test dataset and test the model
    print("\nEvaluating on test set...")
    test_dataset = MalwareDataset(ds, split='test')
    test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)
    test_preds, test_targets = test_model('best_malware_model.pth', test_loader)

    # Generate test metrics
    print("\nTest Classification Report:")
    test_reports = generate_classification_report(test_preds, test_targets, class_names)

    # Plot test confusion matrices
    test_confusion_matrix_path = plot_confusion_matrices(test_preds, test_targets, class_names)
    print(f"Test confusion matrices saved to: {test_confusion_matrix_path}")

    print("\nTraining and evaluation complete!")

if __name__ == "__main__":
    main()

Using device: cpu
Loading dataset...
Dataset loaded successfully.
Dataset splits: dict_keys(['train', 'test', 'valid'])
Train set size: 3728
Feature: binary_name, Type: Value(dtype='string', id=None)
Feature: labels, Type: Sequence(feature=Value(dtype='string', id=None), length=-1, id=None)
Feature: functions, Type: Value(dtype='string', id=None)
Creating datasets...
Created training set with 2982 samples and validation set with 746 samples
Creating model...
Training model...


Epoch 1/10:  25%|██▌       | 47/187 [06:56<20:13,  8.66s/it, train_loss=0.128]