In [2]:
import os
import torch
from torch import nn, optim
from torch.utils.data import DataLoader, Dataset, random_split
import torchvision
from torchvision import transforms
from PIL import Image
import pennylane as qml
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import classification_report, confusion_matrix

In [3]:
# ----------------------
# Configuration & Setup
# ----------------------
torch.manual_seed(42)
np.random.seed(42)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")


Using device: cuda


In [4]:
# --------------------
# Dataset Class
# --------------------
class SpectrogramDataset(Dataset):
    def __init__(self, soi_dir, cwi_dir, transform=None):
        self.filepaths = []
        self.labels = []
        
        # Load SOI images (label 0)
        for fname in os.listdir(soi_dir):
            self.filepaths.append(os.path.join(soi_dir, fname))
            self.labels.append(0)
            
        # Load CWI images (label 1)
        for fname in os.listdir(cwi_dir):
            self.filepaths.append(os.path.join(cwi_dir, fname))
            self.labels.append(1)
            
        self.transform = transform

    def __len__(self):
        return len(self.filepaths)

    def __getitem__(self, idx):
        img = Image.open(self.filepaths[idx]).convert('RGB')
        if self.transform:
            img = self.transform(img)
        return img, self.labels[idx]

In [5]:
# ----------------------
# Data Preparation
# ----------------------
data_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# Initialize dataset
soi_dir = '/home/HardDisk/yared/spectrogram-dataset/soi'
cwi_dir = '/home/HardDisk/yared/spectrogram-dataset/cwi'
full_dataset = SpectrogramDataset(soi_dir, cwi_dir, data_transforms)

# Split dataset
train_size = int(0.8 * len(full_dataset))
test_size = len(full_dataset) - train_size
train_dataset, test_dataset = random_split(full_dataset, [train_size, test_size])

# Create data loaders
BATCH_SIZE = 32
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

In [6]:
# --------------------
# Quantum Components
# --------------------
n_qubits = 6
dev = qml.device("default.qubit", wires=n_qubits)

@qml.qnode(dev, interface="torch")
def quantum_circuit(inputs, weights):
    # Input encoding
    for i in range(n_qubits):
        qml.RY(inputs[i], wires=i)
    
    # Variational layers
    qml.StronglyEntanglingLayers(weights, wires=range(n_qubits))
    
    return [qml.expval(qml.PauliZ(i)) for i in range(n_qubits)]

In [7]:
# ----------------------
# Hybrid Model
# ----------------------
class HybridQNN(nn.Module):
    def __init__(self):
        super().__init__()
        # Classical components with frozen ResNet
        self.resnet = torchvision.models.resnet18(pretrained=True)
        for param in self.resnet.parameters():
            param.requires_grad = False
        self.resnet = nn.Sequential(*list(self.resnet.children())[:-1])
        
        # Trainable post-processing
        self.post_resnet = nn.Sequential(
            nn.Linear(512, 64),
            nn.ReLU(),
            nn.Linear(64, n_qubits)
        )
        
        # Quantum components
        weight_shape = {"weights": (3, n_qubits, 3)}
        self.qlayer = qml.qnn.TorchLayer(quantum_circuit, weight_shapes=weight_shape)
        
        # Final classifier
        self.classifier = nn.Linear(n_qubits, 2)

    def forward(self, x):
        # Allow gradient flow through feature extractor
        with torch.set_grad_enabled(True):  # Critical for attack gradients
            x = self.resnet(x).flatten(1)
        x = self.post_resnet(x)
        x = self.qlayer(x)
        return self.classifier(x)

# Initialize model
model = HybridQNN().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)



In [8]:
# ----------------------
# Training Loop
# ----------------------
def train_model(model, epochs=25):
    train_losses, val_losses = [], []
    
    for epoch in range(epochs):
        # Training phase
        model.train()
        running_loss = 0.0
        
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item() * inputs.size(0)

        # Validation phase
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        
        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                
                val_loss += loss.item() * inputs.size(0)
                _, preds = torch.max(outputs, 1)
                correct += (preds == labels).sum().item()
                total += labels.size(0)
        
        # Calculate metrics
        train_loss = running_loss / len(train_loader.dataset)
        val_loss = val_loss / len(test_loader.dataset)
        val_acc = correct / total
        
        train_losses.append(train_loss)
        val_losses.append(val_loss)
        
        print(f"Epoch {epoch+1}/{epochs} | "
              f"Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | "
              f"Accuracy: {val_acc:.2%}")

    return train_losses, val_losses

# Start training
train_losses, val_losses = train_model(model)

Epoch 1/25 | Train Loss: 0.4528 | Val Loss: 0.3358 | Accuracy: 93.51%
Epoch 2/25 | Train Loss: 0.2801 | Val Loss: 0.2125 | Accuracy: 94.31%
Epoch 3/25 | Train Loss: 0.2121 | Val Loss: 0.2021 | Accuracy: 94.19%
Epoch 4/25 | Train Loss: 0.1888 | Val Loss: 0.2068 | Accuracy: 92.78%
Epoch 5/25 | Train Loss: 0.1871 | Val Loss: 0.1573 | Accuracy: 95.17%
Epoch 6/25 | Train Loss: 0.1690 | Val Loss: 0.1532 | Accuracy: 95.29%
Epoch 7/25 | Train Loss: 0.1504 | Val Loss: 0.1555 | Accuracy: 95.47%
Epoch 8/25 | Train Loss: 0.1509 | Val Loss: 0.1409 | Accuracy: 95.10%
Epoch 9/25 | Train Loss: 0.1524 | Val Loss: 0.1427 | Accuracy: 95.10%
Epoch 10/25 | Train Loss: 0.1438 | Val Loss: 0.1452 | Accuracy: 94.74%
Epoch 11/25 | Train Loss: 0.1398 | Val Loss: 0.1387 | Accuracy: 95.10%
Epoch 12/25 | Train Loss: 0.1197 | Val Loss: 0.1425 | Accuracy: 95.29%
Epoch 13/25 | Train Loss: 0.1308 | Val Loss: 0.1702 | Accuracy: 94.06%
Epoch 14/25 | Train Loss: 0.1218 | Val Loss: 0.1567 | Accuracy: 94.98%
Epoch 15/25 | T

#  Quantum-State Poisoning attack with adaptive perturbation

In [10]:
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score
import numpy as np

def quantum_aware_attack(model, x, y_true, target_class=1, epsilon=0.3, n_iter=10, temp=0.2):
    """quantum-state poisoning attack with adaptive perturbation"""
    model.eval()
    device = next(model.parameters()).device
    
    # Store original quantum parameters
    quantum_weights = model.qlayer.weights
    q_weights_orig = quantum_weights.detach().clone()
    
    best_conf = 0
    best_pred = y_true.item()
    adaptive_eps = epsilon
    
    for _ in range(n_iter):
        # Enable gradient computation
        with torch.set_grad_enabled(True):
            x.requires_grad = True
            model.zero_grad()
            
            # Forward pass with current weights
            output = model(x)
            loss = nn.CrossEntropyLoss()(output, torch.tensor([target_class]).to(device))
            loss.backward()
            
            # Get quantum gradients
            grad_q = quantum_weights.grad
            if grad_q is None:
                continue
                
            # Quantum-aware perturbation
            q_sensitivity = torch.sigmoid(torch.abs(grad_q) / temp)
            
            # Entanglement pattern boosting
            weight_diff = quantum_weights - torch.mean(quantum_weights, dim=1, keepdim=True)
            ent_boost = 1 + torch.norm(weight_diff, p=2, dim=2)
            q_sensitivity *= ent_boost[:,:,None]
            
            # Apply adaptive perturbation
            delta = adaptive_eps * grad_q * q_sensitivity
            new_weights = q_weights_orig + delta
            
            # Evaluate perturbed model
            with torch.no_grad():
                quantum_weights.copy_(new_weights)
                conf = torch.softmax(model(x), 1)[0, target_class].item()
                current_pred = model(x).argmax().item()
                
                if conf > best_conf:
                    best_conf = conf
                    best_pred = current_pred
                
                # Dynamic epsilon adjustment
                adaptive_eps *= 0.8 if current_pred == target_class else 1.2
    
    # Restore original weights
    with torch.no_grad():
        quantum_weights.copy_(q_weights_orig)
        
    return best_pred, best_conf


In [11]:
def compute_metrics(model, test_loader, attack_strength=None, target_class=1):
    model.eval()
    all_preds = []
    all_labels = []
    all_confidences = []
    
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        
        batch_preds = []
        for i in range(len(inputs)):
            x = inputs[i].unsqueeze(0)
            y = labels[i]
            
            if attack_strength is None or y.item() == target_class:
                # Clean evaluation
                with torch.no_grad():
                    output = model(x)
                    pred = output.argmax().item()
                    conf = torch.softmax(output, 1)[0, pred].item()
            else:
                # Attack scenario
                pred, conf = quantum_aware_attack(
                    model, x, y, 
                    target_class=target_class,
                    epsilon=attack_strength,
                    n_iter=10,
                    temp=0.2
                )
            
            batch_preds.append(pred)
            all_confidences.append(conf)
        
        all_preds.extend(batch_preds)
        all_labels.extend(labels.cpu().numpy())
    
    # Calculate metrics
    metrics = {
        'accuracy': np.mean(np.array(all_preds) == np.array(all_labels)),
        'precision': precision_score(all_labels, all_preds, zero_division=0),
        'recall': recall_score(all_labels, all_preds, zero_division=0),
        'f1': f1_score(all_labels, all_preds, zero_division=0),
        'roc_auc': roc_auc_score(all_labels, all_preds),
        'avg_confidence': np.mean(all_confidences),
        'attack_success': np.mean([p == target_class for p, l in zip(all_preds, all_labels) if l != target_class])
    }
    
    return metrics


In [12]:
def evaluate_attack_scenarios(model, test_loader):
    attack_strengths = [0.0, 0.01, 0.02, 0.03, 0.04, 0.05, 0.1]
    results = {}
    
    print("\n" + "="*60)
    print("Quantum-Aware Attack Evaluation".center(60))
    print("="*60 + "\n")
    
    for eps in attack_strengths:
        key = 'Clean' if eps == 0.0 else f'Œµ={eps:.2f}'
        print(f"üîç Processing {key} scenario...")
        
        results[key] = compute_metrics(
            model, 
            test_loader, 
            attack_strength=None if eps == 0.0 else eps,
            target_class=1
        )
        
        # Print formatted results
        print(f"\nüìä {key} Results:")
        print("-"*50)
        for metric, value in results[key].items():
            if metric == 'attack_success':
                print(f"{'Attack Success Rate':>20}: {value:.2%}")
            else:
                print(f"{metric:>20}: {value:.4f}")
        print()
    
    return results


In [None]:
# Run evaluation
attack_results = evaluate_attack_scenarios(model, test_loader)

# Generate comparison table
print("\n" + "="*80)
print("Quantum Attack Performance Comparison".center(80))
print("="*80)
print(f"{'Scenario':<10}{'Accuracy':<12}{'F1':<12}{'ROC AUC':<12}{'Attack Success':<15}{'Avg Conf':<12}")
print("-"*80)

for scenario, metrics in attack_results.items():
    print(f"{scenario:<10}{metrics['accuracy']:.4f}{'':<2}{metrics['f1']:.4f}{'':<2}"
          f"{metrics['roc_auc']:.4f}{'':<2}{metrics['attack_success']:.2%}{'':<3}"
          f"{metrics['avg_confidence']:.4f}")



              Quantum-Aware Attack Evaluation               

üîç Processing Clean scenario...

üìä Clean Results:
--------------------------------------------------
            accuracy: 0.9547
           precision: 0.9624
              recall: 0.8668
                  f1: 0.9121
             roc_auc: 0.9271
      avg_confidence: 0.9645
 Attack Success Rate: 1.26%

üîç Processing Œµ=0.01 scenario...

üìä Œµ=0.01 Results:
--------------------------------------------------
            accuracy: 0.9590
           precision: 0.9796
              recall: 0.8668
                  f1: 0.9198
             roc_auc: 0.9301
      avg_confidence: 0.2711
 Attack Success Rate: 0.67%

üîç Processing Œµ=0.02 scenario...

üìä Œµ=0.02 Results:
--------------------------------------------------
            accuracy: 0.2760
           precision: 0.2546
              recall: 0.8668
                  f1: 0.3936
             roc_auc: 0.4615
      avg_confidence: 0.8767
 Attack Success Rate: 94.37%

