
# QuantumLeak Project Notebook

Đây là phiên bản Jupyter Notebook được tạo tự động từ dự án Python gốc.
Notebook này chứa tất cả mã nguồn cần thiết và được cấu trúc để chạy trong các môi trường như Google Colab.

**Hướng dẫn:**
1.  Chạy các cell theo thứ tự từ trên xuống dưới.
2.  Cell đầu tiên sẽ cài đặt tất cả các thư viện cần thiết.
3.  Các cell tiếp theo sẽ sử dụng "magic command" `%%writefile` để tái tạo lại cấu trúc file của dự án.
4.  Cell cuối cùng chứa mã từ `main.py` để bạn có thể chạy các thử nghiệm.


In [None]:
import sys
!{sys.executable} -m pip install pennylane pennylane-lightning torch torchvision pandas numpy scikit-learn tqdm joblib matplotlib


## 1. Tái tạo cấu trúc file của dự án

Các cell dưới đây sẽ tạo ra các thư mục và file `.py` cần thiết cho dự án.


In [None]:
!mkdir -p ./attacks

In [None]:
%%writefile ./attacks/quantum_leak.py

import torch
import torch.nn as nn
import pennylane as qml
import numpy as np
import pandas as pd
import os
from tqdm import tqdm
from torch.utils.data import DataLoader, TensorDataset
from models.qnn_leak import QNN
from utils.visualization import plot_model_comparison
from utils.training import evaluate_model_with_metrics
from abc import ABC, abstractmethod

class HuberLoss(nn.Module):
    def __init__(self, delta=1.0):
        super(HuberLoss, self).__init__()
        self.delta = delta

    def forward(self, outputs, targets):
        error = outputs - targets
        is_small_error = torch.abs(error) <= self.delta
        squared_loss = 0.5 * error ** 2
        linear_loss = self.delta * torch.abs(error) - 0.5 * self.delta ** 2
        loss = torch.where(is_small_error, squared_loss, linear_loss)
        return loss.mean()

class ModelExtraction(ABC):
    def __init__(self, victim_model, n_qubits=4, query_budget=6000, device="cuda", save_path="./results", circuit_device="default.mixed"):
        self.victim_model = victim_model
        self.n_qubits = n_qubits
        self.query_budget = query_budget
        self.device = torch.device(device if torch.cuda.is_available() else "cpu")
        self.save_path = save_path
        os.makedirs(save_path, exist_ok=True)
        self.circuit_device = circuit_device
        self.qnn_zoo = self._create_qnn_zoo()

    def _create_quantum_circuit(self, n_layers):
        dev = qml.device(self.circuit_device, wires=self.n_qubits)
        @qml.qnode(dev, interface="torch", diff_method="parameter-shift")
        def circuit(inputs, weights):
            qml.AmplitudeEmbedding(inputs, wires=range(self.n_qubits), normalize=True)
            for layer in range(n_layers):
                for i in range(self.n_qubits):
                    qml.RY(weights[layer * self.n_qubits + i], wires=i)
                for i in range(self.n_qubits):
                    qml.CZ(wires=[i, (i + 1) % self.n_qubits])
            return [qml.expval(qml.PauliZ(i)) for i in range(self.n_qubits)]
        return circuit

    def _create_a1_circuit(self):
        dev = qml.device(self.circuit_device, wires=self.n_qubits)
        @qml.qnode(dev, interface="torch", diff_method="parameter-shift")
        def circuit(inputs, weights):
            qml.AmplitudeEmbedding(inputs, wires=range(self.n_qubits), normalize=True)
            param_idx = 0
            for _ in range(2):
                for i in range(self.n_qubits):
                    qml.RX(weights[param_idx], wires=i)
                    param_idx += 1
                    qml.RY(weights[param_idx], wires=i)
                    param_idx += 1
                    qml.RZ(weights[param_idx], wires=i)
                    param_idx += 1
                for i in range(self.n_qubits):
                    qml.CNOT(wires=[i, (i + 1) % self.n_qubits])
            return [qml.expval(qml.PauliZ(i)) for i in range(self.n_qubits)]
        return circuit

    def _create_a2_circuit(self):
        dev = qml.device(self.circuit_device, wires=self.n_qubits)
        @qml.qnode(dev, interface="torch", diff_method="parameter-shift")
        def circuit(inputs, weights):
            qml.AmplitudeEmbedding(inputs, wires=range(self.n_qubits), normalize=True)
            param_idx = 0
            for _ in range(2):
                qml.RX(weights[param_idx], wires=0)
                param_idx += 1
                qml.RY(weights[param_idx], wires=1)
                param_idx += 1
                qml.RZ(weights[param_idx], wires=2)
                param_idx += 1
                qml.RX(weights[param_idx], wires=3)
                param_idx += 1
                for i in range(self.n_qubits):
                    qml.CNOT(wires=[i, (i + 1) % self.n_qubits])
            return [qml.expval(qml.PauliZ(i)) for i in range(self.n_qubits)]
        return circuit

    def _create_qnn_zoo(self):
        qnn_zoo = []
        for n_layers in [1, 2, 3]:
            circuit = self._create_quantum_circuit(n_layers)
            model = QNN(self.n_qubits, circuit, n_layers).to(self.device)
            model.q_params = nn.Parameter(
                torch.tensor(np.random.normal(0, np.pi, (n_layers * self.n_qubits,)),
                             dtype=torch.float32, device=self.device, requires_grad=True)
            )
            qnn_zoo.append((f'L{n_layers}', model))
        circuit_a1 = self._create_a1_circuit()
        model_a1 = QNN(self.n_qubits, circuit_a1, n_layers=6).to(self.device)
        model_a1.q_params = nn.Parameter(
            torch.tensor(np.random.normal(0, np.pi, (12 * 2,)),
                         dtype=torch.float32, device=self.device, requires_grad=True)
        )
        qnn_zoo.append(('A1', model_a1))
        circuit_a2 = self._create_a2_circuit()
        model_a2 = QNN(self.n_qubits, circuit_a2, n_layers=4).to(self.device)
        model_a2.q_params = nn.Parameter(
            torch.tensor(np.random.normal(0, np.pi, (4 * 2,)),
                         dtype=torch.float32, device=self.device, requires_grad=True)
        )
        qnn_zoo.append(('A2', model_a2))
        return qnn_zoo

    def qnnaas_predict(self, model, images, device):
        model.eval()
        images = images.to(device)
        with torch.no_grad():
            outputs = model(images)
            prob_class_1 = outputs
            prob_class_0 = 1 - prob_class_1
            prob_vectors = torch.cat([prob_class_0, prob_class_1], dim=1)
        return prob_vectors

    def query_victim(self, data_loader, n_rounds=3):
        self.victim_model.eval()
        query_dataset = []
        samples_per_round = self.query_budget // n_rounds
        spam_noise = 0.0054
        gate_1q_noise = 0.00177
        gate_2q_noise = 0.0287
        crosstalk_noise = 0.2
        with torch.no_grad():
            for round in range(n_rounds):
                samples_collected = 0
                for inputs, _ in data_loader:
                    inputs = inputs.to(self.device)
                    outputs = self.qnnaas_predict(self.victim_model, inputs, self.device)
                    noise_scale = spam_noise + gate_1q_noise + gate_2q_noise + crosstalk_noise
                    noise = torch.randn_like(outputs) * noise_scale
                    noisy_outputs = outputs + noise
                    noisy_outputs = torch.clamp(noisy_outputs, 0, 1)
                    query_dataset.append((inputs.cpu().numpy(), noisy_outputs.cpu().numpy()))
                    samples_collected += inputs.size(0)
                    if samples_collected >= samples_per_round:
                        break
                print(f"Completed query round {round + 1}/{n_rounds}")
        return query_dataset

    def generate_adversarial_samples(self, model, inputs, targets, epsilon=0.01):
        model.eval()
        inputs = inputs.clone().detach().to(self.device)
        with torch.no_grad():
            outputs = self.qnnaas_predict(model, inputs, self.device)
            probs = outputs[:, 1]
            boundary_mask = (probs >= 0.4) & (probs <= 0.6)
            if not boundary_mask.any():
                return inputs
            boundary_inputs = inputs[boundary_mask]
            noise = epsilon * torch.randn_like(boundary_inputs)
            adv_inputs = boundary_inputs + noise
            adv_inputs = torch.clamp(adv_inputs, 0, 1)
            inputs[boundary_mask] = adv_inputs
        return inputs

    @abstractmethod
    def train(self, *args, **kwargs):
        pass

    @abstractmethod
    def evaluate(self, *args, **kwargs):
        pass

class CloudLeak(ModelExtraction):
    def pretrain(self, public_loader, n_epochs=10, batch_size=8, architecture='L2', save=True):
        criterion = nn.BCEWithLogitsLoss()
        architecture_map = {name: model for name, model in self.qnn_zoo}
        model = architecture_map[architecture]
        model.q_params = nn.Parameter(
            torch.normal(mean=0, std=np.pi, size=(model.n_layers * self.n_qubits,), device=self.device)
        )
        optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
        history = {'pretrain_loss': [], 'pretrain_accuracy': []}
        for epoch in tqdm(range(n_epochs), desc="Pretraining CloudLeak"):
            model.train()
            running_loss = 0.0
            correct = 0
            total = 0
            for inputs, targets in public_loader:
                inputs, targets = inputs.to(self.device), targets.to(self.device)
                targets = targets.view(-1, 1).float()
                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(outputs, targets)
                loss.backward()
                optimizer.step()
                running_loss += loss.item()
                predicted = (torch.sigmoid(outputs) >= 0.5).float().squeeze()
                total += targets.size(0)
                correct += (predicted == targets.squeeze()).sum().item()
            avg_loss = running_loss / len(public_loader)
            accuracy = 100 * correct / total
            history['pretrain_loss'].append(avg_loss)
            history['pretrain_accuracy'].append(accuracy)
            print(f"Pretrain Epoch {epoch+1}, Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%")
        if save:
            save_path = os.path.join(self.save_path, f'pretrained_cloudleak_{architecture}.pth')
            torch.save(model.state_dict(), save_path)
            print(f"Pretrained model saved to {save_path}")
        return model, history

    def load_pretrained_model(self, architecture='L2'):
        architecture_map = {name: model for name, model in self.qnn_zoo}
        if architecture not in architecture_map:
            raise ValueError(f"Architecture {architecture} not found in QNN zoo.")
        model = architecture_map[architecture]
        pretrained_path = os.path.join(self.save_path, f'pretrained_cloudleak_{architecture}.pth')
        if os.path.exists(pretrained_path):
            model.load_state_dict(torch.load(pretrained_path, map_location=self.device))
            print(f"Loaded pretrained model from {pretrained_path}")
            return model
        else:
            print(f"No pretrained model found at {pretrained_path}")
            return None

    def train(self, query_dataset, out_of_domain_loader, n_epochs=20, batch_size=8, architecture='L2', loss_type='nll'):
        criterion = HuberLoss(delta=0.5) if loss_type == 'huber' else nn.BCEWithLogitsLoss()
        architecture_map = {name: model for name, model in self.qnn_zoo}
        model = self.load_pretrained_model(architecture)
        if model is None:
            public_inputs, public_targets = [], []
            for inputs, targets in out_of_domain_loader:
                public_inputs.append(inputs.cpu().numpy())
                public_targets.append(targets.cpu().numpy())
            public_inputs = np.concatenate(public_inputs)[:3000]
            public_targets = np.concatenate(public_targets)[:3000]
            public_dataset = TensorDataset(
                torch.tensor(public_inputs, dtype=torch.float32),
                torch.tensor(public_targets, dtype=torch.float32).view(-1, 1)
            )
            public_loader = DataLoader(public_dataset, batch_size=batch_size, shuffle=True)
            model, pretrain_history = self.pretrain(public_loader, n_epochs=10, batch_size=batch_size, architecture=architecture)
        else:
            pretrain_history = {'pretrain_loss': [], 'pretrain_accuracy': []}
        optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
        history = {'train_loss': [], 'train_accuracy': []}
        out_inputs, out_targets = [], []
        for inputs, targets in out_of_domain_loader:
            out_inputs.append(inputs.cpu().numpy())
            out_targets.append(targets.cpu().numpy())
            if len(np.concatenate(out_inputs)) >= 500:
                break
        out_inputs = np.concatenate(out_inputs)[:500]
        out_targets = np.concatenate(out_targets)[:500]
        query_inputs = np.concatenate([item[0] for item in query_dataset])
        query_outputs = np.concatenate([item[1][:, 1:2] for item in query_dataset])
        query_inputs_tensor = torch.tensor(query_inputs, dtype=torch.float32, device=self.device)
        query_targets_tensor = torch.tensor(query_outputs, dtype=torch.float32, device=self.device)
        adv_inputs = self.generate_adversarial_samples(self.victim_model, query_inputs_tensor, query_targets_tensor)
        adv_inputs = adv_inputs.cpu().numpy()
        adv_outputs = query_outputs
        synthetic_inputs = np.concatenate([out_inputs, query_inputs, adv_inputs])
        synthetic_outputs = np.concatenate([out_targets[:, None], query_outputs, adv_outputs])
        synthetic_dataset = TensorDataset(
            torch.tensor(synthetic_inputs, dtype=torch.float32),
            torch.tensor(synthetic_outputs, dtype=torch.float32).view(-1, 1)
        )
        synthetic_loader = DataLoader(synthetic_dataset, batch_size=batch_size, shuffle=True)
        for epoch in tqdm(range(n_epochs), desc="Training CloudLeak"):
            model.train()
            running_loss = 0.0
            correct = 0
            total = 0
            for inputs, targets in synthetic_loader:
                inputs, targets = inputs.to(self.device), targets.to(self.device)
                targets = targets.view(-1, 1).float()
                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(outputs, targets)
                loss.backward()
                optimizer.step()
                running_loss += loss.item()
                predicted = (torch.sigmoid(outputs) >= 0.5).float().squeeze()
                total += targets.size(0)
                correct += (predicted == targets.squeeze()).sum().item()
            avg_loss = running_loss / len(synthetic_loader)
            accuracy = 100 * correct / total
            history['train_loss'].append(avg_loss)
            history['train_accuracy'].append(accuracy)
            print(f"Epoch {epoch+1}, Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%")
        return model, {**pretrain_history, **history}

    def evaluate(self, model, test_loader):
        metrics = evaluate_model_with_metrics(model, test_loader, self.device)
        return metrics

class QuantumLeak(ModelExtraction):
    def __init__(self, victim_model, n_qubits=4, query_budget=6000, n_committee=3, device="cuda", save_path="./results", circuit_device="default.mixed"):
        super().__init__(victim_model, n_qubits, query_budget, device, save_path, circuit_device)
        self.n_committee = n_committee
        self.ensemble_models = []

    def train(self, query_dataset, out_of_domain_loader, n_epochs=30, batch_size=8, architecture='L2', loss_type='huber'):
        return self.train_ensemble(query_dataset, n_epochs, batch_size, architecture, loss_type, self.n_committee)

    def train_ensemble(self, query_dataset, n_epochs=30, batch_size=8, architecture='L2', loss_type='huber', n_committee=None):
        n_committee = n_committee or self.n_committee
        criterion = HuberLoss(delta=0.5) if loss_type == 'huber' else nn.BCEWithLogitsLoss()
        dataset_size = len(query_dataset)
        subset_size = dataset_size // n_committee
        ensemble_models = []
        history = {'train_loss': [], 'train_accuracy': []}
        architecture_map = {name: model for name, model in self.qnn_zoo}
        if architecture not in architecture_map:
            raise ValueError(f"Architecture {architecture} not found in QNN zoo.")
        base_model = architecture_map[architecture]
        base_circuit = base_model.quantum_circuit
        vqc_layers = base_model.n_layers
        for i in range(n_committee):
            model = QNN(self.n_qubits, base_circuit, vqc_layers).to(self.device)
            param_size = 12 * 2 if architecture == 'A1' else (4 * 2 if architecture == 'A2' else vqc_layers * self.n_qubits)
            model.q_params = nn.Parameter(
                torch.normal(mean=0, std=np.pi, size=(param_size,)).to(self.device)
            )
            optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
            subset_indices = np.random.choice(dataset_size, subset_size, replace=True)
            subset_data = [query_dataset[idx] for idx in subset_indices]
            inputs = np.concatenate([item[0] for item in subset_data])
            outputs = np.concatenate([item[1][:, 1:2] for item in subset_data])
            subset_dataset = TensorDataset(
                torch.tensor(inputs, dtype=torch.float32),
                torch.tensor(outputs, dtype=torch.float32).view(-1, 1)
            )
            subset_loader = DataLoader(subset_dataset, batch_size=batch_size, shuffle=True)
            for epoch in range(n_epochs):
                model.train()
                running_loss = 0.0
                correct = 0
                total = 0
                for inputs, targets in subset_loader:
                    inputs, targets = inputs.to(self.device), targets.to(self.device)
                    targets = targets.view(-1, 1).float()
                    optimizer.zero_grad()
                    outputs = model(inputs)
                    loss = criterion(outputs, targets)
                    loss.backward()
                    optimizer.step()
                    running_loss += loss.item()
                    predicted = (torch.sigmoid(outputs) >= 0.5).float().squeeze()
                    target_labels = (targets >= 0.5).float().squeeze()
                    total += targets.size(0)
                    correct += (predicted == target_labels).sum().item()
                avg_loss = running_loss / len(subset_loader)
                accuracy = 100 * correct / total
                history['train_loss'].append(avg_loss)
                history['train_accuracy'].append(accuracy)
                print(f"Committee {i+1}, Epoch {epoch+1}, Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%")
            ensemble_models.append(model)
        self.ensemble_models = ensemble_models
        return ensemble_models, history

    def evaluate(self, test_loader):
        metrics = evaluate_model_with_metrics(self.ensemble_models, test_loader, self.device, is_ensemble=True)
        return metrics

    def ablation_study(self, data_loader, test_loader, out_of_domain_loader, query_budgets=[1500, 3000, 6000], architectures=['L1', 'L2', 'L3', 'A1', 'A2'], committee_numbers=[3, 5, 7], epochs=30):
        results = []
        cloud_leak = CloudLeak(
            self.victim_model, self.n_qubits, self.query_budget, self.device, self.save_path, self.circuit_device
        )
        public_inputs, public_targets = [], []
        for inputs, targets in out_of_domain_loader:
            public_inputs.append(inputs.cpu().numpy())
            public_targets.append(targets.cpu().numpy())
        public_inputs = np.concatenate(public_inputs)[:3000]
        public_targets = np.concatenate(public_targets)[:3000]
        public_dataset = TensorDataset(
            torch.tensor(public_inputs, dtype=torch.float32),
            torch.tensor(public_targets, dtype=torch.float32)
        )
        public_loader = DataLoader(public_dataset, batch_size=8, shuffle=True)
        cloud_leak.pretrain(public_loader, n_epochs=10, batch_size=8, architecture='L2')
        for query_budget in query_budgets:
            self.query_budget = query_budget
            query_dataset = self.query_victim(data_loader, n_rounds=3)
            for architecture in architectures:
                for n_committee in committee_numbers:
                    for scheme in ['Single-N', 'Single-H', 'Ens-N', 'Ens-H']:
                        if 'Single' in scheme:
                            loss_type = 'nll' if scheme == 'Single-N' else 'huber'
                            model, _ = cloud_leak.train(
                                query_dataset, out_of_domain_loader, n_epochs=20,
                                batch_size=8, architecture=architecture, loss_type=loss_type
                            )
                            metrics = cloud_leak.evaluate(model, test_loader)
                        else:
                            loss_type = 'nll' if scheme == 'Ens-N' else 'huber'
                            ensemble_models, _ = self.train_ensemble(
                                query_dataset, n_epochs=epochs, batch_size=8, architecture=architecture,
                                loss_type=loss_type, n_committee=n_committee
                            )
                            metrics = self.evaluate(test_loader)
                        results.append({
                            'Query Budget': query_budget,
                            'Architecture': architecture,
                            'Committee': n_committee,
                            'Scheme': scheme,
                            'Accuracy': metrics['accuracy'],
                            'Precision': metrics['precision'],
                            'Recall': metrics['recall'],
                            'F1': metrics['f1']
                        })
                        print(f"Query Budget: {query_budget}, Architecture: {architecture}, Committee: {n_committee}, "
                              f"Scheme: {scheme}, Accuracy: {metrics['accuracy']:.2f}%, "
                              f"Precision: {metrics['precision']:.2f}, Recall: {metrics['recall']:.2f}, F1: {metrics['f1']:.2f}")
        results_df = pd.DataFrame(results)
        results_df.to_csv(os.path.join(self.save_path, 'ablation_results.csv'), index=False)
        return results

    def plot_ablation_results(self, results_df):
        import matplotlib.pyplot as plt
        plt.figure(figsize=(10, 6))
        df_fig4 = results_df[(results_df['Query Budget'] == 6000) & (results_df['Architecture'] == 'L2') & (results_df['Committee'] == 5)]
        schemes = df_fig4['Scheme'].values
        accuracies = df_fig4['Accuracy'].values
        plt.bar(schemes, accuracies, color=['blue', 'green', 'orange', 'red'])
        plt.xlabel('Attack Scheme')
        plt.ylabel('Accuracy (%)')
        plt.title('Figure 4: Comparison of Attack Schemes')
        plt.savefig(os.path.join(self.save_path, 'figure4.png'))
        plt.close()
        plt.figure(figsize=(10, 6))
        df_fig5 = results_df[(results_df['Query Budget'] == 6000) & (results_df['Scheme'] == 'Ens-H') & (results_df['Committee'] == 5)]
        architectures = df_fig5['Architecture'].values
        accuracies = df_fig5['Accuracy'].values
        plt.bar(architectures, accuracies, color='purple')
        plt.xlabel('VQC Architecture')
        plt.ylabel('Accuracy (%)')
        plt.title('Figure 5: Impact of VQC Ansatz')
        plt.savefig(os.path.join(self.save_path, 'figure5.png'))
        plt.close()
        plt.figure(figsize=(10, 6))
        df_fig6 = results_df[(results_df['Query Budget'] == 6000) & (results_df['Architecture'] == 'L2') & (results_df['Scheme'] == 'Ens-H')]
        committees = df_fig6['Committee'].values
        accuracies = df_fig6['Accuracy'].values
        plt.plot(committees, accuracies, marker='o', label='Ens-H')
        plt.xlabel('Number of Committee Members')
        plt.ylabel('Accuracy (%)')
        plt.title('Figure 6: Impact of Committee Number')
        plt.legend()
        plt.savefig(os.path.join(self.save_path, 'figure6.png'))
        plt.close()
        plt.figure(figsize=(10, 6))
        for arch in ['L1', 'L2', 'L3']:
            df_fig7 = results_df[(results_df['Architecture'] == arch) & (results_df['Scheme'] == 'Ens-H') & (results_df['Committee'] == 5)]
            query_budgets = df_fig7['Query Budget'].values
            accuracies = df_fig7['Accuracy'].values
            plt.plot(query_budgets, accuracies, marker='o', label=f'{arch}')
        plt.xlabel('Query Budget')
        plt.ylabel('Accuracy (%)')
        plt.title('Figure 7: Performance with Different VQC Layers')
        plt.legend()
        plt.savefig(os.path.join(self.save_path, 'figure7.png'))
        plt.close()
        plt.figure(figsize=(10, 6))
        for arch in ['L1', 'L2', 'L3']:
            df_fig8 = results_df[(results_df['Architecture'] == arch) & (results_df['Scheme'] == 'Ens-H') & (results_df['Committee'] == 5)]
            query_budgets = df_fig8['Query Budget'].values
            accuracies = df_fig8['Accuracy'].values
            plt.plot(query_budgets, accuracies, marker='o', label=f'{arch}')
        plt.xlabel('Query Budget')
        plt.ylabel('Accuracy (%)')
        plt.title('Figure 8: Performance with Query Budgets')
        plt.legend()
        plt.savefig(os.path.join(self.save_path, 'figure8.png'))
        plt.close()

In [None]:
!mkdir -p ./configs

In [None]:
%%writefile ./configs/config.py

 
import os
import torch

# Paths
SAVE_PATH = "./saved_models"
QUANTUM_LEAK_SAVE_PATH = "./results"
DATA_PATH = "./data"

# Hyperparameters
N_EPOCHS = 30
N_LAYERS = 2
N_TRAIN = 5000
N_TEST = 1000
N_QUBITS = 4
BATCH_SIZE = 8
LEARNING_RATE = 0.001
PREPROCESS = True
N_LEAK_EPOCHS = 30
N_LEAK_LAYERS = 4
N_LEAK_TRAIN = 50000
N_LEAK_TEST = 10000
N_LEAK_QUBITS = 4
LEAK_BATCH_SIZE = 8
LEAK_QUERY_BUDGET = 6000
LEAK_N_COMMITTEE = 3

# Device
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
QUANTUM_DEVICE = "lightning.qubit" if torch.cuda.is_available() else "default.qubit"

# Ensure directories exist
os.makedirs(SAVE_PATH, exist_ok=True)
os.makedirs(QUANTUM_LEAK_SAVE_PATH, exist_ok=True)
os.makedirs(DATA_PATH, exist_ok=True)
os.makedirs(SAVE_PATH + "/quanv", exist_ok=True)
os.makedirs(SAVE_PATH + "/basic_qnn", exist_ok=True)
os.makedirs(SAVE_PATH + "/circuit14", exist_ok=True)
os.makedirs(SAVE_PATH + "/transfer_learning", exist_ok=True)

In [None]:
!mkdir -p ./models

In [None]:
%%writefile ./models/basic_qnn.py

 
import torch
import torch.nn as nn

class BasicQNN(nn.Module):
    def __init__(self, n_qubits, quantum_circuit):
        super(BasicQNN, self).__init__()
        self.conv = nn.Conv2d(1, 16, kernel_size=3, stride=2)
        self.pre_net = nn.Linear(16 * 15 * 15, n_qubits)
        self.q_params = nn.Parameter(torch.randn(n_qubits))
        self.post_net = nn.Linear(n_qubits, 1)
        self.quantum_circuit = quantum_circuit

    def forward(self, x):
        x = torch.relu(self.conv(x))
        x = x.view(-1, 16 * 15 * 15)
        x = torch.tanh(self.pre_net(x))
        batch_size = x.size(0)
        x_device = x.device
        x = [self.quantum_circuit(x[i], self.q_params) for i in range(batch_size)]
        x = torch.tensor(x, dtype=torch.float32, device=x_device)
        x = torch.sigmoid(self.post_net(x))
        return x

In [None]:
%%writefile ./models/circuit14.py

 
import torch
import torch.nn as nn
import pennylane as qml

class QuantumLayer(nn.Module):
    def __init__(self, n_qubits, n_layers, quantum_circuit, device_name="lightning.qubit"):
        super(QuantumLayer, self).__init__()
        self.n_qubits = n_qubits
        self.n_layers = n_layers
        self.conv = nn.Conv2d(1, 16, kernel_size=3, stride=2)
        self.pre_net = nn.Linear(16 * 15 * 15, 16)
        self.weights = nn.Parameter(torch.randn(n_layers, n_qubits, 3))
        self.crx_weights = nn.Parameter(torch.randn(n_layers, 6))
        self.fc = nn.Linear(n_qubits, 1)
        self.dev = qml.device(device_name, wires=n_qubits, shots=None)
        self.quantum_circuit = quantum_circuit

    def forward(self, inputs):
        x = torch.relu(self.conv(inputs))
        x = x.view(-1, 16 * 15 * 15)
        x = torch.tanh(self.pre_net(x))
        x = torch.nn.functional.normalize(x, p=2, dim=1, eps=1e-8)

        batch_size = x.size(0)
        outputs = []
        for i in range(batch_size):
            q_out = self.quantum_circuit(x[i], self.weights, self.crx_weights)
            q_out = torch.stack(q_out).float()
            outputs.append(q_out)
        outputs = torch.stack(outputs)
        probs = self.fc(outputs).sigmoid()
        return probs

In [None]:
%%writefile ./models/qnn_leak.py

 
import torch
import torch.nn as nn
import pennylane as qml
import numpy as np

class QNN(nn.Module):
    def __init__(self, n_qubits, quantum_circuit, n_layers):
        super(QNN, self).__init__()
        self.pool = nn.AvgPool2d(kernel_size=8, stride=8)
        self.pre_net = nn.Linear(16, 16)
        self.q_params = nn.Parameter(torch.normal(mean=0, std=np.pi, size=(n_layers * n_qubits,)))
        self.post_net = nn.Linear(n_qubits, 1)
        self.quantum_circuit = quantum_circuit
        self.n_qubits = n_qubits
        self.n_layers = n_layers

    def forward(self, x):
        x = self.pool(x)
        x = x.view(-1, 16)
        x = torch.tanh(self.pre_net(x))
        batch_size = x.size(0)
        x_device = x.device
        x = torch.nn.functional.normalize(x, p=2, dim=1)
        x = [self.quantum_circuit(x[i], self.q_params) for i in range(batch_size)]
        x = torch.tensor(x, dtype=torch.float32, device=x_device)
        x = torch.sigmoid(self.post_net(x))
        return x

In [None]:
%%writefile ./models/quanv_model.py

 
import torch
import torch.nn as nn

class QuanvModel(nn.Module):
    def __init__(self):
        super(QuanvModel, self).__init__()
        self.conv1 = nn.Conv2d(4, 32, kernel_size=3, stride=1, padding=0)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=0)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(64 * 2 * 2, 64)
        self.fc2 = nn.Linear(64, 1)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.relu(self.conv1(x))
        x = self.pool1(x)
        x = self.relu(self.conv2(x))
        x = self.pool2(x)
        x = self.flatten(x)
        x = self.relu(self.fc1(x))
        x = self.sigmoid(self.fc2(x))
        return x

In [None]:
%%writefile ./models/transfer_learning.py

 
import torch
import torch.nn as nn
import pennylane as qml
from torchvision.models import resnet18, ResNet18_Weights

class DressedQuantumCircuit(nn.Module):
    def __init__(self, n_qubits, n_layers, device_name="default.qubit"):
        super(DressedQuantumCircuit, self).__init__()
        self.n_qubits = n_qubits
        self.n_layers = n_layers
        self.pre_net = nn.Linear(512, n_qubits)
        self.post_net = nn.Linear(n_qubits, 1)
        self.weights = nn.Parameter(torch.randn(n_layers, n_qubits, requires_grad=True))
        self.dev = qml.device(device_name, wires=n_qubits)
        self.quantum_circuit = self._create_quantum_circuit()

    def _create_quantum_circuit(self):
        @qml.qnode(self.dev, interface="torch", diff_method="parameter-shift")
        def quantum_circuit(inputs, weights):
            for i in range(self.n_qubits):
                qml.Hadamard(wires=i)
                qml.RY(inputs[i] * np.pi / 2, wires=i)
            for layer in range(self.n_layers):
                for i in range(self.n_qubits):
                    qml.RY(weights[layer, i], wires=i)
                for i in range(self.n_qubits - 1):
                    qml.CNOT(wires=[i, i + 1])
            return [qml.expval(qml.PauliZ(i)) for i in range(self.n_qubits)]
        return quantum_circuit

    def forward(self, x):
        x = self.pre_net(x)
        q_out = torch.zeros(x.size(0), self.n_qubits, dtype=torch.float32).to(x.device)
        for i in range(x.size(0)):
            q_out[i] = torch.tensor(self.quantum_circuit(x[i], self.weights), dtype=torch.float32).to(x.device)
        return torch.sigmoid(self.post_net(q_out))

class CQTransferLearningModel(nn.Module):
    def __init__(self, n_qubits, n_layers):
        super(CQTransferLearningModel, self).__init__()
        self.resnet = resnet18(weights=ResNet18_Weights.IMAGENET1K_V1)
        self.resnet.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
        nn.init.kaiming_normal_(self.resnet.conv1.weight, mode='fan_out', nonlinearity='relu')
        self.resnet.fc = nn.Identity()
        for param in self.resnet.parameters():
            param.requires_grad = False
        self.quantum_net = DressedQuantumCircuit(n_qubits, n_layers)

    def forward(self, x):
        features = self.resnet(x)
        return self.quantum_net(features)

In [None]:
!mkdir -p ./quantum_circuits

In [None]:
%%writefile ./quantum_circuits/basic_qnn_circuit.py

 
import pennylane as qml
import torch
from configs.config import N_QUBITS, N_LAYERS, QUANTUM_DEVICE

def create_quantum_circuit(n_qubits=N_QUBITS, n_layers=N_LAYERS, device_name=QUANTUM_DEVICE):
    dev = qml.device(device_name, wires=n_qubits)
    @qml.qnode(dev, interface="torch", diff_method="parameter-shift")
    def circuit(inputs, weights):
        for layer in range(n_layers):
            for i in range(n_qubits):
                qml.RX(inputs[..., i], wires=i)
                qml.RY(weights[i], wires=i)
            for i in range(n_qubits - 1):
                qml.CZ(wires=[i, i + 1])
        return [qml.expval(qml.PauliZ(i)) for i in range(n_qubits)]
    return circuit

In [None]:
%%writefile ./quantum_circuits/circuit14_circuit.py

 
import pennylane as qml
import torch
from configs.config import N_QUBITS, N_LAYERS, QUANTUM_DEVICE

def create_circuit14(n_qubits=N_QUBITS, n_layers=N_LAYERS, device_name=QUANTUM_DEVICE):
    dev = qml.device(device_name, wires=n_qubits, shots=None)
    @qml.qnode(dev, interface='torch', diff_method='parameter-shift')
    def circuit_14(inputs, weights, crx_weights):
        qml.AmplitudeEmbedding(inputs, wires=range(n_qubits), normalize=True, pad_with=0.)
        for l in range(n_layers):
            for qubit in range(n_qubits):
                qml.RX(weights[l, qubit, 0], wires=qubit)
                qml.RY(weights[l, qubit, 1], wires=qubit)
                qml.RZ(weights[l, qubit, 2], wires=qubit)
            crx_idx = 0
            for qubit in range(n_qubits - 1):
                qml.CRX(crx_weights[l, crx_idx], wires=[qubit, qubit + 1])
                crx_idx += 1
            qml.CRX(crx_weights[l, crx_idx], wires=[n_qubits - 1, 0])
            crx_idx += 1
            for qubit in [0, 1]:
                target = (qubit + 3) % n_qubits
                qml.CRX(crx_weights[l, crx_idx], wires=[qubit, target])
                crx_idx += 1
        return [qml.expval(qml.PauliZ(j)) for j in range(n_qubits)]
    return circuit_14

In [None]:
%%writefile ./quantum_circuits/quanv_circuit.py

 
import pennylane as qml
import numpy as np
from configs.config import N_LAYERS, N_QUBITS, QUANTUM_DEVICE

def create_quanv_circuit():
    try:
        dev = qml.device(QUANTUM_DEVICE, wires=4)
    except:
        dev = qml.device("default.qubit", wires=4)
    rand_params = np.random.uniform(high=2 * np.pi, size=(N_LAYERS, 4))

    @qml.qnode(dev)
    def circuit(phi):
        for j in range(4):
            qml.RY(np.pi * phi[j], wires=j)
        qml.templates.RandomLayers(rand_params, wires=list(range(4)))
        return [qml.expval(qml.PauliZ(j)) for j in range(4)]
    return circuit

In [None]:
%%writefile ./quantum_circuits/transfer_learning_circuit.py

 
import pennylane as qml
import torch
from configs.config import N_QUBITS, N_LAYERS, QUANTUM_DEVICE

def create_transfer_learning_circuit(n_qubits=N_QUBITS, n_layers=N_LAYERS, device_name=QUANTUM_DEVICE):
    dev = qml.device(device_name, wires=n_qubits)
    @qml.qnode(dev, interface="torch", diff_method="parameter-shift")
    def quantum_circuit(inputs, weights):
        for i in range(n_qubits):
            qml.Hadamard(wires=i)
            qml.RY(inputs[i] * np.pi / 2, wires=i)
        for layer in range(n_layers):
            for i in range(n_qubits):
                qml.RY(weights[layer, i], wires=i)
            for i in range(n_qubits - 1):
                qml.CNOT(wires=[i, i + 1])
        return [qml.expval(qml.PauliZ(i)) for i in range(n_qubits)]
    return quantum_circuit

In [None]:
!mkdir -p ./utils

In [None]:
%%writefile ./utils/training.py

import torch
import numpy as np
from sklearn.metrics import precision_score, recall_score, f1_score

def train_model(model, train_loader, val_loader, device, criterion, n_epochs, learning_rate, save_path):
    model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    train_losses = []
    val_accuracies = []
    best_val_accuracy = 0.0
    best_model_path = f"{save_path}/best_model.pth"
    for epoch in range(n_epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels.view(-1, 1).float())
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        avg_train_loss = running_loss / len(train_loader)
        train_losses.append(avg_train_loss)
        val_metrics = evaluate_model_with_metrics(model, val_loader, device, criterion)
        val_accuracies.append(val_metrics['accuracy'])
        print(f"Epoch {epoch+1}/{n_epochs}, Train Loss: {avg_train_loss:.4f}, "
              f"Val Accuracy: {val_metrics['accuracy']:.2f}%, "
              f"Precision: {val_metrics['precision']:.2f}, "
              f"Recall: {val_metrics['recall']:.2f}, "
              f"F1: {val_metrics['f1']:.2f}")
        if val_metrics['accuracy'] > best_val_accuracy:
            best_val_accuracy = val_metrics['accuracy']
            torch.save(model.state_dict(), best_model_path)
    return train_losses, val_accuracies

def evaluate_model_with_metrics(models, data_loader, device, criterion=None, is_ensemble=False):
    if not isinstance(models, (list, tuple)):
        models = [models]
    for model in models:
        model.to(device)
        model.eval()
    all_preds = []
    all_labels = []
    total_loss = 0.0 if criterion is not None else None
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            labels = labels.float().squeeze()
            if is_ensemble:
                ensemble_outputs = []
                for model in models:
                    outputs = model(inputs)
                    ensemble_outputs.append(outputs)
                outputs = torch.stack(ensemble_outputs).mean(dim=0)
            else:
                outputs = models[0](inputs)
            probs = torch.sigmoid(outputs).squeeze()
            predicted = (probs >= 0.5).float()
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            if criterion is not None:
                loss = criterion(outputs, labels.view(-1, 1)).item()
                total_loss += loss * len(labels)
    accuracy = 100 * np.mean(np.array(all_preds) == np.array(all_labels))
    precision = precision_score(all_labels, all_preds, average='binary', zero_division=0)
    recall = recall_score(all_labels, all_preds, average='binary', zero_division=0)
    f1 = f1_score(all_labels, all_preds, average='binary', zero_division=0)
    metrics = {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1': f1
    }
    if criterion is not None:
        metrics['loss'] = total_loss / len(all_labels)
    return metrics

In [None]:
%%writefile ./utils/visualization.py

 
import matplotlib.pyplot as plt
import os
import torch
import numpy as np

def plot_training_history(history, save_path, title="Training History"):
    plt.figure(figsize=(12, 4))
    if 'train_loss' in history:
        plt.subplot(1, 2, 1)
        plt.plot(history['train_loss'], label='Training Loss')
        plt.title('Model Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()
    if 'test_accuracy' in history:
        plt.subplot(1, 2, 2)
        plt.plot(history['test_accuracy'], label='Test Accuracy')
        plt.title('Model Accuracy')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy (%)')
        plt.legend()
    plt.tight_layout()
    plt.savefig(os.path.join(save_path, f'{title.lower().replace(" ", "_")}.png'))
    plt.close()

def plot_quanv_visualization(train_images, q_train_images, save_path, n_samples=4, n_channels=4):
    q_train_images_np = q_train_images.cpu().numpy().transpose(0, 2, 3, 1)
    fig, axes = plt.subplots(1 + n_channels, n_samples, figsize=(10, 10))
    for k in range(n_samples):
        axes[0, 0].set_ylabel("Input")
        if k != 0:
            axes[0, k].yaxis.set_visible(False)
        axes[0, k].imshow(train_images[k, :, :, 0], cmap="gray")
        for c in range(n_channels):
            axes[c + 1, 0].set_ylabel(f"Output [ch. {c}]")
            if k != 0:
                axes[c + 1, k].yaxis.set_visible(False)
            axes[c + 1, k].imshow(q_train_images_np[k, :, :, c], cmap="gray")
    plt.tight_layout()
    plt.savefig(os.path.join(save_path, 'quanv_visualization.png'))
    plt.close()

def plot_model_comparison(accuracies, labels, save_path, title="Model Comparison"):
    plt.figure(figsize=(10, 6))
    plt.bar(labels, accuracies, color=['blue', 'orange', 'green'])
    plt.ylabel('Accuracy (%)')
    plt.title(title)
    plt.savefig(os.path.join(save_path, 'model_comparison.png'))
    plt.close()

In [None]:
%%writefile main.py

import torch
import pandas as pd
import numpy as np
from configs.config import *
from data.data_pipeline import CIFAR10DataPipeline, get_cq_dataloaders, create_out_of_domain_loader
from models.quanv_model import QuanvModel
from models.basic_qnn import BasicQNN
from models.circuit14 import QuantumLayer
from models.transfer_learning import CQTransferLearningModel
from quantum_circuits.quanv_circuit import create_quanv_circuit
from quantum_circuits.basic_qnn_circuit import create_quantum_circuit
from quantum_circuits.circuit14_circuit import create_circuit14
from quantum_circuits.transfer_learning_circuit import create_transfer_learning_circuit
from utils.training import train_model, evaluate_model_with_metrics
from utils.visualization import plot_training_history, plot_quanv_visualization, plot_model_comparison
from attacks.quantum_leak import CloudLeak, QuantumLeak, HuberLoss
from torch.utils.data import TensorDataset, DataLoader

def run_quanv_experiment():
    pipeline = CIFAR10DataPipeline(n_train=N_TRAIN, n_test=N_TEST, save_path=SAVE_PATH)
    train_loader, test_loader = get_cq_dataloaders(pipeline, batch_size=BATCH_SIZE, device=torch.device(DEVICE))
    train_images = []
    train_labels = []
    for images, labels in train_loader:
        train_images.append(images.cpu().numpy())
        train_labels.append(labels.cpu().numpy())
    train_images = np.concatenate(train_images)
    train_labels = np.concatenate(train_labels)
    test_images = []
    test_labels = []
    for images, labels in test_loader:
        test_images.append(images.cpu().numpy())
        test_labels.append(labels.cpu().numpy())
    test_images = np.concatenate(test_images)
    test_labels = np.concatenate(test_labels)
    train_images = train_images.transpose(0, 2, 3, 1)
    test_images = test_images.transpose(0, 2, 3, 1)
    quanv_circuit = create_quanv_circuit()
    q_train_images, q_test_images = pipeline.preprocess_quanv(train_images, test_images, quanv_circuit, load_from_drive=True)
    q_train_images = torch.tensor(q_train_images, dtype=torch.float32).clone().detach().permute(0, 3, 1, 2).to(DEVICE)
    q_test_images = torch.tensor(q_test_images, dtype=torch.float32).clone().detach().permute(0, 3, 1, 2).to(DEVICE)
    train_labels = torch.tensor(train_labels, dtype=torch.float32).clone().detach().view(-1, 1).to(DEVICE)
    test_labels = torch.tensor(test_labels, dtype=torch.float32).clone().detach().view(-1, 1).to(DEVICE)
    train_dataset = TensorDataset(q_train_images, train_labels)
    test_dataset = TensorDataset(q_test_images, test_labels)
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, drop_last=True)
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, drop_last=True)
    model = QuanvModel().to(DEVICE)
    criterion = torch.nn.BCELoss()
    train_losses, val_accuracies = train_model(model, train_loader, test_loader, torch.device(DEVICE), criterion, N_EPOCHS, LEARNING_RATE, SAVE_PATH + "/quanv")
    plot_training_history({'train_loss': train_losses, 'test_accuracy': val_accuracies}, SAVE_PATH + "/quanv", "Quanv Training History")
    plot_quanv_visualization(train_images, q_train_images, SAVE_PATH + "/quanv")

def run_basic_qnn_experiment():
    pipeline = CIFAR10DataPipeline(n_train=N_TRAIN, n_test=N_TEST, save_path=SAVE_PATH)
    train_loader, test_loader = get_cq_dataloaders(pipeline, batch_size=BATCH_SIZE, device=torch.device(DEVICE))
    quantum_circuit = create_quantum_circuit()
    model = BasicQNN(N_QUBITS, quantum_circuit).to(DEVICE)
    criterion = torch.nn.BCEWithLogitsLoss()
    history = train_model(model, train_loader, test_loader, torch.device(DEVICE), criterion, N_EPOCHS, LEARNING_RATE, SAVE_PATH + "/basic_qnn")
    plot_training_history(history, SAVE_PATH + "/basic_qnn"	, "Basic QNN Training History")

def run_circuit14_experiment():
    pipeline = CIFAR10DataPipeline(n_train=N_TRAIN, n_test=N_TEST, save_path=SAVE_PATH)
    train_loader, test_loader = get_cq_dataloaders(pipeline, batch_size=BATCH_SIZE, device=torch.device(DEVICE))
    quantum_circuit = create_circuit14()
    model = QuantumLayer(N_QUBITS, N_LAYERS, quantum_circuit, QUANTUM_DEVICE).to(DEVICE)
    criterion = torch.nn.BCELoss()
    train_losses, val_accuracies = train_model(model, train_loader, test_loader, torch.device(DEVICE), criterion, N_EPOCHS, LEARNING_RATE, SAVE_PATH + "/circuit14")
    plot_training_history({'train_loss': train_losses, 'test_accuracy': val_accuracies}, SAVE_PATH + "/circuit14", "Circuit14 Training History")

def run_transfer_learning_experiment():
    pipeline = CIFAR10DataPipeline(n_train=N_LEAK_TRAIN, n_test=N_LEAK_TEST, save_path=SAVE_PATH)
    train_loader, test_loader = get_cq_dataloaders(pipeline, batch_size=BATCH_SIZE, device=torch.device(DEVICE))
    model = CQTransferLearningModel(N_QUBITS, N_LAYERS).to(DEVICE)
    criterion = torch.nn.BCELoss()
    train_losses, val_accuracies = train_model(model, train_loader, test_loader, torch.device(DEVICE), criterion, N_EPOCHS, LEARNING_RATE, SAVE_PATH + "/transfer_learning")
    plot_training_history({'train_loss': train_losses, 'test_accuracy': val_accuracies}, SAVE_PATH + "/transfer_learning", "Transfer Learning Training History")

def run_leak_experiment(model_type="basic_qnn"):
    pipeline = CIFAR10DataPipeline(n_train=N_LEAK_TRAIN, n_test=N_LEAK_TEST, save_path=SAVE_PATH)
    train_loader, test_loader = get_cq_dataloaders(pipeline, batch_size=LEAK_BATCH_SIZE, device=torch.device(DEVICE))
    out_of_domain_loader = create_out_of_domain_loader(pipeline, batch_size=LEAK_BATCH_SIZE, n_samples=3000)
    if model_type == "basic_qnn":
        quantum_circuit = create_quantum_circuit()
        victim_model = BasicQNN(N_LEAK_QUBITS, quantum_circuit).to(DEVICE)
        victim_model.load_state_dict(torch.load(os.path.join(SAVE_PATH + "/basic_qnn", "best_model.pth")))
    elif model_type == "circuit14":
        victim_model = QuantumLayer(N_LEAK_QUBITS, N_LEAK_LAYERS, QUANTUM_DEVICE).to(DEVICE)
        victim_model.load_state_dict(torch.load(os.path.join(SAVE_PATH + "/circuit14", "best_model.pth")))
    elif model_type == "transfer_learning":
        victim_model = CQTransferLearningModel(N_LEAK_QUBITS, N_LEAK_LAYERS).to(DEVICE)
        victim_model.load_state_dict(torch.load(os.path.join(SAVE_PATH + "/transfer_learning", "best_model.pth")))
    else:
        raise ValueError("Invalid model type")
    victim_model.eval()
    cloud_leak = CloudLeak(
        victim_model,
        n_qubits=N_LEAK_QUBITS,
        query_budget=LEAK_QUERY_BUDGET,
        device=DEVICE,
        save_path=QUANTUM_LEAK_SAVE_PATH,
        circuit_device=QUANTUM_DEVICE
    )
    quantum_leak = QuantumLeak(
        victim_model,
        n_qubits=N_LEAK_QUBITS,
        query_budget=LEAK_QUERY_BUDGET,
        n_committee=LEAK_N_COMMITTEE,
        device=DEVICE,
        save_path=QUANTUM_LEAK_SAVE_PATH,
        circuit_device=QUANTUM_DEVICE
    )
    query_dataset = cloud_leak.query_victim(train_loader, n_rounds=3)
    query_outputs = np.concatenate([item[1][:, 1] for item in query_dataset])
    print(f"Query dataset noise check - Mean probability: {query_outputs.mean():.4f}, Std: {query_outputs.std():.4f}")
    print("Training CloudLeak Single-N...")
    single_n_model, single_n_history = cloud_leak.train(
        query_dataset, out_of_domain_loader, n_epochs=20, batch_size=LEAK_BATCH_SIZE, architecture='L2', loss_type='nll'
    )
    single_n_metrics = cloud_leak.evaluate(single_n_model, test_loader)
    print(f"CloudLeak Single-N Metrics: Accuracy: {single_n_metrics['accuracy']:.2f}%, "
          f"Precision: {single_n_metrics['precision']:.2f}, Recall: {single_n_metrics['recall']:.2f}, F1: {single_n_metrics['f1']:.2f}")
    print("Training CloudLeak Single-H...")
    single_h_model, single_h_history = cloud_leak.train(
        query_dataset, out_of_domain_loader, n_epochs=20, batch_size=LEAK_BATCH_SIZE, architecture='L2', loss_type='huber'
    )
    single_h_metrics = cloud_leak.evaluate(single_h_model, test_loader)
    print(f"CloudLeak Single-H Metrics: Accuracy: {single_h_metrics['accuracy']:.2f}%, "
          f"Precision: {single_h_metrics['precision']:.2f}, Recall: {single_h_metrics['recall']:.2f}, F1: {single_h_metrics['f1']:.2f}")
    print("Training QuantumLeak Ens-N...")
    ens_n_models, ens_n_history = quantum_leak.train(
        query_dataset, out_of_domain_loader, n_epochs=N_LEAK_EPOCHS, batch_size=LEAK_BATCH_SIZE, architecture='L2', loss_type='nll'
    )
    ens_n_metrics = quantum_leak.evaluate(test_loader)
    print(f"QuantumLeak Ens-N Metrics: Accuracy: {ens_n_metrics['accuracy']:.2f}%, "
          f"Precision: {ens_n_metrics['precision']:.2f}, Recall: {ens_n_metrics['recall']:.2f}, F1: {ens_n_metrics['f1']:.2f}")
    print("Training QuantumLeak Ens-H...")
    ens_h_models, ens_h_history = quantum_leak.train(
        query_dataset, out_of_domain_loader, n_epochs=N_LEAK_EPOCHS, batch_size=LEAK_BATCH_SIZE, architecture='L2', loss_type='huber'
    )
    ens_h_metrics = quantum_leak.evaluate(test_loader)
    print(f"QuantumLeak Ens-H Metrics: Accuracy: {ens_h_metrics['accuracy']:.2f}%, "
          f"Precision: {ens_h_metrics['precision']:.2f}, Recall: {ens_h_metrics['recall']:.2f}, F1: {ens_h_metrics['f1']:.2f}")
    victim_criterion = HuberLoss(delta=0.5)
    victim_metrics = evaluate_model_with_metrics(victim_model, test_loader, torch.device(DEVICE), victim_criterion)
    print(f"Victim QNN Metrics: Accuracy: {victim_metrics['accuracy']:.2f}%, "
          f"Precision: {victim_metrics['precision']:.2f}, Recall: {victim_metrics['recall']:.2f}, F1: {victim_metrics['f1']:.2f}")
    table2_data = {
        'Scheme': ['Single-N', 'Single-H', 'Ens-N', 'Ens-H'],
        'Accuracy': [single_n_metrics['accuracy'], single_h_metrics['accuracy'], ens_n_metrics['accuracy'], ens_h_metrics['accuracy']],
        'Precision': [single_n_metrics['precision'], single_h_metrics['precision'], ens_n_metrics['precision'], ens_h_metrics['precision']],
        'Recall': [single_n_metrics['recall'], single_h_metrics['recall'], ens_n_metrics['recall'], ens_h_metrics['recall']],
        'F1': [single_n_metrics['f1'], single_h_metrics['f1'], ens_n_metrics['f1'], ens_h_metrics['f1']]
    }
    table2_df = pd.DataFrame(table2_data)
    table2_df.to_csv(os.path.join(QUANTUM_LEAK_SAVE_PATH, 'table2.csv'), index=False)
    print("\nTable II:")
    print(table2_df)
    table3_data = {
        'Method': ['CloudLeak', 'QuantumLeak'],
        'In-domain Images': [6000, 6000],
        'Out-of-domain Images': [512, 0],
        'Query Rounds': [3, 3],
        'VQC Architecture': ['L2', 'L2']
    }
    table3_df = pd.DataFrame(table3_data)
    table3_df.to_csv(os.path.join(QUANTUM_LEAK_SAVE_PATH, 'table3.csv'), index=False)
    print("\nTable III:")
    print(table3_df)
    table4_data = {
        'Model': ['Victim QNN', 'CloudLeak Single-N', 'CloudLeak Single-H', 'QuantumLeak Ens-N', 'QuantumLeak Ens-H'],
        'Accuracy (%)': [victim_metrics['accuracy'], single_n_metrics['accuracy'], single_h_metrics['accuracy'], ens_n_metrics['accuracy'], ens_h_metrics['accuracy']],
        'Precision': [victim_metrics['precision'], single_n_metrics['precision'], single_h_metrics['precision'], ens_n_metrics['precision'], ens_h_metrics['precision']],
        'Recall': [victim_metrics['recall'], single_n_metrics['recall'], single_h_metrics['recall'], ens_n_metrics['recall'], ens_h_metrics['recall']],
        'F1': [victim_metrics['f1'], single_n_metrics['f1'], single_h_metrics['f1'], ens_n_metrics['f1'], ens_h_metrics['f1']],
        'Notes': [
            'Original model, no query noise',
            'Trained with noisy queries (SPAM: 0.54%, 1Q: 0.177%, 2Q: 2.87%, Crosstalk: 20%)',
            'Trained with noisy queries, Huber loss',
            'Ensemble of 5 models, noisy queries, BCE loss',
            'Ensemble of 5 models, noisy queries, Huber loss'
        ]
    }
    table4_df = pd.DataFrame(table4_data)
    table4_df.to_csv(os.path.join(QUANTUM_LEAK_SAVE_PATH, 'table4.csv'), index=False)
    print("\nTable IV: Comparison of Victim QNN, CloudLeak, and QuantumLeak")
    print(table4_df)
    print("\nRunning ablation study for Figure 5, 6, 7, 8...")
    results = quantum_leak.ablation_study(
        train_loader, test_loader, out_of_domain_loader,
        query_budgets=[1500, 3000, 6000],
        architectures=['L1', 'L2', 'L3', 'A1', 'A2'],
        committee_numbers=[3, 5, 7],
        epochs=N_LEAK_EPOCHS
    )
    results_df = pd.read_csv(os.path.join(QUANTUM_LEAK_SAVE_PATH, 'ablation_results.csv'))
    quantum_leak.plot_ablation_results(results_df)
    plot_model_comparison(
        [victim_metrics['accuracy'], single_h_metrics['accuracy'], ens_h_metrics['accuracy']],
        ['Victim QNN', 'CloudLeak Single-H', 'QuantumLeak Ens-H'],
        QUANTUM_LEAK_SAVE_PATH,
        f"{model_type.capitalize()} Model Comparison"
    )

if __name__ == "__main__":
    # run_quanv_experiment()
    # run_basic_qnn_experiment()
    run_circuit14_experiment()
    run_transfer_learning_experiment()
    # run_leak_experiment("basic_qnn")
    # run_leak_experiment("circuit14")
    # run_leak_experiment("transfer_learning")


## 2. Chạy các thử nghiệm

Cell dưới đây cho phép bạn chạy các thử nghiệm chính của dự án.
Bỏ comment (xóa dấu `#`) ở dòng tương ứng với thử nghiệm bạn muốn chạy.

**Lưu ý:**
- Một bản vá lỗi cho `QuantumLayer.forward` đã được áp dụng tự động để tránh lỗi `ValueError` khi chuẩn hóa vector.
- Bạn có thể chạy lại cell này nhiều lần để thực hiện các thử nghiệm khác nhau mà không cần chạy lại các cell phía trên.


In [None]:

# Import các hàm từ file main.py vừa được tạo
from main import (
    run_quanv_experiment,
    run_basic_qnn_experiment,
    run_circuit14_experiment,
    run_transfer_learning_experiment,
    run_leak_experiment
)

# --- BẢN VÁ LỖI TỰ ĐỘNG ---
# Áp dụng bản vá cho lỗi ValueError khi chuẩn hóa vector trong QuantumLayer
try:
    import torch
    from models.circuit14 import QuantumLayer

    def forward_patched(self, inputs):
        x = torch.relu(self.conv(inputs))
        x = x.view(-1, 16 * 15 * 15)
        x = torch.tanh(self.pre_net(x))
        # Chuẩn hóa để tránh lỗi chia cho 0
        x = torch.nn.functional.normalize(x, p=2, dim=1, eps=1e-8)
        
        batch_size = x.size(0)
        outputs = []
        for i in range(batch_size):
            q_out = self.quantum_circuit(x[i], self.weights, self.crx_weights)
            q_out = torch.stack(q_out).float()
            outputs.append(q_out)
        outputs = torch.stack(outputs)
        probs = self.fc(outputs).sigmoid()
        return probs

    QuantumLayer.forward = forward_patched
    print("✅ Đã áp dụng bản vá cho QuantumLayer.forward để sửa lỗi ValueError.")
except ImportError:
    print("⚠️ Không tìm thấy QuantumLayer, bỏ qua việc vá lỗi.")
except Exception as e:
    print(f"❌ Lỗi khi áp dụng bản vá: {e}")


# --- CHỌN THỬ NGHIỆM ĐỂ CHẠY ---
# Bỏ comment (xóa #) ở dòng bạn muốn thực thi.
# Chỉ nên chạy một thử nghiệm mỗi lần để dễ theo dõi.

print("\n--- Bắt đầu chạy thử nghiệm Circuit14 ---")
run_circuit14_experiment()

# print("\n--- Bắt đầu chạy thử nghiệm Transfer Learning ---")
# run_transfer_learning_experiment()

# print("\n--- Bắt đầu chạy thử nghiệm Quanv ---")
# run_quanv_experiment()

# print("\n--- Bắt đầu chạy thử nghiệm Basic QNN ---")
# run_basic_qnn_experiment()

# print("\n--- Bắt đầu chạy thử nghiệm Leak trên Basic QNN ---")
# run_leak_experiment("basic_qnn")

# print("\n--- Bắt đầu chạy thử nghiệm Leak trên Circuit14 ---")
# run_leak_experiment("circuit14")

# print("\n--- Bắt đầu chạy thử nghiệm Leak trên Transfer Learning ---")
# run_leak_experiment("transfer_learning")
