# Context aware pulse gate calibration with Reinforcement Learning

This notebook takes advantage of pulse level simulation through Qiskit Dynamics module to perform context aware pulse gate calibration. The main difference with the other notebook that works for both ```DynamicsBackend``` and real backends is that we take advantage of JAX to increase significantly the simulation speed, which turns out to be a limiting factor when dealing with the way ```DynamicsBackend``` is originally designed.

In [2]:
# Qiskit imports
from qiskit import pulse, transpile
from qiskit.circuit import (
    ParameterVector,
    QuantumCircuit,
    QuantumRegister,
    Gate,
)
from qiskit.providers import Backend
from rl_qoc import QuantumEnvironment, ContextAwareQuantumEnvironment, QEnvConfig, GateTarget
from rl_qoc.config import (
    BackendConfig,
    ActionSpace,
    ExecutionConfig,
    BenchmarkConfig,
)
from torch.distributions import Normal

import numpy as np
import tqdm
from typing import Optional
from IPython.display import clear_output
import jax

jax.config.update("jax_enable_x64", True)
# tell JAX we are using CPU
jax.config.update("jax_platform_name", "cpu")
# import Array and set default backend
from qiskit_dynamics.array import Array

Array.set_default_backend("jax")

In [3]:
from qiskit.providers import BackendV1, BackendV2
from qiskit_experiments.calibration_management import Calibrations
from qiskit_experiments.calibration_management.basis_gate_library import (
    FixedFrequencyTransmon,
    EchoedCrossResonance,
)
from rl_qoc.helpers import get_ecr_params


def custom_schedule(
    backend: BackendV1 | BackendV2,
    physical_qubits: list,
    params: ParameterVector,
    keep_symmetry: bool = True,
):
    """
    Define parametrization of the pulse schedule characterizing the target gate.
    This function can be customized at will, however one shall recall to make sure that number of actions match the
    number of pulse parameters used within the function (throught the params argument).
        :param backend: IBM Backend on which schedule shall be added
        :param physical_qubits: Physical qubits on which custom gate is applied on
        :param params: Parameters of the Schedule/Custom gate
        :param keep_symmetry: Choose if the two parts of the ECR tone shall be jointly parametrized or not

        :return: Parametrized Schedule
    """
    # Load here all pulse parameters names that should be tuned during model-free calibration.
    # Here we focus on real time tunable pulse parameters (amp, angle, duration)
    pulse_features = ["amp", "angle", "tgt_amp", "tgt_angle"]

    # Uncomment line below to include pulse duration as tunable parameter
    # pulse_features.append("duration")
    duration_window = 0

    global action_space

    new_params, _, _ = get_ecr_params(backend, physical_qubits)

    qubits = tuple(physical_qubits)

    if keep_symmetry:  # Maintain symmetry between the two GaussianSquare pulses
        for sched in ["cr45p", "cr45m"]:
            for i, feature in enumerate(pulse_features):
                if feature != "duration":
                    new_params[(feature, qubits, sched)] += params[i]
                else:
                    new_params[(feature, qubits, sched)] += pulse.builder.seconds_to_samples(
                        duration_window * params[i]
                    )
    else:
        num_features = len(pulse_features)
        for i, sched in enumerate(["cr45p", "cr45m"]):
            for j, feature in enumerate(pulse_features):
                if feature != "duration":
                    new_params[(feature, qubits, sched)] += params[i * num_features + j]
                else:
                    new_params[(feature, qubits, sched)] += pulse.builder.seconds_to_samples(
                        duration_window * params[i * num_features + j]
                    )

    cals = Calibrations.from_backend(
        backend,
        [
            FixedFrequencyTransmon(["x", "sx"]),
            EchoedCrossResonance(["cr45p", "cr45m", "ecr"]),
        ],
        add_parameter_defaults=True,
    )

    # Retrieve schedule (for now, works only with ECRGate(), as no library yet available for CX)
    parametrized_schedule = cals.get_schedule("ecr", qubits, assign_params=new_params)
    return parametrized_schedule

In [4]:
# Pulse gate ansatz


def apply_parametrized_circuit(
    qc: QuantumCircuit,
    params: Optional[ParameterVector] = None,
    tgt_register: Optional[QuantumRegister] = None,
):
    """
    Define ansatz circuit to be played on Quantum Computer. Should be parametrized with Qiskit ParameterVector
    This function is used to run the QuantumCircuit instance on a Runtime backend
    :param qc: Quantum Circuit instance to add the gate on
    :param params: Parameters of the custom Gate
    :param tgt_register: Quantum Register formed of target qubits
    :return:
    """
    # qc.num_qubits
    global n_actions, backend, target

    gate, physical_qubits = target.gate, target.physical_qubits
    # x_pulse = backend.defaults().instruction_schedule_map.get('x', (qubit_tgt_register,)).instructions[0][1].pulse
    if params is None:
        params = ParameterVector("theta", action_space.n_actions)
    if tgt_register is None:
        tgt_register = qc.qregs[0]

    # Choose below which target gate you'd like to calibrate
    parametrized_gate = Gate("custom_ecr", 2, params=params.params)
    # parametrized_gate = gate.copy()
    # parametrized_gate.params = params.params
    parametrized_schedule = custom_schedule(
        backend=backend, physical_qubits=physical_qubits, params=params
    )
    qc.add_calibration(parametrized_gate, physical_qubits, parametrized_schedule)
    qc.append(parametrized_gate, tgt_register)

In [5]:
physical_qubits = [0, 1]
sampling_Paulis = 50
N_shots = 200
n_actions = 4  # Cf number of parameters in custom_schedule function above
action_space = ActionSpace(n_actions=n_actions)
abstraction_level = "pulse"

In [6]:
# Example of target gate
from qiskit.circuit.library.standard_gates import ECRGate

target = GateTarget(gate=ECRGate(), physical_qubits=physical_qubits)

In [7]:
# target = {"gate": XGate(), "register": [0]}

## Setting up Quantum Backend

### Simulation backend initialization: Qiskit Dynamics
If you want to run the algorithm over a simulation, one can use Qiskit Dynamics for pulse level simulation of quantum circuits. Below we set the ground for declaring a ```DynamicsBackend```.

This can be done in two ways: 

1. Declare a ```DynamicsBackend``` from a ```FakeBackend``` or ```IBMBackend``` instance and use the ```from_backend()``` method to retrieve the Hamiltonian description of such backend.
2. Alternatively, you can define your own custom Hamiltonian/Linblad that should be used to simulate the multiqubit system of interest, and feed it to a ```Solver``` instance which can be used to declare the ```DynamicsBackend```.
For more information you can check Qiskit Dynamics documentation (https://qiskit.org/documentation/dynamics/apidocs/backend.html)


In [8]:
from qiskit_ibm_runtime.fake_provider import FakeJakarta, FakeJakartaV2
from rl_qoc.helpers import get_control_channel_map
from qiskit_dynamics import DynamicsBackend

fake_backend = FakeJakarta()
fake_backend_v2 = FakeJakartaV2()
control_channel_map = get_control_channel_map(fake_backend, physical_qubits)
dt = fake_backend_v2.target.dt
print("Coupling Map: ", list(fake_backend_v2.coupling_map.get_edges()))

In [9]:
solver_options = {"method": "jax_odeint", "atol": 1e-3, "rtol": 1e-5, "hmax": dt}
dynamics_options = {
    "seed_simulator": None,  # "configuration": fake_backend.configuration(),
    "control_channel_map": control_channel_map,
    "solver_options": solver_options,
}
qubit_properties = fake_backend_v2.qubit_properties(physical_qubits)

In [10]:
from qiskit.quantum_info import Operator
from qiskit_dynamics import Solver

dt, v0, r0 = 1 / 4.5e9, 4.86e9, 0.22e9

X = Operator.from_label("X")
Y = Operator.from_label("Y")
Z = Operator.from_label("Z")
s_p = 0.5 * (X + 1j * Y)

solver_1q = Solver(
    static_hamiltonian=0.5 * 2 * np.pi * v0 * Z,
    hamiltonian_operators=[2 * np.pi * r0 * X],
    rotating_frame=0.5 * 2 * np.pi * v0 * Z,
    hamiltonian_channels=["d0"],
    channel_carrier_freqs={"d0": v0},
    dt=dt,
)

In [11]:
dim = 3

v0, v1 = 4.86e9, 4.97e9
anharm0, anharm1 = -0.32e9, -0.32e9
r0, r1 = 0.22e9, 0.26e9

J = 0.002e9

a = np.diag(np.sqrt(np.arange(1, dim)), 1)
adag = np.diag(np.sqrt(np.arange(1, dim)), -1)
N = np.diag(np.arange(dim))

ident = np.eye(dim, dtype=complex)
full_ident = np.eye(dim**2, dtype=complex)

N0, N1 = np.kron(ident, N), np.kron(N, ident)
a0, a1 = np.kron(ident, a), np.kron(a, ident)
a0dag, a1dag = np.kron(ident, adag), np.kron(adag, ident)

static_ham0 = 2 * np.pi * v0 * N0 + np.pi * anharm0 * N0 * (N0 - full_ident)
static_ham1 = 2 * np.pi * v1 * N1 + np.pi * anharm1 * N1 * (N1 - full_ident)

static_ham_full = static_ham0 + static_ham1 + 2 * np.pi * J * ((a0 + a0dag) @ (a1 + a1dag))

drive_op0 = 2 * np.pi * r0 * (a0 + a0dag)
drive_op1 = 2 * np.pi * r1 * (a1 + a1dag)

# build solver
dt = 1 / 4.5e9
solver_2q = Solver(
    static_hamiltonian=static_ham_full,
    hamiltonian_operators=[
        drive_op0,
        drive_op1,
        drive_op0,
        drive_op1,
        drive_op1,
        drive_op0,
    ],
    rotating_frame=static_ham_full,
    hamiltonian_channels=["d0", "d1", "u0", "u1", "u2", "u3"],
    channel_carrier_freqs={"d0": v0, "d1": v1, "u0": v1, "u1": v0, "u2": v0, "u3": v1},
    dt=dt,
    evaluation_mode="dense",
)
# Consistent solver option to use throughout notebook

In [12]:
solver_options = {"method": "jax_odeint", "atol": 1e-3, "rtol": 1e-5, "hmax": dt}
solver = solver_2q
custom_backend = DynamicsBackend(
    solver=solver,
    # target = fake_backend_v2.target,
    subsystem_dims=[dim, dim],  # for computing measurement data
    solver_options=solver_options,  # to be used every time run is called
)
backend_target = custom_backend.target
channel_freq = None
calibration_files = None
do_calibrations = True
# qubit properties

In [13]:
print(custom_backend.target)

## Select backend

In [14]:
# Choose backend among the set defined above
backend = custom_backend

print("Selected Backend: ", backend)
if isinstance(backend, DynamicsBackend):
    print("Subsystem dims: ", backend.options.subsystem_dims)
print("Backend options", backend.options)

In [15]:
backend.options.subsystem_dims

In [16]:
# Wrap all info in one QiskitConfig
backend_config = BackendConfig(parametrized_circuit=apply_parametrized_circuit, backend=backend, use_torch=True)
backend_config

## Declare QuantumEnvironment object
Running the box below declares the QuantumEnvironment instance.

If selected backend is a ```DynamicsBackend```, this declaration launches a series of single qubit gate calibrations (to calibrate X and SX gate). The reason for this is that the Estimator primitive, which enables the computation of Pauli expectation values, requires calibrated single qubit gates for doing Pauli basis rotations (SX and RZ, to perform Hadamard and S gates).

In [17]:

%%time
execution_config = ExecutionConfig(n_shots=N_shots, sampling_paulis=sampling_Paulis)
q_env_config = QEnvConfig(target=target, backend_config=backend_config,
                          action_space=action_space, execution_config=execution_config)
q_env = QuantumEnvironment(q_env_config)

In [18]:
print(q_env)

In [19]:
q_env.backend

In [20]:
q_env.estimator

In [21]:
# plot_gate_map(backend)

In [22]:
from qiskit.quantum_info import Statevector

y0 = Array(Statevector(np.array([1.0, 0.0])))
dt = 2.222e-10
rtol = 1e-5
atol = 1e-3

h_sched = backend.target.get_calibration("h", (0,))
x_sched = backend.target.get_calibration("x", (0,))
meas_sched = backend.target.get_calibration("measure", (0,))
# build a pulse schedule
with pulse.build(default_alignment="sequential") as schedule:
    pulse.call(x_sched)
    # pulse.call(h_sched)
    # pulse.shift_phase(np.pi, pulse.DriveChannel(0))
    # pulse.call(h_sched)

t_linspace = np.linspace(0.0, schedule.duration, 21)


def get_sched(param_values, observables, y0, options):
    x_sched = backend.target.get_calibration("x", (0,))
    h_sched = backend.target.get_calibration("h", (0,))
    meas_sched = backend.target.get_calibration("measure", (0,))
    # build a pulse schedule
    with pulse.build(default_alignment="sequential") as schedule:
        pulse.call(x_sched)
        # pulse.shift_phase(phase, pulse.DriveChannel(0))
        # pulse.call(h_sched)
        # pulse.call(meas_sched)
    results = solver.solve(
        t_span=[t_linspace[0], t_linspace[-1]],
        y0=y0,
        signals=schedule,
        t_eval=t_linspace,
        method="jax_odeint",
        atol=atol,
        rtol=rtol,
    )

    return results.y

In [23]:
from jax import vmap
from qiskit_dynamics.array import wrap

jit = wrap(jax.jit, decorator=True)
qd_vmap = wrap(vmap, decorator=True)

sched_fast = jit(get_sched)

In [24]:
# x_sched = backend.target.get_calibration("x", (0,))
# h_sched = backend.target.get_calibration("h", (0,))
# meas_sched = backend.target.get_calibration("measure", (0,))
# # build a pulse schedule
# with pulse.build(default_alignment="sequential") as schedule:
#     pulse.call(x_sched)
#     #pulse.shift_phase(phase, pulse.DriveChannel(0))
#     #pulse.call(h_sched)
#     #pulse.call(meas_sched)
# results = backend.options.solver.solve(t_span=[t_linspace[0], t_linspace[-1]],
#                                        y0=Statevector.from_label("0"),
#                                        signals=schedule,
#                                        t_eval=t_linspace,
#                                        method='jax_odeint',
#                                        atol=atol,
#                                        rtol=rtol
#                                        )

# Definition of Circuit context

Now that we have established our ```QuantumEnvironment```, we will now focus on the main research point of this paper, which is to calibrate the target gate based on its location within a specific circuit context. As we will use PyTorch to build the interface between our agent and our environment, we will wrap up our original environment within a ```TorchQuantumEnvironment``` object, which will build a suitable environment for dynamical and contextual gate calibration. But first, we define the quantum circuit in which our target operation will appear.

In [25]:
target_circuit = QuantumCircuit(2)
target_circuit.h(0)
target_circuit.cx(0, 1)
target_circuit.x([0, 1])
target_circuit.cx(0, 1)
target_circuit.draw("mpl")

To be able to see where our ECR gate shall appear in the circuit, we have to transpile this logical circuit to the backend. To ease the visualization, we add small functions to see the circuit only on relevant physical qubits.

In [26]:
from rl_qoc.helpers.circuit_utils import remove_unused_wires

transpiled_circ = transpile(
    target_circuit,
    backend,
    initial_layout=physical_qubits,
    basis_gates=["sx", "rz", "ecr", "x"],
    optimization_level=1,
)
remove_unused_wires(transpiled_circ).draw("mpl")

In [27]:
from qiskit import schedule

print(backend.dt)
schedule(transpiled_circ, backend).draw()

In [28]:
def schedule_from_qc(qc: QuantumCircuit, backend: Backend):
    def my_sched():
        sched = schedule(qc, backend)

        return sched

    return my_sched

# Definition of TorchQuantumEnvironment

To define the ```TorchQuantumEnvironment``` wrapper, we follow the Gym like definition, where we provide the structure of the observation and action spaces.

The class takes the following inputs:
- ```q_env: QuantumEnvironment```: the baseline object where the information about the backend and the target gate is
- ```circuit_context: QuantumCircuit```: The circuit in which the previously defined target operation is applied. Note that the class will automatically look for all instances of the gate within the circuit and build dedicated subcircuits (truncations) enabling the successive calibration of each gate instance. To be noted here: the gate calibration focuses only on the target qubits defined in the ```QuantumEnvironment```. In Qiskit, we typically look for the ```CircuitInstruction``` object composed of a ```Gate``` object and a set of target qubits on which the gate is applied (both defined in target).
- ```action_space```/```observation_space```: Spaces defining the range and shapes of possible actions/observations.
For now, the observation space is fixed to a set of two integers:
    - the first one indicates which random input state was selected at the beginning of the episode, so that the network have an extra information on the randomness source coming from the reward.
    - the second one indicates which instance of the gate it will calibrate. In the real-time use case, we would like the agent to generate on the fly random actions that will be applied directly for the next gate within the circuit execution. With Qiskit Runtime however, we are not able to generate those actions on the fly and will therefore load all actions associated to each instance of the gate prior to execution.


Moreover, we apply for now a sequential training loop, meaning that we will force the agent to focus on the calibration of the first gate (truncating the circuit just behind its execution) before starting to calibrate the second one (and so forth). The number of iterations per gate instance must be provided with the number ```training_steps_per_gate```.

Since we want to run a contextual gate calibration, we need to know exactly how the circuit will be transpiled on the backend. There is therefore an internal transpilation (without any optimization) that enables the retrieval of all timings of the logical gates indicated above. We also account for the local context happening on nearest neighbor qubits on the chip.

Moreover, as we run this sequential gate calibration for each instance of the target gate within the circuit, one can check the different circuit truncations the agent will go over.

Important note: The target type of the calibration must be a quantum Gate instance (it will not work if target is a quantum state).

In [29]:
%%time
# Circuit context
from gymnasium.spaces import Box

seed = 10
benchmark_cycle = 100
# tgt_instruction_counts = target_circuit.data.count(CircuitInstruction(target_gate, tgt_qubits))
tgt_instruction_counts = 2  # Number of times target Instruction is applied in Circuit
batchsize = 200  # Batch size (iterate over a bunch of actions per policy to estimate expected return) default 100
n_actions = 4  # Choose how many control parameters in pulse/circuit parametrization
min_bound_actions = -0.1
max_bound_actions = 0.1
scale_factor = 0.1

env = ContextAwareQuantumEnvironment(
    q_env_config=q_env_config,
    circuit_context=transpiled_circ,
    batch_size=batchsize,
    benchmark_cycle=benchmark_cycle,
    intermediate_rewards=False,
)

In [30]:
env.circuit_truncations[0].draw("mpl")

In [31]:
env.circuit_truncations[1].draw("mpl")

In [32]:
env.baseline_truncations[1].draw("mpl")

In [33]:
env.estimator.options

In [34]:
schedule(env.circuit_truncations[0], backend)

# Definition of the Agent

In [35]:
import torch
import torch.nn as nn

from rl_qoc.agent import ActorNetwork, CriticNetwork, PPOAgent

from rl_qoc.helpers import load_from_yaml_file

file_name = "../agent_config.yaml"

agent_config = load_from_yaml_file(file_name)

## Hyperparameters for training

In [36]:
from rl_qoc.agent import TrainFunctionSettings, TotalUpdates, TrainingConfig

total_updates = TotalUpdates(700)
training_config = TrainingConfig(
    training_constraint=total_updates,
    target_fidelities=[0.999, 0.9999],
    lookback_window=10,
    anneal_learning_rate=True,
)

train_function_settings = TrainFunctionSettings(
    plot_real_time=True,
    print_debug=True,
    num_prints=5,
    clear_history=True,
)

# Training
## Storage setup

In [37]:
ppo_agent = PPOAgent(agent_config, env)

## Main loop

In [38]:
ppo_agent.train(training_config, train_function_settings)

In [None]:
import matplotlib.pyplot as plt
plt.plot(ppo_agent.avg_return_history)

In [None]:
plt.plot(ppo_agent.benchmark_results["circuit_fidelity"])