In [1]:
# ruff: noqa
import sys, os
sys.path.append(os.path.abspath("./../feedback-grape"))
sys.path.append(os.path.abspath("./../"))

# ruff: noqa
from feedback_grape.fgrape import optimize_pulse, evaluate_on_longer_time
from helpers import (
    init_grape_protocol,
    init_fgrape_protocol,
    test_implementations,
    generate_random_discrete_state,
    generate_random_bloch_state,
    generate_excited_state
)
from tqdm import tqdm
import jax
import jax.numpy as jnp
import numpy as np
from library.utils.FgResult_to_dict import FgResult_to_dict
import json

test_implementations()

jax.config.update("jax_enable_x64", True)

In [2]:
# Physical parameters
# (attention! #elements in density matrix grow as 4^N_chains)
N_chains = 3 # Number of parallel chains to simulate
gamma = 0.25 # Decay constant
generate_state = generate_random_discrete_state

# Training and evaluation parameters
model_type = "lut" # "lut" or "rnn"
training_params = {
    "N_samples": 3, # Number of random initial states to sample
    "N_training_iterations": 10000, # Number of training iterations
    "learning_rate": 0.01, # Learning rate
    "convergence_threshold": 1e-6,
    "batch_size": 8,
    "eval_batch_size": 8,
    "evaluation_time_steps": 50,
}
N_parallel_threads = 4 # Number of parallel threads for training
save_all_samples = True # wether to evaluate and save all training samples or only the best one

# Parameters to test

#num_time_steps : Number of time steps in the control pulse
#lut_depth : Depth of the lookup table for feedback
#reward_weights: Weights for the reward at each time step. Default only weights last timestep [0, 0, ... 0, 1]

experiments = [ # (timesteps, lut_depth, reward_weights, noise_level)
    (t, l, [0]*(t-w) + [1]*w, noise) for t in [3] for w in range(1, t+1) for l in [2] for noise in [0.0]
]

experiments_format = [ # Format of variables as tuples (name, type, required by grape, required by lut, required by rnn)
    ("t", int, True, True, True), # number timesteps (or segments)
    ("l", int, False, True, False), # LUT depth
    ("w", list, True, True, True), # reward weights
    ("noise", float, True, True, True), # noise level
]

# Check if experiment parameters are in specified format
for params in experiments:
    assert len(params) == len(experiments_format), "Experiment parameters length mismatch."
    for i, (param, (name, expected_type, req_grape, req_lut, req_rnn)) in enumerate(zip(params, experiments_format)):
        assert type(name) == str, "Parameter name must be a string."
        assert type(req_grape) == bool, "Requirement flags must be boolean."
        assert type(req_lut) == bool, "Requirement flags must be boolean."
        assert type(req_rnn) == bool, "Requirement flags must be boolean."
        assert type(param) == expected_type, f"Parameter '{name}' at index {i} must be of type {expected_type.__name__}."
        if type(param) == list:
            for j, item in enumerate(param):
                assert type(item) == int and 0 <= item <= 9, f"Item at index {j} in parameter '{name}' must be of type int and between 0 and 9."

def generate_param_paths(params, model_type, save_all_samples, s):
    assert model_type in ["grape", "lut", "rnn"], "Invalid model type."

    parts = []
    for (name, expected_type, req_grape, req_lut, req_rnn), param in zip(experiments_format, params):
        required = (model_type == "grape" and req_grape) or (model_type == "lut" and req_lut) or (model_type == "rnn" and req_rnn)
        if required:
            if expected_type == list:
                param_str = "".join(str(x) for x in param)
            else:
                param_str = str(param)
            parts.append(f"{name}={param_str}")

    pstr = f"{model_type}_" + "_".join(parts)
    
    if save_all_samples:
        pstr += f"_s={s}"
    
    return (
        "./optimized_architectures/" + pstr + ".json",
        "./optimized_architectures/" + pstr + "_training_data.json",
        "./evaluation_results/" + pstr + ".npz"
    )

if not os.path.exists("./evaluation_results"):
    os.makedirs("./evaluation_results")
if not os.path.exists("./optimized_architectures"):
    os.makedirs("./optimized_architectures")

In [3]:
from concurrent.futures import ThreadPoolExecutor
def run_experiment(params):
    num_time_steps, lut_depth, reward_weights, noise_level = params
    print(f"Evaluating {params}")
    noisy_state = lambda key: generate_state(key, N_chains=N_chains, noise_level=noise_level)
    pure_state = lambda key: generate_state(key, N_chains=N_chains, noise_level=0.0)

    best_result = None
    best_result_fidelities = None
    fidelities_each = []
    for s in tqdm(range(training_params["N_samples"])):
        system_params = init_fgrape_protocol(jax.random.PRNGKey(s), N_chains, gamma)
        # Train LUT
        result = optimize_pulse(
            U_0=noisy_state,
            C_target=pure_state,
            system_params=system_params,
            num_time_steps=num_time_steps,
            lut_depth=lut_depth,
            reward_weights=reward_weights,
            mode="lookup" if model_type == "lut" else "nn",
            goal="fidelity",
            max_iter=training_params["N_training_iterations"],
            convergence_threshold=training_params["convergence_threshold"],
            learning_rate=training_params["learning_rate"],
            evo_type="density",
            batch_size=training_params["batch_size"],
            eval_batch_size=1, # This evaluation is discarded
        )
        eval_result = evaluate_on_longer_time( # Evaluate on longer time and choose best LUT accordingly
            U_0 = pure_state,
            C_target = pure_state,
            system_params = system_params,
            optimized_trainable_parameters = result.optimized_trainable_parameters,
            num_time_steps = training_params["evaluation_time_steps"],
            evo_type = "density",
            goal = "fidelity",
            eval_batch_size = training_params["eval_batch_size"],
            mode = "lookup" if model_type == "lut" else "nn",
        )
        fidelities = eval_result.fidelity_each_timestep

        if best_result is None or np.mean(fidelities) > np.mean(best_result_fidelities):
            best_result = result
            best_result_fidelities = fidelities

        fidelities_each.append(float(np.mean(fidelities)))

        if save_all_samples:
            path1, path2, path3 = generate_param_paths(params, model_type, save_all_samples, s)
            with open(path1, "w") as f:
                json.dump(FgResult_to_dict(result), f)

            with open(path2, "w") as f:
                training_data = {
                    "average_fidelity": float(np.mean(fidelities)),
                    "training_params": training_params,
                }
                json.dump(training_data, f)

            np.savez(path3, fidelities_lut=np.array(fidelities))

    path1, path2, path3 = generate_param_paths(params, model_type, False, s)
    np.savez(path3, fidelities_lut=np.array(best_result_fidelities))

    with open(path1, "w") as f:
        json.dump(FgResult_to_dict(best_result), f)

    with open(path2, "w") as f:
        training_data = {
            "fidelities_each_sample": fidelities_each,
            "average_fidelity": float(np.mean(fidelities_each)),
            "training_params": training_params,
        }
        json.dump(training_data, f)

# Run experiments in parallel
if N_parallel_threads > 1:
    with ThreadPoolExecutor(max_workers=N_parallel_threads) as executor:
        executor.map(run_experiment, experiments)
else:
    for params in experiments:
        run_experiment(params)

# Play a sound when done
os.system('say "done."')

Evaluating (3, 2, [0, 0, 1], 0.0)
Evaluating (3, 2, [0, 1, 1], 0.0)
Evaluating (3, 2, [1, 1, 1], 0.0)


  0%|          | 0/3 [00:00<?, ?it/s]
[A
 33%|███▎      | 1/3 [04:25<08:50, 265.05s/it]
 67%|██████▋   | 2/3 [08:19<04:07, 247.15s/it]
100%|██████████| 3/3 [12:00<00:00, 240.14s/it]
100%|██████████| 3/3 [12:10<00:00, 243.65s/it]
100%|██████████| 3/3 [12:13<00:00, 244.34s/it]


0