# Hardware considerations and limitations for classical feedforward and control flow

[Classical feedforward and control flow](/build/classical-feedforward-and-control-flow) shows how to use Qiskit to build circuits that involve classical feedforward and control flow, also known as dynamic circuits. When actually running such circuits on quantum hardware, there are several considerations to be aware of.

## Does not work with primitives

Currently, circuits with classical control flow cannot be executed with the Qiskit Runtime primitives. The only way to run them on hardware is to use the `backend.run` function, where `backend` is an IBMBackend object.

## Must pass `dynamic=True` to `backend.run`

When running a circuit with classical control flow using `backend.run`, one must specfy the `dynamic=True` flag. For example:

```python
job = backend.run(circuit, dynamic=True)
```

## Memory limits and latency in control hardware

![Diagram showing control hardware architecture](/images/run/rta-architecture.png)

Running circuits on quantum processors involves not only the qubits themselves, but also a system of classical electronics and computers to generate and receive waveforms and orchestrate the control logic. When a job is submitted to the IBM Quantum service, it is processed into multiple classical programs that must be distributed between two kinds of units: central controllers and qubit controllers (see diagram above). A job may fail if it exceeds certain limitations of these controllers. There are two kinds of limitations to be aware of:

- **Limited working memory**. This primarily affects the central controllers, and jobs will fail if they cause this memory limit to be exceeded.
- **Latency caused by classical computation**. Running circuits that use classical feedforward and control flow involves performing classical computation during the course of the circuit execution. Due to the limited coherence time of qubits, there is a limited time budget for performing these computations. A job may fail at compile time if the compilation detects that the classical computation overhead is too large.

The memory requirements and classical latencies of a job are affected by the following factors:

- **Number of circuits**. When multiple circuits are submitted in a single job, they become concatenated into a single large circuit, with qubit initialization operations between them. Qubit initialization is implemented as a conditional reset on all qubits used in the large circuit.
  - Central controller: Memory usage scales proportionally with the number of circuits.
- **Amount of control flow**.
  - Central controller: Memory usage scales proportionally with the number of control flow decisions.
  - Qubit controller: A control flow construct with too many or too large logic branches may not be realizable.
- **Resets**.
  - Central controller: Memory usage scales proportionally with the number of resets.
- **Measurements**.
  - Central controller: Memory usage scales proportionally with the number of measurements used by the central controller for control flow.

In the rest of this section we provide some example code that you can use to probe these limits yourself.

In [None]:
from qiskit_ibm_provider import IBMProvider

hub = "ibm-q-internal"
group = "deployed"
project = "default"

provider = IBMProvider(instance=f"{hub}/{group}/{project}")

backend_name = "ibmq_jakarta"
backend = provider.get_backend(backend_name)

### Test `num_qubits` x `num_circuits`

Here, we run a sweep to determine job limits as a function of the number of qubits and the number of circuits.

In [None]:
import numpy as np

# Number of qubits to sweep over
num_qubit_steps = 5
# Number of circuits to sweep over
num_circuit_steps = 5

max_circuits = 300
num_qubits = backend.num_qubits
shots = 100

save = True

qubit_steps = np.concatenate(
    [
        [1],
        np.arange(0, num_qubits, step=int(num_qubits / num_qubit_steps))[
            1 : num_qubit_steps - 1
        ],
        [num_qubits],
    ]
)
circuit_steps = np.concatenate(
    [
        [1],
        np.arange(0, max_circuits, step=int(max_circuits / num_circuit_steps))[
            1 : num_circuit_steps - 1
        ],
        [max_circuits],
    ]
)
print(f"Qubit steps: {qubit_steps}")
print(f"Circuit steps: {circuit_steps}")
print(f"Total number of experiments to be run: {num_qubit_steps*num_circuit_steps}")

In [None]:
from qiskit import ClassicalRegister, QuantumCircuit, QuantumRegister, transpile


def build_qubit_by_circuit_qcs(num_circuits, num_qubits, backend):
    """Build a list of transpiled circuits of n_circuits where each circuit trivially uses n_qubits"""

    qr = QuantumRegister(num_qubits)
    cr = ClassicalRegister(num_qubits)
    qc = QuantumCircuit(qr, cr)
    qc.x(qr)
    qc.measure(qr, cr)

    qc = transpile(qc, backend)

    qcs = [qc.copy() for i in range(num_circuits)]

    return qcs

In [None]:
from datetime import datetime
from pathlib import Path
import pandas as pd

from qiskit_ibm_provider.job.exceptions import (
    IBMJobFailureError,
    IBMJobInvalidStateError,
)

col_names = [
    "job_id",
    "time",
    "num_circuits",
    "num_qubits",
    "num_conditionals",
    "num_resets",
    "success",
]
circuit_qubit_df = pd.DataFrame(columns=col_names)
circuit_qubit_path = Path(
    f"circuit_qubit_sweep_{datetime.now().strftime('%Y_%m_%d_%H_%M_%S')}.df"
).resolve()

# Run
circuit_qubit_jobs = {}
for num_circuits in circuit_steps:
    for num_qubits in qubit_steps:
        qcs = build_qubit_by_circuit_qcs(num_circuits, num_qubits, backend)
        job = backend.run(qcs, dynamic=True, shots=shots)
        print(
            f"Running - num_circuits: {num_circuits}, num_qubits: {num_qubits}, job id: {job.job_id()}"
        )
        circuit_qubit_jobs[(num_circuits, num_qubits)] = job

# Fetch
for num_circuits in circuit_steps:
    for num_qubits in qubit_steps:
        job = circuit_qubit_jobs[(num_circuits, num_qubits)]

        success = 0
        try:
            print(
                f"Awaiting result - num_circuits: {num_circuits}, num_qubits: {num_qubits}, job id: {job.job_id()}"
            )
            result = job.result()
            success = 1

        except IBMJobFailureError:
            success = -1
        except IBMJobInvalidStateError:
            success = -1

        circuit_qubit_df.loc[len(circuit_qubit_df)] = {
            "job_id": job.job_id(),
            "time": datetime.now(),
            "num_circuits": num_circuits,
            "num_qubits": num_qubits,
            "num_conditionals": 0,
            "num_resets": 0,
            "success": success,
        }

if save:
    print(f"Saving data to {circuit_qubit_path}")
    circuit_qubit_df.to_csv(circuit_qubit_path)

In [None]:
circuit_qubit_df

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
from matplotlib.colors import LinearSegmentedColormap


plt.rcParams["figure.figsize"] = (12, 12)

cmap = LinearSegmentedColormap.from_list("rg", ["r", "w", "g"], N=256)
linewidths = 0.1

In [None]:
ax = plt.axes()

heat_data = circuit_qubit_df.drop_duplicates(
    subset=["num_qubits", "num_circuits"], keep="first"
).pivot(columns="num_circuits", index="num_qubits", values="success")
sns.heatmap(heat_data, cmap=cmap, ax=ax, linewidths=0.1, vmin=-1, vmax=1)
ax.set_title("Comparing execution success as a function of num_qubits & num_circuits")
ax.invert_yaxis()

### Test `num_conditional` x `num_circuits`

Here, we run a sweep to determine job limits as a function of the number of conditionals and the number of circuits.

In [None]:
import numpy as np

# Number of conditionals to sweep over
num_conditional_steps = 5
# Number of circuits to sweep over
num_circuit_steps = 4

max_conditionals = 500
max_circuits_cond = 10

# Add a 20us delay between measurements to ensure we don't overflow the buffer
# and can stream results back to the device.
cond_delay = 20e-6

MAX_REGISTER_SIZE = 64  # Pending resolution of https://github.ibm.com/IBM-Q-Restricted-System/qic-rta-driver/issues/949
circuit_cond_num_qubits = min(num_qubits, MAX_REGISTER_SIZE)

save = True

cond_steps = np.concatenate(
    [
        [1],
        np.arange(
            0, max_conditionals, step=int(max_conditionals / num_conditional_steps)
        )[1 : num_conditional_steps - 1],
        [max_conditionals],
    ]
)
circuit_steps = np.concatenate(
    [
        [1],
        np.arange(
            0, max_circuits_cond, step=int(max_circuits_cond / num_circuit_steps)
        )[1 : num_circuit_steps - 1],
        [max_circuits_cond],
    ]
)
print(f"Conditional steps: {cond_steps}")
print(f"Circuit steps: {circuit_steps}")
print(
    f"Number of qubits to measure/make a decision based on: {circuit_cond_num_qubits}"
)
print(f"Total number of experiments to be run: {len(cond_steps)*len(circuit_steps)}")

In [None]:
from qiskit import ClassicalRegister, QuantumCircuit, QuantumRegister, transpile


def build_cond_by_circuit_qcs(num_circuits, num_conds, num_qubits, backend):
    """Build a list of transpiled circuits of n_circuits where each circuit trivially uses n_conditionals"""

    qr = QuantumRegister(num_qubits)
    cr = ClassicalRegister(num_qubits)
    qc = QuantumCircuit(qr, cr)
    for i in range(num_conds):
        qc.measure(qr, cr)
        with qc.if_test((cr, 1)):
            qc.x(qr)
        qc.barrier(qr)
        qc.delay(cond_delay, qr, unit="s")

    qc = transpile(qc, backend)

    qcs = [qc.copy() for i in range(num_circuits)]

    return qcs

In [None]:
from datetime import datetime
from pathlib import Path
import pandas as pd

from qiskit_ibm_provider.job.exceptions import (
    IBMJobFailureError,
    IBMJobInvalidStateError,
)

col_names = [
    "job_id",
    "time",
    "num_circuits",
    "num_qubits",
    "num_conditionals",
    "num_resets",
    "success",
]
circuit_cond_df = pd.DataFrame(columns=col_names)
circuit_cond_path = Path(
    f"circuit_cond_sweep_{datetime.now().strftime('%Y_%m_%d_%H_%M_%S')}.df"
).resolve()

# Run
circuit_conds_jobs = {}
for num_circuits in circuit_steps:
    for num_conds in cond_steps:
        qcs = build_cond_by_circuit_qcs(num_circuits, num_conds, num_qubits, backend)
        job = backend.run(qcs, dynamic=True, shots=shots)
        print(
            f"Running - num_circuits: {num_circuits}, num_conditionals: {num_conds}, job id: {job.job_id()}"
        )
        circuit_conds_jobs[(num_circuits, num_conds)] = job

# Fetch
for num_circuits in circuit_steps:
    for num_conds in cond_steps:
        job = circuit_conds_jobs[(num_circuits, num_conds)]

        success = 0
        try:
            print(
                f"Awaiting result - num_circuits: {num_circuits}, num_conditionals: {num_conds}, job id: {job.job_id()}"
            )
            result = job.result()
            success = 1

        except IBMJobFailureError:
            success = -1
        except IBMJobInvalidStateError:
            success = -1

        circuit_cond_df.loc[len(circuit_cond_df)] = {
            "job_id": job.job_id(),
            "time": datetime.now(),
            "num_circuits": num_circuits,
            "num_qubits": 1,
            "num_conditionals": num_conds,
            "num_resets": 0,
            "success": success,
        }

if save:
    print(f"Saving data to {circuit_cond_path}")
    circuit_cond_df.to_csv(circuit_cond_path)

In [None]:
circuit_cond_df

In [None]:
ax = plt.axes()
heat_data = circuit_cond_df.drop_duplicates(
    subset=["num_conditionals", "num_circuits"], keep="first"
).pivot(index="num_circuits", columns="num_conditionals", values="success")
sns.heatmap(heat_data, cmap=cmap, ax=ax, linewidths=0.1, vmin=-1, vmax=1)
ax.set_title(
    "Comparing execution success as a function of num_conditionals & num_circuits"
)
ax.invert_yaxis()

### Test `num_resets` x `num_qubits`

Here, we run a sweep to determine job limits as a function of the number of resets and the number of qubits.

In [None]:
import numpy as np

# Number of resets to sweep over
num_reset_steps = 8
# Number of circuits to sweep over
num_qubit_steps = 3

max_resets = 1000
max_qubits = backend.num_qubits

# No delay is required as
# reset measurement results are not
# transmitted back to the host controller
# which is where the bottle-neck with too many
# MCM occurs.
# cond_delay = 20e-6


save = True

reset_steps = np.concatenate(
    [
        [1],
        np.arange(0, max_resets, step=int(max_resets / num_reset_steps))[
            1 : num_reset_steps - 1
        ],
        [max_resets],
    ]
)
qubit_steps = np.concatenate(
    [
        [1],
        np.arange(0, max_qubits, step=int(max_qubits / num_qubit_steps))[
            1 : num_qubit_steps - 1
        ],
        [max_qubits],
    ]
)
print(f"Reset steps: {cond_steps}")
print(f"Qubit steps: {circuit_steps}")
print(f"Total number of experiments to be run: {len(reset_steps)*len(qubit_steps)}")

In [None]:
from qiskit import ClassicalRegister, QuantumCircuit, QuantumRegister, transpile


def build_reset_by_qubits_qc(num_resets, num_qubits, backend):
    """Build a single circuit with `num_resets` and `num_qubits` and a final measurement"""

    qr = QuantumRegister(num_qubits)
    cr = ClassicalRegister(num_qubits)
    qc = QuantumCircuit(qr, cr)
    for i in range(num_resets):
        qc.reset(qr)
        qc.barrier(qr)
    qc.measure_all()

    qc = transpile(qc, backend)

    return qc

In [None]:
from datetime import datetime
from pathlib import Path
import pandas as pd

from qiskit_ibm_provider.job.exceptions import IBMJobFailureError

col_names = [
    "job_id",
    "time",
    "num_circuits",
    "num_qubits",
    "num_conditionals",
    "num_resets",
    "success",
]
qubit_reset_df = pd.DataFrame(columns=col_names)
qubit_reset_path = Path(
    f"qubit_reset_sweep_{datetime.now().strftime('%Y_%m_%d_%H_%M_%S')}.df"
).resolve()

# Run
qubit_resets_jobs = {}
for num_qubits in qubit_steps:
    for num_resets in reset_steps:
        qc = build_reset_by_qubits_qc(num_resets, num_qubits, backend)
        # Disable init_qubits as this would insert additional resets to initialize
        # our qubits which would impact the data we gather.
        job = backend.run(qc, dynamic=True, shots=shots, init_qubits=False)
        print(
            f"Running - num_qubits: {num_qubits}, num_conditionals: {num_resets}, job id: {job.job_id()}"
        )
        qubit_resets_jobs[(num_qubits, num_resets)] = job

# Fetch
for num_qubits in qubit_steps:
    for num_resets in reset_steps:
        job = qubit_resets_jobs[(num_qubits, num_resets)]

        success = 0
        try:
            print(
                f"Awaiting result - num_qubits: {num_qubits}, num_resets: {num_resets}, job id: {job.job_id()}"
            )
            result = job.result()
            success = 1

        except IBMJobFailureError:
            success = -1
        except IBMJobInvalidStateError:
            success = -1

        qubit_reset_df.loc[len(qubit_reset_df)] = {
            "job_id": job.job_id(),
            "time": datetime.now(),
            "num_circuits": 1,
            "num_qubits": num_qubits,
            "num_conditionals": 0,
            "num_resets": num_resets,
            "success": success,
        }

if save:
    print(f"Saving data to {qubit_reset_path}")
    qubit_reset_df.to_csv(qubit_reset_path)

In [None]:
qubit_reset_df

In [None]:
ax = plt.axes()
heat_data = qubit_reset_df.drop_duplicates(
    subset=["num_qubits", "num_resets"], keep="first"
).pivot(index="num_resets", columns="num_qubits", values="success")
sns.heatmap(heat_data, cmap=cmap, ax=ax, linewidths=0.1, vmin=-1, vmax=1)
ax.set_title("Comparing execution success as a function of num_qubits & num_resets")
ax.invert_yaxis()

### Test the limits of the switch statement as a function of  `num_cases` x `num_operations`

Here, we run a sweep to determine job limits as a function of the number of switch statement cases and the number of gates.

In [None]:
import numpy as np

# Number of switch cases to sweep over
num_case_steps = 6
# Number of gates per-qubit to sweep over
num_gate_steps = 4

max_cases = 2**11 - 1  # Include default case
max_gates = 2**8

shots = 100

switch_case_num_qubits = backend.num_qubits

save = True

case_steps = np.concatenate(
    [
        [1],
        np.arange(0, max_cases, step=int(max_cases / num_case_steps))[
            1 : num_case_steps - 1
        ],
        [max_cases],
    ]
)
gate_steps = np.concatenate(
    [
        [1],
        np.arange(0, max_gates, step=int(max_gates / num_gate_steps))[
            1 : num_gate_steps - 1
        ],
        [max_gates],
    ]
)
print(f"Switch case steps: {case_steps}")
print(f"Gate count's per switch case steps: {gate_steps}")
print(f"Total number of experiments to be run: {len(case_steps)*len(gate_steps)}")

In [None]:
from qiskit import ClassicalRegister, QuantumCircuit, QuantumRegister, transpile


def build_switch_cases_by_gates_qc(num_cases, num_gates, num_qubits, backend):
    """Build a list of transpiled circuits of n_circuits where each circuit trivially uses n_conditionals"""

    n_measures = max(int(np.ceil(np.log2(num_cases))), 1)
    qr = QuantumRegister(num_qubits)
    cr = ClassicalRegister(n_measures)
    cr_res = ClassicalRegister(num_qubits)
    qc = QuantumCircuit(qr, cr, cr_res)

    for i in range(n_measures):
        if i % num_qubits == 0:
            # Prepare measurement register randomly
            qc.h(qr)
            qc.barrier(qr)
        qc.measure(i % num_qubits, cr[i])

    qc.barrier(qr)

    with qc.switch(cr) as case:
        # metaprogram case loops
        for case_idx in range(num_cases):
            with case(case_idx):
                # metaprogram gates within case
                for num_gate in range(num_gates):
                    qc.x(qr)
                    qc.barrier(qr)
        with case(case.DEFAULT):
            pass

    qc.barrier(qr)
    qc.measure(qr, cr_res)

    qc = transpile(qc, backend, optimization_level=0)

    return qc

In [None]:
from datetime import datetime
from pathlib import Path
import pandas as pd

from qiskit_ibm_provider.job.exceptions import (
    IBMJobFailureError,
    IBMJobInvalidStateError,
)

col_names = [
    "job_id",
    "time",
    "num_circuits",
    "num_qubits",
    "num_conditionals",
    "num_resets",
    "num_switches",
    "num_cases",
    "num_gates",
    "success",
]
case_gates_df = pd.DataFrame(columns=col_names)
case_gates_path = Path(
    f"case_gates_sweep_{datetime.now().strftime('%Y_%m_%d_%H_%M_%S')}.df"
).resolve()

# Run
case_gates_jobs = {}
for num_cases in case_steps:
    for num_gates in gate_steps:
        start = datetime.now()
        qc = build_switch_cases_by_gates_qc(
            num_cases, num_gates, switch_case_num_qubits, backend
        )
        build = datetime.now()
        job = backend.run(qc, dynamic=True, shots=shots, init_qubits=False)
        submit = datetime.now()
        print(
            f"Running - num_cases: {num_cases}, num_gates: {num_gates}, job id: {job.job_id()}, build time: {(build-start).total_seconds()}, submit time: {(submit-build).total_seconds()}"
        )
        case_gates_jobs[(num_cases, num_gates)] = job

# Fetch
for num_cases in case_steps:
    for num_gates in gate_steps:
        job = case_gates_jobs[(num_cases, num_gates)]

        success = 0
        try:
            print(
                f"Awaiting result - num_cases: {num_cases}, num_gates: {num_gates}, job id: {job.job_id()}"
            )
            result = job.result()
            success = 1

        except IBMJobFailureError:
            success = -1
        except IBMJobInvalidStateError:
            success = -1

        case_gates_df.loc[len(case_gates_df)] = {
            "job_id": job.job_id(),
            "time": datetime.now(),
            "num_circuits": 1,
            "num_qubits": 1,
            "num_conditionals": 1,
            "num_resets": 0,
            "num_switches": 1,
            "num_cases": num_cases,
            "num_gates": num_gates,
            "success": success,
        }

if save:
    print(f"Saving data to {case_gates_path}")
    case_gates_df.to_csv(case_gates_path)

In [None]:
# Fetch
for num_cases in case_steps:
    for num_gates in gate_steps:
        job = case_gates_jobs[(num_cases, num_gates)]

        success = 0
        try:
            print(
                f"Awaiting result - num_cases: {num_cases}, num_gates: {num_gates}, job id: {job.job_id()}"
            )
            result = job.result()
            success = 1

        except IBMJobFailureError:
            success = -1
        except IBMJobInvalidStateError:
            success = -1

        case_gates_df.loc[len(case_gates_df)] = {
            "job_id": job.job_id(),
            "time": datetime.now(),
            "num_circuits": 1,
            "num_qubits": 1,
            "num_conditionals": 1,
            "num_resets": 0,
            "num_switches": 1,
            "num_cases": num_cases,
            "num_gates": num_gates,
            "success": success,
        }

if save:
    print(f"Saving data to {case_gates_path}")
    case_gates_df.to_csv(case_gates_path)

In [None]:
case_gates_df

In [None]:
ax = plt.axes()
heat_data = case_gates_df.drop_duplicates(
    subset=["num_cases", "num_gates"], keep="first"
).pivot(index="num_cases", columns="num_gates", values="success")
sns.heatmap(heat_data, cmap=cmap, ax=ax, linewidths=0.1, vmin=-1, vmax=1)
ax.set_title(
    "Comparing execution success as a function of the number of num_switch_cases & num_gates"
)
ax.invert_yaxis()