# Ligthning Fast One-Shot Side Entanglement Optimization

## Imports

In [63]:
import numpy as np
import random
import itertools
import math
import time
from typing import Optional, Tuple, cast, List, Dict
from bitarray.util import urandom, count_xor
from bitarray import frozenbitarray
from qiskit import Aer, QuantumCircuit, execute, QuantumRegister, ClassicalRegister
from qiskit.result.result import Result
from qiskit.aqua.components.optimizers import SLSQP, L_BFGS_B, CRS, DIRECT_L, DIRECT_L_RAND, ESCH, ISRES

## General auxiliary functions

In [30]:
def get_combinations_two_etas_without_repeats_from_etas(angles_etas: List[float]) -> List[Tuple[float, float]]:
    """ from a given list of attenuations factors create a
        list of all combinatorial pairs of possible etas
        without repeats
        For us it is the same testing first eta 0.1 and second eta 0.2
        than first eta 0.2 and second eta 0.1
        Though, we will always put the greater value as the first pair element
    """
    # when there is only one element, we add the same element
    if len(angles_etas) == 1:
        angles_etas.append(angles_etas[0])
    # get combinations of two etas without repeats
    eta_pairs = list(itertools.combinations(angles_etas, 2))

    return reorder_pairs(eta_pairs)

In [31]:
def reorder_pairs(pairs: List[Tuple[float, float]]) -> List[Tuple[float, float]]:
    """ reorder received pairs setting first the element of the tuple
        as greater or equal de second one
    """
    reordered_pairs = pairs
    for idx, pair in enumerate(pairs):
        reordered_pairs[idx] = reorder_pair(pair)
    return reordered_pairs

In [32]:
def reorder_pair(pair: Tuple[float, float]) -> Tuple[float, float]:
    if pair[0] < pair[1]:
        return (pair[1], pair[0])
    return pair

## Auxiliary functions for preparing the optimization (until the cost function)

In [45]:
def find_optimal_configurations(optimization_setup: dict, clone_setup: Optional[dict]=None) -> dict:
    """ Finds out the optimal configuration for each pair of attenuation levels
        using the configured optimization algorithm """
    eta_pairs_idx_to_optimize, optimal_configurations = _select_eta_pairs_to_optimize(clone_setup)

    print(f'number of eta_pairs_idx_to_optimize: {len(eta_pairs_idx_to_optimize)} -> {eta_pairs_idx_to_optimize}')

    program_start_time = time.time()
    print("Starting the execution")

    for eta_pair_idx in eta_pairs_idx_to_optimize:
        start_time = time.time()
        GLOBAL_ETA_PAIR = GLOBAL_ETA_PAIRS[eta_pair_idx]
        result = _compute_best_configuration(optimization_setup)
        optimal_configurations['probabilities'].append(result['best_probability'])
        optimal_configurations['configurations'].append(result['best_configuration'])
        optimal_configurations['best_algorithm'].append(result['best_algorithm'])
        optimal_configurations['number_calls_made'].append(result['number_calls_made'])
        end_time = time.time()
        print(f"Pair of etas # {eta_pair_idx} of {len(eta_pairs_idx_to_optimize)}, time taken this pair of etas: " +
              f'{np.round((end_time - start_time)/60, 0)} minutes' +
              f' and {np.round((end_time - start_time) % 60, 0)} seconds')
        print("total minutes taken this pair of etas: ", int(np.round((end_time - start_time) / 60)))
        print("total time taken so far: " +
              f'{np.round((end_time - program_start_time)/60, 0)} minutes' +
              f' and {np.round((end_time - program_start_time) % 60, 0)} seconds')
    end_time = time.time()
    print("total minutes of execution time: ", int(np.round((end_time - program_start_time) / 60)))
    print(f'Number eta pairs optimized: {len(eta_pairs_idx_to_optimize)}' +
          f'from the total eta pairs: {len(GLOBAL_ETA_PAIRS)} ')
    optimal_configurations['eta_pairs'] = GLOBAL_ETA_PAIRS

    return optimal_configurations

In [35]:
    def _select_eta_pairs_to_optimize(clone_setup: Optional[dict]) -> Tuple[List[int], dict]:
        """ from the given clone setup, select the eta pairs to be optimized,
            and set the non computed pairs configuration as default values   """
        eta_pair_idx_init, eta_pair_idx_end = _set_eta_pair_index_bounds(clone_setup)
        index_dict = _build_eta_pair_index_lists(eta_pair_idx_init, eta_pair_idx_end)
        default_optimal_configurations = _set_default_optimal_configurations(index_dict['eta_pair_idx_to_skip'])

        return (index_dict['eta_pair_idx_to_compute'],
                default_optimal_configurations)

In [49]:
def _set_default_optimal_configurations(eta_pair_idx_to_skip: List[int]) -> dict:
    """ Return the optimal configurations set to default values for the indexes to be skipped """
    elements_to_skip = len(eta_pair_idx_to_skip)

    configurations: List[dict] = []
    for eta_pair_idx in eta_pair_idx_to_skip:
        one_configuration = {'state_probability': 0,
                             'angle_rx': 0,
                             'angle_ry': 0,
                             'eta_pair': GLOBAL_ETA_PAIRS[eta_pair_idx]}
        configurations.append(one_configuration)

    return {'eta_pairs': [],
            'best_algorithm': ['NA'] * elements_to_skip,
            'probabilities': [0] * elements_to_skip,
            'configurations': configurations,
            'number_calls_made': [0] * elements_to_skip}

In [47]:
def _set_eta_pair_index_bounds(clone_setup: Optional[dict]) -> Tuple[int, int]:
    """ set the first and last eta pair index from which to optimize the configuration """
    total_eta_pairs = len(GLOBAL_ETA_PAIRS)

    if clone_setup is None or clone_setup['total_clones'] <= 1:
        return (0, total_eta_pairs)

    eta_pair_idx_init = int(np.floor(clone_setup['id_clone'] * total_eta_pairs / clone_setup['total_clones']))
    eta_pair_idx_end = min(int((clone_setup['id_clone'] + 1) *
                               total_eta_pairs / clone_setup['total_clones']), total_eta_pairs)
    return (eta_pair_idx_init, eta_pair_idx_end)

def _build_eta_pair_index_lists(eta_pair_idx_init: int, eta_pair_idx_end: int) -> Dict:
    """ create two lists with the the eta pair index to be computed and the index to be skipped """
    first_part_to_skip = list(range(0, eta_pair_idx_init))
    last_part_to_skip = list(range(eta_pair_idx_end, len(GLOBAL_ETA_PAIRS)))

    return {
        'eta_pair_idx_to_compute': list(range(eta_pair_idx_init, eta_pair_idx_end)),
        'eta_pair_idx_to_skip': first_part_to_skip + last_part_to_skip
    }

In [67]:
def _compute_best_configuration(optimization_setup: dict) -> dict:
    """ Find out the best configuration with a global pair of etas (channels) trying out
        a list of specified optimization algorithm """
    optimizer_algorithms = optimization_setup['optimizer_algorithms']
    optimizer_iterations = optimization_setup['optimizer_iterations']
    best_probability = 0
    best_configuration = []
    best_optimizer_algorithm = ""
    number_calls_made = 0

    for optimizer_algorithm, max_evals in zip(optimizer_algorithms, optimizer_iterations):
        print("Analyzing Optimizer Algorithm: ", optimizer_algorithm)
        if optimizer_algorithm == 'SLSQP':
            optimizer = SLSQP(maxiter=max_evals)
        if optimizer_algorithm == 'L_BFGS_B':
            optimizer = L_BFGS_B(maxfun=max_evals, maxiter=max_evals)
        if optimizer_algorithm == 'CRS':
            optimizer = CRS(max_evals=max_evals)
        if optimizer_algorithm == 'DIRECT_L':
            optimizer = DIRECT_L(max_evals=max_evals)
        if optimizer_algorithm == 'DIRECT_L_RAND':
            optimizer = DIRECT_L_RAND(max_evals=max_evals)
        if optimizer_algorithm == 'ESCH':
            optimizer = ESCH(max_evals=max_evals)
        if optimizer_algorithm == 'ISRES':
            optimizer = ISRES(max_evals=max_evals)

        ret = optimizer.optimize(num_vars=len(optimization_setup['initial_parameters']),
                                 objective_function=_cost_function,
                                 variable_bounds=optimization_setup['variable_bounds'],
                                 initial_point=optimization_setup['initial_parameters'])
        print("Best Average Probability:", 1 - ret[1])
        if (1 - ret[1]) > best_probability:
            best_configuration = ret[0]
            best_probability = 1 - ret[1]
            number_calls_made = ret[2]
            best_optimizer_algorithm = optimizer_algorithm

    # Print results
    print("Final Best Optimizer Algorithm: ", best_optimizer_algorithm)
    print("Final Best Average Probability:", best_probability)
    print("Number of cost function calls made:", number_calls_made)
    print("Parameters Found: state_probability = " + " = " + str(best_configuration[0]) +
          ", " + u"\u03D5" + "rx = " + str(int(math.degrees(best_configuration[1]))) + u"\u00B0" +
          ", " + u"\u03D5" + "ry = " + str(int(math.degrees(best_configuration[2]))) + u"\u00B0" +
          ", " + u"\u03B7" + u"\u2080" + " = " + str(int(math.degrees(GLOBAL_ETA_PAIR[0]))) + u"\u00B0" +
          ", " + u"\u03B7" + u"\u2081" + " = " + str(int(math.degrees(GLOBAL_ETA_PAIR[1]))) + u"\u00B0")
    
    best_configuration_dict = {'state_probability': best_configuration[0],
                               'angle_rx': best_configuration[1],
                               'angle_ry': best_configuration[2],
                               'eta_pair': GLOBAL_ETA_PAIR} 
    
    return {'best_algorithm': best_optimizer_algorithm,
            'best_probability': best_probability,
            'best_configuration': best_configuration_dict,
            'number_calls_made': number_calls_made}

In [42]:
def _cost_function(params: List[float]) -> float:
    """ Computes the cost of running a specific configuration for the number of plays
        defined in the optimization setup.
        Cost is computed as 1 (perfect probability) - average success probability for
        all the plays with the given configuration
        Returns the Cost (error probability).
    """
    configuration = {
        'state_probability': params[0],
        'angle_rx': params[1],
        'angle_ry': params[2],
        'eta_pair': GLOBAL_ETA_PAIR}

    return 1 - compute_new_average_success_probability(configuration=configuration,
                                                       plays=GLOBAL_PLAYS)

## Auxiliary functions for fast compute the circuit and success average probability

In [71]:
def compute_new_average_success_probability(configuration: dict,
                                            plays: Optional[int] = 100,) -> float:
    """ Computes the average success probability of running a specific configuration
        for the number of plays defined in the configuration.
    """
    random_etas, eta_shots = _get_random_etas_and_eta_shots(plays)
    guesses_eta = _run_all_circuits_and_return_guess(configuration, eta_shots)
    return _check_guesses_and_return_average_success_probability(plays, random_etas, guesses_eta)

In [69]:
def _get_random_etas_and_eta_shots(plays: int) -> Tuple[frozenbitarray, Tuple[int, int]]:
    """ create a random bit string with length the number of plays and calculate the
        number of shots for each eta based on the random string value (0 -> eta0, 1 -> eta1)
    """
    random_etas = frozenbitarray(urandom(plays))
    eta1_shots = random_etas.count()
    eta0_shots = plays - eta1_shots
    return (random_etas, (eta0_shots, eta1_shots))

In [13]:
def _check_guesses_and_return_average_success_probability(plays: int, 
                                                          random_etas: frozenbitarray, 
                                                          guesses_eta: Tuple[List[int],List[int]]) -> float:
    """ check the guessed etas with the initial random etas and compute the average with a xor bitwise operation """
    guesses = frozenbitarray([guesses_eta[random_eta].pop(0) for random_eta in random_etas])
    return 1 - (count_xor(random_etas, guesses) / plays)

In [14]:
def _run_all_circuits_and_return_guess(configuration: dict,
                                       eta_shots: Optional[Tuple[int, int]] = (1, 1)) -> Tuple[List[int],
                                                                                               List[int]]:
    """ Create a pair of Quantum Circuits, in its transpiled form, from a given configuration """
    if eta_shots is None:
        eta_shots = (1, 1)

    reordered_configuration = _reorder_configuration(configuration)
    eta_memories = [cast(Result, execute(_create_one_circuit(reordered_configuration, eta),
                                         backend=GLOBAL_BACKEND,
                                         shots=eta_shots[idx],
                                         memory=True).result()).get_memory()
                    for idx, eta in enumerate(reordered_configuration['eta_pair'])]

    if len(eta_memories) > 2:
        raise ValueError('Results must have length 2')
    guesses = [_get_guesses_from_one_eta_memories(one_eta_memories)
               for one_eta_memories in eta_memories]
    if len(guesses) > 2:
        raise ValueError('Guesses must have length 2')
    return (guesses[0], guesses[1])

In [17]:
def _reorder_configuration(configuration: dict):
    return {'state_probability': configuration['state_probability'],
            'angle_rx': configuration['angle_rx'],
            'angle_ry': configuration['angle_ry'],
            'eta_pair': reorder_pair(configuration['eta_pair'])
    }

In [65]:
def _create_one_circuit(configuration: dict,
                        eta: float) -> QuantumCircuit:
    """ Creates one circuit from a given configuration and eta """
    qreg_q = QuantumRegister(3, 'q')
    creg_c = ClassicalRegister(2, 'c')

    initial_state = _prepare_initial_state_entangled(configuration['state_probability'])

    circuit = QuantumCircuit(qreg_q, creg_c)
    circuit.initialize(initial_state, [0, 1])
    circuit.reset(qreg_q[2])
    circuit.cry(2 * eta, qreg_q[1], qreg_q[2])
    circuit.cx(qreg_q[2], qreg_q[1])
    circuit.rx(configuration['angle_rx'], qreg_q[1])
    circuit.ry(configuration['angle_ry'], qreg_q[1])
    circuit.barrier()
    circuit.measure([0, 1], creg_c)
    return circuit

In [20]:
def _prepare_initial_state_entangled(state_probability: float) -> Tuple[complex, complex, complex, complex]:
    """ Prepare initial state: computing 'y' as the amplitudes  """
    # ORIGINAL
    #  return (0, np.sqrt(state_probability), np.sqrt(1 - state_probability), 0)
    return (0, np.sqrt(1 - state_probability), np.sqrt(state_probability), 0)

In [21]:
def _get_guesses_from_one_eta_memories(one_eta_memories: List[str]) -> List[int]:
    return [_guess_eta_from_counts(one_eta_memory) for one_eta_memory in one_eta_memories]

In [22]:
def _guess_eta_from_counts(counts: str) -> int:
    """ Decides which eta was used on the real execution from the 'counts' measured
        based on the guess strategy that is required to use
    """
    return _guess_eta_used_two_bit_strategy(counts)

In [24]:
def _guess_eta_used_two_bit_strategy(counts: str) -> int:
    """ Decides which eta was used on the real execution from the two 'counts' measured
        Qubits order MATTER!!!!
        "01" means that:
          the LEFTMOST bit (0) corresponds to the measurement of the qubit that goes THROUGH the channel
          and the RIGHTMOST bit (1) corresponds to the measurement of the qubit that goes OUTSIDE the channel
        Remember that we are only sending |01> + |10> entangles states
        Setting eta0 >= eta1:
            * outcome 00 -> eta0 as the most probable (more attenuation)
            * outcome 01 -> we do not know if there has been attenuation. 50% chance, random choice
            * outcome 10 -> eta1 as the most probable (less attenuation)
            * outcome 11 -> not possible, but in case we get it (from noisy simulation), 50% chance, random choice
    """
    if len(counts) != 2:
        raise ValueError('counts MUST be a two character length string')
    if counts == "00":
        return 0
    if counts == "10":
        return 1
    if counts == "01" or counts == "11":
        return random.choice([0, 1])
    raise ValueError("Accepted counts are '00', '01', '10', '11'")

## Global Variables

In [41]:
GLOBAL_BACKEND=Aer.get_backend('qasm_simulator')
GLOBAL_ETA_PAIR = (0,0)
optimization_setup = { 'optimizer_algorithms': ['CRS'],
                        'optimizer_iterations': [100],
                        'attenuation_angles': np.append(np.arange(0, np.pi/2, np.pi/2/20), np.pi/2),
                        'initial_parameters': [0, 0, 0],
                        'variable_bounds': [(0, 1),   # amplitude_probability
                                            (0, 2*np.pi),  # rx
                                            (0, 2*np.pi)], # ry
                        'plays': 100}
GLOBAL_ETA_PAIRS = get_combinations_two_etas_without_repeats_from_etas(optimization_setup['attenuation_angles'])
GLOBAL_PLAYS = optimization_setup['plays']

## Launch Optimization

In [72]:
results = find_optimal_configurations(optimization_setup=optimization_setup)

number of eta_pairs_idx_to_optimize: 210 -> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209]
Starting the ex

SystemError: <built-in function get_default_endian> returned a result with an error set