In [None]:
'''
added perturbation for features, earlier it was only for centroids
formula for epsilon

contains base model code, additional model training code, classification report, after-attack accuracy

model saved as: best_QNI_model_2
accuracy = 96%
'''

In [1]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, WeightedRandomSampler
from torchvision import transforms
from collections import Counter
import numpy as np
import random
import os
from torchvision.datasets import ImageFolder
from matplotlib import pyplot as plt
import pennylane as qml
from pennylane.qnn import TorchLayer
from tqdm.notebook import tqdm

#for loss function 
import torch
import torch.nn as nn
import torch.nn.functional as F

class FocalLoss(nn.Module):
    def __init__(self, alpha=1, gamma=2, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, inputs, targets):
        ce_loss = F.cross_entropy(inputs, targets, reduction='none')
        pt = torch.exp(-ce_loss)
        focal_loss = self.alpha * ((1 - pt) ** self.gamma) * ce_loss

        if self.reduction == 'mean':
            return focal_loss.mean()
        elif self.reduction == 'sum':
            return focal_loss.sum()
        else:
            return focal_loss

# Set seeds for reproducibility
def seed_all(seed=42):
    torch.manual_seed(seed)
    np.random.seed(seed)
    random.seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

seed_all(42)

# ========== DEVICE ==========
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ========== PARAMETERS ==========
n_qubits = 6
batch_size = 16
num_classes = 25
num_epochs = 50
lr = 0.0005

# ========== TRANSFORMS WITH DATA AUGMENTATION ==========
# ✅ For training (with augmentation)
train_transform = transforms.Compose([
    transforms.Grayscale(1),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

# ✅ For validation and test (no augmentation)
eval_transform = transforms.Compose([
    transforms.Grayscale(1),
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])


# ========== DATASETS ==========
train_dataset = ImageFolder('/home/netsec1/dataset_folder/malimg_dataset/train', transform=train_transform)
val_dataset   = ImageFolder('/home/netsec1/dataset_folder/malimg_dataset/val', transform=eval_transform)
test_dataset  = ImageFolder('/home/netsec1/dataset_folder/malimg_dataset/test', transform=eval_transform)
print("**dataset loaded**")
# ========== CLASS WEIGHTS ==========
from sklearn.utils.class_weight import compute_class_weight

labels = [label for _, label in train_dataset.samples]
class_weights = compute_class_weight(class_weight='balanced',
                                     classes=np.unique(labels),
                                     y=labels)
class_wts = torch.tensor(class_weights, dtype=torch.float)

class_weights_tensor = torch.tensor(class_weights, dtype=torch.float).to(device)

# DataLoaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
val_loader   = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4)
test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

# ========== QUANTUM CIRCUIT ==========
dev = qml.device("default.qubit", wires=n_qubits)

@qml.qnode(dev, interface="torch")

def quantum_circuit(inputs, weights):
    for i in range(n_qubits):
        qml.RY(inputs[i], wires=i)
    
    for l in range(weights.shape[0]):
        for i in range(n_qubits):
            qml.RY(weights[l][i], wires=i)
        for i in range(n_qubits - 1):
            qml.CNOT(wires=[i, i+1])
    
    return [qml.expval(qml.PauliZ(i)) for i in range(n_qubits)]

weight_shapes = {"weights": (6, n_qubits)}


# ========== CNN + QNN MODEL ==========
class FeatureReduce(nn.Module):
    def __init__(self, final_dim, dropout=0.4):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(1, 8, 3, stride=2, padding=1),    # 128 -> 64
            nn.BatchNorm2d(8),
            nn.ReLU(),
            nn.Dropout(dropout),

            nn.Conv2d(8, 16, 3, stride=2, padding=1),   # 64 -> 32
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.Dropout(dropout),

            nn.Conv2d(16, 32, 3, stride=2, padding=1),  # 32 -> 16
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Dropout(dropout),

            nn.Conv2d(32, 64, 3, stride=2, padding=1),  # 16 -> 8
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Dropout(dropout),

            nn.Conv2d(64, 128, 3, stride=2, padding=1),  # ⬅️ Extra block: 8 -> 4
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d((1, 1))                # 4×4 -> 1×1
        )
        self.fc = nn.Linear(128, final_dim)  # ⬅️ Changed from 64 to 128

    def forward(self, x):
        x = self.conv(x)
        x = x.view(x.size(0), -1)
        return self.fc(x)

class HybridQNN(nn.Module):
    def __init__(self, n_qubits, num_classes):
        super().__init__()
        self.feature_extractor = FeatureReduce(final_dim=n_qubits)
        self.q_layer = TorchLayer(quantum_circuit, weight_shapes)

        # Adding 4-layer MLP after quantum layer
        self.classifier = nn.Sequential(
            nn.Linear(n_qubits, 64),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(32, 16),
            nn.ReLU(),
            nn.Linear(16, num_classes)
        )

    def forward(self, x):
        x = self.feature_extractor(x)
        x = torch.tanh(x)
        q_out = torch.stack([self.q_layer(f) for f in x])
        return self.classifier(q_out)

# ========== TRAINING ==========
print("Starting training")

# Applying gradient based pertubations of features
def gradient_noise_on_features(model, x, y, epsilon=0.1):
    """
    Compute gradient of loss w.r.t. extracted features and perturb them.
    Returns: perturbed feature tensor [B, n_qubits]
    """
    model.eval()
    x = x.clone().detach().requires_grad_(True)
    
    # Forward: extract features, apply tanh, quantum, classify
    feats = model.feature_extractor(x)  # pre-tanh features [B, n_qubits]
    feats_tanh = torch.tanh(feats)
    
    q_out = torch.stack([model.q_layer(f) for f in feats_tanh])
    logits = model.classifier(q_out)
    
    loss = F.cross_entropy(logits, y)
    loss.backward()
    
    # Get gradient w.r.t. input features
    feats_grad = x.grad.data
    x_pert = x + epsilon * feats_grad.sign()
    x_pert = torch.clamp(x_pert, -1, 1)
    
    # Re-extract features from perturbed image
    feats_pert = model.feature_extractor(x_pert)
    
    return feats_pert.detach()

# ── 2) Precompute class‐centroids in feature space ──────────────────────────
def compute_centroids(model, loader, device, num_classes):
    model.eval()
    sums = torch.zeros(num_classes, n_qubits, device=device)
    counts = torch.zeros(num_classes, device=device)
    with torch.no_grad():
        for x,y in loader:
            x,y = x.to(device), y.to(device)
            feats = model.feature_extractor(x)      # pre‐tanh features
            for c in range(num_classes):
                mask = (y==c)
                if mask.any():
                    sums[c] += feats[mask].sum(0)
                    counts[c] += mask.sum()
    return sums / counts.unsqueeze(1)

# ── 3) QNI perturbation function ────────────────────────────────────────────
def gradient_based_noise(model, x, y, epsilon=0.1):
    """
    x: input image batch [B, C, H, W]
    y: labels
    """
    x = x.clone().detach().requires_grad_(True)
    model.eval()

    # Get output logits
    logits = model(x)
    loss = F.cross_entropy(logits, y)

    # Compute gradient of loss w.r.t input
    loss.backward()
    grad = x.grad.data  # [B, C, H, W]

    # Normalize gradient and perturb input
    grad_sign = grad.sign()
    x_pert = x + epsilon * grad_sign
    x_pert = torch.clamp(x_pert, -1, 1)  # Keep within normalized bounds

    return x_pert.detach()

# … everything above stays the same up to compute_centroids …

# ── 4) Training loop with QNI ──────────────────────────────────────────────
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = HybridQNN(n_qubits, num_classes).to(device)
opt   = torch.optim.AdamW(model.parameters(), lr=5e-4, weight_decay=5e-3)
sched = torch.optim.lr_scheduler.ReduceLROnPlateau(opt, 'min', patience=5)

# Initialize best validation accuracy
best_val_acc = 0.0
best_model_path = "best_QNI_model_2.pth"


# helper to evaluate on a loader
def evaluate(model, loader):
    model.eval()
    total, correct = 0, 0
    with torch.no_grad():
        for x, y in loader:
            x, y = x.to(device), y.to(device)
            logits = model(x)
            preds = logits.argmax(1)
            correct += (preds == y).sum().item()
            total   += y.size(0)
    return correct/total

# initial centroids before training
centroids = compute_centroids(model, train_loader, device, num_classes)

for epoch in range(1, 51):
    # every 5 epochs, recompute centroids on the *current* model:
    if epoch % 5 == 0:
        centroids = compute_centroids(model, train_loader, device, num_classes)
    
    model.train()
    running_loss, running_correct, running_total = 0, 0, 0

    for x, y in tqdm(train_loader, desc=f"Epoch {epoch} [train]"):
        x, y = x.to(device), y.to(device)

        ### changed
        # Clean path
        feats = model.feature_extractor(x)
        clean_logits = model(x)
        loss_clean = F.cross_entropy(clean_logits, y)

        # Perturbed path (gradient-based on features)
        feats_pert = gradient_noise_on_features(model, x, y, epsilon=0.1)
        feats_pert_t = torch.tanh(feats_pert)
        q_out_pert = torch.stack([model.q_layer(f) for f in feats_pert_t])
        pert_logits = model.classifier(q_out_pert)
        loss_pert = F.cross_entropy(pert_logits, y)

        # Joint loss
        loss = 0.8 * loss_clean + 0.2 * loss_pert
        opt.zero_grad()
        loss.backward()
        opt.step()


        # track
        running_loss   += loss.item() * x.size(0)
        running_correct += (clean_logits.argmax(1) == y).sum().item()
        running_total   += y.size(0)

    # step scheduler on *average* training loss
    avg_train_loss = running_loss / running_total
    sched.step(avg_train_loss)

    train_acc = running_correct / running_total
    val_acc   = evaluate(model, val_loader)
    print(f"\nEpoch {epoch:2d} — train loss: {avg_train_loss:.4f}, "
      f"train acc: {train_acc:.4f}, val acc: {val_acc:.4f}")
    
    # Save best model
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(), 
            'optimizer_state_dict': opt.state_dict(),
            'val_accuracy': val_acc
        }, best_model_path)F
        print(f"✅ Best model saved at epoch {epoch} with val_acc: {val_acc:.4f}\n")
    else:
        print()

**dataset loaded**
Starting training


Epoch 1 [train]:   0%|          | 0/467 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [5]:
## to evaluate the saved model

# === Evaluate saved model ===
model = HybridQNN(n_qubits, num_classes).to(device)
checkpoint = torch.load("best_QNI_model_2.pth", map_location=device)
model.load_state_dict(checkpoint['model_state_dict'])
model.eval()

def evaluate(model, loader):
    total, correct = 0, 0
    with torch.no_grad():
        for x, y in loader:
            x, y = x.to(device), y.to(device)
            logits = model(x)
            preds = logits.argmax(1)
            correct += (preds == y).sum().item()
            total += y.size(0)
    return correct / total

val_acc = evaluate(model, val_loader)
test_acc = evaluate(model, test_loader)

print(f"✅ Validation Accuracy: {val_acc:.4f}")
print(f"✅ Test Accuracy: {test_acc:.4f}")


✅ Validation Accuracy: 0.9697
✅ Test Accuracy: 0.9594


In [10]:
## train the model again

# === Load saved model for continued training ===
model = HybridQNN(n_qubits, num_classes).to(device)
opt = torch.optim.AdamW(model.parameters(), lr=5e-4, weight_decay=5e-3)

# Load checkpoint
checkpoint = torch.load("best_QNI_model_2.pth", map_location=device)
model.load_state_dict(checkpoint['model_state_dict'])
opt.load_state_dict(checkpoint['optimizer_state_dict'])
start_epoch = checkpoint['epoch'] + 1
best_val_acc = checkpoint['val_accuracy']
print(f"✅ Model loaded. Resuming from epoch {start_epoch} with best val_acc = {best_val_acc:.4f}")

# Resume training for 10 more epochs
for epoch in range(start_epoch, start_epoch + 5):
    if epoch % 5 == 0:
        centroids = compute_centroids(model, train_loader, device, num_classes)

    model.train()
    running_loss, running_correct, running_total = 0, 0, 0

    for x, y in tqdm(train_loader, desc=f"Epoch {epoch} [train]"):
        x, y = x.to(device), y.to(device)

        # Clean path
        feats = model.feature_extractor(x)
        clean_logits = model(x)
        loss_clean = F.cross_entropy(clean_logits, y)

        # Perturbed path using gradient-based perturbation on features
        feats_pert = gradient_noise_on_features(model, x, y, epsilon=0.1)
        feats_pert_t = torch.tanh(feats_pert)
        q_out_pert = torch.stack([model.q_layer(f) for f in feats_pert_t])
        pert_logits = model.classifier(q_out_pert)
        loss_pert = F.cross_entropy(pert_logits, y)

        # Joint loss and backprop
        loss = 0.8 * loss_clean + 0.2 * loss_pert
        opt.zero_grad()
        loss.backward()
        opt.step()

        # Track stats
        running_loss += loss.item() * x.size(0)
        running_correct += (clean_logits.argmax(1) == y).sum().item()
        running_total += y.size(0)

    # Scheduler step
    avg_train_loss = running_loss / running_total
    sched.step(avg_train_loss)

    train_acc = running_correct / running_total
    val_acc = evaluate(model, val_loader)
    print(f"\nEpoch {epoch:2d} — train loss: {avg_train_loss:.4f}, "
          f"train acc: {train_acc:.4f}, val acc: {val_acc:.4f}")

    # Save best model if improved
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': opt.state_dict(),
            'val_accuracy': val_acc
        }, "best_QNI_model_2.pth")
        print(f"✅ Best model updated at epoch {epoch} with val_acc: {val_acc:.4f}\n")
    else:
        print()


✅ Model loaded. Resuming from epoch 37 with best val_acc = 0.9697


NameError: name 'feats_train' is not defined

In [8]:
from sklearn.metrics import classification_report
import torch

# Initialize the model
model = HybridQNN(n_qubits=n_qubits, num_classes=num_classes).to(device)

# ✅ Load only the model weights from the saved checkpoint
checkpoint = torch.load("best_QNI_model_2.pth", map_location=device)
model.load_state_dict(checkpoint['model_state_dict'])  # ⬅️ fix
model.eval()

all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# Get class names from the test dataset
class_names = test_dataset.classes

print("Classification Report:")
print(classification_report(all_labels, all_preds, target_names=class_names))


Classification Report:
                precision    recall  f1-score   support

     Adialer.C       1.00      1.00      1.00        14
     Agent.FYI       1.00      1.00      1.00        13
     Allaple.A       1.00      0.99      0.99       296
     Allaple.L       1.00      1.00      1.00       160
 Alueron.gen!J       1.00      0.95      0.98        21
     Autorun.K       0.00      0.00      0.00        12
       C2LOP.P       0.72      0.81      0.76        16
   C2LOP.gen!g       0.78      0.70      0.74        20
Dialplatform.B       1.00      1.00      1.00        20
     Dontovo.A       1.00      1.00      1.00        17
      Fakerean       1.00      0.97      0.99        39
 Instantaccess       0.92      1.00      0.96        44
    Lolyda.AA1       1.00      0.95      0.98        22
    Lolyda.AA2       1.00      1.00      1.00        21
    Lolyda.AA3       1.00      1.00      1.00        13
     Lolyda.AT       1.00      1.00      1.00        17
   Malex.gen!J       0.9

  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [9]:
## attacking the model with FGSM and PGD

import torch
import torch.nn.functional as F

# ===== FGSM Attack =====
def fgsm_attack(model, x, y, epsilon=0.1, device='cuda'):
    model.eval()
    x_adv = x.clone().detach().to(device).requires_grad_(True)
    y = y.to(device)

    logits = model(x_adv)
    loss = F.cross_entropy(logits, y)
    model.zero_grad()
    loss.backward()

    x_adv = x_adv + epsilon * x_adv.grad.sign()
    x_adv = torch.clamp(x_adv, min=-1.0, max=1.0)
    return x_adv.detach()

# ===== PGD Attack =====
def pgd_attack(model, x, y, eps=0.1, alpha=0.02, iters=10, device='cuda'):
    model.eval()
    x_orig = x.clone().detach().to(device)
    x_adv  = x_orig + torch.empty_like(x_orig).uniform_(-eps, eps)
    x_adv  = torch.clamp(x_adv, -1.0, 1.0).detach()

    y = y.to(device)
    for _ in range(iters):
        x_adv.requires_grad_(True)
        logits = model(x_adv)
        loss = F.cross_entropy(logits, y)
        model.zero_grad()
        loss.backward()

        x_adv = x_adv + alpha * x_adv.grad.sign()
        delta = torch.clamp(x_adv - x_orig, min=-eps, max=eps)
        x_adv = torch.clamp(x_orig + delta, -1.0, 1.0).detach()
    return x_adv

# ===== Evaluate Attack =====
def test_adversarial(model, loader, attack_fn, attack_name, **attack_kwargs):
    model.eval()
    total, correct = 0, 0
    for x, y in loader:
        x, y = x.to(device), y.to(device)
        x_adv = attack_fn(model, x, y, **attack_kwargs)
        logits = model(x_adv)
        preds = logits.argmax(dim=1)
        correct += (preds == y).sum().item()
        total += y.size(0)
    acc = 100. * correct / total
    print(f"{attack_name} Adversarial Accuracy: {acc:.2f}%")

# ===== Load Model from best_QNI_model_2.pth =====
model = HybridQNN(n_qubits=n_qubits, num_classes=num_classes).to(device)
checkpoint = torch.load("best_QNI_model_2.pth", map_location=device)
model.load_state_dict(checkpoint['model_state_dict'])
model.eval()

# ===== Run Evaluations =====
# Clean accuracy
clean_acc = evaluate(model, test_loader)
print(f"\n✅ Clean Test Accuracy: {clean_acc*100:.2f}%")

# FGSM attack
test_adversarial(
    model, test_loader,
    attack_fn=fgsm_attack,
    attack_name="FGSM",
    epsilon=0.1,
    device=device
)

# PGD attack
test_adversarial(
    model, test_loader,
    attack_fn=pgd_attack,
    attack_name="PGD",
    eps=0.1,
    alpha=0.02,
    iters=10,
    device=device
)



✅ Clean Test Accuracy: 95.94%
FGSM Adversarial Accuracy: 41.73%
PGD Adversarial Accuracy: 25.91%
