In [3]:
# !pip install mitiq
# !pip uninstall qiskit-terra -y
# !pip uninstall qiskit-machine-learning -y
# !pip uninstall qiskit-ibm-runtime -y
# !pip uninstall qiskit_aer -y
# !pip install qiskit-machine-learning qiskit-ibm-runtime qiskit-aer
# !pip install qiskit==1.1.2

Found existing installation: qiskit-machine-learning 0.8.2
Uninstalling qiskit-machine-learning-0.8.2:
  Successfully uninstalled qiskit-machine-learning-0.8.2
Found existing installation: qiskit-ibm-runtime 0.37.0
Uninstalling qiskit-ibm-runtime-0.37.0:
  Successfully uninstalled qiskit-ibm-runtime-0.37.0
Found existing installation: qiskit-aer 0.12.1
Uninstalling qiskit-aer-0.12.1:
  Successfully uninstalled qiskit-aer-0.12.1
Defaulting to user installation because normal site-packages is not writeable
Collecting qiskit-machine-learning
  Using cached qiskit_machine_learning-0.8.2-py3-none-any.whl.metadata (13 kB)
Collecting qiskit-ibm-runtime
  Using cached qiskit_ibm_runtime-0.37.0-py3-none-any.whl.metadata (20 kB)
Collecting qiskit-aer
  Using cached qiskit_aer-0.17.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (8.2 kB)
Collecting qiskit>=1.0 (from qiskit-machine-learning)
  Using cached qiskit-2.0.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.

In [1]:
import qiskit
import qiskit_machine_learning
import qiskit_ibm_runtime
import qiskit_aer
import mitiq

print('Qiskit Version :', qiskit.version.get_version_info())
print('Qiskit Machine Learning Version :', qiskit_machine_learning.__version__)
print('Qiskit-IBM-runtime Version :', qiskit_ibm_runtime.__version__)
print('Qiskit-Aer Version :', qiskit_aer.__version__)
print('Mitiq Version :', mitiq.__version__)

Qiskit Version : 1.1.2
Qiskit Machine Learning Version : 0.8.2
Qiskit-IBM-runtime Version : 0.37.0
Qiskit-Aer Version : 0.17.0
Mitiq Version : 0.43.0


In [2]:
from qiskit import QuantumCircuit, transpile
from qiskit.circuit import Parameter
from qiskit.quantum_info import SparsePauliOp
from qiskit_machine_learning.neural_networks import EstimatorQNN, SamplerQNN, NeuralNetwork
from qiskit.circuit.library import ZFeatureMap
from qiskit_machine_learning.connectors import TorchConnector
from qiskit_machine_learning.utils import algorithm_globals
from qiskit_ibm_runtime import QiskitRuntimeService
from qiskit_ibm_runtime import Estimator as RuntimeEstimator
# from qiskit.primitives import BackendEstimatorV2 as Estimator
# from qiskit.primitives import Estimator

from mitiq import zne
from mitiq.interface.mitiq_qiskit.qiskit_utils import initialized_depolarizing_noise

import random
import numpy as np
import pandas as pd
import torch
import torchvision
from torchvision.datasets import MNIST
from torchvision.transforms import ToTensor, Compose
from torch.utils.data import DataLoader, TensorDataset, random_split
import torch.nn as nn
from torch.optim import Adam
import torch.nn.functional as F
from tqdm import tqdm
from sklearn.metrics import roc_auc_score
import copy
import time
from typing import Any, Optional, Tuple, Callable
import itertools

# IBM Quantum Hardware Backend

In [3]:
token = 'cb4921703fe282148f320172517ac70528ebb67187d712333db2622503d323ff4e3680292cb77b69c8f344599281f9c4d43808b1d7478f1b01509b1c3017afa8'
service = QiskitRuntimeService(
    channel='ibm_quantum',
    instance='yonsei-dedicated/internal/default',
    token=token,
)

# Or save your credentials on disk.
# QiskitRuntimeService.save_account(channel='ibm_quantum', instance='yonsei-dedicated/internal/default', token='<IBM Quantum API key>')

backend = service.least_busy(operational=True, simulator=False)

In [4]:
# estimator = Estimator(backend=backend)
# sampler = Sampler(backend)

algorithm_globals.random_seed = 12345
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# device = "cpu"
print("Running on ", device)

Running on  cuda


# QCNN Model

In [5]:
def conv_layer(num_qubits: int, name: str = "conv") -> QuantumCircuit:
    """
    Create a 'convolution' layer on `num_qubits`.
    Fill in whatever parametric gates / entangling pattern
    you like, sized to `num_qubits`.
    """
    qc = QuantumCircuit(num_qubits, name=name)
    # ------------------------------------------------
    # Example (placeholder): a layer of Rx, Rz gates
    # + ring entangling with CNOTs
    # ------------------------------------------------
    for q in range(num_qubits):
        theta = Parameter(f"θ_{name}_{q}")  # Qiskit parameter
        phi   = Parameter(f"φ_{name}_{q}")  # Qiskit parameter
        qc.ry(theta, q)
        qc.rz(phi,   q)
    # Entangle in a ring, e.g. CNOT(q, q+1 mod num_qubits)
    for q in range(num_qubits):
        qc.cx(q, (q+1) % num_qubits)

    return qc


def pool_layer(left_qubits: list[int], right_qubits: list[int], name: str = "pool") -> QuantumCircuit:
    """
    Pool pairs of qubits, one from `left_qubits` and one from `right_qubits`.
    Typically merges 2 qubits -> 1 logical qubit by a controlled operation.

    Note: The circuit you return must have a total 'width' equal to the
    sum of len(left_qubits) + len(right_qubits).
    """
    n_left = len(left_qubits)
    n_right = len(right_qubits)
    qc = QuantumCircuit(n_left + n_right, name=name)
    # Simple example: controlled-Z from each left qubit to each right qubit
    for i in range(min(n_left, n_right)):
        qc.cz(i, n_left + i)

    return qc

In [6]:
def build_qcnn_circuit(
    n_qubits: int,
    n_layers: int,
    conv_layer_func=conv_layer,
    pool_layer_func=pool_layer
):
    """
    Build the QCNN circuit (but do *not* wrap it in EstimatorQNN yet).
    Returns (circuit, input_params, weight_params, observable).
    """

    # 1) Feature map
    feature_map = ZFeatureMap(n_qubits)

    # 2) Ansatz - the parametric part
    ansatz = QuantumCircuit(n_qubits, name="Ansatz")

    active_qubits = list(range(n_qubits))
    for layer_index in range(n_layers):
        layer_name = layer_index + 1
        # Convolution
        conv_qc = conv_layer_func(len(active_qubits), f"c{layer_name}")
        ansatz.compose(conv_qc, qubits=active_qubits, inplace=True)

        # Pooling
        half = len(active_qubits) // 2
        left_block = active_qubits[:half]
        right_block = active_qubits[half:]
        pool_qc = pool_layer_func(left_block, right_block, f"p{layer_name}")
        ansatz.compose(pool_qc, qubits=active_qubits, inplace=True)
        active_qubits = right_block

    # 3) Combine feature map + ansatz into one circuit
    full_circuit = QuantumCircuit(n_qubits, name="QCNN")
    full_circuit.compose(feature_map, range(n_qubits), inplace=True)
    full_circuit.compose(ansatz,      range(n_qubits), inplace=True)

    # 4) Define observable (e.g., measure Z on first qubit)
    obs_label = "Z" + "I" * (n_qubits - 1)
    observable = SparsePauliOp.from_list([(obs_label, 1.0)])

    # Identify which parameters belong to the feature map vs. ansatz
    input_params  = list(feature_map.parameters)
    weight_params = list(ansatz.parameters)

    return full_circuit, input_params, weight_params, observable

In [46]:
class RuntimeNeuralNetwork(NeuralNetwork):
    def __init__(self, circuit, observable, input_params, weight_params, backend_name):
        super().__init__(
            num_inputs=len(input_params),
            num_weights=len(weight_params),
            sparse=False,
            output_shape=(1,),
            input_gradients=False
        )
        self._circuit = circuit
        self._observable = observable
        self._input_params = input_params
        self._weight_params = weight_params

        self._estimator = RuntimeEstimator(
            mode=backend_name
        )

    def _forward(self, input_data: np.ndarray, weights: np.ndarray):
        batch_size = input_data.shape[0]
        # input_data -> shape (batch_size, num_inputs)
        input_data = np.array(input_data, dtype=float).reshape(batch_size, -1)
        weights = np.array(weights, dtype=float).flatten()  # shape (num_weights,)
        # Build a list of param_values for each item in the batch
        param_values_list = []
        for b in range(batch_size):
            param_values = np.concatenate([input_data[b], weights])
            param_values_list.append(param_values.tolist())
        # Pass them all in one call
        pubs = []
        for pvals in param_values_list:
            pubs.append((self._circuit, self._observable, pvals))
        job = self._estimator.run(pubs)
        res = job.result()

        outputs = []
        for pub_res in res:
            # pub_res is a PubResult
            val = float(pub_res.data.evs)  # shape=() => single scalar
            outputs.append(val)

        outputs = np.array(outputs).reshape((batch_size, 1))

        return outputs
    
    def _backward(self, input_data, weights):
        batch_size = input_data.shape[0]
        # Return dummy zeros (no analytic gradient).
        d_input = np.zeros((batch_size, self._num_inputs), dtype=float)
        d_weights = np.zeros((batch_size, self._num_weights), dtype=float)
        return d_input, d_weights

In [47]:
def reduce_qubits(circuit):
    """Create a new circuit with only the qubits actually used by instructions."""
    used_qubits = set()
    for instr, qbs, cbs in circuit.data:
        used_qubits.update(qbs)

    # Sort by the actual index in this circuit
    used_qubits = sorted(used_qubits, key=lambda qb: circuit.find_bit(qb).index)

    # Map old qubit objects -> new indices [0..(len-1)]
    qubit_map = {qb: i for i, qb in enumerate(used_qubits)}

    # Build a new circuit with fewer qubits
    new_circ = QuantumCircuit(len(used_qubits), name=circuit.name)
    for instr, qbs, cbs in circuit.data:
        new_qbs = [qubit_map[qb] for qb in qbs]
        new_circ.append(instr, new_qbs, cbs)

    # Copy over metadata if desired
    new_circ.metadata = circuit.metadata
    return new_circ

In [48]:
class HybridQCNN(nn.Module):
    def __init__(self, n_qubits=8, n_layers=2, input_dim=784, backend=None):
        super().__init__()

        # A classical front-end to reduce 784 -> n_qubits
        self.fc = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, n_qubits)
        )

        # 1) Build the raw QCNN circuit + param metadata
        raw_circuit, input_params, weight_params, observable = build_qcnn_circuit(
            n_qubits=n_qubits,
            n_layers=n_layers,
            conv_layer_func=conv_layer,
            pool_layer_func=pool_layer
        )

        # 2) Transpile the circuit for the chosen backend
        #    If you're using Aer locally, pass backend=Aer.get_backend('aer_simulator')
        #    If you're using IBM hardware, pass backend=service.backend("ibmq_lima"), etc.
        my_layout = qiskit.transpiler.Layout({raw_circuit.qubits[i]: i for i in range(6)})
        transpiled_circuit = transpile(raw_circuit, backend=backend,
                                       initial_layout=my_layout, layout_method="sabre",      # or "trivial", "dense"
                                       routing_method="sabre",
                                       optimization_level=3,)
        reduced_circuit = reduce_qubits(transpiled_circuit)

        # 3) Build the QNN with the transpiled circuit
        qnn = RuntimeNeuralNetwork(
            circuit=reduced_circuit,
            observable=observable,
            input_params=input_params,
            weight_params=weight_params,
            backend_name=backend
        )

        # 4) Wrap with TorchConnector => a PyTorch module
        self.qnn = TorchConnector(qnn)

    def forward(self, x):
        """
        x shape: (batch_size, 784) for MNIST
        """
        # Classical part
        reduced_x = self.fc(x)

        # Quantum part => outputs shape (batch_size, 1)
        output = self.qnn(reduced_x)

        # Squeeze the last dim to get (batch_size,)
        return output.squeeze(1)

# MNIST

In [10]:
def load_mnist_binary(seed, n_train, n_valtest, device, batch_size, classes=(0, 1)):
    # Set random seed for reproducibility
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)

    # Load dataset with transformation
    transform = Compose([ToTensor(), lambda x: x.view(-1)])  # Flatten MNIST images
    data_train = MNIST(root='./data', train=True, download=True, transform=transform)
    data_test = MNIST(root='./data', train=False, download=True, transform=transform)
    input_dim = 28 * 28

    # Filter for binary classes
    train_mask = (data_train.targets == classes[0]) | (data_train.targets == classes[1])
    test_mask = (data_test.targets == classes[0]) | (data_test.targets == classes[1])
    X_train = data_train.data[train_mask].float() / 255.0  # Normalize pixel values to [0, 1]
    y_train = data_train.targets[train_mask].clone().detach()
    X_test = data_test.data[test_mask].float() / 255.0
    y_test = data_test.targets[test_mask].clone().detach()

    # Binarize labels
    y_train = (y_train == classes[1]).long()
    y_test = (y_test == classes[1]).long()

    # Shuffle data
    shuffle_idx = torch.randperm(len(y_train))
    X_train = X_train[shuffle_idx]
    y_train = y_train[shuffle_idx]

    shuffle_idx2 = torch.randperm(len(y_test))
    X_test = X_test[shuffle_idx2]
    y_test = y_test[shuffle_idx2]

    # Flatten images
    X_train = X_train.view(-1, 28*28)
    X_test = X_test.view(-1, 28*28)

    # Create TensorDatasets
    train_X = X_train.to(device)
    train_y = y_train.to(device)
    test_X = X_test.to(device)
    test_y = y_test.to(device)

    train_dataset = TensorDataset(train_X, train_y)
    valtest_dataset = TensorDataset(test_X, test_y)

    # Equally split validation and test sets
    val_size = int(0.5 * len(valtest_dataset))
    test_size = len(valtest_dataset) - val_size
    val_dataset, test_dataset = random_split(valtest_dataset, [val_size, test_size])

    # DataLoader parameters
    params = {'shuffle': True, 'batch_size': batch_size} if batch_size > 0 else {'shuffle': True}
    test_params = {'shuffle': False, 'batch_size': batch_size} if batch_size > 0 else {'shuffle': False}

    train_loader = DataLoader(train_dataset, **params)
    val_loader = DataLoader(val_dataset, **test_params)
    test_loader = DataLoader(test_dataset, **test_params)
    
    return train_loader, val_loader, test_loader, input_dim

# Training Function

In [34]:
################################# Calculate Running Time ########################################
def epoch_time(start_time: float, end_time: float) -> Tuple[float, float]:
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs


################################# Performance & Density Matrices ################################
# Training loop
def train_perf(model, dataloader, optimizer, criterion):
    model.train()
    train_loss = 0.0
    all_labels = []
    all_outputs = []
    for inputs, labels in tqdm(dataloader):
        inputs, labels = inputs.to(device), labels.to(device)  # Ensure that data is on the same device (GPU or CPU)
        labels = labels.float()   # Ensure labels are of type float for BCEWithLogitsLoss
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
        
        # Collect labels and outputs for AUROC
        all_labels.append(labels.cpu().numpy())
        all_outputs.append(outputs.detach().cpu().numpy())       
        
    # Calculate train AUROC
    all_labels = np.concatenate(all_labels)
    all_outputs = np.concatenate(all_outputs)
    train_auroc = roc_auc_score(all_labels, all_outputs)
    
    return train_loss / len(dataloader), train_auroc


# Validation/Test loop
def evaluate_perf(model, dataloader, criterion):
    model.eval()
    running_loss = 0.0
    all_labels = []
    all_outputs = []
    with torch.no_grad():
        for inputs, labels in tqdm(dataloader):
            inputs, labels = inputs.to(device), labels.to(device)  # Ensure that data is on the same device (GPU or CPU)
            labels = labels.float()   # Ensure labels are of type float for BCEWithLogitsLoss
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            running_loss += loss.item()

            # Collect labels and outputs for AUROC
            all_labels.append(labels.cpu().numpy())
            all_outputs.append(outputs.cpu().numpy())

    all_labels = np.concatenate(all_labels)
    all_outputs = np.concatenate(all_outputs)
    auroc = roc_auc_score(all_labels, all_outputs)
    
    return running_loss / len(dataloader), auroc

In [35]:
def QuantumCNN_run(n_qubits, n_layers, input_dim, num_epochs):
    print("Running on ", device)
    model = HybridQCNN(n_qubits, n_layers, input_dim, backend).to(device)
    criterion = nn.BCEWithLogitsLoss()  # Use BCEWithLogitsLoss for binary classification
    # criterion = nn.CrossEntropyLoss()   # Loss function for multi-class classification
    optimizer = Adam(model.parameters(), lr=0.001, weight_decay=1e-4, eps=1e-8)
    # optimizer = COBYLA(maxiter=200)
        
    # Training process
    train_metrics, valid_metrics, test_metrics = [], [], []
        
    for epoch in range(num_epochs):
        start_time = time.time()
        
        train_loss, train_auc = train_perf(model, train_loader, optimizer, criterion)
        train_metrics.append({'epoch': epoch + 1, 'train_loss': train_loss, 'train_auc': train_auc})    
    
        valid_loss, valid_auc = evaluate_perf(model, val_loader, criterion)
        valid_metrics.append({'epoch': epoch + 1, 'valid_loss': valid_loss, 'valid_auc': valid_auc})
    
        end_time = time.time()
        epoch_mins, epoch_secs = epoch_time(start_time, end_time)
        print(f"Epoch: {epoch + 1:02} | Time: {epoch_mins}m {epoch_secs}s")
        print(f"Train Loss: {train_loss:.4f}, AUC: {train_auc:.4f} | Validation Loss: {valid_loss:.4f}, AUC: {valid_auc:.4f}")

    # Final evaluation on the test set
    test_loss, test_auc = evaluate_perf(model, test_loader, criterion)
    print(f"Test Loss: {test_loss:.4f}, AUC: {test_auc:.4f}")
    test_metrics.append({'epoch': num_epochs, 'test_loss': test_loss, 'test_auc': test_auc}) 

    # Combine all metrics into a pandas DataFrame
    metrics = []
    for epoch in range(num_epochs):
        metrics.append({
            'epoch': epoch + 1,
            'train_loss': train_metrics[epoch]['train_loss'],
            'train_auc': train_metrics[epoch]['train_auc'],
            'valid_loss': valid_metrics[epoch]['valid_loss'],
            'valid_auc': valid_metrics[epoch]['valid_auc'],
            'test_loss': test_metrics[0]['test_loss'],
            'test_auc': test_metrics[0]['test_auc'],
        })
    # Convert to DataFrame
    metrics_df = pd.DataFrame(metrics)
    # Save to CSV
    # csv_filename = f"QuantumCNN_performance.csv"
    # metrics_df.to_csv(csv_filename, index=False)
    # print(f"Metrics saved to {csv_filename}")
        
    return test_loss, test_auc

# Run Model

In [49]:
train_loader, val_loader, test_loader, input_dim = load_mnist_binary(seed=2025, n_train=70, n_valtest=30, device=device, batch_size=32)

In [50]:
QuantumCNN_run(n_qubits=6, n_layers=2, input_dim=input_dim, num_epochs=5)

Running on  cuda


  1%|          | 3/396 [2:43:42<357:25:19, 3274.10s/it]


KeyboardInterrupt: 

# ZNE Error Mitigation

In [None]:
class HybridQCNN(nn.Module):
    def __init__(self, n_qubits=8, n_layers=2, input_dim=784):
        super().__init__()
        # self.conv1 = nn.Conv2d(1, 2, kernel_size=5)
        # self.conv2 = nn.Conv2d(2, 16, kernel_size=5)
        # self.dropout = nn.Dropout2d()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, n_qubits)
        )
        qnn = build_qcnn(n_qubits=n_qubits, n_layers=n_layers, estimator=estimator)
        zne.execute_with_zne(circuit, ibmq_executor)
        self.qnn = TorchConnector(qnn)  # Apply torch connector, weights chosen
        # uniformly at random from interval [-1,1].
        # self.fc2 = nn.Linear(1, 1)  # 1-dimensional output from QNN

    def forward(self, x):
        reduced_x = self.fc(x)
        output = self.qnn(reduced_x)  # apply QNN
        
        return output.squeeze(1)