**AlphaZero-like algorithm for Qiskit game**

We need to define an environment that is able to interact with OpenAI. It contains necessarily init, reset, render and close. We also introduce the methods step, calc_reward and others that are instrumental.

In [1]:
from qiskit import *
import re
import numpy as np
import math
from copy import copy, deepcopy 
import gym
from gym import spaces


from fastai.text import *

In [17]:
class QiskitGameEnv(gym.Env):
    '''
        The game starts in state |+>|->|+>|->|+>|-> and the objective of each player is to measure as many 0 or 1 as possible
    '''
    metadata = {'render.modes': ['human']}

    def __init__(self):

        self.max_turns = 10
        self.qubits = 6
        self.turn = True
        self.objective = 1 #Assume 1 means that we want to measure 1
        self.adversary_objective = -self.objective
        self.temperature = .001

        #self.P = P # Is there a difference between a given state and the environment? - > probably yes. Correct this
        #self.v = v

        self.gates = ['H','X','Z','CX','CZ','M']

        self.simulator = Aer.get_backend('qasm_simulator')
        self.circuit = QuantumCircuit(self.qubits,self.qubits) #self.qubit qubits and self.qubit bits to store the result

        self.viewer = None
        self.step_count = 0
        self.measured = []
        
        self.action_space = [self.gates, spaces.Discrete(self.qubits), spaces.Discrete(self.qubits)]
        #first we indicate the gate, then the first qubit to which it applies, and latter the second qubit it is applied to.

        self.seed()

    def step(self, action):
        
        # The format of action is (gate, qubit1, qubit2)

        self.step_count += 1

        if action[0] not in self.gates:
            raise Exception('Not valid gate!')
        
        if action[1] in self.measured:
            raise Exception('Already measured qubit!')

        if action[1] not in self.measured:
            if action[0] == 'H':
                self.circuit.h(action[1]) #apply Hadamard to qubit action[1]

            elif action[0] == 'M':
                self.circuit.measure(action[1],action[1]) #measures qubit in action[1] and saves the result to bit action[1]
                #self.measured += [action[1]] # This qubit was measured

            elif action[0] == 'X':
                self.circuit.x(action[1]) #apply X to qubit action[1]

            elif action[0] == 'Z':
                self.circuit.z(action[1]) #apply Z to qubit action[1]

            elif action[0] == 'CX':
                self.circuit.cx(action[1],action[2]) #apply CX from qubit action[2] to qubit action[1]

            elif action[0] == 'CZ':
                self.circuit.cz(action[1],action[2]) #apply CZ from qubit action[2] to qubit action[1]

    def reset(self):

        self.step_count = 0

        self.circuit = QuantumCircuit(self.qubits,self.qubits)

        for qubit in range(0,self.qubits):

            if qubit % 2 == 1:
                self.circuit.x(qubit) #Apply X on the odd qubits

            self.circuit.h(qubit) #Apply H on all qubits

        self.measured = []  # This is a list of what qubits has been measured

        return self.circuit # The initial state is |+>|->|+>|->|+>|->




    def render(self, mode='human'):
        return self.circuit.draw()

    def calc_reward(self):  #Tengo que enredar con medidas parciales a ver qué pasa: si no se ha medido el output es 0

        circ = deepcopy(self.circuit) #To run a simulation we use a copy

        # Use Aer's qasm_simulator
        backend_sim = Aer.get_backend('qasm_simulator')

        # Execute the circuit on the qasm simulator.
        # We've set the number of repeats of the circuit
        # to be 1024, which is the default.
        job_sim = execute(circ, backend_sim, shots=1024)

        # Grab the results from the job.
        result_sim = job_sim.result()

        counts = result_sim.get_counts(circ)

        self.counts = counts
        reward = 0
        measured = self.measured_qubits(self.circuit) 
        
        for key in counts.keys():
            counter = circ.n_qubits - 1 # Due to the structure of the representation of qubits, we start by the biggest and go to the lowest
            key_reward = 0
            for digit in str(key):
                if counter in measured: 
                    key_reward += 2*int(digit)-1
                counter -= 1
            reward += key_reward * counts[key]

            if reward == 0:
                reward = 2*(float(np.random.rand(1))-0.5)
                reward *= .001
        self.reward = reward
        del circ

    
    def count_letter(string,letter):
        count = 0
        for any_letter in string:
            if any_letter == letter:
                count += 1
        return count

    #def next_state(self,action):
    def measured_qubits(self):
        measured_qubits = { qarg for (inst, qargs, cargs) in self.circuit.data for qarg in qargs if inst.name == 'measure' }
        list_measured_qubits = []
        for qubit in measured_qubits:
            list_measured_qubits += [qubit.index]
        return list_measured_qubits

    def close(self):
        return

Next we want to define Node(), Edge() and the MonteCarlo Tree Search, MCTS(). Node can calculate the reward, calc_reward, and if it has never been previously expanded, expand the children, calc_children.

In [7]:
class Node():
    def __init__(self,circuit,parent=None,previous_edge = None, value=0,children = []):
        '''
        circuit: class QuantumCircuit
        parent: class Node
        children: list of Nodes
        '''
        if type(circuit) != QuantumCircuit:
            raise Exception('Not a circuit!')

        #super().__init__()
        self.value = value
        #self.probabilities = probabilities
        self.children = children #Tuple (edge, circuit)
        self.parent = parent
        self.circuit = circuit
        self.one_qubit_gates = ['H','X','Z','M']
        self.two_qubit_gates = ['CX','CZ']
        self.gates = self.one_qubit_gates +  self.two_qubit_gates
        self.measured = self.measured_qubits(self.circuit) #calculates self.measured
        self.reward = None
        self.previous_edge = previous_edge
        
    
    def calc_reward(self):  
        '''
        Calculates the reward of a circuit simulating it once
        '''
        measured = self.measured_qubits(self.circuit) #Figure out which qubits have been measured
        circ = deepcopy(self.circuit) #To run an experiment we use a copy

        # Use Aer's qasm_simulator
        backend_sim = Aer.get_backend('qasm_simulator')

        # Execute the circuit on the qasm simulator.
        # We've set the number of repeats of the circuit
        # to be 1024, which is the default.
        job_sim = execute(circ, backend_sim, shots=1024)

        # Grab the results from the job.
        result_sim = job_sim.result()

        counts = result_sim.get_counts(circ) 

        self.counts = counts #----
        #print('counts: ',counts)
        reward = 0
        
        
        for key in counts.keys():
            counter = circ.n_qubits - 1 # Due to the structure of the representation of qubits, we start by the biggest and go to the lowest
            key_reward = 0
            for digit in str(key):
                if counter in self.measured: 
                    key_reward += 2*int(digit)-1
                counter -= 1
            reward += key_reward * counts[key]

        
        if reward == 0:
            reward = 2*(float(np.random.rand(1))-0.5)
            reward *= .001

        self.reward = reward

        del circ

    def calc_children(self):
        '''
        Expands one layer of the MCTS
        '''
        
        self.children = []
        number_of_qubits = self.circuit.n_qubits
        
        for gate in self.one_qubit_gates:
            
            for qubit1 in range(number_of_qubits):

                if qubit1 not in self.measured:

                    new_circuit = deepcopy(self.circuit) #'''Need to clone it'''
                    
                    if gate is 'H':
                        new_circuit.h(qubit1)
                    elif gate is 'M':
                        new_circuit.measure(qubit1,qubit1) 
                    elif gate is 'X':
                        new_circuit.x(qubit1)
                    elif gate is 'Z':
                        new_circuit.z(qubit1)
                    
                    qubit2 = -1


    
                    new_node = Node(new_circuit, parent = self)
                    new_edge = Edge(self,(gate,qubit1,qubit2),new_node,P=1,N=0) # Important to call the NN to calculate P!!!
                    new_node.previous_edge = new_edge

                    self.children += [(new_node,new_edge)]
        
            
        for gate in self.two_qubit_gates:
            
            for qubit1 in range(number_of_qubits):
                
                for qubit2 in range(number_of_qubits):
                    
                    if (qubit1 != qubit2) and (qubit1 not in self.measured) and (qubit2 not in self.measured):
                
                        new_circuit = deepcopy(self.circuit)

                        if gate is 'CX':
                            new_circuit.cx(qubit1,qubit2)
                        elif gate is 'CZ':
                            new_circuit.cz(qubit1,qubit2)
                            
                        new_node = Node(new_circuit, parent = self)
                        new_edge = Edge(self,(gate,qubit1,qubit2),new_node,P=1,N=0) # Important to call the NN to calculate P!!!
                        new_node.previous_edge = new_edge

                        self.children += [(new_node,new_edge)]
                

                # edge = Edge(circuit,(gate,qubit1,qubit2),P,N+1)
                
    def measured_qubits(self,circuit):
        measured_qubits = { qarg for (inst, qargs, cargs) in circuit.data for qarg in qargs if inst.name == 'measure' }
        list_measured_qubits = []
        for qubit in measured_qubits:
            list_measured_qubits += [qubit.index]   
        return list_measured_qubits   

Edge should be able to calculate $Q$ and $U$. The key here is that for each edge we should feed the number of times we have explored this edge, $N$, and $P$, calculated by the Neural Network initially.

In [8]:
class Edge():
    def __init__(self, state, action, next_state, P = 1, N = 0):
        '''
        state: class Node
        action: tuple (string, number, possible number)
        '''
        #print('creating one new edge')
        if type(state) != Node:
            #print(type(state))
            raise Exception('Not a node!')

        self.circuit = state.circuit
        self.state = state
        self.N = N
        self.P = P # Set P = None once we define P_function()
        self.action = action         
        self.next_state = next_state
        #print(next_state.circuit) # 


        #self.Q_function()
        self.Q = 0
        self.U_function()

    def Q_function(self):
        #next_state = deepcopy(self.next_state)
        sum_values = self.sum_of_values(self.next_state)
        self.Q =  sum_values / (self.N)

    def U_function(self): #Need P that comes from the NN
        self.U = self.P / (1+self.N)
        
    def sum_of_values(self,state): #Returns \sum V(s') such that s,a eventually reaches s'
        '''
        state: class Node
        '''
        if type(state) != Node:
            raise Exception('state is not a node for sum_of_values')
        list_of_states = [state]
        value = state.value
        for s in state.children:
            if s[0] not in list_of_states:
                value_add = self.sum_of_values(s[0])
                value += value_add
                list_of_states += [s[0]]
        return value
    
    '''
    def P_function(self,NN):
        #Calculates the probability that in state self.circuit one takes self.action
        
    '''

Finally, we want to define the MCTS. We need functions to select a new node to which one wants to move. One would also like to have a rollout policy, a backup and a calc_reward.

In [12]:
class MCTS():
    def __init__(self,root_node,n_iterations=1,depth=5,temperature=.001):
        self.root_node = root_node
        self.n_iterations = n_iterations
        self.depth = depth
        self.temperature = temperature
        
        
    def play(self):
        for i in range(0,self.n_iterations):
            node = self.root_node
            

            #Expand
            for j in range(0,self.depth):
                print('expanded ',j,' times')
                node, _ = self.select(node) 
                

            #Rollout
            self.rollout(node)
            
            #Backup
            self.backup(node)
        
        children = self.root_node.children
        
        options = []
        edges = []
        for child in children:
            options += [child[0]]
            edges += [child[1]]

        action_probabilities = []
        sumN = sum((edge.N) for edge in edges)
        action_probabilities += [(float(edge.N)/sumN)**(1/self.temperature) for edge in edges]



        return action_probabilities, edges, options
    
    def select(self,node): # np.radom.choice(action , number_of_items_to_pick = 1, P)
        #print(node.circuit)
        if node.children == []:
            print('calculating children')
            node.calc_children() 


        children = node.children

        options = []
        edges = []
        for child in children:    
            options += [child[0]]
            edges += [child[1]]

        
        probabilities = []
        

        for edge in edges:
            probabilities += [edge.P]
            
        index = np.argmax(edge.Q+edge.U for edge in edges) #Since at the beginning we have no idea of what is best, it makes sense that index = 0
        #print(index)
        new_node, new_edge = children[index]
        
        new_edge.N += 1
        new_edge.Q_function()
        new_edge.U_function()
        
        #print(new_node.circuit)
        return new_node, new_edge

    def rollout(self,rollout_node):
        node = deepcopy(rollout_node)
        for i in range(0,5): 
            
            if node.children == []:
                node.calc_children()
            
            children = node.children
            options = []
            edges = []
            for child in children:
                options += [child[0]]
                edges += [child[1]]
        
            probabilities = []
            probabilities += (edge.P for edge in edges) # Is this notation right?
            
            #for edge in edges:
            #    probabilities += [edge.P]
            '''To eliminate when the NN predicts probabilities, if needed'''
            #print('probabilities',probabilities)
            suma = sum(probabilities)
            probabilities = [probability/suma for probability in probabilities]  # Ensure that probabilities are norm-1 normalized

            #print(probabilities)
            #print(node.children)
            indices = np.arange(len(node.children))
            #print(indices)
            index = int(np.random.choice(indices, 1, probabilities))
            #print(index)
            node, edge = node.children[index]
            #print(node,edge)
        node.calc_reward()
        #print('rollout node reward', node.reward)
        #print(node.circuit)
        rollout_node.value += node.reward
        del node
            
             
        
    def backup(self,node):

        while node.previous_edge != None:
            previous_edge = node.previous_edge
            #print('Q value before', previous_edge.Q)
            previous_edge.Q_function()
            #print('Q value after', previous_edge.Q)
            node = previous_edge.state


To define the neural network we need to format the input and output. The input is (state,action_probabilities, success_probability). That is, with a state, one should predict action probabilities, and success probabilities.

In [14]:
def play_once():
    data_DL =[] # Data used to train the NN
    game = QiskitGameEnv() #Environment
    game.reset() #Initialize environment

    
    while len(game.measured) < 6:   #How do we update game.measured

        node = deepcopy(Node(game.circuit))
        MonteCarlo = MCTS(node) #Create the MonteCarlo game

        #Run MonteCarlo
        action_probabilities, edges, nodes = MonteCarlo.play() 

        #Saving the data
        data_DL +=[(game.circuit,action_probabilities, edges, nodes)]

        #Choose next step
        indices = np.arange(len(edges))
        #print(action_probabilities)

        index = int(np.random.choice(indices, 1, action_probabilities))
        edge = edges[index]
        action = edge.action
        game.step(action)
        del node
        
        print(game.circuit)
        
        if game.step_count > 5:
            break
    return None   
    #return data_DL
    
    
    

The main algorithm, still to be completed. Perhaps a Transformer should work as DL architercture to predict steps.

In [18]:
if __name__ == '__main__':
    game = QiskitGameEnv()
    # Initialize the NN -> Recall to modify the P_function() in Edge()
    for j in range(0,100)
        data_dL = []
        for i in range(0,100):
            data_dL += play_once()
        # train the NN with data_dL
        # Collect some statistics
    