In [1]:
import qutip as qt
import numpy as np
import scqubits as scq
import matplotlib.pyplot as plt
import itertools
import warnings
import os
import time

In [2]:
levels = 6
fluxonium = scq.Fluxonium(EJ=8.9, EC=2.5, EL=0.5, flux=0.48, cutoff=110)
c_ops = None  # will be initialized once below

In [3]:
def init_c_ops():
    gamma_ij = {}
    for j in range(1, levels):
        for i in range(j):
            t1 = fluxonium.t1_capacitive(j, i, Q_cap=1e5)
            if t1 is not None and t1 > 0:
                rate = 1.0 / t1
                gamma_ij[(i, j)] = rate
                gamma_ij[(j, i)] = rate
    c_ops_local = []
    for (i, j), gamma in gamma_ij.items():
        cop = np.sqrt(gamma) * qt.basis(levels, i) * qt.basis(levels, j).dag()
        c_ops_local.append(cop)
    return c_ops_local

In [4]:
def evolve(omega_d, t_g):
    global c_ops
    if c_ops is None:
        c_ops = init_c_ops()

    evals, evecs = fluxonium.eigensys(evals_count=levels)
    n_op_energy_basis = qt.Qobj(fluxonium.process_op(fluxonium.n_operator(), energy_esys=(evals, evecs)))
    H0 = qt.Qobj(np.diag(evals))
    A = 0.1
    drive_op = n_op_energy_basis
    H = [H0, [A * drive_op, 'cos(wd * t)']]
    args = {'wd': omega_d}
    options = qt.Options(nsteps=1000000, store_states=True, atol=1e-10, rtol=1e-9)

    propagator = qt.propagator(H, t_g, args=args, options=options, c_ops=c_ops)
    propagator_kraus = qt.to_kraus(propagator)
    propagator_2x2 = [qt.Qobj(k.full()[:2, :2]) for k in propagator_kraus]
    p_2x2_super = qt.kraus_to_super(propagator_2x2)
    fidelity = qt.average_gate_fidelity(p_2x2_super, qt.sigmax())
    print("completed iteration")
    return fidelity

def wrapped_evolve(args):
    return evolve(*args)


## Serial Execution

In [5]:
# if __name__ == "__main__":
#     evals, _ = fluxonium.eigensys(evals_count=levels)
#     omega_d_base = evals[1] - evals[0]

#     omega_d_array = np.linspace(omega_d_base - 0.005, omega_d_base + 0.005, 10)
#     peak_time_noise = 559.5559555955596  # previously determined
#     t_g_array = np.linspace(0.99 * peak_time_noise, 1.01 * peak_time_noise, 10)
#     param_pairs = list(itertools.product(omega_d_array, t_g_array))
#     print(f"Total simulations to run: {len(param_pairs)}")

#     results_flat = []
#     for (omega_d, t_g) in param_pairs:
#         print(f"Running: omega_d={omega_d:.5f}, t_g={t_g:.2f}")
#         fidelity = evolve(omega_d, t_g)
#         results_flat.append(fidelity)

#     results = np.reshape(results_flat, (len(omega_d_array), len(t_g_array)))

#     max_idx = np.unravel_index(np.argmax(results), results.shape)
#     max_value = results[max_idx]
#     omega_d_best = omega_d_array[max_idx[0]]
#     t_g_best = t_g_array[max_idx[1]]

#     print("\n=== Final Results ===")
#     print(f"Best fidelity: {max_value}")
#     print(f"Found at omega_d = {omega_d_best}, t_g = {t_g_best}")
#     print(f"Indices in results array: {max_idx}")

## Parallel Execution

In [6]:
from joblib import Parallel, delayed
from tqdm.notebook import tqdm  # Better in Jupyter

In [7]:
# if __name__ == "__main__":
#     evals, _ = fluxonium.eigensys(evals_count=levels)
#     omega_d_base = evals[1] - evals[0]

#     omega_d_array = np.linspace(omega_d_base - 0.005, omega_d_base + 0.005, 10)
#     peak_time_noise = 559.5559555955596  
#     t_g_array = np.linspace(0.99 * peak_time_noise, 1.01 * peak_time_noise, 10)
#     param_pairs = list(itertools.product(omega_d_array, t_g_array))
#     print(f"Total simulations to run: {len(param_pairs)}")

#     # Parallel execution using joblib
#     results_flat = Parallel(n_jobs=-1)(
#         delayed(evolve)(omega_d, t_g)
#         for (omega_d, t_g) in tqdm(param_pairs, desc="Running simulations")
#     )

#     results = np.reshape(results_flat, (len(omega_d_array), len(t_g_array)))

#     max_idx = np.unravel_index(np.argmax(results), results.shape)
#     max_value = results[max_idx]
#     omega_d_best = omega_d_array[max_idx[0]]
#     t_g_best = t_g_array[max_idx[1]]

#     print("\n=== Final Results ===")
#     print(f"Best fidelity: {max_value}")
#     print(f"Found at omega_d = {omega_d_best}, t_g = {t_g_best}")
#     print(f"Indices in results array: {max_idx}")

In [8]:
# results

### Param map rework for windows

In [9]:
def parallel_map_qutip(task, values, task_args=tuple(), task_kwargs={}, **kwargs):
    os.environ["QUTIP_IN_PARALLEL"] = "TRUE"
    kw = _default_kwargs()
    if "num_cpus" in kwargs:
        kw["num_cpus"] = kwargs["num_cpus"]

    nfinished = [0]

    try:
        pool = mp.Pool(processes=kw["num_cpus"])

        async_res = [
            pool.apply_async(
                task, (value,) + task_args, task_kwargs
            )
            for value in values
        ]

 

        # while not all([ar.ready() for ar in async_res]):
        #     for ar in async_res:
        #         ar.wait(timeout=0.1)
        start_time = time.time()
        timeout_sec = 60  # 1 minute timeout for debugging
        while not all([ar.ready() for ar in async_res]):
            print(f"{sum(ar.ready() for ar in async_res)}/{len(async_res)} tasks completed...")
            time.sleep(1)
            if time.time() - start_time > timeout_sec:
                raise TimeoutError("parallel_map_qutip is hanging or a worker failed.")

 

        pool.terminate()
        pool.join()
        # return results

 

    except KeyboardInterrupt as e:
        os.environ["QUTIP_IN_PARALLEL"] = "FALSE"
        pool.terminate()
        pool.join()
        raise e


try:
    # pathos implementation is much more robust - should install if not present
    import pathos.multiprocessing as mp
except ImportError:
    # but default to std library version
    print(
        "using std lib version of multiprocessing; consider installing pathos; it's much more robust"
    )
    import multiprocessing as mp
    


def varg_opt(data, axis=None, opt_fun=np.nanargmin):
    """
    Return an index of a (possibly) multi-dimensional array of the element that
    optimizes a given function along with the optimal value.
    """
    index = arg_opt(data, axis=axis, opt_fun=opt_fun)
    return index, data[index]

def parallel_map_adapter(f, iterable):
    return parallel_map_qutip(f, list(iterable))


In [10]:
def param_map(f, parameters, map_fun=map, dtype=object):

    dims_list = [len(i) for i in parameters]
    total_dim = np.prod(dims_list)
    parameters_prod = tuple(itertools.product(*parameters))

    data = np.empty(total_dim, dtype=dtype)
    # for i, d in enumerate(map_fun(f, parameters_prod)):
    #     data[i] = d
    for i, d in enumerate(map_fun(lambda args: f(*args), parameters_prod)):
        data[i] = d
    print("complete param map")
    return np.reshape(data, dims_list)

In [11]:
def parallel_map_qutip_2(task, values, task_args=tuple(), task_kwargs={}, **kwargs):
    os.environ["QUTIP_IN_PARALLEL"] = "TRUE"
    kw = _default_kwargs()
    if "num_cpus" in kwargs:
        kw["num_cpus"] = kwargs["num_cpus"]

    try:
        pool = mp.Pool(processes=kw["num_cpus"])
        async_res = [
            pool.apply_async(
                task, (value,) + task_args, task_kwargs
            )
            for value in values
        ]

        # Collect results
        results = [ar.get() for ar in async_res]

        # Wait for all tasks to complete
        start_time = time.time()
        timeout_sec = 600  # Increased timeout for complex simulations
        while not all([ar.ready() for ar in async_res]):
            print(f"{sum(ar.ready() for ar in async_res)}/{len(async_res)} tasks completed...")
            time.sleep(1)
            if time.time() - start_time > timeout_sec:
                raise TimeoutError("parallel_map_qutip is hanging or a worker failed.")

        pool.close()
        pool.join()
        os.environ["QUTIP_IN_PARALLEL"] = "FALSE"
        return results

    except KeyboardInterrupt as e:
        os.environ["QUTIP_IN_PARALLEL"] = "FALSE"
        pool.terminate()
        pool.join()
        raise e
    except Exception as e:
        os.environ["QUTIP_IN_PARALLEL"] = "FALSE"
        pool.terminate()
        pool.join()
        raise e

In [12]:
evals, _ = fluxonium.eigensys(evals_count=levels)
omega_d_base = evals[1] - evals[0]

omega_d_array = np.linspace(omega_d_base - 0.005, omega_d_base + 0.005, 10)
peak_time_noise = 559.5559555955596  # previously determined
t_g_array = np.linspace(0.99 * peak_time_noise, 1.01 * peak_time_noise, 10)
param_pairs = list(itertools.product(omega_d_array, t_g_array))
print(f"Total simulations to run: {len(param_pairs)}")

warnings.filterwarnings(
    "ignore",
    module="qutip.*"  # Regex pattern to match all warnings from qutip
)
scq.settings.T1_DEFAULT_WARNING=False

try:
    # pathos implementation is much more robust - should install if not present
    import pathos.multiprocessing as mp
except ImportError:
    # but default to std library version
    print(
        "using std lib version of multiprocessing; consider installing pathos; it's much more robust"
    )
    import multiprocessing as mp

def _default_kwargs():
    return {"num_cpus": os.cpu_count() or 1}

#single process
# fidelity_results = param_map(evolve, [omega_d_array, t_g_array])

#parallel process
# fidelity_results = param_map(wrapped_evolve, [omega_d_array, t_g_array], map_fun=parallel_map_qutip)

fidelity_results = param_map(wrapped_evolve, [omega_d_array, t_g_array], map_fun=parallel_map_qutip)

Total simulations to run: 100
0/100 tasks completed...


TypeError: 'NoneType' object is not iterable

In [None]:
fidelity_results

--------------

## parallel_map from qutip parallel

In [24]:
import datetime
from ipyparallel import Client


def parallel_map(task, values, task_args=None, task_kwargs=None,
                 client=None, view=None, progress_bar=None,
                 show_scheduling=False, **kwargs):
    """
    Call the function ``task`` for each value in ``values`` using a cluster
    of IPython engines. The function ``task`` should have the signature
    ``task(value, *args, **kwargs)``.

    The ``client`` and ``view`` are the IPython.parallel client and
    load-balanced view that will be used in the parfor execution. If these
    are ``None``, new instances will be created.

    Parameters
    ----------

    task: a Python function
        The function that is to be called for each value in ``task_vec``.

    values: array / list
        The list or array of values for which the ``task`` function is to be
        evaluated.

    task_args: list / dictionary
        The optional additional argument to the ``task`` function.

    task_kwargs: list / dictionary
        The optional additional keyword argument to the ``task`` function.

    client: IPython.parallel.Client
        The IPython.parallel Client instance that will be used in the
        parfor execution.

    view: a IPython.parallel.Client view
        The view that is to be used in scheduling the tasks on the IPython
        cluster. Preferably a load-balanced view, which is obtained from the
        IPython.parallel.Client instance client by calling,
        view = client.load_balanced_view().

    show_scheduling: bool {False, True}, default False
        Display a graph showing how the tasks (the evaluation of ``task`` for
        for the value in ``task_vec1``) was scheduled on the IPython engine
        cluster.

    show_progressbar: bool {False, True}, default False
        Display a HTML-based progress bar during the execution of the parfor
        loop.

    Returns
    --------
    result : list
        The result list contains the value of
        ``task(value, task_args, task_kwargs)`` for each
        value in ``values``.

    """
    submitted = datetime.datetime.now()

    if task_args is None:
        task_args = tuple()

    if task_kwargs is None:
        task_kwargs = {}

    if client is None:
        client = Client()

        # make sure qutip is available at engines
        dview = client[:]
        dview.block = True
        dview.execute("from qutip import *")

    if view is None:
        view = client.load_balanced_view()

    ar_list = [view.apply_async(task, value, *task_args, **task_kwargs)
               for value in values]

    if progress_bar is None:
        view.wait(ar_list)
    else:
        if progress_bar is True:
            progress_bar = HTMLProgressBar()

        n = len(ar_list)
        progress_bar.start(n)
        while True:
            n_finished = sum([ar.progress for ar in ar_list])
            progress_bar.update(n_finished)

            if view.wait(ar_list, timeout=0.5):
                progress_bar.update(n)
                break
        progress_bar.finished()

    if show_scheduling:
        metadata = [[ar.engine_id,
                     (ar.started - submitted).total_seconds(),
                     (ar.completed - submitted).total_seconds()]
                    for ar in ar_list]
        _visualize_parfor_data(metadata)

    return [ar.get() for ar in ar_list]


In [25]:
results = parallel_map(
    task=wrapped_evolve,
    values=param_pairs,
    progress_bar=True
)


Waiting for connection file: ~\.ipython\profile_default\security\ipcontroller-client.json


OSError: Connection file '~\\.ipython\\profile_default\\security\\ipcontroller-client.json' not found.
You have attempted to connect to an IPython Cluster but no Controller could be found.
Please double-check your configuration and ensure that a cluster is running.