In [None]:
# Install the latest version of the mqt-predictor
!cd .. && cd .. && uv pip install -e .

In [None]:
import ast
import copy
import pickle
import random
import zipfile
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import qiskit.qasm2
import scipy as sp
from matplotlib.ticker import MultipleLocator
from scipy.stats import pearsonr
from sklearn.ensemble import RandomForestRegressor
from sklearn.inspection import permutation_importance
from sklearn.model_selection import KFold, ParameterGrid, train_test_split

from mqt.bench.devices import get_device_by_name
from mqt.predictor.reg import hellinger_distance
from mqt.predictor.reward import (
    calc_device_specific_features,
    estimated_success_probability,
    expected_fidelity,
)

### IMPORT AND PREPARE DATA

In [None]:
# Get the path to the current data.zip file
data_dir = Path.cwd() / "data"

# Ensure the zip file exists
zip_file_path = Path.cwd() / "data.zip"
if not Path.exists(zip_file_path):
    msg = f"Zip file not found at {zip_file_path}"
    raise FileNotFoundError(msg)

# Unzip the data files into the current directory
with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
    zip_ref.extractall(data_dir)  # Extracts into the current directory

print(f"Files extracted from {zip_file_path} into the current directory.")

In [None]:
# Dictionaries to store the calculated figures of merit and Hellinger distance
depth, esp, fidelity, num_gates, hellinger = {}, {}, {}, {}, {}

qpus = ["apollo", "qexa"]
for qpu in qpus:
    depth[qpu], esp[qpu], fidelity[qpu], num_gates[qpu], hellinger[qpu] = {}, {}, {}, {}, {}
    circuits = []  # TODO: only for debugging

    try:  # Import QPU
        device = get_device_by_name("iqm_" + qpu)
    except Exception:
        # Device 'iqm_qexa' calibration data not yet "publicly" available
        print(f"Device 'iqm_{qpu}' not found among available providers.")
        print("Using precomputed ESP and fidelity values with historical calibration data.")
        device = None

    # Get a list of all execution files
    execution_files = sorted((data_dir / "execution" / qpu).glob("*.txt"))
    print(f"Found {len(execution_files)} execution files for {qpu}")

    # Iterate over the execution files
    for execution_file in execution_files:
        # file_name is sth. like 'wstate_indep_qiskit_20'
        file_name = execution_file.stem
        try:
            # Load the corresponding circuit
            circuit = qiskit.qasm2.load(data_dir / "qasm" / f"{file_name}.qasm")

            # Load the noiseless simulation file
            noiseless_file = data_dir / "simulation" / qpu / f"{file_name}.txt"

            # Open the files and read the counts
            with Path.open(noiseless_file, encoding="utf-8") as f:
                noiseless_counts = ast.literal_eval(f.read())
            with Path.open(execution_file, encoding="utf-8") as f:
                execution_counts = ast.literal_eval(f.read())

            num_execution_shots = sum(execution_counts.values())
            num_noiseless_shots = sum(noiseless_counts.values())

            # Transform counts to probabilities
            noiseless_probs_org = {k: v / num_noiseless_shots for k, v in noiseless_counts.items()}

            # Only keep the states that are distinguishable with the number of execution shots
            noiseless_probs = {k: v for k, v in noiseless_probs_org.items() if v * num_execution_shots >= 1}

            # Get the states and their noiseless probabilities
            states = sorted(set(noiseless_counts.keys()).union(execution_counts.keys()))
            noiseless_probs_all = np.array([noiseless_probs.get(state, 0) for state in states])

            # Get the execution probabilities
            execution_counts_all = np.array([execution_counts.get(state, 0) for state in states])
            execution_probs_all = execution_counts_all / num_execution_shots

            # If they do not resemble a probability distribution, skip the file (e.g., due to too few shots)
            if not np.isclose(sum(noiseless_probs_all), 1, 0.05) or not np.isclose(sum(execution_probs_all), 1, 0.05):
                # print(f"Skipping {file_name} because probabilities do not sum to 1")
                continue

            # File paths for the figures of merit
            path = data_dir / "foms" / qpu / file_name

            num_gates_file = path.with_name(f"{path.name}_num_gates.txt")
            depth_file = path.with_name(f"{path.name}_depth.txt")
            fidelity_file = path.with_name(f"{path.name}_fidelity.txt")
            esp_file = path.with_name(f"{path.name}_esp.txt")

            # Hellinger distance will be saved as labels
            labels_file = data_dir / "labels" / qpu / file_name
            hellinger_file = labels_file.with_name(f"{labels_file.name}_hellinger.txt")

            # Calculate all values and save them to a file or load them from the file if it exists

            # Depth
            if not depth_file.exists():
                d = circuit.depth()
                with Path.open(depth_file, "w", encoding="utf-8") as f:
                    f.write(str(d))
            else:
                with Path.open(depth_file, encoding="utf-8") as f:
                    d = int(f.read())
            if d > 1000:
                # print(f"Skipping {file_name} because depth is too high")
                continue
            depth[qpu][file_name] = d

            # Fidelity
            if device and not fidelity_file.exists():
                fidelity[qpu][file_name] = expected_fidelity(circuit, device)
                with Path.open(fidelity_file, "w", encoding="utf-8") as f:
                    f.write(str(fidelity[qpu][file_name]))
            else:
                with Path.open(fidelity_file, encoding="utf-8") as f:
                    fidelity[qpu][file_name] = float(f.read())

            # ESP
            if device and not esp_file.exists():
                esp[qpu][file_name] = estimated_success_probability(circuit, device)
                with Path.open(esp_file, "w", encoding="utf-8") as f:
                    f.write(str(esp[qpu][file_name]))
            else:
                with Path.open(esp_file, encoding="utf-8") as f:
                    esp[qpu][file_name] = float(f.read())

            # Number of gates
            if not num_gates_file.exists():
                num_gates[qpu][file_name] = sum(circuit.count_ops().values())
                with Path.open(num_gates_file, "w", encoding="utf-8") as f:
                    f.write(str(num_gates[qpu][file_name]))
            else:
                with Path.open(num_gates_file, encoding="utf-8") as f:
                    num_gates[qpu][file_name] = int(f.read())

            # Execution vs noiseless Hellinger distance
            if not hellinger_file.exists():
                hellinger[qpu][file_name] = hellinger_distance(noiseless_probs_all, execution_probs_all)
                with Path.open(hellinger_file, "w", encoding="utf-8") as f:
                    f.write(str(hellinger[qpu][file_name]))
            else:
                with Path.open(hellinger_file, encoding="utf-8") as f:
                    hellinger[qpu][file_name] = float(f.read())

            circuits.append(circuit)

        except Exception as e:
            print(f"Error processing {qpu}: {e}")
            continue

    print(
        f"Stored {len(depth[qpu])} files for {qpu}, after removing too high depths and non-probability distributions. \n"
    )

### EVALUATION OF CURRENT FIGURES OF MERIT

In [None]:
cols = ["#FE6100", "#648FFF"]
names = ["Q20-A", "Q20-B"]

In [None]:
fig, axs = plt.subplots(2, 2, figsize=(9, 9))
data = {}

for i, qpu in enumerate(qpus):
    # Collect data for plot
    depth_list = list(depth[qpu].values())
    esp_list = list(esp[qpu].values())
    fidelity_list = list(fidelity[qpu].values())
    num_gates_list = list(num_gates[qpu].values())
    execution_noiseless_dist_list = list(hellinger[qpu].values())

    data[qpu] = [
        (num_gates_list, execution_noiseless_dist_list, "Gate Count", "Hellinger Distance"),
        (depth_list, execution_noiseless_dist_list, "Circuit Depth", "Hellinger Distance"),
        (esp_list, execution_noiseless_dist_list, "Estimated Success Probability", "Hellinger Distance"),
        (fidelity_list, execution_noiseless_dist_list, "Expected Fidelity", "Hellinger Distance"),
    ]

    for ax, (x, y, xlabel, ylabel) in zip(axs.flatten(), data[qpu], strict=False):
        # Normalize for Pearson correlation
        x_max, y_max = np.max(x), np.max(y)
        x_norm, y_norm = np.array(x) / x_max, np.array(y) / y_max

        # Calculate Pearson correlation
        slope, intercept, rval, pval, std_err = sp.stats.linregress(x_norm, y_norm)
        x_plot = np.linspace(np.min(x) - 1, np.max(x) + 1, 100)
        y_plot = (slope * (x_plot / x_max) + intercept) * y_max
        ax.plot(x_plot, y_plot, color=cols[i], linewidth=0.5, alpha=0.5, linestyle="--")

        # Should be the same as the calculated Pearson correlation
        r, p = pearsonr(x_norm, y_norm)
        assert np.isclose(r, rval), f"r: {r}, rval: {rval}"
        assert np.isclose(p, pval), f"p: {p}, pval: {pval}"

        # Plot the data
        pearson_label = names[i] + f": Pearson $r = {r:.2f}$"
        ax.scatter(x, y, color=cols[i], marker=".", label=pearson_label, alpha=0.5)

        # Adjust x-axis
        if xlabel == "Gate Count":
            ax.set_xlim(0, 2000)
            ax.xaxis.set_major_locator(MultipleLocator(500))
        elif xlabel == "Circuit Depth":
            ax.set_xlim(0, 1000)
            ax.xaxis.set_major_locator(MultipleLocator(250))
        else:
            ax.set_xlim(0, 1)
        ax.set_xlabel(xlabel, labelpad=1)

        # Adjust y-axis
        ax.set_ylim(0.0, 1.0)
        ax.set_ylabel(ylabel, labelpad=1)
        ax.yaxis.set_major_locator(MultipleLocator(0.2))

        # Adjust tick size and tick spacing
        ax.tick_params(axis="both", which="major", width=0.25)

        # Set border line width
        for spine in ax.spines.values():
            spine.set_linewidth(0.25)

        ax.legend(scatterpoints=1)  # Adjust legend text size
        legend = ax.legend(markerscale=2.0)

        print(qpu, xlabel, "correlation", np.round(r, 3))

fig.tight_layout(pad=0.1, h_pad=0.2, w_pad=0)
plt.savefig(data_dir / "figures/correlation.pdf", bbox_inches="tight")

### PROPOSED APPROACH

In [None]:
X = {qpu: [] for qpu in qpus}
Y = {qpu: [] for qpu in qpus}

# Both QPUs have the same architecture (only different calibration data)
device = get_device_by_name("iqm_apollo")

for qpu in qpus:
    # save the training data to
    path = data_dir / "features" / qpu

    native_gates = ["r", "cz"]  # iqm gates

    # Get a list of all execution files
    qasm_files = sorted((data_dir / "execution" / qpu).glob("*.txt"))

    # Iterate over the execution files
    for qasm_file in qasm_files:
        # file_name is sth. like 'qft_10'
        file_name = qasm_file.stem

        try:
            circ = qiskit.qasm2.load(data_dir / "qasm" / f"{file_name}.qasm")
            try:
                y = hellinger[qpu][file_name]
                Y[qpu].append(y)
            except KeyError:
                continue  # No Hellinger distance available, e.g. if depth was too high

            file = path / f"{file_name}.txt"
            if not file.exists():
                x_dict = calc_device_specific_features(circ, device)
                x = list(x_dict.values())
                X[qpu].append(x)

                np.savetxt(file, np.array(x))
                # print(f"Saved features for {file_name}")
            else:
                X[qpu].append(np.loadtxt(file))
                # print(f"Loaded features for {file_name}")

        except Exception as e:
            print(f"Error processing circuit {qasm_file}: {e}")
            continue

    print(f"Extracted features and labels for {len(X[qpu])} files for {qpu}")

In [None]:
# Indicate which features to use for the prediction
non_zero_vector = {qpu: {} for qpu in qpus}

for qpu in qpus:
    gate_dict = {"R Gate Count": True, "CZ Gate Count": True}
    num_qubits = 20  # Same for both IQM QPUs
    qubit_dict = {f"Qubit{i}": True for i in range(num_qubits)}

    supermarq_plus_dict = {
        "Circuit Depth": True,
        "Number of Qubits": False,
        "Critical Depth": True,
        "Entanglement Ratio": True,
        "Parallelism": True,
        "Liveness": True,
        "Directed Program Comm.": True,
        "Single-Qubit Gate Ratio": True,
        "Multi-Qubit Gate Ratio": True,
    }

    non_zero_vector[qpu] = {**gate_dict, **qubit_dict, **supermarq_plus_dict}

In [None]:
X_train, X_test, y_train, y_test = {}, {}, {}, {}

for qpu in qpus:
    X_arr = np.array(X[qpu])
    Y_arr = np.array(Y[qpu])

    # Mask the features
    mask = np.array(list(non_zero_vector[qpu].values()))
    X_masked = X_arr[:, mask]

    # Split the data into training and test sets
    X_train[qpu], X_test[qpu], y_train[qpu], y_test[qpu] = train_test_split(
        X_masked, Y_arr, test_size=0.2, random_state=123
    )
    print(f"Training data shape for {qpu}: features {X_train[qpu].shape}, labels {y_train[qpu].shape}")
    print(f"Test data shape for {qpu}: featuers {X_test[qpu].shape}, labels {y_test[qpu].shape} \n")

# TO BE DELETED! ONLY FOR TESTING

In [None]:
device = get_device_by_name("iqm_apollo")
quantum_circuits = circuits

# prepare the distribution
rng = np.random.Generator(np.random.PCG64(123))
noisy_distribution = [rng.random() for _ in range(len(circuits))]
noiseless_distribution = [rng.random() for _ in range(len(circuits))]
# normalize
noisy_distribution = noisy_distribution / np.sum(noisy_distribution)
noiseless_distribution = noiseless_distribution / np.sum(noiseless_distribution)

noisy_distributions = [noisy_distribution for _ in range(len(circuits))]
noiseless_distributions = [noiseless_distribution for _ in range(len(circuits))]

In [None]:
from mqt.predictor.reward import calc_device_specific_features

feature_vector_list = []
for qc in quantum_circuits:
    feature_dict = calc_device_specific_features(qc, device)
    feature_vector = np.array(list(feature_dict.values()))
    feature_vector_list.append(feature_vector)

In [None]:
from mqt.predictor.ml import hellinger_distance

labels_list = []
for noisy, noiseless in zip(noisy_distributions, noiseless_distributions, strict=False):
    distance_label = hellinger_distance(noisy, noiseless)
    labels_list.append(distance_label)

In [None]:
from mqt.predictor.ml import Predictor
from mqt.predictor.reward import estimated_hellinger_distance

training_data = [(feat, label) for feat, label in zip(feature_vector_list, labels_list, strict=False)]

pred = Predictor(figure_of_merit="hellinger_distance")
pred.save_training_data(training_data)

pred.train_random_forest_model(device=device)

In [None]:
print(estimated_hellinger_distance(circuits[0], device))

### MODEL TRAINING

In [None]:
# Helper functions

# Save best parameter index to file
def save_best_params(idx: int, best_params: dict, path: str) -> None:
    file = {idx: best_params}
    with Path.open(path, "w", encoding="utf-8") as f:
        f.write(str(file))


# Load best parameter index from file
def load_best_params(path: str) -> tuple[int, dict]:
    with Path.open(path, encoding="utf-8") as f:
        file = eval(f.read())
    idx, best_params = next(iter(file.keys())), next(iter(file.values()))
    return idx, best_params


seed = 2
gen = np.random.default_rng(seed)
random.seed(seed)

In [None]:
models = {
    "RandomForestRegressor": {
        "model": RandomForestRegressor(random_state=seed, verbose=0, n_jobs=5),
        "params": {
            "n_estimators": [30, 40, 50, 60],  # Number of trees in the forest; more trees reduce variance.
            "criterion": ["absolute_error", "squared_error"],  # Metric to evaluate splits; affects tree quality.
            "max_depth": [
                10,
                20,
                30,
                None,
            ],  # Max depth of trees; smaller values prevent overly complex trees (reduce overfitting).
            "min_samples_split": [
                4,
                5,
                0.1,
            ],  # Min samples to split an internal node; higher values reduce sensitivity to noise.
            "min_samples_leaf": [
                2,
                3,
                4,
                5,
                0.1,
            ],  # Min samples in a leaf node; larger values create simpler, generalized trees.
            # This is the main RANDOM FOREST parameter
            "max_features": [
                "sqrt",
                "log2",
                None,
            ],  # Max features to consider at each split; lower values reduce overfitting by randomness.
            "max_leaf_nodes": [
                None,
                20,
                40,
                60,
                80,
                100,
            ],  # Max leaf nodes in the tree; fewer nodes limit model complexity.
            "min_impurity_decrease": [
                0.01,
                0.1,
                0.2,
            ],  # Min impurity decrease for a split; higher values prevent splits on minor gains.
            "ccp_alpha": [
                0.01,
                0.05,
            ],  # Complexity parameter for pruning; larger values simplify the tree (reduce overfitting).
            "max_samples": [
                0.5,
                1.0,
            ],  # Max samples to draw for training each tree; smaller values increase diversity, reducing overfitting.
            "monotonic_cst": [
                [  # Monotonic constraints; improves interpretability, not directly related to overfitting.
                    # -1 means decreasing, 0 means no constraint, and 1 means increasing
                    # Gate counts
                    1,  # R gate count
                    1,  # CZ gate count
                    # Qubit counts
                    0,  # Qubit 0
                    0,  # Qubit 1
                    0,  # Qubit 2
                    0,  # Qubit 3
                    0,  # Qubit 4
                    0,  # Qubit 5
                    0,  # Qubit 6
                    0,  # Qubit 7
                    0,  # Qubit 8
                    0,  # Qubit 9
                    0,  # Qubit 10
                    0,  # Qubit 11
                    0,  # Qubit 12
                    0,  # Qubit 13
                    0,  # Qubit 14
                    0,  # Qubit 15
                    0,  # Qubit 16
                    0,  # Qubit 17
                    0,  # Qubit 18
                    0,  # Qubit 19
                    # Supermarq+
                    1,  # Circuit depth
                    0,  # Critical depth
                    1,  # Entanglement ratio
                    0,  # Parallelism
                    0,  # Liveness
                    0,  # Directed program communication
                    0,  # Single qubit gates ratio
                    0,  # Multi qubit gates ratio
                ],
                None,
            ],
        },
    }
}

NOTE: Grid search / training takes time!
If training is disabled, the best parameters will be loaded from previous search.

In [None]:
enable_training = False

if enable_training:
    # Initialize dictionary to store the best model for each QPU
    best_model = dict.fromkeys(qpus)

    # Path to save the best parameters
    trainnig_path = data_dir / "training"

    for qpu in qpus:
        best_score = -np.inf

        # Load the best parameters if they exist
        filename = f"{trainnig_path}/{qpu}_best_params.txt"
        if Path.exists(filename):
            idx, best_params = load_best_params(filename)
            print(f"Loaded best parameters for {qpu}: {best_params}")
            continue
        else:
            idx = 0
            best_params = {}
            print("No best parameters found")

        for model_info in models.values():
            param_grid = ParameterGrid(model_info["params"])
            gird = list(param_grid)
            len_grid = len(gird)

            for i, params in enumerate(gird[idx:]):
                j = i + idx
                print(f"Progress: {j}/{len_grid}")
                model = model_info["model"].set_params(**params)

                # Initialize cross-validation
                kf = KFold(n_splits=3, shuffle=True, random_state=42)
                cv_scores = []

                # Perform cross-validation
                for train_index, val_index in kf.split(X_train[qpu]):
                    X_train_split, X_val_split = X_train[qpu][train_index], X_train[qpu][val_index]
                    y_train_split, y_val_split = y_train[qpu][train_index], y_train[qpu][val_index]

                    # Train and validate
                    model.fit(X_train_split, y_train_split)
                    predictions = model.predict(X_val_split)
                    score = pearsonr(predictions, y_val_split)[0]  # Pearson correlation coefficient
                    cv_scores.append(score)

                # Compute average score across all folds
                mean_score = np.mean(cv_scores)

                # Update the best model with a deep copy
                if mean_score >= best_score:
                    best_model[qpu] = copy.deepcopy(model)  # Ensure an independent copy is stored
                    best_score = mean_score

                    print(f"Parameters: {params}")
                    print(f"Cross-Validation Mean Score: {mean_score}")
                    print(f"Associated Test Score: {pearsonr(best_model[qpu].predict(X_test[qpu]), y_test[qpu])[0]}")

                    # Save the best parameters to a file
                    save_best_params(i, params, filename)

        print(f"Best overall model on {qpu}: {best_model[qpu]}")

In [None]:
# Load the best parameters
training_path = data_dir / "training"
best_model = dict.fromkeys(qpus)

for qpu in qpus:
    # Load the best parameters if they exist
    filename = training_path / f"{qpu}_best_params.txt"
    if Path.exists(filename):
        idx, best_params = load_best_params(filename)
        best_model[qpu] = RandomForestRegressor(random_state=seed, verbose=0, n_jobs=5).set_params(**best_params)
        print(f"Loaded best parameters for {qpu}: {best_params}")
    else:
        print("No best parameters found")

In [None]:
model_path = data_dir.parent / "trained_models"
model_path.mkdir(parents=True, exist_ok=True)
trained_models = {}
train_model = True

if train_model:
    for qpu in qpus:
        trained_models[qpu] = best_model[qpu].fit(X_train[qpu], y_train[qpu])

        # save model to file
        if best_model[qpu] is not None:
            with Path.open(model_path / f"iqm_{qpu}_rf_regressor.pkl", "wb") as f:
                pickle.dump(best_model[qpu], f)
            with Path.open(model_path / f"iqm_{qpu}_non_zero_indices.pkl", "wb") as f:
                pickle.dump(non_zero_vector[qpu], f)
        print(f"Saved model and index_dict to {model_path}")

else:  # load model from file
    trained_models = {}
    for qpu in qpus:
        with Path.open(model_path / f"iqm_{qpu}_rf_regressor.pkl", "rb") as f:
            trained_models[qpu] = pickle.load(f)
        with Path.open(model_path / f"iqm_{qpu}_non_zero_indices.pkl", "rb") as f:
            non_zero_vector[qpu] = pickle.load(f)
        print(f"Loaded model and index_dict from {model_path}")

In [None]:
fig, axs = plt.subplots(figsize=(5, 5))

data = {}
for i, qpu in enumerate(qpus):
    # Collect data for plot
    model_prediction = list(trained_models[qpu].predict(X_test[qpu]))
    true_labels = list(y_test[qpu])

    ax = axs
    x = model_prediction
    y = true_labels
    xlabel = "Figure of Merit"
    ylabel = "Hellinger distance"

    # Normalize for Pearson correlation
    x_max, y_max = np.max(x), np.max(y)
    x_norm, y_norm = np.array(x) / x_max, np.array(y) / y_max

    # Calculate Pearson correlation
    slope, intercept, rval, pval, std_err = sp.stats.linregress(x_norm, y_norm)
    x_plot = np.linspace(np.min(x) - 1, np.max(x) + 1, 100)
    y_plot = (slope * (x_plot / x_max) + intercept) * y_max
    ax.plot(x_plot, y_plot, color=cols[i], linewidth=0.5, alpha=0.5, linestyle="--")

    # Should be the same as the calculated Pearson correlation
    r, p = pearsonr(x_norm, y_norm)
    assert np.isclose(r, rval), f"r: {r}, rval: {rval}"
    assert np.isclose(p, pval), f"p: {p}, pval: {pval}"

    print(f"{qpu}:  {xlabel}:")
    print(f"Pearson r: {rval:.3f}, pval: {pval:.3f}\n")

    pearson_label = names[i] + f": Pearson $r = {r:.3f}$"

    # Plot the data
    ax.scatter(x, y, color=cols[i], marker=".", label=pearson_label, alpha=0.7)

    # Adjust x-axis
    if xlabel == "Number of gates":
        ax.set_xlim(0, 2000)
        ax.xaxis.set_major_locator(MultipleLocator(500))
    elif xlabel == "Circuit depth":
        ax.set_xlim(0, 1000)
        ax.xaxis.set_major_locator(MultipleLocator(250))
    else:
        ax.set_xlim(0, 1)
    ax.set_xlabel(xlabel, labelpad=1)

    # Adjust y-axis
    ax.set_ylim(0.0, 1.0)
    ax.set_ylabel(ylabel, labelpad=1)
    ax.yaxis.set_major_locator(MultipleLocator(0.2))

    # Adjust tick size and tick spacing
    ax.tick_params(axis="both", which="major", width=0.25)

    # Set border line width
    for spine in ax.spines.values():
        spine.set_linewidth(0.25)

    ax.legend(scatterpoints=1)  # Adjust legend text size

    legend = ax.legend(markerscale=2.0)

plt.savefig(data_dir / "figures" / " prediction.pdf", bbox_inches="tight")

### FEATURE IMPORTANCE

In [None]:
def group_qubit_features(imps: list, names: list) -> tuple[np.ndarray, list]:
    """Groups qubit features and returns the grouped importances."""
    qubit_indices = [i for i, name in enumerate(names) if name.startswith("Qubit")]
    qubit_importances = imps[qubit_indices]
    qubit_importances_avg = np.average(qubit_importances, axis=0)

    # Remove individual qubit importances
    new_imps = [imps[i] for i in range(len(imps)) if i not in qubit_indices]
    new_names = [name for i, name in enumerate(names) if i not in qubit_indices]

    # Append the grouped qubit importances
    new_imps.append(qubit_importances_avg)
    new_names.append("Average qubit features")

    return np.array(new_imps), new_names

In [None]:
# Initialize dictionaries for feature importances and permutation importances
feature_importances = {}
perm_importances = {}

# Extract feature names (assuming they are the same for all QPUs)
feature_names = [name for name in non_zero_vector[qpus[0]] if non_zero_vector[qpus[0]][name]]

# Compute feature importance and permutation importance for each QPU
for qpu in qpus:
    feature_importances[qpu] = trained_models[qpu].feature_importances_

    perm_importances[qpu] = permutation_importance(
        trained_models[qpu], X_test[qpu], y_test[qpu], n_repeats=10, random_state=42
    )["importances_mean"]

# Group qubit features
grouped_feature_importances = {}
grouped_perm_importances = {}

# Also store the feature names after grouping
grouped_feature_names = None
grouped_perm_feature_names = None

for qpu in qpus:
    grouped_feature_importances[qpu], grouped_feature_names = group_qubit_features(
        feature_importances[qpu], feature_names
    )
    grouped_perm_importances[qpu], grouped_perm_feature_names = group_qubit_features(
        perm_importances[qpu], feature_names
    )

# Ensure all QPUs have the same feature ordering after grouping
if grouped_feature_names != grouped_perm_feature_names:
    msg = "Feature names mismatch after grouping"
    raise ValueError(msg)

# Sort by increasing importance for permutation importance plot
sorted_perm_indices = np.argsort(sum(grouped_perm_importances[qpu] for qpu in qpus))
sorted_perm_features = [grouped_perm_feature_names[i] for i in sorted_perm_indices]

# Sort by increasing importance for feature importance plot
sorted_indices = np.argsort(sum(grouped_feature_importances[qpu] for qpu in qpus))
sorted_features = [grouped_feature_names[i] for i in sorted_indices]

# Normalize feature importances
for qpu in qpus:
    grouped_feature_importances[qpu] /= grouped_feature_importances[qpu].sum()
    grouped_perm_importances[qpu] /= grouped_perm_importances[qpu].sum()

# Plot settings
width = 0.35  # Bar width
fig, axs = plt.subplots(1, 2, figsize=(8, 5), dpi=300)

# Plot feature importance
x = np.arrange(len(sorted_features))
for i, qpu in enumerate(qpus):
    axs[0].barh(
        x - width / 2 + i * width,
        grouped_feature_importances[qpu][sorted_indices],
        width,
        label=names[i],
        color=cols[i],
        alpha=0.9,
    )

axs[0].set_yticks(x)
axs[0].set_yticklabels(sorted_features)
axs[0].set_xlabel("Feature Importance")
axs[0].legend(loc="lower right")

for spine in axs[0].spines.values():
    spine.set_linewidth(0.25)
axs[0].tick_params(axis="both", which="major", width=0.25)

# Plot permutation importance
x = np.arrange(len(sorted_perm_features))
for i, qpu in enumerate(qpus):
    axs[1].barh(
        x - width / 2 + i * width,
        grouped_perm_importances[qpu][sorted_perm_indices],
        width,
        label=names[i],
        color=cols[i],
        alpha=0.9,
    )

axs[1].set_yticks(x)
axs[1].set_yticklabels(sorted_perm_features)
axs[1].set_xlabel("Permutation Importance")
axs[1].legend(loc="lower right")

for spine in axs[1].spines.values():
    spine.set_linewidth(0.25)
axs[1].tick_params(axis="both", which="major", width=0.25)

# Adjust layout and save the plot
fig.tight_layout()
plt.savefig(data_dir / "figures" / "importance.pdf", bbox_inches="tight")