In [None]:
!pip uninstall -y qiskit qiskit-terra qiskit-aer qiskit-ibm-runtime torch torchvision torchaudio

!pip install 'qiskit==2.1.1' --no-cache-dir
!pip install 'qiskit-aer==0.17.1' --no-cache-dir
!pip install 'qiskit-ibm-runtime==0.40.1' --no-cache-dir
!pip install 'qiskit-addon-utils==0.1.1' --no-cache-dir

!pip install 'torch==2.4.1' --index-url https://download.pytorch.org/whl/cpu --no-cache-dir
!pip install 'gymnasium==0.29.1' 'numpy==1.26.4' 'scipy==1.13.1' 'matplotlib==3.9.2' --no-cache-dir

!pip install --upgrade mitiq --no-cache-dir

print("Installation complete.")

[0mFound existing installation: torch 2.9.0+cu126
Uninstalling torch-2.9.0+cu126:
  Successfully uninstalled torch-2.9.0+cu126
Found existing installation: torchvision 0.24.0+cu126
Uninstalling torchvision-0.24.0+cu126:
  Successfully uninstalled torchvision-0.24.0+cu126
Found existing installation: torchaudio 2.9.0+cu126
Uninstalling torchaudio-2.9.0+cu126:
  Successfully uninstalled torchaudio-2.9.0+cu126
Collecting qiskit==2.1.1
  Downloading qiskit-2.1.1-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting rustworkx>=0.15.0 (from qiskit==2.1.1)
  Downloading rustworkx-0.17.1-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (10 kB)
Collecting stevedore>=3.0.0 (from qiskit==2.1.1)
  Downloading stevedore-5.6.0-py3-none-any.whl.metadata (2.3 kB)
Downloading qiskit-2.1.1-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (7.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.5/7.5 MB[0m [31m85.0 MB/s[0m eta [

Collecting mitiq
  Downloading mitiq-0.48.1-py3-none-any.whl.metadata (16 kB)
Collecting cirq-core<1.7.0,>=1.6.0 (from mitiq)
  Downloading cirq_core-1.6.1-py3-none-any.whl.metadata (4.8 kB)
Collecting duet>=0.2.8 (from cirq-core<1.7.0,>=1.6.0->mitiq)
  Downloading duet-0.2.9-py3-none-any.whl.metadata (2.3 kB)
Downloading mitiq-0.48.1-py3-none-any.whl (336 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m336.8/336.8 kB[0m [31m15.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading cirq_core-1.6.1-py3-none-any.whl (2.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/2.0 MB[0m [31m115.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading duet-0.2.9-py3-none-any.whl (29 kB)
Installing collected packages: duet, cirq-core, mitiq
Successfully installed cirq-core-1.6.1 duet-0.2.9 mitiq-0.48.1
Installation complete.


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
import random
import time
import warnings
warnings.filterwarnings('ignore')

from qiskit import QuantumCircuit, transpile
from qiskit.quantum_info import SparsePauliOp
from qiskit.circuit.library import PauliEvolutionGate
from qiskit.synthesis import LieTrotter
from qiskit.transpiler import CouplingMap
from qiskit.exceptions import QiskitError

from qiskit_addon_utils.problem_generators import generate_xyz_hamiltonian

# Qiskit Runtime & Aer
from qiskit_aer import AerSimulator
from qiskit_aer.noise import NoiseModel, depolarizing_error
from qiskit_ibm_runtime.debug_tools import Neat

# Mitiq
import mitiq
from mitiq.zne.scaling import fold_gates_at_random
from mitiq.zne.inference import RichardsonFactory

# PyTorch & RL
import torch
import torch.nn as nn
import torch.nn.functional as F
import gymnasium as gym
from gymnasium import spaces

print("All imports successful.")

All imports successful.


In [None]:
NOISE_LEVEL = 0.02
SHOTS = 4096

# Define Noise Model on HARDWARE Basis Gates
nm = NoiseModel()
err1 = depolarizing_error(NOISE_LEVEL, 1)
err2 = depolarizing_error(NOISE_LEVEL * 2, 2)

nm.add_all_qubit_quantum_error(err1, ['sx', 'x', 'rz', 'id'])
nm.add_all_qubit_quantum_error(err2, ['cx'])

# Backends
backend_noisy = AerSimulator(noise_model=nm)
backend_ideal = AerSimulator()

# Executors
def executor_ideal(circuit, observable):
    from qiskit_aer.primitives import Estimator
    est = Estimator()
    job = est.run(circuits=[circuit], observables=[observable], shots=SHOTS)
    return job.result().values[0]

def executor_noisy(circuit, observable):
    from qiskit_aer.primitives import Estimator
    est = Estimator(backend_options={"noise_model": nm})
    job = est.run(circuits=[circuit], observables=[observable], shots=SHOTS)
    return job.result().values[0]

# Extrapolation Functions
def linear_extrapolate(scale_factors, exp_vals):
    if len(scale_factors) < 2: return exp_vals[0]
    z = np.polyfit(scale_factors, exp_vals, 1)
    return z[1]

def polynomial_extrapolate(scale_factors, exp_vals, degree=2):
    if len(scale_factors) < degree + 1: return exp_vals[0]
    z = np.polyfit(scale_factors, exp_vals, degree)
    return z[degree]

def exponential_extrapolate(scale_factors, exp_vals):
    try:
        valid = [(x, y) for x, y in zip(scale_factors, exp_vals) if y > 1e-5]
        if len(valid) < 2: return exp_vals[0]
        x, y = zip(*valid)
        coeffs = np.polyfit(x, np.log(y), 1)
        return np.exp(coeffs[1])
    except:
        return exp_vals[0]

def richardson_extrapolate(scale_factors, exp_vals):
    try:
        return RichardsonFactory(scale_factors).extrapolate(scale_factors, exp_vals)
    except:
        return exp_vals[0]

print("Backends and executors ready.")

Backends and executors ready.


In [None]:
class HamiltonianGenerator:
    def __init__(self, seed=42):
        self.rng = np.random.RandomState(seed)

    def generate_base_circuit(self, num_qubits=4, trotter_steps=2):
        layout = [(i, i+1) for i in range(num_qubits-1)]
        cmap = CouplingMap(layout)

        ham = generate_xyz_hamiltonian(
            cmap,
            coupling_constants=(
                self.rng.uniform(0.1, 0.5), self.rng.uniform(0.1, 0.5), self.rng.uniform(0.1, 0.5)
            ),
            ext_magnetic_field=(
                self.rng.uniform(0.05, 0.2), self.rng.uniform(0.05, 0.2), self.rng.uniform(0.05, 0.2)
            )
        )

        evo_gate = PauliEvolutionGate(
            ham,
            time=self.rng.uniform(0.2, 1.0),
            synthesis=LieTrotter(reps=trotter_steps)
        )

        circuit = QuantumCircuit(num_qubits)
        circuit.append(evo_gate, range(num_qubits))
        return circuit.decompose().decompose()

gen = HamiltonianGenerator(seed=42)
print("Generator ready.")

Generator ready.


In [None]:
class ZNEEnvironment(gym.Env):
    def __init__(self, generator, backend):
        super().__init__()
        self.generator = generator

        # Initialize NEAT
        self.neat = Neat(backend)
        self.target_basis = ['cx', 'sx', 'rz', 'x', 'id']

        # ... (options initialization remains the same) ...
        self.noisefactor_options = [
            [1.0, 1.1, 1.2], [1.0, 1.2, 1.3], [1.0, 1.3, 1.5], [1.0, 1.4, 1.6],
            [1.0, 1.5, 1.8], [1.0, 1.7, 1.9], [1.0, 2.0, 2.5], [1.0, 2.2, 2.7],
            [1.0, 2.5, 2.9], [1, 2, 3], [1, 2, 4], [1, 3, 5],
            [1.0, 3.0, 3.5], [1.0, 3.2, 3.8], [1.0, 3.5, 3.9], [1.0, 4.0, 5.0],
            [1.0, 4.5, 5.0], [1, 3, 5, 7], [1, 5, 7], [1.0, 6.0, 7.0],
            [1, 1.5, 2.5, 4], [1, 2, 3, 5], [1, 3, 5, 7], [1, 1.2, 2.5, 5],
            [1, 1.5, 3, 7], [1.0, 1.8, 2.5], [1.0, 2.0, 3.5], [1.0, 3.0, 5.0],
            [1.0, 2.5, 4.0], [1.0, 3.5, 6.0]
        ]
        self.extrapolators = ['linear', 'polynomial', 'exponential', 'richardson']

        n_actions = len(self.noisefactor_options) * len(self.extrapolators)
        self.action_space = spaces.Discrete(n_actions)
        self.observation_space = spaces.Box(low=0, high=np.inf, shape=(6,), dtype=np.float32)

        self.num_qubits = 4
        self.trotter_steps = 2

    def reset(self, seed=None, options=None):
        if seed is not None:
            self.generator.rng = np.random.RandomState(seed)

        nq = self.num_qubits

        # 1. Generate & Transpile Circuit (H-gate fix)
        raw_circuit = self.generator.generate_base_circuit(nq, self.trotter_steps)
        self.base_u = transpile(raw_circuit, basis_gates=self.target_basis, optimization_level=1)

        # 2. Define Observable
        raw_obs = SparsePauliOp(['I'*i + 'Z' + 'I'*(nq-1-i) for i in range(nq)], coeffs=[1.0/nq]*nq)

        # 3. Cliffordization using NEAT
        pubs = [(self.base_u, [raw_obs])] # List wrap fix
        clifford_pubs = self.neat.to_clifford(pubs)

        self.circuit = clifford_pubs[0].circuit

        # --- FINAL FIX: TYPE CASTING & RECONSTRUCTION ---
        neat_obs_container = clifford_pubs[0].observables[0]

        if isinstance(neat_obs_container, dict):
            # the structure is {'PauliString': coefficient}
            # must explicitly cast coefficients to complex to satisfy Qiskit's low-level constructor.

            try:
                # 1. Extract and cast to list of (Pauli, complex coeff) tuples
                pauli_list_tuples = [
                    (pauli_str, complex(coeff))
                    for pauli_str, coeff in neat_obs_container.items()
                ]
                # 2. Use the stable from_list constructor
                self.observable = SparsePauliOp.from_list(pauli_list_tuples)
            except Exception as e:
                # If extraction fails (e.g., nested keys are back), raise an informative QiskitError.
                raise QiskitError(f"Failed to reconstruct Observable due to incompatible data format or casting error. Keys: {list(neat_obs_container.keys())}. Underlying Error: {e}")

        else:
            # Fallback: Object is already a SparsePauliOp instance
            self.observable = SparsePauliOp.from_list(neat_obs_container.to_list())

        # 4. Features & Ideal Value
        ops = self.circuit.count_ops()
        self.features = {
            'qubits': nq,
            'cx_gates': ops.get('cx', 0) + ops.get('cz', 0),
            'single_gates': sum(ops.get(g, 0) for g in ['sx', 'x', 'rz', 'id']),
            'depth': self.circuit.depth(),
            'total_gates': sum(ops.values()),
            'ham_terms': 3*nq + 3*(nq-1)
        }

        self.ideal = executor_ideal(self.circuit, self.observable)

        state = np.array(list(self.features.values()), dtype=np.float32)
        return state, {}

    def step(self, action):
        template_idx = action // len(self.extrapolators)
        extrap_idx = action % len(self.extrapolators)
        scales = self.noisefactor_options[template_idx]
        extrap_name = self.extrapolators[extrap_idx]

        # Fold the Clifford circuit
        folded_circs = [fold_gates_at_random(self.circuit, s) for s in scales]
        noisy_vals = [executor_noisy(fc, self.observable) for fc in folded_circs]

        # Extrapolate
        if extrap_name == 'linear': mitigated = linear_extrapolate(scales, noisy_vals)
        elif extrap_name == 'polynomial': mitigated = polynomial_extrapolate(scales, noisy_vals)
        elif extrap_name == 'exponential': mitigated = exponential_extrapolate(scales, noisy_vals)
        else: mitigated = richardson_extrapolate(scales, noisy_vals)

        error = abs(mitigated - self.ideal)
        reward = -error

        state = np.array(list(self.features.values()), dtype=np.float32)
        return state, reward, True, False, {'error': error}

env = ZNEEnvironment(gen, backend_noisy)
print(f"Env initialized. Actions: {env.action_space.n}")

Env initialized. Actions: 120


In [None]:
class DuelingDQN(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=256):
        super().__init__()
        self.feature = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU()
        )
        self.value = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU(),
            nn.Linear(hidden_dim // 2, 1)
        )
        self.advantage = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU(),
            nn.Linear(hidden_dim // 2, action_dim)
        )

    def forward(self, x):
        features = self.feature(x)
        value = self.value(features)
        advantage = self.advantage(features)
        return value + (advantage - advantage.mean(dim=1, keepdim=True))

class PrioritizedReplayBuffer:
    def __init__(self, capacity, alpha=0.6, beta=0.4, beta_increment=0.001):
        self.capacity = capacity
        self.alpha = alpha
        self.beta = beta
        self.beta_increment = beta_increment
        self.buffer = []
        self.priorities = np.zeros(capacity, dtype=np.float32)
        self.position = 0

    def push(self, transition, td_error):
        priority = (abs(td_error) + 1e-5) ** self.alpha
        if len(self.buffer) < self.capacity:
            self.buffer.append(transition)
        else:
            self.buffer[self.position] = transition
        self.priorities[self.position] = priority
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        buffer_size = len(self.buffer)
        priorities = self.priorities[:buffer_size]
        probs = priorities / priorities.sum()
        indices = np.random.choice(buffer_size, batch_size, p=probs)
        weights = (buffer_size * probs[indices]) ** (-self.beta)
        weights /= weights.max()
        self.beta = min(1.0, self.beta + self.beta_increment)
        batch = [self.buffer[i] for i in indices]
        return batch, indices, torch.FloatTensor(weights)

    def update_priorities(self, indices, td_errors):
        for idx, td_error in zip(indices, td_errors):
            self.priorities[idx] = (abs(td_error) + 1e-5) ** self.alpha

    def __len__(self):
        return len(self.buffer)

class DoubleDQNAgent:
    def __init__(self, state_dim, action_dim, lr=1e-4, gamma=0.99, tau=0.005, buffer_size=50000):
        self.action_dim = action_dim
        self.gamma = gamma
        self.tau = tau
        self.q_network = DuelingDQN(state_dim, action_dim)
        self.target_network = DuelingDQN(state_dim, action_dim)
        self.target_network.load_state_dict(self.q_network.state_dict())
        self.optimizer = torch.optim.Adam(self.q_network.parameters(), lr=lr)
        self.memory = PrioritizedReplayBuffer(buffer_size)
        self.epsilon = 1.0
        self.epsilon_min = 0.05
        self.epsilon_decay = 0.9995

    def select_action(self, state, training=True):
        if training and random.random() < self.epsilon:
            return random.randrange(self.action_dim)
        with torch.no_grad():
            return self.q_network(torch.FloatTensor(state).unsqueeze(0)).argmax().item()

    def store_transition(self, state, action, reward, next_state, done):
        with torch.no_grad():
            current_q = self.q_network(torch.FloatTensor(state).unsqueeze(0))[0, action]
            next_q = self.target_network(torch.FloatTensor(next_state).unsqueeze(0)).max().item()
            td_error = reward + self.gamma * next_q * (1 - done) - current_q.item()
        self.memory.push((state, action, reward, next_state, done), td_error)

    def update(self, batch_size=64):
        if len(self.memory) < batch_size: return 0.0
        batch, indices, weights = self.memory.sample(batch_size)
        states = torch.FloatTensor([t[0] for t in batch])
        actions = torch.LongTensor([t[1] for t in batch])
        rewards = torch.FloatTensor([t[2] for t in batch])
        next_states = torch.FloatTensor([t[3] for t in batch])
        dones = torch.FloatTensor([t[4] for t in batch])

        current_q = self.q_network(states).gather(1, actions.unsqueeze(1))
        with torch.no_grad():
            next_actions = self.q_network(next_states).argmax(1)
            next_q = self.target_network(next_states).gather(1, next_actions.unsqueeze(1)).squeeze()
            target_q = rewards + self.gamma * next_q * (1 - dones)

        td_errors = (target_q - current_q.squeeze()).detach().numpy()
        self.memory.update_priorities(indices, td_errors)
        loss = (weights * F.mse_loss(current_q.squeeze(), target_q, reduction='none')).mean()

        self.optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(self.q_network.parameters(), 10.0)
        self.optimizer.step()

        for target_param, param in zip(self.target_network.parameters(), self.q_network.parameters()):
            target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
        return loss.item()

agent = DoubleDQNAgent(state_dim=6, action_dim=env.action_space.n)
print("RL Agent initialized.")

RL Agent initialized.


In [None]:
def train_double_dqn(env, agent, episodes=20000, curriculum_stages=4):
    """
    Training loop for Double DQN with PER and curriculum learning.
    """
    from collections import defaultdict
    import numpy as np
    import time

    train_rewards = []
    train_errors = []
    action_counts = defaultdict(int)

    # Define a complexity schedule based on the original notebook's logic
    episodes_per_stage = episodes // curriculum_stages
    complexity_schedule = [(2, 4), (3, 4), (4, 4), (4, 4)] # (trotter_steps, num_qubits)

    print(f"Training {episodes} episodes across {curriculum_stages} stages...")
    start_time = time.time()

    for episode in range(episodes):
        stage = min(episode // episodes_per_stage, curriculum_stages - 1)
        trotter, nq = complexity_schedule[stage]

        # --- Environment Setup ---
        env.num_qubits = nq
        env.trotter_steps = trotter
        state, _ = env.reset()

        # --- Agent Action ---
        action = agent.select_action(state, training=True)
        action_counts[action] += 1

        # --- Environment Step (Single step episode) ---
        next_state, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated

        # --- Update Agent (FIXED METHOD NAME) ---
        agent.store_transition(state, action, reward, next_state, done)
        loss = agent.update(batch_size=64)

        # --- Logging ---
        train_rewards.append(reward)
        train_errors.append(info['error'])

        if (episode + 1) % 500 == 0:
            print(f"Ep {episode+1:5d} | Stage {stage+1} ({trotter}T,{nq}q) | Reward: {np.mean(train_rewards[-500:]):+.4f} | Error: {np.mean(train_errors[-500:]):.4f} | ε: {agent.epsilon:.3f}")

    print(f"Complete in {(time.time()-start_time)/60:.1f}m")
    print("Action distribution (Top 5):", dict(sorted(action_counts.items(), key=lambda item: item[1], reverse=True)[:5]))
    return train_rewards, train_errors

# Usage (run after agent/env init)
train_rewards, train_errors = train_double_dqn(env, agent, episodes=20000, curriculum_stages=4)

Training 20000 episodes across 4 stages...
Ep   500 | Stage 1 (2T,4q) | Reward: -0.0026 | Error: 0.0026 | ε: 0.804
Ep  1000 | Stage 1 (2T,4q) | Reward: -0.0023 | Error: 0.0023 | ε: 0.626
Ep  1500 | Stage 1 (2T,4q) | Reward: -0.0021 | Error: 0.0021 | ε: 0.487
Ep  2000 | Stage 1 (2T,4q) | Reward: -0.0024 | Error: 0.0024 | ε: 0.380
Ep  2500 | Stage 1 (2T,4q) | Reward: -0.0030 | Error: 0.0030 | ε: 0.296
Ep  3000 | Stage 1 (2T,4q) | Reward: -0.0034 | Error: 0.0034 | ε: 0.230
Ep  3500 | Stage 1 (2T,4q) | Reward: -0.0019 | Error: 0.0019 | ε: 0.179
Ep  4000 | Stage 1 (2T,4q) | Reward: -0.0018 | Error: 0.0018 | ε: 0.140
Ep  4500 | Stage 1 (2T,4q) | Reward: -0.0019 | Error: 0.0019 | ε: 0.109
Ep  5000 | Stage 1 (2T,4q) | Reward: -0.0023 | Error: 0.0023 | ε: 0.085
Ep  5500 | Stage 2 (3T,4q) | Reward: -0.0013 | Error: 0.0013 | ε: 0.066
Ep  6000 | Stage 2 (3T,4q) | Reward: -0.0015 | Error: 0.0015 | ε: 0.051
Ep  6500 | Stage 2 (3T,4q) | Reward: -0.0016 | Error: 0.0016 | ε: 0.050
Ep  7000 | Stage 2 (3

In [None]:
import torch

# Define the filename for the checkpoint
CHECKPOINT_PATH = '/content/zne_rl_agent_checkpoint.pth'

# Save the model state dictionaries and other critical training parameters
torch.save({
    'q_network_state_dict': agent.q_network.state_dict(),
    'target_network_state_dict': agent.target_network.state_dict(),
    'optimizer_state_dict': agent.optimizer.state_dict(),
    'epsilon': agent.epsilon,
}, CHECKPOINT_PATH)

print(f"Model checkpoint successfully saved to {CHECKPOINT_PATH}")

Model checkpoint successfully saved to /content/zne_rl_agent_checkpoint.pth


In [None]:

noisefactor_options = [
    [1.0, 1.1, 1.2], [1.0, 1.2, 1.3], [1.0, 1.3, 1.5], [1.0, 1.4, 1.6],
    [1.0, 1.5, 1.8], [1.0, 1.7, 1.9], [1.0, 2.0, 2.5], [1.0, 2.2, 2.7],
    [1.0, 2.5, 2.9], [1, 2, 3], [1, 2, 4], [1, 3, 5],
    [1.0, 3.0, 3.5], [1.0, 3.2, 3.8], [1.0, 3.5, 3.9], [1.0, 4.0, 5.0],
    [1.0, 4.5, 5.0], [1, 3, 5, 7], [1, 5, 7], [1.0, 6.0, 7.0],
    [1, 1.5, 2.5, 4], [1, 2, 3, 5], [1, 3, 5, 7], [1, 1.2, 2.5, 5],
    [1, 1.5, 3, 7], [1.0, 1.8, 2.5], [1.0, 2.0, 3.5], [1.0, 3.0, 5.0],
    [1.0, 2.5, 4.0], [1.0, 3.5, 6.0]
]
extrapolators = ['linear', 'polynomial', 'exponential', 'richardson']
top_actions = {76: 642, 42: 607, 52: 578, 101: 561, 97: 510}

print("## Policy Breakdown (Top 5 Actions)")
print("-----------------------------------")
print(f"{'Rank':<4} | {'Action ID':<10} | {'Extrapolator':<15} | {'Noise Factors':<25} | {'Count':<5}")
print("---------------------------------------------------------------------")

for rank, (action_id, count) in enumerate(top_actions.items(), 1):
    action_id = int(action_id)

    # Calculate Indices
    extrap_index = action_id % len(extrapolators)
    template_index = action_id // len(extrapolators)

    # Retrieve Options
    extrapolator_name = extrapolators[extrap_index]
    factor_template = noisefactor_options[template_index]

    print(f"{rank:<4} | {action_id:<10} | {extrapolator_name:<15} | {str(factor_template):<25} | {count:<5}")

print("-----------------------------------")
print("Analysis: The agent favors high-density linear/polynomial extrapolation.")

## Policy Breakdown (Top 5 Actions)
-----------------------------------
Rank | Action ID  | Extrapolator    | Noise Factors             | Count
---------------------------------------------------------------------
1    | 76         | linear          | [1.0, 6.0, 7.0]           | 642  
2    | 42         | exponential     | [1, 2, 4]                 | 607  
3    | 52         | linear          | [1.0, 3.2, 3.8]           | 578  
4    | 101        | polynomial      | [1.0, 1.8, 2.5]           | 561  
5    | 97         | polynomial      | [1, 1.5, 3, 7]            | 510  
-----------------------------------
Analysis: The agent favors high-density linear/polynomial extrapolation.


In [None]:
from google.colab import files
import os

CHECKPOINT_FILENAME = 'zne_rl_agent_checkpoint.pth'

# Check if the file already exists (e.g., if re-running the cell)
if not os.path.exists(CHECKPOINT_FILENAME):
    print(f"Please upload your saved model file: {CHECKPOINT_FILENAME}")
    uploaded = files.upload()

    if CHECKPOINT_FILENAME not in uploaded:
        print("Upload failed or incorrect file name. Please try again.")
    else:
        print(f"File '{CHECKPOINT_FILENAME}' uploaded successfully.")
else:
    print(f"Checkpoint file '{CHECKPOINT_FILENAME}' already exists locally.")

Please upload your saved model file: zne_rl_agent_checkpoint.pth


Saving zne_rl_agent_checkpoint.pth to zne_rl_agent_checkpoint.pth
File 'zne_rl_agent_checkpoint.pth' uploaded successfully.


In [None]:
import torch
import os

CHECKPOINT_PATH = '/content/zne_rl_agent_checkpoint.pth'

if os.path.exists(CHECKPOINT_PATH):
    # Load the entire checkpoint dictionary
    checkpoint = torch.load(CHECKPOINT_PATH)

    # Load weights into the Q-network and Target Network
    agent.q_network.load_state_dict(checkpoint['q_network_state_dict'])
    agent.target_network.load_state_dict(checkpoint['target_network_state_dict'])

    # load the optimizer state, useful if resuming training
    if 'optimizer_state_dict' in checkpoint:
        agent.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

    # Set epsilon to its last saved value, then immediately to 0 for testing
    agent.epsilon = checkpoint.get('epsilon', 0.05)

    # Ensure testing is done greedily
    agent.epsilon = 0.0

    print("Model weights loaded successfully.")
    print("Agent is set to greedy testing mode (epsilon=0.0).")

else:
    print(f" Error: Checkpoint file not found at {CHECKPOINT_PATH}. Please ensure you ran the upload block above.")

Model weights loaded successfully.
Agent is set to greedy testing mode (epsilon=0.0).


In [None]:
def test_agent_final(env, agent, episodes=4000, curriculum_stages=4):

    from collections import defaultdict
    import time
    import numpy as np

    agent.epsilon = 0.0 # Set agent to greedy mode for testing

    # --- Lists to Store All Errors and Actions ---
    agent_errors = []
    agent_actions = []
    baseline_standard = []   # [1, 2, 3] Richardson
    baseline_aggressive = [] # [1, 3, 5, 7] Richardson
    baseline_fine = []       # [1.0, 1.1, 1.2] Linear
    action_counts = defaultdict(int)

    # Calculate curriculum details
    episodes_per_stage = episodes // curriculum_stages
    complexity_schedule = [(2, 4), (3, 4), (4, 4), (4, 4)]

    print("\n" + "="*70)
    print(f"TESTING TRAINED AGENT ON {episodes} UNSEEN CIRCUITS")
    print("======================================================================")
    start_time = time.time()

    # Baselines Indices (Hardcoded from environment options)
    ACT_STD = 9 * 4 + 3    # [1, 2, 3] Richardson
    ACT_AGG = 17 * 4 + 3   # [1, 3, 5, 7] Richardson
    ACT_FINE = 0           # [1.0, 1.1, 1.2] Linear

    for stage in range(curriculum_stages):
        trotter, nq = complexity_schedule[stage]

        # --- ENFORCE CURRICULUM FOR TESTING ---
        env.num_qubits = nq
        env.trotter_steps = trotter

        print(f"Testing Stage {stage+1}/{curriculum_stages}: {episodes_per_stage} episodes at {trotter}T, {nq}q...")

        for ep in range(episodes_per_stage):
            # Generate new circuit based on current stage complexity (NEAT runs inside reset)
            state, _ = env.reset()

            # 1. Agent's Action (Greedy)
            agent_action = agent.select_action(state, training=False)
            agent_actions.append(agent_action)
            action_counts[agent_action] += 1
            _, _, _, _, info_ag = env.step(agent_action)
            agent_errors.append(info_ag['error'])

            # 2. Baseline 1: Standard Richardson
            _, _, _, _, info_std = env.step(ACT_STD)
            baseline_standard.append(info_std['error'])

            # 3. Baseline 2: Aggressive Richardson
            _, _, _, _, info_agg = env.step(ACT_AGG)
            baseline_aggressive.append(info_agg['error'])

            # 4. Baseline 3: Fine Linear
            _, _, _, _, info_fine = env.step(ACT_FINE)
            baseline_fine.append(info_fine['error'])

            if (ep + 1) % 500 == 0:
                print(f"  Progress: {ep+1} / {episodes_per_stage} tested.")


    test_time = (time.time() - start_time) / 60

    # --- Final Statistics Calculation ---
    agent_mean = np.mean(agent_errors)
    agent_std = np.std(agent_errors)
    agent_median = np.median(agent_errors)

    std_mean = np.mean(baseline_standard)
    agg_mean = np.mean(baseline_aggressive)
    fine_mean = np.mean(baseline_fine)

    vs_standard = (std_mean - agent_mean) / std_mean * 100
    vs_aggressive = (agg_mean - agent_mean) / agg_mean * 100
    vs_fine = (fine_mean - agent_mean) / fine_mean * 100

    # --- Print Verbose Output ---
    print("\n" + "="*70)
    print(f"FINAL TEST RESULTS (Total {episodes} Circuits)")
    print("="*70)
    print(f"Test Duration: {test_time:.1f} minutes")
    print(f"\nMean Error (± std | median):")
    print(f"  Trained Agent:           {agent_mean:.4f} ± {agent_std:.4f} | {agent_median:.4f}")
    print(f"  Baseline [1,2,3]+Rich:   {std_mean:.4f}")
    print(f"  Baseline [1,3,5,7]+Rich: {agg_mean:.4f}")
    print(f"  Baseline [1.0,1.1,1.2]+Lin: {fine_mean:.4f}")

    print(f"\nAgent Improvement:")
    print(f"  vs [1,2,3]+Rich:   {vs_standard:+.2f}%")
    print(f"  vs [1,3,5,7]+Rich: {vs_aggressive:+.2f}%")
    print(f"  vs [1.0,1.1,1.2]+Lin: {vs_fine:+.2f}%")

    # Top actions
    action_usage = np.zeros(agent.action_dim)
    for a in agent_actions:
        action_usage[a] += 1

    top_5 = np.argsort(action_usage)[-5:][::-1]

    print(f"\nTop 5 Actions Used by Agent:")
    for i, act_idx in enumerate(top_5, 1):
        template_idx = act_idx // 4
        extrap_idx = act_idx % 4
        # Note: 'env.noisefactor_options' and 'env.extrapolators' must be accessible in scope
        factors = env.noisefactor_options[template_idx]
        extrap = env.extrapolators[extrap_idx]
        count = int(action_usage[act_idx])
        pct = count / episodes * 100
        print(f"  {i}. Action {act_idx}: {factors} + {extrap} ({count} uses, {pct:.2f}%)")

    print("======================================================================")

    return {
        'agent_errors': agent_errors,
        'agent_actions': agent_actions,
        'baseline_standard': baseline_standard,
        'baseline_aggressive': baseline_aggressive,
        'baseline_fine': baseline_fine,
        'agent_mean': agent_mean,
        'std_mean': std_mean,
        'agg_mean': agg_mean,
        'fine_mean': fine_mean
    }

# Execute the testing function
test_results = test_agent_final(env, agent, episodes=4000, curriculum_stages=4)


TESTING TRAINED AGENT ON 4000 UNSEEN CIRCUITS
Testing Stage 1/4: 1000 episodes at 2T, 4q...
  Progress: 500 / 1000 tested.
  Progress: 1000 / 1000 tested.
Testing Stage 2/4: 1000 episodes at 3T, 4q...
  Progress: 500 / 1000 tested.
  Progress: 1000 / 1000 tested.
Testing Stage 3/4: 1000 episodes at 4T, 4q...
  Progress: 500 / 1000 tested.
  Progress: 1000 / 1000 tested.
Testing Stage 4/4: 1000 episodes at 4T, 4q...
  Progress: 500 / 1000 tested.
  Progress: 1000 / 1000 tested.

FINAL TEST RESULTS (Total 4000 Circuits)
Test Duration: 165.0 minutes

Mean Error (± std | median):
  Trained Agent:           0.0014 ± 0.0054 | 0.0000
  Baseline [1,2,3]+Rich:   0.0014
  Baseline [1,3,5,7]+Rich: 0.0014
  Baseline [1.0,1.1,1.2]+Lin: 0.0015

Agent Improvement:
  vs [1,2,3]+Rich:   +0.88%
  vs [1,3,5,7]+Rich: +2.20%
  vs [1.0,1.1,1.2]+Lin: +5.52%

Top 5 Actions Used by Agent:
  1. Action 49: [1.0, 3.0, 3.5] + polynomial (4000 uses, 100.00%)
  2. Action 119: [1.0, 3.5, 6.0] + richardson (0 uses, 0