# Deep Q-learning

Here I try to implement the Reinforcement Learning technique called Deep Q-learning (or Deep Q-Network, hence DQN) which uses a Neural Network (in our case will be a Parametrized Quantum Circuit) to approximate the Q-values.

This is the same technique used in the paper by Skolik. 

I will mostly copy the code from this great book *Hands on Machine Learning*, which has accompanying free tutorials here:  
https://github.com/ageron/handson-ml2

In [None]:
#General imports
import numpy as np
import matplotlib.pyplot as plt

#Operator Imports
from qiskit.aqua.operators import Z, I, StateFn, CircuitStateFn, SummedOp
from qiskit.aqua.operators.gradients import Gradient, NaturalGradient, QFI, Hessian

#Circuit imports
from qiskit.circuit import QuantumCircuit, QuantumRegister, Parameter, ParameterVector, ParameterExpression

#Qiskit imports
import qiskit as qk
from qiskit.circuit.library import TwoLocal
import qiskit_machine_learning as qkml
from qiskit.utils import QuantumInstance

# Fix seed for reproducibility
seed = 42
np.random.seed(42)

import torch 
import gym 

from torch.optim import Adam
from torch.nn import MSELoss

In [None]:
def encoding_circuit(inputs, encoding_weights, num_qubits = 4, gate = 'rx', *args):
    """
    Encode classical input data on a quantum circuit. 
    
    To be used inside the `parametrized_circuit` function. 
    
    Args
    -------
    
    Return
    -------
    
    
    TODO:
    1. Add choice of encoding gate: ry, rz, or rx.
    """
    
    qc = qk.QuantumCircuit(num_qubits)
    
    # Encode data with a RX rotation
    for i, data in enumerate(inputs): 
        qc.rx(encoding_weights[i]*inputs[i], i)
        
    return qc

def parametrized_circuit(num_qubits = 4, reuploading = False, reps = 2, insert_barriers = True, meas = False):
    """
    Create the Parameterized Quantum Circuit (PQC) for estimating Q-values.
    It is the same reported in arXiv:2104.15084 (Skolik et al.).
    
    Args
    -------
    
    Return
    -------
    """
    
    qr = qk.QuantumRegister(num_qubits, 'qr')
    qc = qk.QuantumCircuit(qr)
    
    if meas:
        qr = qk.QuantumRegister(num_qubits, 'qr')
        cr = qk.ClassicalRegister(num_qubits, 'cr')
        qc = qk.QuantumCircuit(qr,cr)
    
    
    if not reuploading:
        
        # Define a vector containg Inputs as parameters (*not* to be optimized)
        inputs = qk.circuit.ParameterVector('x', num_qubits)
        
        # Define a weights for the encoding
        encoding_weights = qk.circuit.ParameterVector('w', num_qubits)
        
        # Encode classical input data
        qc.compose(encoding_circuit(inputs, encoding_weights, num_qubits = num_qubits), inplace = True)
        if insert_barriers: qc.barrier()
        
        # Variational circuit
        qc.compose(TwoLocal(num_qubits, ['ry', 'rz'], 'cx', 'circular', 
               reps=reps, insert_barriers= insert_barriers, 
               skip_final_rotation_layer = True), inplace = True)
        if insert_barriers: qc.barrier()
        
        # Add final measurements
        if meas: qc.measure(qr,cr)
        
    elif reuploading:
        
        # Define a vector containg Inputs as parameters (*not* to be optimized)
        inputs = qk.circuit.ParameterVector('x', num_qubits)
        
        # Define a weights for the encoding
        encoding_weights = qk.circuit.ParameterVector('w', num_qubits)
        
        # Define a vector containng variational parameters
        θ = qk.circuit.ParameterVector('θ', 2 * num_qubits * reps)
        
        # Iterate for a number of repetitions
        for rep in range(reps):

            # Encode classical input data
            qc.compose(encoding_circuit(inputs, encoding_weights, num_qubits = num_qubits), inplace = True)
            if insert_barriers: qc.barrier()
                
            # Variational circuit (does the same as TwoLocal from Qiskit)
            for qubit in range(num_qubits):
                qc.ry(θ[qubit + 2*num_qubits*(rep)], qubit)
                qc.rz(θ[qubit + 2*num_qubits*(rep) + num_qubits], qubit)
            if insert_barriers: qc.barrier()
                
            # Add entanglers (this code is for a circular entangler)
            qc.cx(qr[-1], qr[0])
            for qubit in range(num_qubits-1):
                qc.cx(qr[qubit], qr[qubit+1])
            if insert_barriers: qc.barrier()
                        
        # (Optional) Add final measurements
        if meas: qc.measure(qr,cr)
        
    return qc

# Deep Q-network

In [None]:
env = gym.make("CartPole-v1")
input_shape = [4] # == env.observation_space.shape
n_outputs = 2 # == env.action_space.n

In [None]:
from collections import deque

replay_memory = deque(maxlen=2000)

In [None]:
from qiskit.utils import QuantumInstance
from qiskit_machine_learning.neural_networks import CircuitQNN
from qiskit_machine_learning.connectors import TorchConnector

from torch import Tensor
import torch
from torch.nn import MSELoss
from torch.optim import LBFGS, SGD, Adam, RMSprop

In [None]:
# Select the number of qubits
num_qubits = 4

# Generate some random inputs
inputs = np.random.rand(num_qubits)
inputs = np.random.rand(num_qubits)

# Generate the Parametrized Quantum Circuit (note the flag reuploading)
qc = parametrized_circuit(num_qubits = num_qubits, reuploading = True, reps = 6)


X = list(qc.parameters)[num_qubits: 2*num_qubits]
params = list(qc.parameters)[:num_qubits] + list(qc.parameters)[2 * num_qubits:]

qc.draw()

In [None]:
qi_qasm = QuantumInstance(qk.Aer.get_backend('qasm_simulator'))
qi = QuantumInstance(qk.Aer.get_backend('statevector_simulator'))

qnn = CircuitQNN(qc, input_params=X, weight_params=params, 
                 quantum_instance = qi)

# set up PyTorch module
initial_weights = 0.1*(2*np.random.rand(qnn.num_weights) - 1)
qnn_model = TorchConnector(qnn, initial_weights)

In [None]:
class post_processing_layer2(torch.nn.Module):
    def __init__(self):
        super().__init__()
        weights = torch.Tensor(2)
        self.weights = torch.nn.Parameter(weights)
        torch.nn.init.uniform_(self.weights, 1, 2)
        
    def forward(self, x):
        """description"""
        
        if len(x.shape) == 1:
            x = x.unsqueeze(0)
        
        x = x.float()
            
        mask_ZZ_12 = torch.tensor([1.,-1.,-1.,1.,1.,-1.,-1.,1.,1.,-1.,-1.,1.,1.,-1.,-1.,1.])
        expval_ZZ_12 = torch.tensordot(x, mask_ZZ_12, dims = 1).reshape(-1,1)
        
        mask_ZZ_34 = torch.tensor([-1.,-1.,-1.,-1.,1.,1.,1.,1.,-1.,-1.,-1.,-1.,1.,1.,1.,1.])
        expval_ZZ_34 = torch.tensordot(x, mask_ZZ_34, dims = 1).reshape(-1,1)
                        
        out = torch.cat((expval_ZZ_12, expval_ZZ_34),1)
                
        return self.weights * ((out + 1) / 2)

In [None]:
class post_processing_layer(torch.nn.Module):
    def __init__(self):
        super().__init__()
        weights = torch.Tensor(2)
        self.weights = torch.nn.Parameter(weights)
        torch.nn.init.uniform_(self.weights, 1, 90)
        
    def forward(self, x):
        """description"""
        
        if len(x.shape) == 1:
            x = x.unsqueeze(0)
                        
        ZZ_12_mask = torch.tensor([0,3,4,7,8,11,12,15]).repeat(x.shape[0], 1)
        ZZ_12_mask_not = torch.tensor([1,2,5,6,9,10,13,14]).repeat(x.shape[0], 1)
        # 
        ZZ_34_mask = torch.tensor([0,1,2,3,12,13,14,15]).repeat(x.shape[0], 1)
        ZZ_34_mask_not = torch.tensor([4,5,6,7,8,9,10,11]).repeat(x.shape[0], 1)
        #         
        ZZ12_expval = torch.sum(torch.gather(x, 1, ZZ_12_mask),  dim = 1, keepdims = True) - torch.sum(torch.gather(x, 1, ZZ_12_mask_not),  dim = 1, keepdims = True)
        ZZ34_expval = torch.sum(torch.gather(x, 1, ZZ_34_mask),  dim = 1, keepdims = True) - torch.sum(torch.gather(x, 1, ZZ_34_mask_not),  dim = 1, keepdims = True)
                
        # print(ZZ12_expval)
        
        out = torch.cat((ZZ12_expval, ZZ34_expval), 1)
        
        # print(x, x[:,0], self.weights)
        
        return self.weights * ((x[:,0:1] + 1) / 2)

## Training Functions

In [None]:
def epsilon_greedy_policy(state, epsilon=0):
    if np.random.rand() < epsilon:
        return np.random.randint(n_outputs)
    else:
        # print("Azione non-random", state[np.newaxis])
        with torch.no_grad():
            Q_values = model(torch.atan(Tensor(state))).detach().numpy()
        # print("Q_values = ", Q_values)
        return np.argmax(Q_values[0])

def sample_experiences(batch_size):
    indices = np.random.randint(len(replay_memory), size=batch_size)
    batch = [replay_memory[index] for index in indices]
    states, actions, rewards, next_states, dones = [
        np.array([experience[field_index] for experience in batch])
        for field_index in range(5)]
    return states, actions, rewards, next_states, dones

def play_one_step(env, state, epsilon):
    action = epsilon_greedy_policy(state, epsilon)
    next_state, reward, done, info = env.step(action)
    replay_memory.append((state, action, reward, next_state, done))
    return next_state, reward, done, info

def training_step(batch_size):
    experiences = sample_experiences(batch_size)
    states, actions, rewards, next_states, dones = experiences
    
    with torch.no_grad():
        next_Q_values = model(torch.atan(Tensor(next_states))).detach().numpy()
        
    max_next_Q_values = np.max(next_Q_values, axis=1)
    target_Q_values = (rewards +
                       (1 - dones) * discount_rate * max_next_Q_values)
    target_Q_values = target_Q_values.reshape(-1, 1)
    
    loss = 0.0
    for j in range(target_Q_values.shape[0]):
        q_value = model(torch.atan(Tensor(states[j])))
        q_value = q_value[0, actions[j]]
        loss += (Tensor(target_Q_values[j]) - q_value).pow(2).sum()
    
    optimizer.zero_grad()
    loss.backward()
    
    optimizer.step()
    
    # OLD ROUTINE 
    #mask = torch.nn.functional.one_hot(Tensor(actions).long(), n_outputs)
    ## print("qui")
    #
    #all_Q_values = model(torch.atan(Tensor(states)))
    ## print("allQ:",all_Q_values)
    ## print("mask:",mask)
    #Q_values = torch.sum(all_Q_values * mask, dim=1, keepdims=True)
    ## print("taget_q:",target_Q_values)
    ## print("qval:",Q_values)
    #loss = loss_fn(Tensor(target_Q_values), Q_values)
    #
    #optimizer.zero_grad()
    #loss.backward()
    
    # optimizer.step()

# Training

In [None]:
classical_model = post_processing_layer2()
model = torch.nn.Sequential(qnn_model, classical_model)

batch_size = 10
discount_rate = 0.99
optimizer = Adam(model.parameters(), lr=1e-2)
loss_fn = MSELoss(reduction = "sum")

In [None]:
rewards = [] 
best_score = 0

for episode in range(300):
    obs = env.reset()    
    for step in range(200):
        epsilon = max(1 - episode / 200, 0.01)
        obs, reward, done, info = play_one_step(env, obs, epsilon)
        if done:
            break
    rewards.append(step) # Not shown in the book
    if step >= best_score: # Not shown
        best_weights = model.state_dict # Not shown
        best_score = step # Not shown
    print("\rEpisode: {}, Steps: {}, eps: {:.3f}".format(episode, step + 1, epsilon), end="") # Not shown
    # print("Here!")
    if episode > 50:
        # print("here")
        training_step(batch_size)

#model.set_weights(best_weights)

In [None]:
with torch.no_grad():
    model.weight = best_weights

In [None]:
plt.figure(figsize=(8, 4))
plt.plot(rewards)
plt.xlabel("Episode", fontsize=14)
plt.ylabel("Sum of rewards", fontsize=14)
#save_fig("dqn_rewards_plot")
plt.show()

In [None]:
model.state_dict()

In [None]:
import matplotlib.pyplot as plt
import matplotlib as mpl

# To get smooth animations
import matplotlib.animation as animation
mpl.rc('animation', html='jshtml')

def update_scene(num, frames, patch):
    patch.set_data(frames[num])
    return patch,

def plot_animation(frames, repeat=False, interval=40):
    fig = plt.figure()
    patch = plt.imshow(frames[0])
    plt.axis('off')
    anim = animation.FuncAnimation(
        fig, update_scene, fargs=(frames, patch),
        frames=len(frames), repeat=repeat, interval=interval)
    plt.close()
    return anim

In [None]:
env.seed(42)
state = env.reset()

frames = []

for step in range(200):
    action = epsilon_greedy_policy(state)
    state, reward, done, info = env.step(action)
    print(action, state, done)
    if done:
        print("End at step:", step)
        break
    img = env.render(mode="rgb_array")
    frames.append(img)
    
plot_animation(frames)