In [None]:
# Cell 1: Environment Setup and Configuration
import pennylane as qml
from pennylane import numpy as pnp
import numpy as np
from pennylane.templates.layers import StronglyEntanglingLayers
import random
import torch

# Check for CUDA availability to enable GPU acceleration
torch_device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("CUDA available:", torch.cuda.is_available())
if torch.cuda.is_available():
    print("Device name:", torch.cuda.get_device_name(0))
    print("Torch CUDA version:", torch.version.cuda)

CUDA available: True
Device name: NVIDIA GeForce RTX 4060 Ti
Torch CUDA version: 12.1


In [None]:
# Cell 2: Initialize Quantum Device
# Using 'lightning.qubit' for high-performance state-vector simulation.
# 'shots=None' indicates exact analytical expectation values (ideal simulation).
dev = qml.device("lightning.qubit", wires=6, shots=None)

In [None]:
# Cell 3: Parameter Counting Helper
# Counts the number of rotation gates in the gatestream to determine parameter size.

def check_np(list):
    """
    Counts the number of parameterized gates (Rotations) in the gate sequence.
    Args:
        list: The gatestream (list of gates).
    Returns:
        num: Total number of rotation parameters required.
    """
    num = 0
    for r in list:
        # Check if the gate is a rotation type ("Rot")
        if r[0] == "Rot": 
            num += 1
    return num

In [None]:
# Cell 4: Dynamic Ansatz Construction (RL-driven)
# Constructs the variational circuit based on the gate sequence (gatestream) provided by the RL agent.

def ansatz(W, gatestream):
    """
    Builds the quantum circuit dynamically.
    Args:
        W: Trainable parameters for rotation gates.
        gatestream: List of gates (Action history from RL agent).
    """
    w_cnt = 0
    for gate in gatestream:
        # If the gate is a parameterized rotation
        if gate[0] == "Rot":
            # Apply Pauli Rotation: Param W[w_cnt], Axis gate[1], Target wire gate[2]
            qml.PauliRot(W[w_cnt], gate[1], wires=gate[2])
            w_cnt += 1

        # If the gate is CNOT (Entanglement)
        elif gate[0] == "CNOT":
            qml.CNOT(wires=[gate[1], gate[2]])

In [None]:
# Cell 5: State Preparation
# Encodes classical input data into quantum states.

def statepreparation(x):
    """
    Embeds the input feature vector x into the quantum state amplitudes.
    Method: Amplitude Embedding (requires normalized input).
    """
    qml.AmplitudeEmbedding(x, wires=range(6), normalize=True)

In [None]:
# Cell 6: Quantum Circuit Execution (QNode)
# Defines the full quantum circuit pipeline: Encoding -> Fixed Entanglement -> Dynamic RL Ansatz -> Measurement.

@qml.qnode(dev, interface="torch", diff_method="adjoint")
def circuit(weights_ent, weights_rl, x, gatestream):
    """
    Executes the quantum circuit.
    Args:
        weights_ent: Parameters for the fixed StronglyEntanglingLayers.
        weights_rl: Parameters for the dynamic gates determined by the RL agent.
        x: Input feature vector.
        gatestream: Sequence of gates (architecture) defined by the RL agent.
    """
    
    # 1. State Preparation (Data Encoding)
    statepreparation(x)
    
    # 2. Fixed Entanglement Layer (Pre-processing)
    # Provides a base level of entanglement using StronglyEntanglingLayers.
    StronglyEntanglingLayers(weights_ent, wires=range(6))
    
    # 3. Dynamic Ansatz (RL-Constructed)
    # Applies the specific sequence of gates generated by the RL agent.
    ansatz(weights_rl, gatestream)

    # 4. Measurement
    # Returns the expectation value of the Pauli-Z operator on the first qubit.
    return qml.expval(qml.PauliZ(0))

In [None]:
# Cell 7: Hybrid Variational Classifier
# Combines the quantum circuit output with a classical bias parameter.

def variational_classifier(weights_ent, weights_rl, bias, x, gatestream):
    """
    Computes the final prediction of the hybrid model.
    Formula: f(x) = <Z> + b
    Args:
        weights_ent: Parameters for fixed entanglement.
        weights_rl: Parameters for RL-generated gates.
        bias: Classical bias term.
        x: Input data.
        gatestream: Quantum circuit architecture.
    """
    return circuit(weights_ent, weights_rl, x, gatestream) + bias

In [None]:
# Cell 8: Mean Squared Error (MSE) Loss
# Standard loss function for regression-based quantum classification.

def square_loss(labels, predictions):
    """
    Computes the Mean Squared Error between labels and predictions.
    Includes tensor type/device compatibility checks.
    """
    # Ensure predictions are a tensor
    preds  = torch.stack(predictions).squeeze() if isinstance(predictions, list) else predictions
    # Match label tensor properties to predictions
    labels = torch.as_tensor(labels, dtype=preds.dtype, device=preds.device).squeeze()
    
    # Calculate Mean Squared Error
    return ((labels - preds) ** 2).mean()

In [None]:
# Cell 9: Classification Accuracy (Sign-based)
# Measures accuracy by comparing the sign of predictions with true labels {-1, 1}.

def accuracy_sign(labels, predictions):
    """
    Computes accuracy for binary classification.
    Method: Sign thresholding (pred >= 0 -> Class 1, else -> Class -1).
    Args:
        labels: True labels tensor.
        predictions: Model output tensor (expectation values).
    """
    # Ensure tensor compatibility
    preds  = torch.stack(predictions).squeeze() if isinstance(predictions, list) else predictions
    labels = labels.to(dtype=preds.dtype, device=preds.device).squeeze()
    
    # Convert continuous predictions to discrete class labels {1.0, -1.0}
    pred_labels = torch.where(preds >= 0, 1.0, -1.0)
    
    # Calculate mean accuracy
    return (pred_labels == labels).float().mean()

In [None]:
# Cell 10: Cost Function Calculation
# Computes the total cost (loss) over a batch of data by aggregating individual predictions.

def cost(weights_ent, weights_rl, bias, X, Y, gatestream):
    """
    Calculates the loss for a given dataset batch.
    Args:
        X: Batch of input data.
        Y: Batch of true labels.
        gatestream: Circuit structure.
    """
    # Generate predictions for each sample in the batch
    predictions = [variational_classifier(weights_ent, weights_rl, bias, x, gatestream) for x in X]
    
    # Calculate Square Loss
    return square_loss(Y, predictions)

In [None]:
# Cell 11: Circuit Optimization Function (Reward Calculation)
# Trains the specific circuit architecture proposed by the RL agent to evaluate its performance.

def opt_classifier(gatestream, weights_ent=None, iters=15, draw=False):
    """
    Performs short-term training (optimization) of the quantum circuit.
    Args:
        gatestream: Circuit architecture from RL agent.
        weights_ent: Initial weights for SEL (transfer learning context).
        iters: Number of optimization steps (epochs/iterations).
        draw: Boolean flag to return the circuit diagram.
    Returns:
        out_list: Training logs [iter, cost, train_acc, val_acc].
        draw_p: Circuit visual (text based).
        figset: Data and weights snapshots.
        weights_ent: Updated SEL weights.
    """

    # Load Dataset (Updated path consistency)
    # Assumes data is in 'dataset' folder
    import os
    X = torch.from_numpy(np.load(os.path.join("dataset", "data_speck.npy"))).float()
    Y = torch.from_numpy(np.load(os.path.join("dataset", "labels_speck.npy"))).float()
    Y = Y * 2 - 1 # Rescale labels from [0, 1] to [-1, 1] for Pauli-Z expectation

    # Reproducibility
    np.random.seed(0)
    
    # Data Splitting (80% Train, 20% Validation)
    num_data = len(Y)
    num_train = int(0.8 * num_data)
    indices = torch.randperm(num_data, device=Y.device) # Random permutation
    X_train = X[indices[:num_train]]
    Y_train = Y[indices[:num_train]]
    X_val   = X[indices[num_train:]]
    Y_val   = Y[indices[num_train:]]

    dev_t = X.device # Detect device (CPU/GPU)

    # Initialize or Load Weights for Entanglement Layer
    if weights_ent is None:
        weights_ent = torch.randn((2, 6, 3), device=dev_t) * 0.01
    else:
        weights_ent = weights_ent.to(dev_t)
    weights_ent = weights_ent.clone().detach().requires_grad_(True)

    # Initialize Weights for RL-Generated Ansatz
    num_rot_params = check_np(gatestream) # Count required parameters
    if num_rot_params > 0:
        weights_rl = (torch.randn(num_rot_params, device=dev_t) * 0.01).requires_grad_(True)
    else:
        weights_rl = torch.zeros(0, device=dev_t, requires_grad=True)

    # Classical Bias
    bias = torch.tensor(0.0, device=dev_t, requires_grad=True)

    # Optimizer (SGD with Momentum)
    opt = torch.optim.SGD([weights_ent, weights_rl, bias], lr=0.01, momentum=0.9)
    batch_size = 10

    out_list = []

    # Optimization Loop
    for it in range(iters):

        # Mini-batch Selection
        batch_idx = torch.randint(0, num_train, (batch_size,), device=dev_t)
        X_batch, Y_batch = X_train[batch_idx], Y_train[batch_idx]

        # Update Step
        opt.zero_grad()
        loss = cost(weights_ent, weights_rl, bias, X_batch, Y_batch, gatestream)
        loss.backward()
        opt.step()

        # Evaluation (No Gradients)
        with torch.no_grad():
            
            # Training Metrics
            vals_tr  = [variational_classifier(weights_ent, weights_rl, bias, x, gatestream) for x in X_train]
            preds_tr = torch.stack(vals_tr).squeeze()
            acc_train = (torch.sign(preds_tr) == torch.sign(Y_train)).float().mean()

            # Validation Metrics
            vals_val  = [variational_classifier(weights_ent, weights_rl, bias, x, gatestream) for x in X_val]
            preds_val = torch.stack(vals_val).squeeze()
            acc_val   = (torch.sign(preds_val) == torch.sign(Y_val)).float().mean()
            cost_val  = ((Y_val - preds_val) ** 2).mean()

        # Log Progress
        out_list.append([it + 1, float(cost_val), float(acc_train), float(acc_val)])

        # Early Stopping Condition (Threshold for Success)
        if float(acc_val) >= 0.65:
            break

    # Circuit Drawing (Optional)
    if draw and len(X_val) > 0:
        x_draw     = X_val[0].detach().cpu().numpy()
        w_ent_draw = weights_ent.detach().cpu().numpy()
        w_rl_draw  = weights_rl.detach().cpu().numpy()
        draw_p = qml.draw(circuit)(w_ent_draw, w_rl_draw, x_draw, gatestream)
    else:
        draw_p = None    

    # Snapshot of Data and Weights
    figset = [weights_ent.detach(), weights_rl.detach(), bias.detach(), X_train, Y_train, X_val, Y_val]
    
    return out_list, draw_p, figset, weights_ent.detach()

In [None]:
# Cell 12: Gate to Observation Vector Conversion
# Encodes symbolic gate representations into numerical observation vectors for the RL agent.
# Encoding format: [is_Rotation, is_CNOT, Property_1 (Axis/Control), Property_2 (Target)]

def gate_to_obs(gate):
    """
    Converts a gate tuple into a fixed-size observation vector.
    Args:
        gate: Tuple describing the gate (e.g., ('Rot', 'X', 0) or ('CNOT', 0, 1)).
    Returns:
        ob: List [Type_Rot, Type_CNOT, Param1, Param2].
    """
    
    # Initialize observation vector
    ob = [0, 0, 0, 0]
    
    # Case 1: Rotation Gate
    if gate[0] == 'Rot':
        ob[0] = 1 # Set Rotation flag
        
        # Encode Rotation Axis (X=1, Y=2, Z=3)
        if gate[1] == 'X': ob[2] = 1
        elif gate[1] == 'Y': ob[2] = 2
        elif gate[1] == 'Z': ob[2] = 3
        
        ob[3] = gate[2] # Target wire index
    
    # Case 2: CNOT Gate
    elif gate[0] == 'CNOT':
        ob[1] = 1 # Set CNOT flag
        ob[2] = gate[1] # Control wire index
        ob[3] = gate[2] # Target wire index
    
    return ob

In [None]:
# Cell 13: Environment State Update
# Updates the internal state (gatestream and observation history) based on the agent's selected action.

def update_obs(act, step, obs, gatestream, gates):
    """
    Updates the environment state after an action is performed.
    Args:
        act: Index of the selected action from the 'gates' list.
        step: Current time step index.
        obs: Current observation history (state buffer).
        gatestream: List of gates currently in the circuit.
        gates: List of all available candidate gates.
    """
    
    # 1. Update Circuit Architecture
    gatestream.append(gates[act]) # Append the selected gate to the circuit
    
    # 2. Update Observation Vector
    ob = gate_to_obs(gates[act])  # Convert the gate info to a numerical observation
    obs[step] = ob                # Record the observation at the current step
    step += 1                     # Increment step counter

    return step, obs, gatestream

In [None]:
# Cell 14: Reward Function Definition
# Calculates the multi-objective reward signal for the RL agent.
# Components: Accuracy, Cost, Gate Distribution (Variance), Redundancy (Duplicate), Connectivity, and Circuit Depth.

def cal_reward(steps, obs, outs):
    """
    Computes the reward for the current step based on circuit performance and structural constraints.
    Args:
        steps: Current step count in the episode.
        obs: Observation history (sequence of gates applied).
        outs: Optimization logs containing [iter, cost, train_acc, val_acc].
    Returns:
        list: Individual reward components.
        float: Total weighted reward.
    """

    ## 1. Accuracy Reward
    # Calculate average validation accuracy from the optimization trajectory.
    acc = [row[2] for row in outs]
    acc_m = sum(acc) / len(acc)

    ## 2. Cost Reward
    # Inverse of the average loss (Lower loss -> Higher reward).
    cost = [row[1] for row in outs]
    cost_m = 1 / (sum(cost) / len(cost))

    ## 3. Qubit Usage Variance Reward
    # Encourages uniform distribution of gates across all qubits (prevents neglecting specific qubits).
    pop_list = [0, 0, 0, 0, 0, 0] # Counter for each qubit (0~5)
    for row in obs:
        if row[1] == 1: # If CNOT gate
            pop_list[row[2]] += 1 # Control qubit
            pop_list[row[3]] += 1 # Target qubit
        elif row[0] == 1: # If Rotation gate
            pop_list[row[3]] += 1 # Target qubit
    
    # Calculate score based on variance (Low variance = High score)
    pop_r = (2 - np.var(pop_list)) / 2    

    ## 4. Redundancy Penalty (Duplicate Check)
    # Penalizes consecutive identical operations on the same qubits to avoid redundant cycles.
    dup_r = 0
    
    # Case A: Current gate is Rotation
    if obs[steps-1][0] == 1:
        tc = obs[steps-1][3] # Target qubit index
        tc_list = []
        # Filter history for gates involving this qubit
        for row in obs:
            if row[1] == 1:
                if row[2] == tc or row[3] == tc: tc_list.append(row)
            elif row[0] == 1:
                if row[3] == tc: tc_list.append(row)
        
        # Check if the last two operations are identical
        if len(tc_list) > 1:
            if tc_list[-1] == tc_list[-2]: dup_r = -10
            
    # Case B: Current gate is CNOT
    elif obs[steps-1][1] == 1:
        # Check history for Control Qubit
        tc = obs[steps-1][2]
        tc_list_c = []
        for row in obs:
            if row[1] == 1:
                if row[2] == tc or row[3] == tc: tc_list_c.append(row)
            elif row[0] == 1:
                if row[3] == tc: tc_list_c.append(row)
        
        # Check history for Target Qubit
        tc = obs[steps-1][3]
        tc_list_t = []
        for row in obs:
            if row[1] == 1:
                if row[2] == tc or row[3] == tc: tc_list_t.append(row)
            elif row[0] == 1:
                if row[3] == tc: tc_list_t.append(row)
        
        # Apply penalty if redundancy is detected in both control and target histories
        if len(tc_list_c) > 1 and len(tc_list_t) > 1:
            if tc_list_c[-1] == tc_list_c[-2] and tc_list_t[-1] == tc_list_t[-2]: dup_r = -10

    ## 5. Gate Type Reward
    # Incentivizes specific gate types (Rotation vs CNOT).
    if obs[steps-1][0] == 1: # If Rotation
        gate_r = 1
        rot_r = 1
    else:
        gate_r = 0
        rot_r = 0

    ## 6. Connectivity Reward (CNOT Distance)
    # Encourages local connectivity (shorter distance between control and target).
    if obs[steps-1][1] == 1:   
        cnot_r = 1 / abs(obs[steps-1][2]-obs[steps-1][3])
    else: cnot_r = 0    

    ## 7. Circuit Depth Efficiency
    # Penalizes longer circuits (Encourages finding solutions with fewer steps).
    steps_r = (30 - steps) / 30
    
    # Return component list and final weighted sum
    # Weighted Sum Formula: Accuracy * 30 + Cost * 2 + Variance * 3 + Steps * 5 + ...
    return [acc_m, cost_m, gate_r, rot_r, cnot_r, steps_r, pop_r, dup_r], \
           (acc_m - 0.5)*2 * 15 + cost_m * 2 + gate_r * 3 + rot_r + cnot_r + steps_r * 5 + pop_r * 3 + dup_r

In [None]:
# Cell 15: Quantum Architecture Search Environment (RL Environment)
# Defines the interaction environment for the RL agent, including action space,
# state transitions, reward calculation, and termination conditions.

class qc:
    def __init__(self):
        """
        Initializes the environment.
        Defines the Action Space (Gate Pool) and Maximum Circuit Depth.
        """
        # Define Action Space: Pool of available gates
        # - Rotations (X, Y, Z) on all 6 qubits
        # - CNOTs on all possible connected pairs
        self.gates = [['Rot','X', 0], ['Rot','X', 1], ['Rot','X', 2], ['Rot','X', 3], ['Rot','X', 4], ['Rot','X', 5],
         ['Rot','Y', 0], ['Rot','Y', 1], ['Rot','Y', 2], ['Rot','Y', 3], ['Rot','Y', 4], ['Rot','Y', 5],
         ['Rot','Z', 0], ['Rot','Z', 1], ['Rot','Z', 2], ['Rot','Z', 3], ['Rot','Z', 4], ['Rot','Z', 5],
         ['CNOT', 0, 1],  ['CNOT', 0, 2],  ['CNOT', 0, 3], ['CNOT', 0, 4], ['CNOT', 0, 5],
         ['CNOT', 1, 0],  ['CNOT', 1, 2],  ['CNOT', 1, 3], ['CNOT', 1, 4], ['CNOT', 1, 5],
         ['CNOT', 2, 0],  ['CNOT', 2, 1],  ['CNOT', 2, 3], ['CNOT', 2, 4], ['CNOT', 2, 5],
         ['CNOT', 3, 0],  ['CNOT', 3, 1],  ['CNOT', 3, 2], ['CNOT', 3, 4], ['CNOT', 3, 5],
         ['CNOT', 4, 0],  ['CNOT', 4, 1],  ['CNOT', 4, 2], ['CNOT', 4, 3], ['CNOT', 4, 5],
         ['CNOT', 5, 0],  ['CNOT', 5, 1],  ['CNOT', 5, 2], ['CNOT', 5, 3], ['CNOT', 5, 4]] 
        
        self.len_qc = 30 # Maximum number of steps (max circuit depth)
        self.act_space = len(self.gates) # Size of the action space
    
        self.weights_ent = None # Weights for the fixed entanglement layer

    def reset(self, weights_ent=None):
        """
        Resets the environment to the initial state.
        Args:
            weights_ent: Optional pre-trained weights for the SEL layer (Transfer Learning).
        """
        self.steps = 0
        self.obs = [[0] * 4 for _ in range(self.len_qc)] # Reset observation buffer
        self.gatestream = [] # Reset circuit architecture
        self.reward = -1
        self.term = -1 # Termination flag
        self.done = 0  # Success flag (Accuracy threshold met)
        
        # Initialize SEL weights (Random initialization or Load provided weights)
        if weights_ent is not None:
            self.weights_ent = weights_ent.detach().clone()
        elif self.weights_ent is None:
            self.weights_ent = (torch.randn((2, 6, 3)) * 0.01).detach()
        return

    def step(self, act):
        """
        Executes one time step within the environment.
        1. Updates the circuit with the selected action.
        2. Optimizes the new circuit (Short-term training).
        3. Calculates the reward based on performance and structure.
        4. Checks for termination (Max steps or Target accuracy).
        """
        # Validate action index
        if act > self.act_space-1 or act < 0:
            print("out of action space")
            return 0
        
        # Check if max steps reached
        if self.steps > self.len_qc-1:
            print("out of qc length")
            return 0
        
        # 1. State Update (Add gate and update observation)
        self.steps, self.obs, self.gatestream = update_obs(act, self.steps, self.obs, self.gatestream, self.gates)
        
        # 2. Circuit Optimization (Evaluate current architecture)
        self.outs, self.draw, self.figset, self.weights_ent = opt_classifier(
            self.gatestream,
            weights_ent=self.weights_ent,
            iters=15, draw=False
        )
        
        # 3. Reward Calculation
        self.rlist, self.reward = cal_reward(self.steps, self.obs, self.outs)

        # 4. Check Termination Conditions
        # Terminate if max steps reached or goal achieved
        if self.steps == self.len_qc or self.done == 1:
            self.term = 1
        else: self.term = 0

        # Check for Early Stopping (Target Accuracy Reached)
        acc_val_max   = max(row[3] for row in self.outs)
        if acc_val_max >= 0.65:  # Threshold: 65% validation accuracy
            self.done = 1
            self.term = 1
        
        return 1

    def gs_step(self, gs):
        """
        Helper method to evaluate a specific gate stream manually.
        Useful for debugging or validating a fixed architecture.
        """
        self.outs, self.draw, self.figset, self.weights_ent = opt_classifier(
            gs, weights_ent=self.weights_ent, iters=15, draw=False
        )
        return 1

    def sample(self):
        """
        Randomly samples an action from the action space.
        """
        return random.randint(0, self.act_space-1)
    
    def showdb(self, figset, gatestream):
        """
        Visualizes the Decision Boundary using PCA.
        Projects the high-dimensional quantum states onto 2D space.
        """
        weights_ent, weights_rl, bias, X_train, Y_train, X_val, Y_val = figset
        
        # Combine Train and Val data for visualization
        X_all = torch.cat([X_train, X_val], dim=0)
        Y_all = torch.cat([Y_train, Y_val], dim=0)
        split = len(X_train)

        # Generate Predictions
        preds = [torch.sign(variational_classifier(weights_ent, weights_rl, bias, x, gatestream)) for x in X_all]
        preds = torch.stack(preds).squeeze().cpu().numpy()

        # PCA Projection
        from sklearn.decomposition import PCA
        pca = PCA(n_components=2)
        X_2d = pca.fit_transform(X_all.cpu().numpy())

        # Determine Correct/Wrong predictions
        correct = (preds == Y_all.cpu().numpy())

        # Plotting
        plt.figure(figsize=(6,6))
        plt.scatter(X_2d[correct, 0], X_2d[correct, 1], c="g", label="Correct", alpha=0.7)
        plt.scatter(X_2d[~correct, 0], X_2d[~correct, 1], c="r", label="Wrong", alpha=0.7, marker="x")
    
        # Draw vertical line separating Train (left) and Val (right) implicitly by index if sorted, 
        # or just as a visual marker for the split point in the array.
        plt.axvline(X_2d[split, 0], color="gray", linestyle="--", alpha=0.3)

        plt.legend()
        plt.title("Classification results (PCA 2D projection)")
        plt.show()