In [6]:
import sys
sys.path.append(r"C:\Users\shrey")

In [7]:
import numpy as np
import matplotlib.pyplot as plt
import json
import pandas as pd

from qiskit import QuantumCircuit, QuantumRegister, ClassicalRegister
from qiskit.converters import circuit_to_dag
from qiskit.transpiler import Target, Layout
from qiskit.circuit.library import QFT
from qiskit.circuit import Delay
from qiskit.circuit.library import XGate, YGate, ZGate
from qiskit.circuit import Gate
from qiskit.circuit.random import random_circuit
from qiskit.quantum_info.operators import SparsePauliOp
from qiskit.transpiler.preset_passmanagers import generate_preset_pass_manager

from analytics.qubit_idling import analyze_qubit_idling, analyze_qubit_activity
from analytics.backend_characterization import extract_backend_metrics

In [8]:
from qiskit_ibm_runtime import QiskitRuntimeService

service = QiskitRuntimeService(name="Reserach_IITJ")
backend = service.backend("ibm_torino")



In [9]:
from qiskit.transpiler import PassManager
from qiskit.transpiler.passes.scheduling import ALAPScheduleAnalysis
from qiskit.transpiler.passes import PadDynamicalDecoupling
from qiskit.transpiler import InstructionDurations
from qiskit.circuit.library import XGate

DD_SEQUENCE = [XGate(), XGate()]

DD_PM = PassManager(
    [
        ALAPScheduleAnalysis(backend.instruction_durations),
        PadDynamicalDecoupling(
            durations=backend.instruction_durations, dd_sequence=DD_SEQUENCE
        ),
    ]
)


def dynamical_decoupling_preprocess(
    input_circuit: QuantumCircuit, dd_pass_manager=DD_PM
) -> QuantumCircuit:
    """Apply dynamical decoupling to the input circuit.

    Args:
        input_circuit (QuantumCircuit): input circuit to run error mitigation on.
    """
    return dd_pass_manager.run(input_circuit)

In [10]:
from qiskit_aer import AerSimulator
simulator = AerSimulator()
backend_sim = AerSimulator.from_backend(backend)

In [11]:
from qiskit_ibm_runtime import EstimatorV2, EstimatorOptions

estimator_op = EstimatorOptions()
estimator_op.resilience_level = 0
estimator_op.dynamical_decoupling.enable = False
estimator_op.twirling.enable_gates = True
estimator_op.twirling.enable_measure = True

estimator_no_dd = EstimatorV2(mode = backend, options=estimator_op)

In [12]:
estimator_op = EstimatorOptions()
estimator_op.resilience_level = 0
estimator_op.dynamical_decoupling.enable = True
estimator_op.twirling.enable_gates = True
estimator_op.twirling.enable_measure = True

estimator_dd = EstimatorV2(mode = backend, options=estimator_op)

In [13]:
idle_estimator = EstimatorV2(mode = simulator)

In [14]:
all_circuits_df = pd.DataFrame(columns=[
    "circuit_id", "qubit",
    "marginal_exp_error_no_dd", "marginal_exp_error_dd", "marginal_improvment",
    "expectation_error_no_dd", "expectation_error_dd", "overall_improvment"
])

# --- Function to store metrics for one circuit ---
def store_circuit_results(qubit_act, qubit_idle, marginal_exp_error_no_dd, marginal_exp_error_dd, expectation_error_no_dd, expectation_error_dd, circuit_id):
    rows = []
    for qubit in qubit_act.get("sparsity_by_layer", {}).keys():
        row = {
            "circuit_id": circuit_id,
            "qubit": qubit,
            "marginal_exp_error_no_dd": marginal_exp_error_no_dd.get(qubit, None),
            "marginal_exp_error_dd": marginal_exp_error_dd.get(qubit, None),
            "marginal_improvment": marginal_exp_error_no_dd.get(qubit, 0) - marginal_exp_error_dd.get(qubit, 0),
            "expectation_error_no_dd": expectation_error_no_dd,
            "expectation_error_dd": expectation_error_dd,
            "overall_improvment": expectation_error_no_dd - expectation_error_dd
        }
        rows.append(row)
    return pd.DataFrame(rows)

In [15]:
def ghz_circuit(num_qubits, measure=True):
    """
    Generate a GHZ state circuit with num_qubits qubits.
    If measure=True, adds measurements to all qubits.
    """
    qc = QuantumCircuit(num_qubits, num_qubits if measure else 0)
    
    # Step 1: Apply H on the first qubit
    qc.h(0)
    
    # Step 2: Apply a chain of CX gates
    for i in range(num_qubits - 1):
        qc.cx(i, i + 1)
    
    qc.barrier()

    # Step 3: Optional measurement
    if measure:
        qc.measure(range(num_qubits), range(num_qubits))
    
    return qc

def echo_circuit(n):
    # Create GHZ state
    ghz = QuantumCircuit(n)
    ghz.h(0)
    for i in range(n - 1):
        ghz.cx(i, i + 1)

    # Loschmidt echo circuit: GHZ → barrier → GHZ† → measurement
    echo = QuantumCircuit(n)
    echo.compose(ghz, inplace=True)
    echo.barrier()
    echo.compose(ghz.inverse(), inplace=True)
    echo.x(range(n)) 
    echo.measure_all()

    return echo

In [16]:
def compute_expectation(num_qubits, quantum_circuit, custom_layout, name):    
    pm = generate_preset_pass_manager(
        optimization_level=3,
        seed_transpiler=42,
        backend=backend,
        initial_layout=Layout(custom_layout),
        layout_method="sabre",
        scheduling_method='alap'
    )
    
    transpiled_circ = pm.run(quantum_circuit)
    transpiled_circ_dd = dynamical_decoupling_preprocess(transpiled_circ)

    backend_prop = extract_backend_metrics(transpiled_circ, backend)
    qubit_act = analyze_qubit_activity(transpiled_circ, backend.target)
    qubit_idle = analyze_qubit_idling(transpiled_circ, backend)

    observables = []
    observables.append(SparsePauliOp('Z'*num_qubits, coeffs=[1.0]))
    for i in range(num_qubits):
        obs = SparsePauliOp('I'*i + 'Z' + 'I'*(num_qubits - i -1), coeffs=[1.0])
        observables.append(obs)
    transpiled_observables = [ob.apply_layout(transpiled_circ.layout) for ob in observables]

    # --- Run Sampler and Estimator (example) ---
    idle_exp = idle_estimator.run([(transpiled_circ, transpiled_observables)]).result()[0].data.evs
    exp_no_dd = estimator_no_dd.run([(transpiled_circ, transpiled_observables)]).result()[0].data.evs
    exp_dd = estimator_dd.run([(transpiled_circ_dd, transpiled_observables)]).result()[0].data.evs

    layout = transpiled_circ.layout.final_index_layout()
    marginal_exp_error_no_dd = {}
    marginal_exp_error_dd = {}

    for i in range(num_qubits):
        marginal_exp_error_no_dd[layout[i]] = np.abs(exp_no_dd[i+1]- idle_exp[i+1])
        marginal_exp_error_dd[layout[i]] = np.abs(exp_dd[i+1]- idle_exp[i+1])
    
    df_first_circuit = store_circuit_results(qubit_act, qubit_idle, marginal_exp_error_no_dd, marginal_exp_error_dd, np.abs(exp_no_dd[0]-idle_exp[0]), np.abs(exp_dd[0]-idle_exp[0]), name)
    return df_first_circuit

In [17]:
coupling_qubits_list = [[0,1,2,3], [0,2,4,6], [0,3,6,9], [0,4,8,12]]

for coupling_qubits in coupling_qubits_list:
    rand_circ = ghz_circuit(4)
    qr = rand_circ.qregs[0]
    custom_layout = {qr[i]: coupling_qubits[i] for i in range(4)}
    df_first_circuit = compute_expectation(4, rand_circ, custom_layout, f"GHZ_{coupling_qubits}")
    all_circuits_df = pd.concat([all_circuits_df, df_first_circuit], ignore_index=True)

  all_circuits_df = pd.concat([all_circuits_df, df_first_circuit], ignore_index=True)


In [18]:
all_circuits_df.to_csv("E:\\Desktop\\Torino_estimator_cross_talk_051225.csv")