# Aux

In [None]:
import numpy as np
import pennylane as qml 
import torch

def create_circuit(n_qubits,n_layers=None,circ = "simplified_two_design",fim=False, shots=None):

    dev = qml.device("default.qubit.torch", wires=n_qubits, shots=shots)

    def RZRY(params):        #qml.SpecialUnitary(params, wires=range(n_qubits))
        #qml.SimplifiedTwoDesign(initial_layer_weights=init_params, weights=params, wires=range(n_qubits))
        #qml.AngleEmbedding(params,wires=range(n_qubits))
        for q in range(n_qubits):
            qml.Hadamard(wires=q)

        for w in range(n_layers): 
            for q in range(n_qubits):
                index = w * (2*n_qubits) + q * 2
                qml.RZ(params[index],wires=q)
                qml.RY(params[index + 1],wires=q)
        
        qml.broadcast(qml.CNOT , wires=range(n_qubits), pattern="all_to_all")
        
        return qml.probs(wires=range(n_qubits))

    def S2D(init_params,params,measurement_qubits=0,prod_approx=False):
        #qml.SpecialUnitary(params, wires=range(n_qubits))
        qml.SimplifiedTwoDesign(initial_layer_weights=init_params, weights=params, wires=range(n_qubits))
        
        #qml.broadcast(qml.CNOT , wires=range(n_qubits), pattern="all_to_all")
        if not prod_approx:
            return qml.probs(wires=list(range(measurement_qubits)))
        else:
            return [qml.probs(i) for i in range(measurement_qubits)]

    def SU(params):
        qml.SpecialUnitary(params, wires=range(n_qubits))
        
        ZZ = qml.operation.Tensor(qml.PauliZ(0), qml.PauliZ(1))
        for i in range(2,n_qubits):
            ZZ = qml.operation.Tensor(ZZ, qml.PauliZ(i))

        return qml.expval(ZZ)
    
    def simmpleRZRY(params,cnots=True):
        qml.broadcast(qml.Hadamard, wires=range(n_qubits), pattern="single")
        qml.broadcast(qml.RZ, wires=range(n_qubits), pattern="single", parameters=params[0])
        qml.broadcast(qml.RY, wires=range(n_qubits), pattern="single", parameters=params[1])
        if cnots:
            qml.broadcast(qml.CNOT, wires=range(n_qubits), pattern="chain")

            return qml.expval(qml.PauliZ(n_qubits-1))
        else:
            ZZ = qml.operation.Tensor(qml.PauliZ(0), qml.PauliZ(1))
            for i in range(2,n_qubits):
                ZZ = qml.operation.Tensor(ZZ, qml.PauliZ(i))

            return qml.expval(ZZ)
        
    def RY(params,y=True,probs=False,prod=False, entanglement=None):
        #qml.broadcast(qml.Hadamard, wires=range(n_qubits), pattern="single")
        qml.broadcast(qml.RY, wires=range(n_qubits), pattern="single", parameters=params)
        #qml.broadcast(qml.CZ, wires=range(n_qubits), pattern="all_to_all")

        if entanglement=="all_to_all":
            qml.broadcast(qml.CNOT, wires=range(n_qubits), pattern="all_to_all")
        
        if y==True:
            #YY = qml.operation.Tensor(qml.PauliY(0), qml.PauliY(1))
            YY = [qml.PauliZ(0), qml.PauliZ(1)]
            for i in range(2,n_qubits):
                #YY = qml.operation.Tensor(YY, qml.PauliY(i))
                YY.append(qml.PauliZ(i))
            
            #return [qml.expval(i) for i in YY]
            return qml.expval(YY)

        elif probs==False:

            ZZ = qml.operation.Tensor(qml.PauliZ(0), qml.PauliZ(1))
            #ZZ = [qml.PauliZ(0), qml.PauliZ(1)]
            for i in range(2,n_qubits):
                ZZ = qml.operation.Tensor(ZZ, qml.PauliZ(i))        
                #ZZ.append(qml.PauliZ(i))        

            #return [qml.expval(i) for i in ZZ]
            return qml.expval(ZZ)

        else:
            if prod:
                return [qml.probs(i) for i in range(n_qubits)]
            else:
                return qml.probs(wires=range(n_qubits))
            
        
        
    def GHZ(params,measurement_qubits=0):
        qml.RY(params,wires=0)
        qml.broadcast(qml.CNOT, wires=range(n_qubits), pattern="chain")

        return qml.probs(wires=range(measurement_qubits))

    def random_product_state(params,gate_sequence=None):
                
        for i in range(n_qubits):
            qml.RY(np.pi / 4, wires=i)

        for ll in range(len(params)):

            for i in range(n_qubits):
                gate_sequence["{}{}".format(ll,i)](params[ll][i], wires=i)

            #for i in range(n_qubits - 1):
                #qml.CZ(wires=[i, i + 1])
    def SEL(params, measurement_qubits=0):
        qml.StronglyEntanglingLayers(params, wires=range(n_qubits))
        return qml.probs(wires=range(measurement_qubits))
    
    def RL(params, measurement_qubits=0):
        qml.RandomLayers(params, ratio_imprim=0.8 ,imprimitive=qml.CZ, wires=range(n_qubits))
        return qml.probs(wires=range(measurement_qubits))
    
    if circ == "rzry":
        qcircuit = RZRY
    elif circ == "simplified_two_design":
        qcircuit = S2D
    elif circ == "special_unitary":
        qcircuit = SU
    elif circ == "simpleRZRY":
        qcircuit = simmpleRZRY
    elif circ == "RY":
        qcircuit = RY
    elif circ == "ghz":
        qcircuit = GHZ
    elif circ == "random_product_state":
        qcircuit = random_product_state
    elif circ == "SEL":
        qcircuit = SEL
    elif circ == "RL":
        qcircuit = RL
    if not fim:
        circuit = qml.QNode(qcircuit, dev,interface="torch", diff_method="backprop")
    else:
        circuit = qml.QNode(qcircuit, dev)

    return circuit

def compute_gradient(log_prob, w):
    """Compute gradient of the log probability with respect to weights.
    
    Args:
    - log_prob (torch.Tensor): The log probability tensor.
    - w (torch.Tensor): The weights tensor, with requires_grad=True.

    Returns:
    - numpy.ndarray: The gradient of log_prob with respect to w, flattened.
    """
    if w.grad is not None:
        w.grad.zero_()
    log_prob.backward(retain_graph=True)
    
    if w.grad is None:
        raise ValueError("The gradient for the given log_prob with respect to w is None.")
    
    return w.grad.view(-1).detach().numpy()

def policy(probs, policy_type="contiguous-like", n_actions=2, n_qubits=1):

    if policy_type == "contiguous-like":
        return probs
    elif policy_type == "parity-like":
        policy = torch.zeros(n_actions)
        for i in range(len(probs)):
            a=[]
            for m in range(int(np.log2(n_actions))):
                if m==0:    
                    bitstring = np.binary_repr(i,width=n_qubits)
                else:
                    bitstring = np.binary_repr(i,width=n_qubits)[:-m]
                
                a.append(bitstring.count("1") % 2)
            policy[int("".join(str(x) for x in a),2)] += probs[i]

        return policy    
    
def compute_policy_and_gradient(args):
    n_qubits, shapes, type , n_actions, policy_type, clamp = args

    if policy_type == "parity-like":
        measure_qubits = n_qubits
    else:
        measure_qubits = int(np.log2(n_actions))

    qc = create_circuit(n_qubits, circ=type, fim=False, shots=None)

    if type == "simplified_two_design":
        weights = [np.random.uniform(-np.pi,np.pi,size=shape) for shape in shapes]    
        weights_tensor_init = torch.tensor(weights[0], requires_grad=False)
        weights_tensor_params = torch.tensor(weights[1], requires_grad=True)
        
        probs = qc(weights_tensor_init,weights_tensor_params, measurement_qubits=measure_qubits)

    else:
        weights = [np.random.uniform(-np.pi,np.pi,size=shape) for shape in shapes]    
        weights_tensor_params = torch.tensor(weights, requires_grad=True)

        probs = qc(weights_tensor_params, measurement_qubits=measure_qubits)

    pi = policy(probs, policy_type=policy_type, n_actions=n_actions, n_qubits=n_qubits)
    if clamp is not None:
        pi = torch.clamp(pi, clamp, 1)

    dist = torch.distributions.Categorical(probs=pi)
    
    action = dist.sample()
    log_prob = dist.log_prob(action)

    gradient_no_clamp = np.linalg.norm(compute_gradient(log_prob, weights_tensor_params), 2)
    return gradient_no_clamp


In [None]:
def reinforce(policy, optimizer, env, n_episodes=1000, max_t=1000, gamma=1.0, print_every=5):
    scores_deque = deque(maxlen=print_every)
    scores = []
    average_scores = []
    runtime_sum = 0
    for e in range(1, n_episodes):
        saved_log_probs = []
        rewards = []
        state = env.reset()
        # Collect trajectory
        for t in range(max_t):
            # Sample the action from current policy
            if t==0:
                state_tensor = torch.tensor(state[0]).float()
            else:
                state_tensor = torch.tensor(state).float()
            action, log_prob, _, = policy.sample(state_tensor)
            saved_log_probs.append(log_prob)
            state, reward, done, _, _ = env.step(action)
            rewards.append(reward)
            if done:
                break

    # Total expected reward
        scores_deque.append(sum(rewards))
        scores.append(sum(rewards))
    #standardized returns
        R=0
        policy_loss = []
        returns = []
        for r in rewards[::-1]:
            R = r + gamma * R
            returns.insert(0,R)
        returns = torch.tensor(returns)
        returns = (returns - returns.mean()) / (returns.std() + np.finfo(np.float32).eps)

        for log_prob, R in zip(saved_log_probs, returns):
            policy_loss.append(-log_prob * R)

        policy_unsqueezed = [torch.unsqueeze(loss, 0) for loss in policy_loss]
        policy_sum = torch.cat(policy_unsqueezed).sum()

    # Backpropagation
        start_time = time.time()
        optimizer.zero_grad()
        policy_sum.backward()
        optimizer.step()
        end_time = time.time()
        runtime = end_time-start_time
        
        runtime_sum += runtime
        if e % print_every == 0:
            print('Episode {}\tLast reward: {:.2f}\tLast {}\tEpisodes average reward: {:.2f}\tRuntime: {:.2f}'.format(e, scores_deque[-1], print_every, np.mean(scores_deque), runtime_sum))
            runtime_sum = 0
        if np.mean(scores_deque) == 500:
            print('Environment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(e, np.mean(scores_deque)))
            break
    return scores, policy.gradient_list, average_scores  

In [None]:
    def save_training_data(self):
        ''' 
        Saves training data into json files
        '''
        current_directory = os.path.dirname(__file__)
        folder_name = f"{str(self.env_name)}_{self.policy.policy.post_processing}_{self.policy.circuit.n_layers}"
        folder_path = os.path.join(current_directory, folder_name)
        if not os.path.exists(folder_path):
            os.makedirs(folder_path)

        episode_data = [self.scores_deque, 
                        self.runtime, 
                        self.loss.item(), 
                        tensor_to_list(self.policy.get_gradients()[0]), 
                        tensor_to_list(self.policy.get_gradients()[1])]

        if folder_path is not None:
            file_path = os.path.join(self.folder_path, f"{self.file_name}.json")
            if os.path.exists(file_path):
                with open(file_path, 'r') as f:
                    existing_data = json.load(f)
                existing_data.append(episode_data)
                with open(file_path, 'w') as f:
                    json.dump(existing_data, f, indent=4)
            else:
                with open(file_path, 'w') as f:
                    json.dump([episode_data], f, indent=4)

# Policy Gradient Algorithm

## Imports and utils

In [1]:
import pennylane as qml
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from functools import partial

from torch.utils.tensorboard import SummaryWriter

import gym
from collections import deque

from tensorboard.backend.event_processing import event_accumulator
import os
import json
import time
from datetime import datetime
import ray

import matplotlib.pyplot as plt
plt.rcParams['figure.figsize'] = (10, 5)

In [2]:
#utils
def create_directory(path):
    os.makedirs(path, exist_ok=True)
    return path

def tensor_to_list(tensor):
    """
    Convert a tensor or numpy array to a nested list.
    """
    if isinstance(tensor, list):
        return [tensor_to_list(t) for t in tensor]
    elif isinstance(tensor, dict):
        return {key: tensor_to_list(value) for key, value in tensor.items()}
    elif isinstance(tensor, np.ndarray):
        return tensor.tolist()
    elif isinstance(tensor, torch.Tensor):
        return tensor.tolist()
    else:
        return tensor
    
def create_optimizer_with_lr(params, lr_list, use_amsgrad=False):
    optimizer = torch.optim.Adam([
        {'params': p, 'lr': lr} for p, lr in zip(params, lr_list)
    ], amsgrad=use_amsgrad)
    return optimizer

def get_function_representation(func):
    # Check if the object is callable
    if callable(func):
        # Handle functools.partial objects separately
        if isinstance(func, partial):
            # Get the original function wrapped by functools.partial
            original_func = func.func
            # Represent the original function with its module and name
            original_func_rep = f"{original_func.__module__}.{original_func.__name__}"
            # Optionally, include the arguments set by partial
            return f"partial({original_func_rep}, args={func.args}, keywords={func.keywords})"
        # Handle lambda functions
        elif func.__name__ == "<lambda>":
            return f"{func.__module__}.<lambda>" + (getattr(func, 'description', ''))
        else:
            # Handle regular functions
            return f"{func.__module__}.{func.__name__}"
    return "Unknown Function Type"

In [3]:
#measures

def measure_probs(qubits):
    return qml.probs(wires=range(qubits)) 

def two_measure_expval(qubits):

    pauli_string = qml.PauliZ(0)
    for i in range(1, qubits):
        pauli_string = pauli_string @ qml.PauliZ(i)
    
    expvals = []
    expvals.append(qml.expval(pauli_string))
    expvals.append(qml.expval(-pauli_string))

    return expvals

def three_measure_expval(qubits):
    expvals = []

    if qubits == 1:
        first_observable = qml.PauliZ(0)
        middle_observable = qml.PauliX(0)
        last_observable = -qml.PauliZ(0)
    elif qubits == 2:
        first_observable = qml.PauliZ(0)
        middle_observable = qml.PauliZ(0) @ qml.PauliZ(1) 
        last_observable = qml.PauliZ(1)       
    elif qubits >= 4:
        first_observable = qml.PauliZ(0)
        middle_observable = qml.PauliZ(1)
        for i in range(2, qubits - 1):
            middle_observable = middle_observable @ qml.PauliZ(i)
        last_observable = qml.PauliZ(qubits - 1)                    
    else:
        raise ValueError("Unsupported number of qubits: only 1, 3, or 4 qubits are supported")

    expvals.append(qml.expval(first_observable))
    expvals.append(qml.expval(middle_observable))
    expvals.append(qml.expval(last_observable))

    return expvals

In [4]:
#vqc operations

def strong_entangling(n_qubits, layer, entanglement_gate):
    for qubit in range(n_qubits):
        target = (qubit + layer + 1) % n_qubits
        if target != qubit:
            qml.CZ(wires=[qubit, target])

def entangle_gate_calc(pattern, n_qubits):

    if pattern not in ['single', 'double', 'double_odd', 'chain', 'ring', 'pyramid', 'all_to_all']:
        raise ValueError("Pattern must be one of 'single', 'double', 'double_odd', 'chain', 'ring', 'pyramid', or 'all_to_all'.")
    
    if pattern == 'single':
        # Applies a single-wire unitary to each qubit
        n_gates = n_qubits
    
    elif pattern == 'double':
        # Applies a two-wire unitary to floor(n_qubits / 2) pairs
        n_gates = np.floor(n_qubits / 2).astype(int)
    
    elif pattern == 'double_odd':
        # Applies a two-wire unitary to floor((n_qubits - 1) / 2) pairs, starting with the second wire
        n_gates = np.floor((n_qubits - 1) / 2).astype(int)
    
    elif pattern == 'chain':
        # Applies a two-wire unitary to all (n_qubits - 1) neighboring pairs
        n_gates = n_qubits - 1
    
    elif pattern == 'ring':
        # Applies a two-wire unitary to all n_qubits neighboring pairs, including last to first
        n_gates = n_qubits
    
    elif pattern == 'pyramid':
        # Applies gates in a declining pyramid shape to the right, where the number of pairs reduces by 1 each row
        # Sum of first (n_qubits-1) integers: (n_qubits - 1) * (n_qubits) / 2
        n_gates = (n_qubits - 1) * n_qubits // 2
    
    elif pattern == 'all_to_all':
        # Applies a two-wire unitary to all possible pairs of wires
        # Number of combinations of n_qubits taken 2 at a time: n_qubits * (n_qubits - 1) / 2
        n_gates = n_qubits * (n_qubits - 1) // 2
    return n_gates

## Circuits

fazer funções proprias de error handling para tornar o codigo mais limpo

In [5]:
class JerbiModel(nn.Module):
    '''
    Creates a parametrized quantum circuit based on the work in https://doi.org/10.48550/arXiv.2103.05577.

    For detailed information about the parameters, call the info() method.
    '''

    def __init__(self, 
                n_qubits,
                n_layers, 
                device = "default_qubit",
                shots = None,
                diff_method = 'best', 
                entanglement = True,
                entanglement_pattern = "all_to_all", 
                entanglement_gate = qml.CZ, 
                input_scaling = True, 
                input_init = None, 
                weight_init = None, 
                measure = two_measure_expval):
        super(JerbiModel, self).__init__()

        self.n_qubits = n_qubits
        self.n_layers = n_layers
        self.device = device
        self.shots = shots
        self.diff_method = diff_method
        self.entanglement = entanglement
        self.entanglement_pattern = entanglement_pattern
        self.entanglement_gate = entanglement_gate
        self.input_scaling = input_scaling
        self.input_init = input_init
        self.weight_init = weight_init
        self.measure = measure

        self.circuit = self.generate_circuit()
    
    def generate_circuit(self):
        if self.shots is None:
            dev = qml.device(self.device, wires=self.n_qubits)
        else:
            dev = qml.device(self.device, wires=self.n_qubits, shots=self.shots)
        
        if self.n_layers < 1:
            raise ValueError("Number of layers can't take values below 1")
        
        self.weight_shapes = {
            "input_params": (self.n_layers, self.n_qubits, 2),
            "params": (self.n_layers + 1, self.n_qubits, 2)   
        }
        
        self.init_method = {
            "input_params": self.input_init,
            "params": self.weight_init
        }
        
        @qml.qnode(dev, interface='torch', diff_method=self.diff_method)
        def qnode(inputs, params, input_params):
            # In case the number of qubits is greater than the input size AND it is multiple of the input length, 
            # then tile input tensor 'multiplier' times
            if self.n_qubits > len(inputs) and self.n_qubits % len(inputs) == 0:
                multiplier = self.n_qubits // len(inputs)
                inputs = torch.cat([inputs] * multiplier)

            # If the number of qubits is not equal to the input length and not a multiple of the input length, raise an error
            elif self.n_qubits != len(inputs) and self.n_qubits % len(inputs) != 0:
                raise ValueError('Number of qubits cannot be divided by input lenght')

            # Apply Hadamard gates to every qubits
            qml.broadcast(qml.Hadamard, wires=range(self.n_qubits), pattern="single")
            
            # Iterate through layers
            for layer in range(self.n_layers):
                # Iterate through qubits
                for wire in range(self.n_qubits):
                    # Parameterized block
                    qml.RZ(params[layer][wire][0], wires=wire)
                    qml.RY(params[layer][wire][1], wires=wire)

                # Entanglement between qubits
                if self.entanglement:
                    if self.entanglement_pattern == 'strong_entangling':
                        strong_entangling(self.n_qubits, layer, self.entanglement_gate)
                    else:
                        qml.broadcast(self.entanglement_gate, wires=range(self.n_qubits), pattern=self.entanglement_pattern)

                # If input scaling set to True, multiply inputs by weights
                if self.input_scaling:
                    for wire in range(self.n_qubits):
                        qml.RY(input_params[layer][wire][0] * inputs[wire], wires=wire)
                        qml.RZ(input_params[layer][wire][1] * inputs[wire], wires=wire)
                # If input scaling set to False, use only the inputs
                else:
                    for wire in range(self.n_qubits):
                        qml.RY(inputs[wire], wires=wire)
                        qml.RZ(inputs[wire], wires=wire)
                
            # Last parameterized layer
            for wire in range(self.n_qubits):
                qml.RZ(params[-1][wire][0], wires=wire)
                qml.RY(params[-1][wire][1], wires=wire)

            return self.measure(self.n_qubits)

        self.qnode = qnode

        model = qml.qnn.TorchLayer(qnode, weight_shapes=self.weight_shapes, init_method=self.init_method)  
        
        return model
    
    def forward(self, inputs):
        ''' 
        
        '''
        return self.circuit(inputs)
    
    def visualize_circuit(self):
        inputs = torch.tensor([0.1 * i for i in range(self.n_qubits)], dtype=torch.float32)
        
        initialized_params = {}
        for key, shape in self.weight_shapes.items():
            initialized_params[key] = self.init_method[key](torch.empty(shape))

        # Draw the circuit
        circuit = qml.draw_mpl(self.qnode)(inputs, 
                                        initialized_params["params"], 
                                        initialized_params["input_params"])
        
    def circuit_spectrum(self):
        inputs = torch.tensor([0.1 * i for i in range(self.n_qubits)], dtype=torch.float32)
        
        initialized_params = {}
        for key, shape in self.weight_shapes.items():
            initialized_params[key] = self.init_method[key](torch.empty(shape))

        # Draw the circuit
        res = qml.fourier.circuit_spectrum(self.qnode)(inputs, initialized_params["params"], initialized_params["input_params"])
        for inp, freqs in res.items():
            print(f"{inp}: {freqs}")       
       
    @classmethod
    def info(cls):        
        '''
        Provides a summary of the JerbiModel class and its parameters/methods.
        '''
        info_text = """
        Creates the Parameterized Quantum Circuit based on the design in https://doi.org/10.48550/arXiv.2103.05577.

        Parameters:
        ----------
        n_qubits (int): 
            Number of qubits used in the quantum circuit.
        
        n_layers (int): 
            Number of layers in the quantum circuit. Each layer typically consists of parameterized rotations followed by entanglement gates.
        
        shots (int, optional): 
            Number of times the circuit gets executed (repeated measurements). If None, the circuit is executed with analytic calculations (no shot noise).
        
        diff_method (str): 
            Differentiation method used for training the model. Common options are 'best', 'parameter-shift', 'backprop', etc.

        entanglement (bool):
            If True, entanglement between qubits is implemented. The entanglement pattern and gate is defined in entanglement_pattern and entanglement_gate, respectively.
        
        entanglement_pattern (str): 
            Entanglement pattern used in the circuit, such as 'chain', 'ring', 'all_to_all', etc., as defined by qml.broadcast patterns.
        
        entanglement_gate (function): 
            Quantum gate used for entanglement, such as qml.CZ or qml.CNOT. This gate will be applied between qubits according to the specified entanglement pattern.
        
        input_scaling (bool): 
            If True, input parameters are scaled by additional learnable parameters (input_params). The input is multiplied by these parameters before being applied to the qubits.
                
        input_init (function): 
            Function to initialize the input scaling parameters, such as torch.nn.init.uniform_, torch.nn.init.ones_, and function defined by the user.
        
        weight_init (function): 
            Function to initialize the weights of the quantum circuit, such as torch.nn.init.uniform_, torch.nn.init.normal_, and function defined by the user.
                
        measure (function): 
            Measurement function that takes the number of qubits as an argument and returns the measurement result. Common choices are measure_probs, two_measure_expval, or any user-defined measurement function.

        Returns:
        -------
        JerbiModel: 
            An instance of the JerbiModel class representing the quantum neural network.
        """
        print(info_text)

In [6]:
class UQC_FullEnc(nn.Module):
    '''
    Creates a parameterized quantum circuit (Universal Quantum Classifier) based in https://doi.org/10.22331/q-2020-02-06-226.

    For detailed information about the parameters, call the info() method.
    '''

    def __init__(self, 
                n_qubits, 
                n_layers, 
                state_dim,
                device,
                shots = None, 
                diff_method = 'best', 
                entanglement = True,
                entanglement_pattern = "all_to_all", 
                entanglement_gate = qml.CZ, 
                input_init = None,
                weight_init = None,
                bias_init = None,
                measure = two_measure_expval):
        super(UQC_FullEnc, self).__init__()

        self.n_qubits = n_qubits
        self.n_layers = n_layers
        self.state_dim = state_dim
        self.device = device
        self.shots = shots
        self.diff_method = diff_method
        self.entanglement = entanglement
        self.entanglement_pattern = entanglement_pattern
        self.entanglement_gate = entanglement_gate
        self.input_init = input_init
        self.weight_init = weight_init
        self.bias_init = bias_init
        self.measure = measure
        self.input_scaling = None

        self.circuit = self.generate_circuit()
    
    def generate_circuit(self):
        if self.shots is None:
            dev = qml.device(self.device, wires=self.n_qubits)
        else:
            dev = qml.device(self.device, wires=self.n_qubits, shots=self.shots)
        
        self.weight_shapes = {
            "weights": (self.n_layers, self.n_qubits, self.state_dim),
            "params": (self.n_layers, self.n_qubits, 1),
            "bias": (self.n_layers, self.n_qubits)
        }
        
        self.init_method = {
            "weights": self.input_init,
            "params": self.weight_init,
            "bias": self.bias_init
        }
        
        @qml.qnode(dev, interface='torch', diff_method=self.diff_method)
        def qnode(inputs, weights, params, bias):

            # Iterate through layers
            for layer in range(self.n_layers):
                # Iterate through qubits
                for wire in range(self.n_qubits):
                    # Compute the Hadamard product, sum with bias, and use it as the angle for R_X
                    hadamard_product = torch.dot(inputs.clone().detach(), weights[layer][wire])
                    angle = hadamard_product + bias[layer][wire]

                    # Apply the Rx gate with computed angle

                    qml.RZ(angle, wires=wire)

                    qml.RY(params[layer][wire][0], wires=wire)
                # Entanglement between qubits
                if self.entanglement:
                    if self.entanglement_pattern == 'strong_entangling':
                        strong_entangling(self.n_qubits, layer)
                    else:
                        qml.broadcast(self.entanglement_gate, wires=range(self.n_qubits), pattern=self.entanglement_pattern)

            return self.measure(self.n_qubits)

        self.qnode = qnode

        model = qml.qnn.TorchLayer(self.qnode, weight_shapes=self.weight_shapes, init_method=self.init_method)
        
        return model


    def forward(self, inputs):
        ''' 
        
        '''
        return self.circuit(inputs)
    
    def visualize_circuit(self):
        inputs = torch.tensor([0.1 * i for i in range(self.state_dim)], dtype=torch.float32)
        
        # Initialize all parameters using the provided initialization methods
        initialized_params = {}
        for key, shape in self.weight_shapes.items():
            initialized_params[key] = self.init_method[key](torch.empty(shape))

        # Draw the circuit
        circuit = qml.draw_mpl(self.qnode)(inputs, 
                                        initialized_params["weights"], 
                                        initialized_params["params"], 
                                        initialized_params["bias"])

In [7]:
class UQC_PartialEnc(nn.Module):
    '''
    Creates a parameterized quantum circuit (Universal Quantum Classifier) based in https://doi.org/10.22331/q-2020-02-06-226.

    For detailed information about the parameters, call the info() method.
    '''

    def __init__(self, 
                n_qubits, 
                n_layers, 
                state_dim, 
                device,
                shots = None, 
                diff_method = 'best', 
                entanglement = True,
                entanglement_pattern = "all_to_all", 
                entanglement_gate = qml.CZ, 
                input_init = None,
                weight_init = None,
                bias_init = None,
                measure = two_measure_expval):
        super(UQC_PartialEnc, self).__init__()

        self.n_qubits = n_qubits
        self.n_layers = n_layers
        self.state_dim = state_dim
        self.device = device
        self.shots = shots
        self.diff_method = diff_method
        self.entanglement = entanglement
        self.entanglement_pattern = entanglement_pattern
        self.entanglement_gate = entanglement_gate
        self.input_init = input_init
        self.weight_init = weight_init
        self.bias_init = bias_init
        self.measure = measure
        self.input_scaling = None

        self.circuit = self.generate_circuit()
    
    def generate_circuit(self):
        if self.shots is None:
            dev = qml.device(self.device, wires=self.n_qubits)
        else:
            dev = qml.device(self.device, wires=self.n_qubits, shots=self.shots)
        
        self.weight_shapes = {
            "weights": (self.n_layers, self.n_qubits, int(self.state_dim/self.n_qubits)),
            "params": (self.n_layers, self.n_qubits, 1),
            "bias": (self.n_layers, self.n_qubits)
        }
        
        self.init_method = {
            "weights": self.input_init,
            "params": self.weight_init,
            "bias": self.bias_init
        }
        
        @qml.qnode(dev, interface='torch', diff_method=self.diff_method)
        def qnode(inputs, weights, params, bias):

            separate_inputs = np.array_split(inputs,self.n_qubits)
            # Iterate through layers
            for layer in range(self.n_layers):
                # Iterate through qubits
                for wire in range(self.n_qubits):
                    # Compute the Hadamard product, sum with bias, and use it as the angle for R_X
                    hadamard_product = torch.dot(separate_inputs[wire], weights[layer][wire])
                    angle = hadamard_product + bias[layer][wire]

                    # Apply the Rx gate with computed angle
                    qml.RZ(angle, wires=wire)
                    qml.RY(params[layer][wire][0], wires=wire)
                # Entanglement between qubits
                if self.entanglement:
                    if self.entanglement_pattern == 'strong_entangling':
                        strong_entangling(self.n_qubits, layer)
                    else:
                        qml.broadcast(self.entanglement_gate, wires=range(self.n_qubits), pattern=self.entanglement_pattern)

            return self.measure(self.n_qubits)

        self.qnode = qnode

        model = qml.qnn.TorchLayer(self.qnode, weight_shapes=self.weight_shapes, init_method=self.init_method)
        
        return model

    def forward(self, inputs):
        ''' 
        
        '''
        return self.circuit(inputs)
    
    def visualize_circuit(self):
        inputs = torch.tensor([0.1 * i for i in range(self.state_dim)], dtype=torch.float32)
        
        # Initialize all parameters using the provided initialization methods
        initialized_params = {}
        for key, shape in self.weight_shapes.items():
            initialized_params[key] = self.init_method[key](torch.empty(shape))

        # Draw the circuit
        circuit = qml.draw_mpl(self.qnode)(inputs, 
                                        initialized_params["weights"], 
                                        initialized_params["params"], 
                                        initialized_params["bias"])

In [8]:
class TfqTutorial(nn.Module):
    '''
    Creates a parameterized quantum circuit based on the TensorFlow Quantum tutorial in https://www.tensorflow.org/quantum/tutorials/quantum_reinforcement_learning

    For detailed information about the parameters, call the info() method.
    '''

    def __init__(self, 
                n_qubits,
                n_layers, 
                device,
                shots = None, 
                diff_method = 'best', 
                entanglement = True,
                entanglement_pattern = "all_to_all", 
                entanglement_gate = qml.CZ, 
                input_scaling = True, 
                input_init = None, 
                weight_init = None, 
                measure = two_measure_expval):
        super(TfqTutorial, self).__init__()

        self.n_qubits = n_qubits
        self.n_layers = n_layers
        self.device = device
        self.shots = shots
        self.diff_method = diff_method
        self.entanglement = entanglement
        self.entanglement_pattern = entanglement_pattern
        self.entanglement_gate = entanglement_gate
        self.input_scaling = input_scaling
        self.input_init = input_init
        self.weight_init = weight_init
        self.measure = measure

        self.circuit = self.generate_circuit()
    
    def generate_circuit(self):
        if self.shots is None:
            dev = qml.device(self.device, wires=self.n_qubits)
        else:
            dev = qml.device(self.device, wires=self.n_qubits, shots=self.shots)
        
        if self.n_layers < 1:
            raise ValueError("Number of layers can't take values below 1")
        
        self.weight_shapes = {
            "input_params": (self.n_layers, self.n_qubits, 1),
            "params": (self.n_layers + 1, self.n_qubits, 3)
        }
        
        self.init_method = {
            "input_params": self.input_init,
            "params": self.weight_init,
        }
        
        @qml.qnode(dev, interface='torch', diff_method=self.diff_method)
        def qnode(inputs, params, input_params):
            # In case the number of qubits is greater than the input size AND it is multiple of the input length, 
            # then tile input tensor 'multiplier' times
            if self.n_qubits > len(inputs) and self.n_qubits % len(inputs) == 0:
                multiplier = self.n_qubits // len(inputs)
                inputs = torch.cat([inputs] * multiplier)

            # If the number of qubits is not equal to the input length and not a multiple of the input length, raise an error
            elif self.n_qubits != len(inputs) and self.n_qubits % len(inputs) != 0:
                raise ValueError('Number of qubits cannot be divided by input lenght')
            
            # Iterate through layers
            for layer in range(self.n_layers):
                # Iterate through qubits
                for wire in range(self.n_qubits):
                    # Parameterized block
                    qml.RX(params[layer][wire][0], wires=wire)
                    qml.RY(params[layer][wire][1], wires=wire)
                    qml.RZ(params[layer][wire][2], wires=wire)

                # Entanglement between qubits
                if self.entanglement:
                    if self.entanglement_pattern == 'strong_entangling':
                        strong_entangling(self.n_qubits, layer)
                    else:
                        qml.broadcast(self.entanglement_gate, wires=range(self.n_qubits), pattern=self.entanglement_pattern)

                # If input scaling set to True, multiply inputs by weights
                if self.input_scaling:
                    for wire in range(self.n_qubits):
                        qml.RX(input_params[layer][wire][0] * inputs[wire], wires=wire)
                # If input scaling set to False, use only the inputs
                else:
                    for wire in range(self.n_qubits):
                        qml.RX(inputs[wire], wires=wire)
                
            # Last parameterized layer
            for wire in range(self.n_qubits):
                qml.RX(params[-1][wire][0], wires=wire)
                qml.RY(params[-1][wire][1], wires=wire)
                qml.RZ(params[-1][wire][2], wires=wire)

            return self.measure(self.n_qubits)

        self.qnode = qnode

        model = qml.qnn.TorchLayer(qnode, weight_shapes=self.weight_shapes, init_method=self.init_method)  
        
        return model
    
    def forward(self, inputs):
        ''' 
        
        '''
        return self.circuit(inputs)
    
    def visualize_circuit(self):
        inputs = torch.tensor([0.1 * i for i in range(self.n_qubits)], dtype=torch.float32)
        
        initialized_params = {}
        for key, shape in self.weight_shapes.items():
            initialized_params[key] = self.init_method[key](torch.empty(shape))

        # Draw the circuit
        circuit = qml.draw_mpl(self.qnode)(inputs, 
                                        initialized_params["params"], 
                                        initialized_params["input_params"])
        

    def circuit_spectrum(self):
        inputs = torch.tensor([0.1 * i for i in range(self.n_qubits)], dtype=torch.float32)
        
        initialized_params = {}
        for key, shape in self.weight_shapes.items():
            initialized_params[key] = self.init_method[key](torch.empty(shape))

        # Draw the circuit
        res = qml.fourier.circuit_spectrum(self.qnode)(inputs, initialized_params["params"], initialized_params["input_params"])
        for inp, freqs in res.items():
            print(f"{inp}: {freqs}")       

        
    @classmethod
    def info(cls):        
        '''
        Provides a summary of the JerbiModel class and its parameters/methods.
        '''
        info_text = """
        Creates the Jerbi policy (Parameterized Quantum Circuit) based on the design in https://doi.org/10.48550/arXiv.2103.05577.

        Parameters:
        ----------
        n_qubits (int): 
            Number of qubits used in the quantum circuit.
        
        n_layers (int): 
            Number of layers in the quantum circuit. Each layer typically consists of parameterized rotations followed by entanglement gates.
        
        shots (int, optional): 
            Number of times the circuit gets executed (repeated measurements). If None, the circuit is executed with analytic calculations (no shot noise).
        
        diff_method (str): 
            Differentiation method used for training the model. Common options are 'best', 'parameter-shift', 'backprop', etc.
        
        entanglement_pattern (str): 
            Entanglement pattern used in the circuit, such as 'chain', 'ring', 'all_to_all', etc., as defined by qml.broadcast patterns.
        
        entanglement_gate (function): 
            Quantum gate used for entanglement, such as qml.CZ or qml.CNOT. This gate will be applied between qubits according to the specified entanglement pattern.
        
        input_scaling (bool): 
            If True, input parameters are scaled by additional learnable parameters (input_params). The input is multiplied by these parameters before being applied to the qubits.
                
        input_init (function): 
            Function to initialize the input scaling parameters, such as torch.nn.init.uniform_, torch.nn.init.ones_, and function defined by the user.
        
        weight_init (function): 
            Function to initialize the weights of the quantum circuit, such as torch.nn.init.uniform_, torch.nn.init.normal_, and function defined by the user..
                
        measure (function): 
            Measurement function that takes the number of qubits as an argument and returns the measurement result. Common choices are measure_probs, measure_expval_pairs, or any user-defined measurement function.

        Returns:
        -------
        JerbiModel: 
            An instance of the JerbiModel class representing the quantum neural network.
        """
        print(info_text)

## Policy

In [9]:
class PolicyPostProcessing(nn.Module):
    
    def __init__(self, n_qubits, n_actions, policy_type = 'raw_contiguous', 
                 beta_scheduling = False, beta = 1, increase_rate = 0.0005, 
                 output_scaling = False, output_init = torch.nn.init.ones_):
        super(PolicyPostProcessing, self).__init__()

        '''
        Determines the type of policy used based on the arguments:

            n_actions(int) = Number of actions
            post_processing(str) = Type of policy ('raw_contiguous', 'raw_parity', 'softmax')
            beta_scheduling(bool) = Inverse temperature parameter used in softmax (used if set to True)
            beta(float) = Beta parameter or inverse temperature (used only for softmax)
            increase_rate(float) = Amount added to beta at the end of each episode (used only for softmax)
            output_scaling(bool) = Output parameters are used if True
            output_init(function) = How the output parameters are initialized
            
        '''
        self.n_qubits = n_qubits
        self.n_actions = n_actions
        self.policy_type = policy_type
        self.beta_scheduling = beta_scheduling
        self.beta = beta
        self.increase_rate = increase_rate
        self.output_scaling = output_scaling
        self.output_init = output_init

        if self.output_scaling == True:
            self.output_params = nn.parameter.Parameter(torch.Tensor(self.n_actions), requires_grad=True)
            self.output_init(self.output_params)
        else:
            self.register_parameter('w_input', None)

    def forward(self,probs):
        if self.policy_type == 'raw_contiguous':
            policy = self.raw_contiguous(probs)
        elif self.policy_type == 'raw_parity':
            policy = self.raw_parity(probs)
        elif self.policy_type == 'softmax':
            policy = self.softmax(probs)
        else:
            raise ValueError("Invalid post-processing method specified.")
        return policy

    def raw_contiguous(self,probs):
    # Ensure the number of actions does not exceed the number of basis states (determined by n_qubits)
        if np.log2(self.n_actions) > self.n_qubits:
            raise ValueError('Number of actions exceeds the number of basis states!')

    # Split the probabilities in a contiguous manner
        probs_split = torch.chunk(probs, self.n_actions)

    # Sum each split tensor directly (no unsqueeze)
        policy = [torch.sum(prob) for prob in probs_split]

    # Stack the resulting summed tensors into a single tensor
        return(torch.stack(policy))

    def raw_parity(self,probs):
        # Logarithm of the number of actions

        log_n_actions = np.log2(self.n_actions)

        # Number of splits required (log(n_actions) - 1)
        n_substrings = int(log_n_actions)

        # Check if the number of actions is a power of 2
        if log_n_actions < 1.0 or not (np.floor(log_n_actions) == np.ceil(log_n_actions)):
            raise NotImplementedError('Number of actions needs to be a power of two!')

        # Ensure the number of actions does not exceed the number of qubits
        if log_n_actions > self.n_qubits:
            raise ValueError('Number of actions exceeds number of basis states!')

        # Flatten the probability distribution to handle it as a single-dimensional array
        if n_substrings == 1:
            summed_tensors = []
            even_tensor = probs[::2]  # Elements at even indices
            odd_tensor = probs[1::2]  # Elements at odd indices
            summed_tensors.append(torch.sum(even_tensor))
            summed_tensors.append(torch.sum(odd_tensor))
        else:
            probs_split = list(torch.chunk(probs, self.n_actions//2))
            summed_tensors = []

            for tensor in probs_split:
                even_tensor = tensor[::2]  # Even indexed elements
                odd_tensor = tensor[1::2]  # Odd indexed elements
                summed_tensors.append(torch.sum(even_tensor))
                summed_tensors.append(torch.sum(odd_tensor))

        return torch.stack(summed_tensors)
    
    def softmax(self, probs):
        
        if self.output_scaling == True:
            probs *= self.output_params

        scaled_output = probs * self.beta
        softmax_output = F.softmax(scaled_output, dim=0)
        return softmax_output
    
    def beta_schedule(self):
        if self.beta_scheduling == True:
            if self.policy_type == 'softmax' or self.policy_type == 'softmax_probs':
                self.beta += self.increase_rate

    @classmethod
    def info(cls):
        '''
        Provides a summary of the PolicyType class and its parameters/methods.
        '''
        info_text = """
        PolicyType Class:

        Determines the type of policy used based on the arguments:

        Parameters:
        ----------
        n_actions (int): 
            Number of actions available for the agent to choose from.
        
        policy_type (str): 
            Type of policy applied to the probability distribution:
            - 'raw_contiguous': Sums up contiguous chunks of probabilities.
            - 'raw_parity': Sums up probabilities in a circular manner based on parity.
            - 'softmax': Applies a softmax function to the scaled probabilities.
            - 'softmax_probs': Sums up contiguous chunks of probabilities and then applies softmax.
        
        beta_scheduling (bool): 
            If True, updates the inverse temperature parameter (beta) after each episode. Used only for 'softmax' and 'softmax_probs'.
        
        beta (float): 
            Inverse temperature parameter used for scaling probabilities in the softmax policy.
        
        increase_rate (float): 
            Amount added to beta at the end of each episode, if beta_scheduling is True.
        
        output_scaling (bool): 
            If True, scales the output probabilities by learnable parameters.
        
        output_init (function): 
            Initialization function for output parameters, such as torch.nn.init.uniform_, torch.nn.init.ones_, etc.
        
        Methods:
        -------
        sample(probs):
            Selects an action based on the chosen post_processing method.
        
        raw_contiguous(probs):
            Sums up contiguous chunks of probabilities and returns the resulting tensor.
        
        raw_parity(probs):
            Sums up probabilities in a circular manner based on parity and returns the action, log probability, and policy tensor.
        
        softmax(probs):
            Applies a softmax function to the scaled probabilities and returns the action, log probability, and policy tensor.
        
        softmax_probs(probs):
            Sums up contiguous chunks of probabilities, applies softmax, and returns the action, log probability, and policy tensor.
        
        beta_schedule():
            Updates the beta parameter if beta_scheduling is True. Only applicable for 'softmax' and 'softmax_probs' methods.
        """
        print(info_text)

## Quantum Policy

In [10]:
class QuantumPolicy(nn.Module):
    
    def __init__(self, circuit, post_processing):
        super(QuantumPolicy, self).__init__()
        self.circuit = circuit
        self.post_processing = post_processing

    def sample(self, inputs):
        '''
        Samples an action from the action probability distribution
        '''
        policy = self.forward(inputs)
        dist = torch.distributions.Categorical(policy)
        action = dist.sample()
        return action.item(), dist.log_prob(action), policy
    
    def forward(self, inputs):
        '''
        Input state is fed to the circuit - its output is then fed to the post processing 
        '''
        probs = self.circuit.forward(inputs)
        probs_processed = self.post_processing.forward(probs)
        return probs_processed
    
    def get_parameters(self):
        '''
        Returns the values of each set of parameters
        '''
        parameters = []

        circuit_parameters = [param.clone().detach().numpy().flatten() for param in self.circuit.parameters()]
        parameters.extend(circuit_parameters)

        policy_parameters = [param.clone().detach().numpy().flatten() for param in self.post_processing.parameters()]
        parameters.extend(policy_parameters)

        return parameters
    
    def get_gradients(self):
        '''
        Returns the gradient values of each set of parameters
        '''
        gradients = []

        # Get gradients from circuit parameters
        circuit_gradients = [torch.flatten(param.grad.clone().detach()) if param.grad is not None else torch.flatten(torch.zeros_like(param)) for param in self.circuit.parameters()]
        gradients.extend(circuit_gradients)

        # Get gradients from policy parameters
        policy_gradients = [torch.flatten(param.grad.clone().detach()) if param.grad is not None else torch.flatten(torch.zeros_like(param)) for param in self.post_processing.parameters()]
        gradients.extend(policy_gradients)

        return gradients

## Agents

ajustar a verbosity level: nivel 1 rewards, nivel 2 rewards runtimes, nivel 3 rewards runtimes loss..

colocar o print dos ultimos 100 episodios separadamente dos episodicos:
Episode 100: reward
Last 100 

In [11]:
import warnings
class ReinforceAgent:

    def __init__(self, 
                policy, 
                policy_optimizer, 
                env_name, 
                n_episodes, 
                max_t, 
                gamma = 0.98, 
                baseline = True, 
                batch_size = 10, 
                normalize = False,
                print_every = 100, 
                verbose = 1):
        
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.policy = policy.to(self.device)
        self.policy_optimizer = policy_optimizer
        self.env = gym.make(env_name)
        self.env_name = env_name
        self.n_episodes = n_episodes
        self.max_t = max_t
        self.gamma = gamma
        self.baseline = baseline
        self.batch_size = batch_size
        self.normalize = normalize
        self.print_every = print_every
        self.verbose = verbose

        self.solved = False
        self.scores = deque(maxlen=100)
        self.batch_log_probs = []
        self.batch_rewards = []
        self.loss = torch.tensor(0.0)

    def train(self, run_count=None, rundate = None, path=None, tensorboard=False):

        if run_count is not None and path is not None:
            data_path = create_directory(os.path.join(path, 'data'))
            env_folder = create_directory(os.path.join(data_path, self.env_name))
            experiment_folder_name = f"{self.policy.circuit.__class__.__name__}_{self.policy.circuit.n_qubits}qubits_{self.policy.circuit.n_layers}layer_{rundate}"
            experiment_path = create_directory(os.path.join(env_folder, experiment_folder_name))
            run_path = create_directory(os.path.join(experiment_path, f'run_{str(run_count)}'))
            self.save_agent_data(experiment_path)
        
        if tensorboard:
            writer = SummaryWriter(log_dir=run_path)

        for i in range(1, self.n_episodes + 1):

            start_time = time.time()
            self.get_trajectory()

            self.env_solved_verification()

            if i % self.batch_size == 0 and self.solved is False:
                self.update_policy()
                self.policy.post_processing.beta_schedule()
 
            end_time = time.time()
            self.runtime = end_time - start_time

            if tensorboard:
                self.writer_function(writer, i)
            if run_count is not None and path is not None:
                self.save_data(run_path, i)

            if (i+1) % self.print_every == 0 and self.verbose == 1:
                print('Episode {}\tLast reward: {:.2f}\tRuntime: {:.2f}s\t Last {} Episodes average reward: {:.2f}\t '.format(i+1, self.scores[-1], self.runtime, str(100), np.mean(self.scores)))
            elif self.verbose == 1:
                print('Episode {}\tLast reward: {:.2f}\tRuntime: {:.2f}s\t '.format(i+1, self.scores[-1], self.runtime))

        if tensorboard:
            writer.close()

    def get_trajectory(self):
        '''
        Gets a trajectory based on the running policy until it runs out of bounds or achieves maximum reward of an episode
        '''

        self.saved_log_probs = []
        self.rewards = []
        state = self.env.reset()
        for t in range(self.max_t):
            if t == 0:
                state_tensor = torch.tensor(self.normalize_state(state[0])).float().to(self.device)
            else:
                state_tensor = torch.tensor(self.normalize_state(state)).float().to(self.device)
            action, log_prob, _ = self.policy.sample(state_tensor)
            state, reward, done, _, _ = self.env.step(action)
            
            self.saved_log_probs.append(log_prob)
            self.rewards.append(reward)

            if done:
                break

        self.scores.append(sum(self.rewards))
        self.batch_log_probs.append(self.saved_log_probs)
        self.batch_rewards.append(self.rewards)

        del self.saved_log_probs, self.rewards

        if self.solved is True:
            self.batch_log_probs = []
            self.batch_rewards = []
      
    def update_policy(self):
        '''
        Computes the loss and gradients and updates the policy via gradient methods
        '''
        all_returns = []
        # Compute returns for each batch
        for batch in self.batch_rewards:
            R = 0
            ep_return = []
            for r in reversed(batch):
                R = r + self.gamma * R
                ep_return.insert(0, R)
            
            ep_return = torch.tensor(ep_return).to(self.device)
            ep_return = (ep_return - ep_return.mean()) / (ep_return.std() + 1e-8)  # Standardize returns
            all_returns.append(ep_return)
        policy_loss = []
        
        if self.baseline:
            baseline = np.mean([sum(lst) for lst in all_returns])
            for log_probs, ep_returns in zip(self.batch_log_probs, all_returns):
                for log_prob, ret in zip(log_probs, ep_returns):
                    advantage = ret - baseline 
                    policy_loss.append(-log_prob * advantage)
        else:
            for log_probs, ep_returns in zip(self.batch_log_probs, all_returns):
                for log_prob, ret in zip(log_probs, ep_returns):
                    policy_loss.append(-log_prob * ret)

        if policy_loss:
            policy_unsqueezed = [torch.unsqueeze(loss, 0) for loss in policy_loss]
            self.loss = torch.cat(policy_unsqueezed).mean()

        self.policy_optimizer.zero_grad()
        self.loss.backward()
        self.policy_optimizer.step()

        del all_returns
        del policy_loss
        del policy_unsqueezed 

        self.batch_log_probs = []
        self.batch_rewards = []

    def normalize_state(self, state):

        if self.env_name in ('Acrobot-v0', 'Acrobot-v1'):
            theta1 = np.arccos(state[0])
            theta2 = np.arccos(state[2])
            state = [theta1,theta2,state[4],state[5]]

        if self.normalize == True:
        # Compute the maximum absolute value of all features in the current state

            max_abs_value = max(abs(value) for value in state)

        # Normalize each feature by the maximum absolute value
            state = np.array([value / max_abs_value for value in state])

        return state
    
    def env_solved_verification(self):

        if self.env.spec.reward_threshold is not None:
            if np.mean(self.scores) > self.env.spec.reward_threshold:
                self.solved = True
        elif self.env_name in ('Acrobot-v1'):
            if np.mean(self.scores) > -125:
                self.solved = True
        else:              
            warnings.warn(f"No reward threshold defined for environment {self.env_name}. "
                          "Consider specifying a solved condition explicitly.",
                          UserWarning
            )


    def save_agent_data(self,main_path):
        '''
        Stores the model parameters into a json file

        '''
        agent_variables = {
            "Model": self.policy.circuit.__class__.__name__,
            "Number of Qubits": self.policy.circuit.n_qubits,
            "Number of Layers": self.policy.circuit.n_layers,
            "Shots": self.policy.circuit.shots,
            "Differentiation Method": self.policy.circuit.diff_method,
            "Entanglement": self.policy.circuit.entanglement,
            "Entanglement Pattern": self.policy.circuit.entanglement_pattern,
            "Entanglement Gate": get_function_representation(self.policy.circuit.entanglement_gate),
            "Input Scaling": self.policy.circuit.input_scaling,
            "Input Initialization": get_function_representation(self.policy.circuit.input_init),
            "Weight Initialization": get_function_representation(self.policy.circuit.weight_init),
            "Measure": get_function_representation(self.policy.circuit.measure),
            "Policy Post Processing": self.policy.post_processing.policy_type,
            "Optimizers": str(self.policy_optimizer),
            "Envinronment": str(self.env_name),
            "Gamma (discounting factor)": self.gamma,
            "Batch Size": str(self.batch_size),
            "Baseline": str(self.baseline)
        }
        if self.policy.post_processing.policy_type in ['softmax', 'softmax_probs']:
            agent_variables.update({
                "Softmax scheduling (in case policy is softmax)": f"{self.policy.post_processing.beta_scheduling}. Starting beta: {self.policy.post_processing.beta}. Increase rate: {self.policy.post_processing.increase_rate}",
                "Softmax output scaling (in case policy is softmax)": f"{self.policy.post_processing.output_scaling}. Output Initialization: {get_function_representation(self.policy.post_processing.output_init)}",
            })
        with open(os.path.join(main_path, "agent_characteristics.json"), "w") as f:
            json.dump(agent_variables, f, indent=4)
    
    def save_data(self, run_path, iteration):
        '''
        Saves the data into a .npz file for each episode, where gradients are stored as a list of lists.
        '''
        data_file = os.path.join(run_path, "run_data.npz")
        
        # Load existing data if the file exists
        if os.path.exists(data_file):
            data = np.load(data_file, allow_pickle=True)
            old_episode_reward = data['episode_reward'].tolist()
            old_loss = data['loss'].tolist()
            old_runtime = data['runtime'].tolist()
            old_params_gradients = data['params_gradients'].tolist()
        else:
            old_episode_reward = []
            old_loss = []
            old_runtime = []
            old_params_gradients = []

        # Add new episode data
        old_episode_reward.append(self.scores[-1])
        old_runtime.append(self.runtime)

        current_episode_gradients = []
        if iteration % self.batch_size == 0 and iteration != 0 and self.solved is False:
            old_loss.append(self.loss.item())
        for name, param in self.policy.named_parameters():
            if param.grad is not None:
                grad_array = param.grad.cpu().numpy().flatten()
                current_episode_gradients.append(grad_array)

        old_params_gradients.append(current_episode_gradients)
            
        # Save data to .npz file
        np.savez_compressed(data_file,
                            episode_reward=np.array(old_episode_reward),
                            loss=np.array(old_loss),
                            runtime=np.array(old_runtime),
                            params_gradients=np.array(old_params_gradients, dtype=object))  # Use dtype=object to handle lists

        # Clear old data lists to free up memory
        del old_episode_reward[:]
        del old_loss[:]
        del old_runtime[:]
        del old_params_gradients[:]

    def writer_function(self, writer, iteration):
        '''
        Stores data into a tensorboard session
        '''
    #   Episode reward
        writer.add_scalar("Episode Reward", self.scores[-1], global_step=iteration)
    #   Episode runtime
        writer.add_scalar("Runtime", self.runtime, global_step=iteration)
    #   Loss
        writer.add_scalar("Loss", self.loss.item(), global_step=iteration)


# Single agent

## Jerbi Model

In [None]:
n_qubits = 6
n_layers = 4
shots = None
diff_method = 'best' 
entanglement = True
entanglement_pattern = "all_to_all"
entanglement_gate = qml.CZ
input_scaling = True
input_init = torch.nn.init.ones_
weight_init = lambda shape, dtype=torch.float: torch.FloatTensor(shape).normal_(-np.pi, np.pi)
policy_circuit_measure = three_measure_expval
policy_circuit = JerbiModel(n_qubits, n_layers, shots, diff_method, 
                     entanglement, entanglement_pattern, entanglement_gate, 
                     input_scaling, input_init, weight_init, policy_circuit_measure)
policy_circuit.visualize_circuit()
#policy_circuit.circuit_spectrum()

In [None]:
n_qubits = 4
n_layers = 5
device = 'default.qubit'
shots = None
diff_method = 'backprop' 
entanglement = True
entanglement_pattern = "all_to_all"
entanglement_gate = qml.CZ
input_scaling = True
input_init = partial(torch.nn.init.normal_, mean=0.0, std=0.01)
weight_init = lambda shape, dtype=torch.float: torch.FloatTensor(shape).uniform_(-np.pi, np.pi)
policy_circuit_measure = measure_probs
policy_circuit = JerbiModel(n_qubits, n_layers, device, shots, diff_method, 
                     entanglement, entanglement_pattern, entanglement_gate, 
                     input_scaling, input_init, weight_init, policy_circuit_measure)


n_actions = 2
post_processing = 'raw_parity'
beta_scheduling = False
beta = 1
increase_rate = 0.0005
output_scaling = True
output_init = torch.nn.init.ones_
policy_post_process = PolicyPostProcessing(n_qubits, n_actions, 
                         post_processing, 
                         beta_scheduling, 
                         beta, increase_rate, 
                         output_scaling, 
                         output_init)

policy = QuantumPolicy(policy_circuit,policy_post_process)

policy_lr_list= [0.1, 0.01, 0.1]  # [weights, input_weights, output_weights]
policy_params = list(policy_circuit.parameters()) + list(policy_post_process.parameters())
policy_optimizer= create_optimizer_with_lr(policy_params, policy_lr_list, use_amsgrad=True)

env_name = 'CartPole-v1'
n_episodes = 1000
max_t = 500
gamma = 0.98
print_every = 100
verbose = 1
baseline = True
batch_size = 10
reinforce_update = ReinforceAgent(policy, 
                                  policy_optimizer, 
                                  env_name, 
                                  n_episodes, 
                                  max_t, 
                                  gamma, 
                                  baseline, 
                                  batch_size, 
                                  print_every, 
                                  verbose)
reinforce_update.train()

## UQC

In [None]:
n_qubits = 5
n_layers = 5
state_dim = 4
shots = None
diff_method = 'best' 
entanglement = True
entanglement_pattern = "all_to_all"
entanglement_gate = qml.CZ
entanglement_training = False
entanglement_init = lambda shape, dtype=torch.float: torch.FloatTensor(shape).uniform_(-np.pi, np.pi)
input_init = lambda shape, dtype=torch.float: torch.FloatTensor(shape).uniform_(-np.pi, np.pi)
weight_init = lambda shape, dtype=torch.float: torch.FloatTensor(shape).uniform_(-np.pi, np.pi)
bias_init = torch.nn.init.zeros_
policy_circuit_measure = two_measure_expval
policy_circuit = UQC_FullEnc(n_qubits,
                            n_layers, 
                            state_dim, 
                            shots, 
                            diff_method, 
                            entanglement, 
                            entanglement_pattern, 
                            entanglement_gate, 
                            entanglement_training,
                            entanglement_init,
                            weight_init,
                            weight_init,
                            bias_init, 
                            policy_circuit_measure)

policy_circuit.visualize_circuit()

In [None]:
n_qubits = 4
n_layers = 5
state_dim = 4
shots = None
diff_method = 'adjoint' 
entanglement = True
entanglement_pattern = "all_to_all"
entanglement_gate = qml.CZ
input_init = partial(torch.nn.init.normal_, mean=0.0, std=0.01)
weight_init = lambda shape, dtype=torch.float: torch.FloatTensor(shape).uniform_(-np.pi, np.pi)
bias_init = torch.nn.init.zeros_
policy_circuit_measure = three_measure_expval
policy_circuit = UQC_FullEnc(n_qubits,
                            n_layers, 
                            state_dim, 
                            shots, 
                            diff_method, 
                            entanglement, 
                            entanglement_pattern, 
                            entanglement_gate, 
                            input_init,
                            weight_init,
                            bias_init, 
                            policy_circuit_measure)

n_actions = 3
post_processing = 'softmax'
beta_scheduling = False
beta = 1
increase_rate = 0.0005
output_scaling = True
output_init = torch.nn.init.ones_
policy_post_process = PolicyPostProcessing( n_qubits,
                                            n_actions, 
                                            post_processing, 
                                            beta_scheduling, 
                                            beta, increase_rate, 
                                            output_scaling, 
                                            output_init)

policy = QuantumPolicy(policy_circuit,policy_post_process)

policy_lr_list= [0.1, 0.01, 0.1, 0.1]  # [input_weights, weights, bias, enttanglement_weights, output_weights]
policy_params = list(policy_circuit.parameters()) + list(policy_post_process.parameters())
policy_optimizer= create_optimizer_with_lr(policy_params, policy_lr_list, use_amsgrad=True)

env_name = 'Acrobot-v1'
n_episodes = 1000
max_t = 500
gamma = 0.98
print_every = 100
verbose = 1
baseline = True
batch_size = 10
normalize = True
reinforce_update = ReinforceAgent(policy, 
                                  policy_optimizer, 
                                  env_name, 
                                  n_episodes, 
                                  max_t, 
                                  gamma, 
                                  baseline, 
                                  batch_size, 
                                  normalize,
                                  print_every, 
                                  verbose)
reinforce_update.train()

## TQF

In [None]:
n_qubits = 4
n_layers = 5
shots = None
diff_method = 'adjoint' 
entanglement = True
entanglement_pattern = "ring"
entanglement_gate = qml.CZ
input_scaling = True
input_init = torch.nn.init.ones_
weight_init = lambda shape, dtype=torch.float: torch.FloatTensor(shape).uniform_(-np.pi, np.pi)
policy_circuit_measure = two_measure_expval
policy_circuit = TfqTutorial(n_qubits, n_layers, shots, diff_method, 
                     entanglement, entanglement_pattern, entanglement_gate, 
                     input_scaling, input_init, weight_init, policy_circuit_measure)


n_actions = 2
post_processing = 'softmax'
beta_scheduling = False
beta = 1
increase_rate = 0.0005
output_scaling = True
output_init = torch.nn.init.ones_
policy_post_process = PolicyPostProcessing(n_actions, 
                         post_processing, 
                         beta_scheduling, 
                         beta, increase_rate, 
                         output_scaling, 
                         output_init)

policy = QuantumPolicy(policy_circuit,policy_post_process)

policy_lr_list= [0.01, 0.1, 0.1]  # [weights, input_weights, output_weights]
policy_params = list(policy_circuit.parameters()) + list(policy_post_process.parameters())
policy_optimizer= create_optimizer_with_lr(policy_params, policy_lr_list, use_amsgrad=True)

env_name = 'CartPole-v1'
n_episodes = 1000
max_t = 500
gamma = 0.98
print_every = 100
verbose = 1
baseline = True
batch_size = 10
reinforce_update = ReinforceAgent(policy, 
                                  policy_optimizer, 
                                  env_name, 
                                  n_episodes, 
                                  max_t, 
                                  gamma, 
                                  baseline, 
                                  batch_size, 
                                  print_every, 
                                  verbose)
reinforce_update.train()

# Multiple agents

## UQC

In [None]:
@ray.remote
def train_agents(file_name, rundate):
#   Path settings
    current_dir = os.getcwd()
    two_levels_up = os.path.abspath(os.path.join(current_dir, "../../"))

#   VQC settings
    n_qubits = 4
    n_layers = 5
    state_dim = 4
    device = 'default.qubit'
    shots = None
    diff_method = 'backprop' 
    entanglement = True
    entanglement_pattern = "all_to_all"
    entanglement_gate = qml.CZ
    input_init = partial(torch.nn.init.normal_, mean=0.0, std=0.01)
    weight_init = lambda shape, dtype=torch.float: torch.FloatTensor(shape).uniform_(-np.pi, np.pi)
    bias_init = torch.nn.init.zeros_
    policy_circuit_measure = measure_probs
    policy_circuit = UQC_FullEnc(n_qubits,
                                n_layers, 
                                state_dim, 
                                device,
                                shots, 
                                diff_method, 
                                entanglement, 
                                entanglement_pattern, 
                                entanglement_gate, 
                                input_init,
                                weight_init,
                                bias_init,
                                policy_circuit_measure)
    
#   Post processing settings
    n_actions = 2
    post_processing = 'raw_contiguous'
    beta_scheduling = False
    beta = 1
    increase_rate = 0.003
    output_scaling = True
    output_init = torch.nn.init.ones_
    policy_post_process = PolicyPostProcessing( n_qubits,
                                                n_actions, 
                                                post_processing, 
                                                beta_scheduling, 
                                                beta, increase_rate, 
                                                output_scaling, 
                                                output_init)
#   Circuit + Post processing
    policy = QuantumPolicy(policy_circuit,policy_post_process)

#   Gradient learning rates
    policy_lr_list= [0.1, 0.01, 0.1, 0.1]  # [weights, params, bias, output_weights]
    policy_params = list(policy_circuit.parameters()) + list(policy_post_process.parameters())
    policy_optimizer = create_optimizer_with_lr(policy_params, policy_lr_list, use_amsgrad=True)

#   Agent and environment settings
    env_name = 'CartPole-v1'
    n_episodes = 1000
    max_t = 500
    gamma = 0.98
    print_every = 100
    verbose = 1
    baseline = True
    batch_size = 10
    normalize = False
    reinforce_update = ReinforceAgent(policy, policy_optimizer, env_name, n_episodes, max_t, gamma, baseline, batch_size, normalize, print_every, verbose)
    reinforce_update.train(file_name, rundate, two_levels_up, True)

    return ('Agent ' + str(file_name) + ': ' + str(reinforce_update.solved))

if __name__ == "__main__":

    all_results = []
    rundate = datetime.now().strftime('%Y-%m-%d_%H.%M.%S')
    num_agents = 3   

    for run_index in range(2):  
        start_agent_index = run_index * num_agents

        results = [
            train_agents.remote(str(start_agent_index + i), rundate) for i in range(num_agents)]

        completed_results = ray.get(results)
        all_results.extend(completed_results)
        print(f"Results for run {run_index}: {completed_results}")

    # Shutdown Ray after all tasks are complete
    ray.shutdown()

In [None]:
@ray.remote
def train_agents(file_name, rundate):
#   Path settings
    current_dir = os.getcwd()
    two_levels_up = os.path.abspath(os.path.join(current_dir, "../../"))

#   VQC settings
    n_qubits = 4
    n_layers = 5
    state_dim = 4
    device = 'default.qubit'
    shots = None
    diff_method = 'backprop' 
    entanglement = True
    entanglement_pattern = "all_to_all"
    entanglement_gate = qml.CZ
    input_init = partial(torch.nn.init.normal_, mean=0.0, std=0.01)
    weight_init = lambda shape, dtype=torch.float: torch.FloatTensor(shape).uniform_(-np.pi, np.pi)
    bias_init = torch.nn.init.zeros_
    policy_circuit_measure = measure_probs
    policy_circuit = UQC_FullEnc(n_qubits,
                                n_layers, 
                                state_dim, 
                                device,
                                shots, 
                                diff_method, 
                                entanglement, 
                                entanglement_pattern, 
                                entanglement_gate, 
                                input_init,
                                weight_init,
                                bias_init,
                                policy_circuit_measure)
    
#   Post processing settings
    n_actions = 2
    post_processing = 'raw_parity'
    beta_scheduling = False
    beta = 1
    increase_rate = 0.003
    output_scaling = True
    output_init = torch.nn.init.ones_
    policy_post_process = PolicyPostProcessing( n_qubits,
                                                n_actions, 
                                                post_processing, 
                                                beta_scheduling, 
                                                beta, increase_rate, 
                                                output_scaling, 
                                                output_init)
#   Circuit + Post processing
    policy = QuantumPolicy(policy_circuit,policy_post_process)

#   Gradient learning rates
    policy_lr_list= [0.1, 0.01, 0.1, 0.1]  # [weights, params, bias, output_weights]
    policy_params = list(policy_circuit.parameters()) + list(policy_post_process.parameters())
    policy_optimizer = create_optimizer_with_lr(policy_params, policy_lr_list, use_amsgrad=True)

#   Agent and environment settings
    env_name = 'CartPole-v1'
    n_episodes = 1000
    max_t = 500
    gamma = 0.98
    print_every = 100
    verbose = 1
    baseline = True
    batch_size = 10
    normalize = False
    reinforce_update = ReinforceAgent(policy, policy_optimizer, env_name, n_episodes, max_t, gamma, baseline, batch_size, normalize, print_every, verbose)
    reinforce_update.train(file_name, rundate, two_levels_up, True)

    return ('Agent ' + str(file_name) + ': ' + str(reinforce_update.solved))

if __name__ == "__main__":

    all_results = []
    rundate = datetime.now().strftime('%Y-%m-%d_%H.%M.%S')
    num_agents = 3  

    for run_index in range(2):  
        start_agent_index = run_index * num_agents

        results = [
            train_agents.remote(str(start_agent_index + i), rundate) for i in range(num_agents)]

        completed_results = ray.get(results)
        all_results.extend(completed_results)
        print(f"Results for run {run_index}: {completed_results}")

    # Shutdown Ray after all tasks are complete
    ray.shutdown()

## Jerbi

In [12]:
@ray.remote
def train_agents(file_name, rundate):
#   Path settings
    current_dir = os.getcwd()
    two_levels_up = os.path.abspath(os.path.join(current_dir, "../../"))

#   VQC settings
    n_qubits = 4
    n_layers = 5
    device = 'default.qubit'
    shots = None
    diff_method = 'backprop' 
    entanglement = True
    entanglement_pattern = "all_to_all"
    entanglement_gate = qml.CZ
    input_scaling = True
    input_init = partial(torch.nn.init.normal_, mean=0.0, std=0.01)
    weight_init = lambda shape, dtype=torch.float: torch.FloatTensor(shape).uniform_(-np.pi, np.pi)
    policy_circuit_measure = measure_probs
    policy_circuit = JerbiModel(n_qubits, n_layers,device, shots, diff_method, 
                        entanglement, entanglement_pattern, entanglement_gate, 
                        input_scaling, input_init, weight_init, policy_circuit_measure)

    
#   Post processing settings
    n_actions = 2
    post_processing = 'raw_contiguous'
    beta_scheduling = False
    beta = 1
    increase_rate = 0.003
    output_scaling = True
    output_init = torch.nn.init.ones_
    policy_post_process = PolicyPostProcessing(n_qubits, n_actions, 
                            post_processing, 
                            beta_scheduling, 
                            beta, increase_rate, 
                            output_scaling, 
                            output_init)
#   Circuit + Post processing
    policy = QuantumPolicy(policy_circuit,policy_post_process)

#   Gradient learning rates
    policy_lr_list= [0.1, 0.01, 0.1]  # [input_weights, weight, output_weights]
    policy_params = list(policy_circuit.parameters()) + list(policy_post_process.parameters())
    policy_optimizer= create_optimizer_with_lr(policy_params, policy_lr_list, use_amsgrad=True)

#   Agent and environment settings
    env_name = 'CartPole-v1'
    n_episodes = 1000
    max_t = 500
    gamma = 0.98
    print_every = 100
    verbose = 1
    baseline = True
    batch_size = 10
    normalize = True
    reinforce_update = ReinforceAgent(policy, policy_optimizer, env_name, n_episodes, max_t, gamma, baseline, batch_size, normalize, print_every, verbose)
    reinforce_update.train(file_name, rundate, two_levels_up, True)

    return ('Agent ' + str(file_name) + ': ' + str(reinforce_update.solved))

if __name__ == "__main__":

    all_results = []
    rundate = datetime.now().strftime('%Y-%m-%d_%H.%M.%S')
    num_agents = 4

    for run_index in range(2):  
        start_agent_index = run_index * num_agents

        results = [
            train_agents.remote(str(start_agent_index + i), rundate) for i in range(num_agents)]

        completed_results = ray.get(results)
        all_results.extend(completed_results)
        print(f"Results for run {run_index}: {completed_results}")

    # Shutdown Ray after all tasks are complete
    ray.shutdown()


In [None]:
@ray.remote
def train_agents(file_name, rundate):
#   Path settings
    current_dir = os.getcwd()
    two_levels_up = os.path.abspath(os.path.join(current_dir, "../../"))

#   VQC settings
    n_qubits = 4
    n_layers = 5
    device = 'default.qubit'
    shots = None
    diff_method = 'backprop' 
    entanglement = True
    entanglement_pattern = "all_to_all"
    entanglement_gate = qml.CZ
    input_scaling = True
    input_init = partial(torch.nn.init.normal_, mean=0.0, std=0.01)
    weight_init = lambda shape, dtype=torch.float: torch.FloatTensor(shape).uniform_(-np.pi, np.pi)
    policy_circuit_measure = measure_probs
    policy_circuit = JerbiModel(n_qubits, n_layers,device, shots, diff_method, 
                        entanglement, entanglement_pattern, entanglement_gate, 
                        input_scaling, input_init, weight_init, policy_circuit_measure)

    
#   Post processing settings
    n_actions = 2
    post_processing = 'raw_parity'
    beta_scheduling = False
    beta = 1
    increase_rate = 0.003
    output_scaling = True
    output_init = torch.nn.init.ones_
    policy_post_process = PolicyPostProcessing(n_qubits, n_actions, 
                            post_processing, 
                            beta_scheduling, 
                            beta, increase_rate, 
                            output_scaling, 
                            output_init)
#   Circuit + Post processing
    policy = QuantumPolicy(policy_circuit,policy_post_process)

#   Gradient learning rates
    policy_lr_list= [0.1, 0.01, 0.1]  # [input_weights, weight, output_weights]
    policy_params = list(policy_circuit.parameters()) + list(policy_post_process.parameters())
    policy_optimizer= create_optimizer_with_lr(policy_params, policy_lr_list, use_amsgrad=True)

#   Agent and environment settings
    env_name = 'CartPole-v1'
    n_episodes = 1000
    max_t = 500
    gamma = 0.98
    print_every = 100
    verbose = 1
    baseline = True
    batch_size = 10
    normalize = True
    reinforce_update = ReinforceAgent(policy, policy_optimizer, env_name, n_episodes, max_t, gamma, baseline, batch_size, normalize, print_every, verbose)
    reinforce_update.train(file_name, rundate, two_levels_up, True)

    return ('Agent ' + str(file_name) + ': ' + str(reinforce_update.solved))

if __name__ == "__main__":

    all_results = []
    rundate = datetime.now().strftime('%Y-%m-%d_%H.%M.%S')
    num_agents = 2

    for run_index in range(2):  
        start_agent_index = run_index * num_agents

        results = [
            train_agents.remote(str(start_agent_index + i), rundate) for i in range(num_agents)]

        completed_results = ray.get(results)
        all_results.extend(completed_results)
        print(f"Results for run {run_index}: {completed_results}")

    # Shutdown Ray after all tasks are complete
    ray.shutdown()


## TFQ

In [None]:
@ray.remote
def train_agents(file_name, rundate):
#   Path settings
    current_dir = os.getcwd()
    two_levels_up = os.path.abspath(os.path.join(current_dir, "../../"))

#   VQC settings
    n_qubits = 4
    n_layers = 5
    device = 'default.qubit'
    shots = None
    diff_method = 'backprop' 
    entanglement = True
    entanglement_pattern = "all_to_all"
    entanglement_gate = qml.CZ
    input_scaling = True
    input_init = partial(torch.nn.init.normal_, mean=0.0, std=0.01)
    weight_init = lambda shape, dtype=torch.float: torch.FloatTensor(shape).uniform_(-np.pi, np.pi)
    policy_circuit_measure = measure_probs
    policy_circuit = TfqTutorial(n_qubits, n_layers,device, shots, diff_method, 
                        entanglement, entanglement_pattern, entanglement_gate, 
                        input_scaling, input_init, weight_init, policy_circuit_measure)

    
#   Post processing settings
    n_actions = 2
    post_processing = 'raw_contiguous'
    beta_scheduling = False
    beta = 1
    increase_rate = 0.003
    output_scaling = True
    output_init = torch.nn.init.ones_
    policy_post_process = PolicyPostProcessing(n_qubits, n_actions, 
                            post_processing, 
                            beta_scheduling, 
                            beta, increase_rate, 
                            output_scaling, 
                            output_init)
#   Circuit + Post processing
    policy = QuantumPolicy(policy_circuit,policy_post_process)

#   Gradient learning rates
    policy_lr_list= [0.1, 0.01, 0.1]  # [input_weights, weight, output_weights]
    policy_params = list(policy_circuit.parameters()) + list(policy_post_process.parameters())
    policy_optimizer= create_optimizer_with_lr(policy_params, policy_lr_list, use_amsgrad=True)

#   Agent and environment settings
    env_name = 'CartPole-v1'
    n_episodes = 1000
    max_t = 500
    gamma = 0.98
    print_every = 100
    verbose = 1
    baseline = True
    batch_size = 10
    normalize = True
    reinforce_update = ReinforceAgent(policy, policy_optimizer, env_name, n_episodes, max_t, gamma, baseline, batch_size, normalize, print_every, verbose)
    reinforce_update.train(file_name, rundate, two_levels_up, True)

    return ('Agent ' + str(file_name) + ': ' + str(reinforce_update.solved))

if __name__ == "__main__":

    all_results = []
    rundate = datetime.now().strftime('%Y-%m-%d_%H.%M.%S')
    num_agents = 2

    for run_index in range(2):  
        start_agent_index = run_index * num_agents

        results = [
            train_agents.remote(str(start_agent_index + i), rundate) for i in range(num_agents)]

        completed_results = ray.get(results)
        all_results.extend(completed_results)
        print(f"Results for run {run_index}: {completed_results}")

    # Shutdown Ray after all tasks are complete
    ray.shutdown()


In [None]:
@ray.remote
def train_agents(file_name, rundate):
#   Path settings
    current_dir = os.getcwd()
    two_levels_up = os.path.abspath(os.path.join(current_dir, "../../"))

#   VQC settings
    n_qubits = 4
    n_layers = 5
    device = 'default.qubit'
    shots = None
    diff_method = 'backprop' 
    entanglement = True
    entanglement_pattern = "all_to_all"
    entanglement_gate = qml.CZ
    input_scaling = True
    input_init = partial(torch.nn.init.normal_, mean=0.0, std=0.01)
    weight_init = lambda shape, dtype=torch.float: torch.FloatTensor(shape).uniform_(-np.pi, np.pi)
    policy_circuit_measure = measure_probs
    policy_circuit = TfqTutorial(n_qubits, n_layers,device, shots, diff_method, 
                        entanglement, entanglement_pattern, entanglement_gate, 
                        input_scaling, input_init, weight_init, policy_circuit_measure)

    
#   Post processing settings
    n_actions = 2
    post_processing = 'raw_parity'
    beta_scheduling = False
    beta = 1
    increase_rate = 0.003
    output_scaling = True
    output_init = torch.nn.init.ones_
    policy_post_process = PolicyPostProcessing(n_qubits, n_actions, 
                            post_processing, 
                            beta_scheduling, 
                            beta, increase_rate, 
                            output_scaling, 
                            output_init)
#   Circuit + Post processing
    policy = QuantumPolicy(policy_circuit,policy_post_process)

#   Gradient learning rates
    policy_lr_list= [0.1, 0.01, 0.1]  # [input_weights, weight, output_weights]
    policy_params = list(policy_circuit.parameters()) + list(policy_post_process.parameters())
    policy_optimizer= create_optimizer_with_lr(policy_params, policy_lr_list, use_amsgrad=True)

#   Agent and environment settings
    env_name = 'CartPole-v1'
    n_episodes = 1000
    max_t = 500
    gamma = 0.98
    print_every = 100
    verbose = 1
    baseline = True
    batch_size = 10
    normalize = True
    reinforce_update = ReinforceAgent(policy, policy_optimizer, env_name, n_episodes, max_t, gamma, baseline, batch_size, normalize, print_every, verbose)
    reinforce_update.train(file_name, rundate, two_levels_up, True)

    return ('Agent ' + str(file_name) + ': ' + str(reinforce_update.solved))

if __name__ == "__main__":

    all_results = []
    rundate = datetime.now().strftime('%Y-%m-%d_%H.%M.%S')
    num_agents = 2

    for run_index in range(2):  
        start_agent_index = run_index * num_agents

        results = [
            train_agents.remote(str(start_agent_index + i), rundate) for i in range(num_agents)]

        completed_results = ray.get(results)
        all_results.extend(completed_results)
        print(f"Results for run {run_index}: {completed_results}")

    # Shutdown Ray after all tasks are complete
    ray.shutdown()


### Data Reup

## Acrobot tests

### Jerbi

In [None]:
@ray.remote
def train_agents(file_name, rundate):
#   Path settings
    current_dir = os.getcwd()
    two_levels_up = os.path.abspath(os.path.join(current_dir, "../../"))

#   VQC settings
    n_qubits = 4
    n_layers = 5
    device = 'default.qubit'
    shots = None
    diff_method = 'backprop' 
    entanglement = True
    entanglement_pattern = "all_to_all"
    entanglement_gate = qml.CZ
    input_scaling = True
    input_init = partial(torch.nn.init.normal_, mean=0.0, std=0.01)
    weight_init = lambda shape, dtype=torch.float: torch.FloatTensor(shape).uniform_(-np.pi, np.pi)
    policy_circuit_measure = measure_probs
    policy_circuit = JerbiModel(n_qubits, n_layers, device, shots, diff_method, 
                        entanglement, entanglement_pattern, entanglement_gate, 
                        input_scaling, input_init, weight_init, policy_circuit_measure)

    
#   Post processing settings
    n_actions = 3
    post_processing = 'raw_contiguous'
    beta_scheduling = False
    beta = 1
    increase_rate = 0.003
    output_scaling = True
    output_init = torch.nn.init.ones_
    policy_post_process = PolicyPostProcessing(n_qubits,n_actions, 
                            post_processing, 
                            beta_scheduling, 
                            beta, increase_rate, 
                            output_scaling, 
                            output_init)
#   Circuit + Post processing
    policy = QuantumPolicy(policy_circuit,policy_post_process)

#   Gradient learning rates
    policy_lr_list= [0.1, 0.01, 0.1]  # [input_weights, weight, output_weights]
    policy_params = list(policy_circuit.parameters()) + list(policy_post_process.parameters())
    policy_optimizer= create_optimizer_with_lr(policy_params, policy_lr_list, use_amsgrad=True)

#   Agent and environment settings
    env_name = 'Acrobot-v1'
    n_episodes = 1000
    max_t = 500
    gamma = 0.98
    print_every = 100
    verbose = 1
    baseline = True
    batch_size = 10
    normalize = True
    reinforce_update = ReinforceAgent(policy, policy_optimizer, env_name, n_episodes, max_t, gamma, baseline, batch_size, normalize, print_every, verbose)
    reinforce_update.train(file_name, rundate, two_levels_up, True)

    return ('Agent ' + str(file_name) + ': ' + str(reinforce_update.solved))

if __name__ == "__main__":

    all_results = []
    rundate = datetime.now().strftime('%Y-%m-%d_%H.%M.%S')
    num_agents = 1

    for run_index in range(2):  
        start_agent_index = run_index * num_agents

        results = [
            train_agents.remote(str(start_agent_index + i), rundate) for i in range(num_agents)]

        completed_results = ray.get(results)
        all_results.extend(completed_results)
        print(f"Results for run {run_index}: {completed_results}")

    # Shutdown Ray after all tasks are complete
    ray.shutdown()


### UQC

In [None]:
@ray.remote
def train_agents(file_name, rundate):
#   Path settings
    current_dir = os.getcwd()
    two_levels_up = os.path.abspath(os.path.join(current_dir, "../../"))

#   VQC settings
    n_qubits = 4
    n_layers = 5
    state_dim = 4
    device = 'lightning.qubit'
    shots = None
    diff_method = 'adjoint' 
    entanglement = True
    entanglement_pattern = "all_to_all"
    entanglement_gate = qml.CZ
    input_init = partial(torch.nn.init.normal_, mean=0.0, std=0.01)
    weight_init = lambda shape, dtype=torch.float: torch.FloatTensor(shape).uniform_(-np.pi, np.pi)
    bias_init = torch.nn.init.zeros_
    policy_circuit_measure = three_measure_expval
    policy_circuit = UQC_FullEnc(n_qubits,
                                n_layers, 
                                state_dim, 
                                device,
                                shots, 
                                diff_method, 
                                entanglement, 
                                entanglement_pattern, 
                                entanglement_gate, 
                                input_init,
                                weight_init,
                                bias_init,
                                policy_circuit_measure)
    
#   Post processing settings
    n_actions = 3
    post_processing = 'softmax'
    beta_scheduling = False
    beta = 1
    increase_rate = 0.003
    output_scaling = True
    output_init = torch.nn.init.ones_
    policy_post_process = PolicyPostProcessing( n_qubits,
                                                n_actions, 
                                                post_processing, 
                                                beta_scheduling, 
                                                beta, increase_rate, 
                                                output_scaling, 
                                                output_init)
#   Circuit + Post processing
    policy = QuantumPolicy(policy_circuit,policy_post_process)

#   Gradient learning rates
    policy_lr_list= [0.1, 0.01, 0.1, 0.1]  # [weights, params, bias, output_weights]
    policy_params = list(policy_circuit.parameters()) + list(policy_post_process.parameters())
    policy_optimizer = create_optimizer_with_lr(policy_params, policy_lr_list, use_amsgrad=True)

#   Agent and environment settings
    env_name = 'Acrobot-v1'
    n_episodes = 20
    max_t = 500
    gamma = 0.98
    print_every = 100
    verbose = 1
    baseline = True
    batch_size = 10
    normalize = True
    reinforce_update = ReinforceAgent(policy, policy_optimizer, env_name, n_episodes, max_t, gamma, baseline, batch_size, normalize, print_every, verbose)
    reinforce_update.train(file_name, rundate, two_levels_up, True)

    return ('Agent ' + str(file_name) + ': ' + str(reinforce_update.solved))

if __name__ == "__main__":

    all_results = []
    rundate = datetime.now().strftime('%Y-%m-%d_%H.%M.%S')
    num_agents = 4  

    for run_index in range(2):  
        start_agent_index = run_index * num_agents

        results = [
            train_agents.remote(str(start_agent_index + i), rundate) for i in range(num_agents)]

        completed_results = ray.get(results)
        all_results.extend(completed_results)
        print(f"Results for run {run_index}: {completed_results}")

    # Shutdown Ray after all tasks are complete
    ray.shutdown()

In [None]:
@ray.remote
def train_agents(file_name, rundate):
#   Path settings
    current_dir = os.getcwd()
    two_levels_up = os.path.abspath(os.path.join(current_dir, "../../"))

#   VQC settings
    n_qubits = 10
    n_layers = 5
    state_dim = 4
    device = 'lightning.qubit'
    shots = None
    diff_method = 'adjoint' 
    entanglement = True
    entanglement_pattern = "all_to_all"
    entanglement_gate = qml.CZ
    input_init = partial(torch.nn.init.normal_, mean=0.0, std=0.01)
    weight_init = lambda shape, dtype=torch.float: torch.FloatTensor(shape).uniform_(-np.pi, np.pi)
    bias_init = torch.nn.init.zeros_
    policy_circuit_measure = three_measure_expval
    policy_circuit = UQC_FullEnc(n_qubits,
                                n_layers, 
                                state_dim, 
                                device,
                                shots, 
                                diff_method, 
                                entanglement, 
                                entanglement_pattern, 
                                entanglement_gate, 
                                input_init,
                                weight_init,
                                bias_init,
                                policy_circuit_measure)
    
#   Post processing settings
    n_actions = 3
    post_processing = 'softmax'
    beta_scheduling = False
    beta = 1
    increase_rate = 0.003
    output_scaling = True
    output_init = torch.nn.init.ones_
    policy_post_process = PolicyPostProcessing( n_qubits,
                                                n_actions, 
                                                post_processing, 
                                                beta_scheduling, 
                                                beta, increase_rate, 
                                                output_scaling, 
                                                output_init)
#   Circuit + Post processing
    policy = QuantumPolicy(policy_circuit,policy_post_process)

#   Gradient learning rates
    policy_lr_list= [0.1, 0.01, 0.1, 0.1]  # [weights, params, bias, output_weights]
    policy_params = list(policy_circuit.parameters()) + list(policy_post_process.parameters())
    policy_optimizer = create_optimizer_with_lr(policy_params, policy_lr_list, use_amsgrad=True)

#   Agent and environment settings
    env_name = 'Acrobot-v1'
    n_episodes = 1000
    max_t = 500
    gamma = 0.98
    print_every = 100
    verbose = 1
    baseline = True
    batch_size = 10
    normalize = True
    reinforce_update = ReinforceAgent(policy, policy_optimizer, env_name, n_episodes, max_t, gamma, baseline, batch_size, normalize, print_every, verbose)
    reinforce_update.train(file_name, rundate, two_levels_up, True)

    return ('Agent ' + str(file_name) + ': ' + str(reinforce_update.solved))

if __name__ == "__main__":

    all_results = []
    rundate = datetime.now().strftime('%Y-%m-%d_%H.%M.%S')
    num_agents = 4   

    for run_index in range(2):  
        start_agent_index = run_index * num_agents

        results = [
            train_agents.remote(str(start_agent_index + i), rundate) for i in range(num_agents)]

        completed_results = ray.get(results)
        all_results.extend(completed_results)
        print(f"Results for run {run_index}: {completed_results}")

    # Shutdown Ray after all tasks are complete
    ray.shutdown()

### TFQ

In [None]:
@ray.remote
def train_agents(file_name, rundate):
#   Path settings
    current_dir = os.getcwd()
    two_levels_up = os.path.abspath(os.path.join(current_dir, "../../"))

#   VQC settings
    n_qubits = 4
    n_layers = 5
    device = 'default.qubit'
    shots = None
    diff_method = 'backprop' 
    entanglement = True
    entanglement_pattern = "all_to_all"
    entanglement_gate = qml.CZ
    input_scaling = True
    input_init = partial(torch.nn.init.normal_, mean=0.0, std=0.01)
    weight_init = lambda shape, dtype=torch.float: torch.FloatTensor(shape).uniform_(-np.pi, np.pi)
    policy_circuit_measure = measure_probs
    policy_circuit = TfqTutorial(n_qubits, n_layers, device, shots, diff_method, 
                        entanglement, entanglement_pattern, entanglement_gate, 
                        input_scaling, input_init, weight_init, policy_circuit_measure)

    
#   Post processing settings
    n_actions = 3
    post_processing = 'raw_contiguous'
    beta_scheduling = False
    beta = 1
    increase_rate = 0.003
    output_scaling = True
    output_init = torch.nn.init.ones_
    policy_post_process = PolicyPostProcessing(n_qubits,n_actions, 
                            post_processing, 
                            beta_scheduling, 
                            beta, increase_rate, 
                            output_scaling, 
                            output_init)
#   Circuit + Post processing
    policy = QuantumPolicy(policy_circuit,policy_post_process)

#   Gradient learning rates
    policy_lr_list= [0.1, 0.01, 0.1]  # [input_weights, weight, output_weights]
    policy_params = list(policy_circuit.parameters()) + list(policy_post_process.parameters())
    policy_optimizer= create_optimizer_with_lr(policy_params, policy_lr_list, use_amsgrad=True)

#   Agent and environment settings
    env_name = 'Acrobot-v1'
    n_episodes = 1000
    max_t = 500
    gamma = 0.98
    print_every = 100
    verbose = 1
    baseline = True
    batch_size = 10
    normalize = True
    reinforce_update = ReinforceAgent(policy, policy_optimizer, env_name, n_episodes, max_t, gamma, baseline, batch_size, normalize, print_every, verbose)
    reinforce_update.train(file_name, rundate, two_levels_up, True)

    return ('Agent ' + str(file_name) + ': ' + str(reinforce_update.solved))

if __name__ == "__main__":

    all_results = []
    rundate = datetime.now().strftime('%Y-%m-%d_%H.%M.%S')
    num_agents = 2

    for run_index in range(1):  
        start_agent_index = run_index * num_agents

        results = [
            train_agents.remote(str(start_agent_index + i), rundate) for i in range(num_agents)]

        completed_results = ray.get(results)
        all_results.extend(completed_results)
        print(f"Results for run {run_index}: {completed_results}")

    # Shutdown Ray after all tasks are complete
    ray.shutdown()


## Turn Off PC

In [None]:
time.sleep(100)

import os
os.system("shutdown /s /t 1")