Implementation of a VQC for reinforcement learning on OpenAI Gym's Frozen Lake environment

Based off of https://github.com/ycchen1989/Var-QuantumCircuits-DeepRL/blob/master/Code/QML_DQN_FROZEN_LAKE.py

Implemented using Qiskit and PyTorch

In [175]:
import torch
import random
from qiskit import QuantumCircuit
from qiskit.circuit import Parameter
from qiskit.visualization import plot_histogram
from qiskit.compiler import transpile
from qiskit import Aer
import gym
import numpy as np

In [1]:
class ReplayMemory():

    # Initialize our replay memory
    def __init__(self, capacity):
        self.transitions = []
        self.capacity = capacity

    # Add a transition value to our memory
    def store_transition(self, transition):
        if len(self.transitions) < self.capacity:
            self.transitions.append(transition)
    
    # Sample a random batch from our memory
    def sample(self, batch_size):
        return random.sample(self.transitions, batch_size)

In [302]:
class Agent():
    
    def __init__(self, numQubits=4, depth=1):
        # Number of qubits used (# of wires)
        self.numQubits = numQubits

        # Number of times to apply the CNOT / rotation module
        self.depth = depth

        # Action-value function approximator
        self.qc = QuantumCircuit(numQubits, numQubits)

        # State encoding parameters
        # thetas: parameters used to store angles to rotate in the x direction by
        # phis: parameters used to store angles to rotate in the z direction by
        self.thetas = [Parameter('theta_{}'.format(i)) for i in range(self.numQubits)]
        self.phis = [Parameter('phi_{}'.format(i)) for i in range(self.numQubits)]

        # Creates rotations to be used in getting outputs
        self.alpha_rotations = [Parameter('alpha_{}'.format(i)) for i in range(self.numQubits)]
        self.beta_rotations = [Parameter('beta_{}'.format(i)) for i in range(self.numQubits)]
        self.gamma_rotations = [Parameter('gamma_{}'.format(i)) for i in range(self.numQubits)]

        # Creates a backend to run the circuit on
        self.backend = Aer.get_backend('qasm_simulator')

        # Initialize state preparation gates
        self.state_preparation()

        # Create a layer
        self.init_layer()

        # Initialize random parameters
        rand_alphas, rand_betas, rand_gammas = np.random.rand(3, 4) * 2 - 1
        self.bind_layer(rand_alphas, rand_betas, rand_gammas)

        # Initialize measurement
        self.init_measurement()

    # State is a decimal value from 0 to 16
    # Converts this decimal value to a binary list
    def get_binary_state_encoding(self, state):
        encoding = [int(i) for i in bin(state)[2:]]
        while len(encoding) < self.numQubits:
            encoding = [0] + encoding
        return encoding

    # Creates a parameterized state encoding circuit
    def state_preparation(self):
        
        # Initialize circuit with params
        for wire in range(self.numQubits):
            self.qc.rx(np.pi * self.thetas[wire], wire)
        
        for wire in range(self.numQubits):
            self.qc.rz(np.pi * self.phis[wire], wire)
    
    # Binds theta values and phi values to the quantum circuit
    # state: index of the state
    def bind_state_preparation_parameters(self, state, circuit):
        angles = self.get_binary_state_encoding(state)

        # Make sure the number of theta values and phi values are equal to the number of qubits in the circuit
        assert len(angles) == self.numQubits

        circuit = circuit.bind_parameters(dict(zip(self.thetas, angles)))
        circuit = circuit.bind_parameters(dict(zip(self.phis, angles)))

        return circuit
    
    # Creates a rotation layer
    def init_layer(self):
        # Create CNOT gates at each layer of the circuit
        for wire in range(self.numQubits - 1):
            self.qc.cx(wire, wire + 1)
        
        # Create rotations at each level of the circuit
        for wire in range(self.numQubits):
            self.qc.rx(self.alpha_rotations[wire], wire)
            self.qc.ry(self.beta_rotations[wire], wire)
            self.qc.rz(self.gamma_rotations[wire], wire)

    # Binds one layer of our parameterized circuit to parameters
    # alpha: array storing rotation values in x direction
    # beta: array storing rotation values in y direction
    # gamma: array storing rotation values in z direction
    def bind_layer(self, alphas, betas, gammas):
        # Length of alpha, beta, and gamma must be the same as the number of wires in the circuit
        assert len(alphas) == self.numQubits                     
        assert len(betas) == self.numQubits
        assert len(gammas) == self.numQubits

        self.qc = self.qc.bind_parameters(dict(zip(self.alpha_rotations, alphas)))
        self.qc = self.qc.bind_parameters(dict(zip(self.beta_rotations, betas)))
        self.qc = self.qc.bind_parameters(dict(zip(self.gamma_rotations, gammas)))
    
    # Adds a measurement layer to the end of the circuit
    def init_measurement(self):
        for wire in range(self.numQubits):
            self.qc.measure(wire, wire)

    # Outputs a score for each action
    # Four actions
    # If expectation value from qubit 0 is highest, then action selected is LEFT
    # If expectation value from qubit 1 is highest, then action selected is DOWN
    # If expectation value from qubit 2 is highest, then action selected is RIGHT
    # If expectation value from qubit 3 is highest, then action selected is UP
    # state: the state to select an action from
    # num_iterations: the number of times to use to calculate our expectation values
    # epsilon: percentage of the time to choose a random action
    def select_action(self, state, num_iterations=10, epsilon=0.1):
        if np.random.rand() < epsilon:
            return np.random.randint(0, 4)

        # Binds the state preparation parameters to the state we want to select the best action from
        bound_copy = self.qc.copy()
        bound_copy = self.bind_state_preparation_parameters(state, bound_copy)

        # Create a job to run on our circuit
        job = self.backend.run(transpile(bound_copy, self.backend), shots=num_iterations)

        # Get the result of our job
        results = job.result()

        # Get the number of times each result appeared
        result_counts = results.get_counts(bound_copy)

        # For each result we got, get the number of times each bit appeared
        # The bit that was 1 most often is the index of our selected action
        counts = [0] * self.numQubits
        for output in result_counts.keys():
            for wire in range(self.numQubits):
                counts[wire] += int(output[wire]) * result_counts[output]

        return np.argmax(counts)

In [303]:
class Runner():

    # Initialize the runner class
    # num_episodes: the number of episodes to run
    # epsilon: the percentage of the time to select a random action
    # capacity: the capacity of the replay memory
    def __init__(self, num_episodes=100, epsilon=0.1, capacity=1000):
        self.num_episodes = num_episodes
        self.epsilon = epsilon
        self.memory = ReplayMemory(capacity)
        self.agent = Agent(numQubits=4, depth=1)
        self.env = gym.make('FrozenLake-v1', is_slippery=False)
        self.terminal_state = 15
    
    # Run the algorithm for num_episodes
    # num_episodes: the number of episodes to run (M)
    def run(self):
        for ep in range(self.num_episodes):
            initial_state = self.env.reset()[0]
            print(initial_state)
            terminated, truncated = False, False
            while not terminated and not truncated:
                action = self.agent.select_action(initial_state)
                observation, reward, terminated, truncated, info = self.env.step(action)
            
            # Penalize for falling in a hole
            if observation != self.terminal_state:
                reward -= 1


In [304]:
r = Runner()
r.run()

0
(0, 0.0, False, False, {'prob': 1.0})
(4, 0.0, False, False, {'prob': 1.0})
(8, 0.0, False, False, {'prob': 1.0})
(8, 0.0, False, False, {'prob': 1.0})
(4, 0.0, False, False, {'prob': 1.0})
(8, 0.0, False, False, {'prob': 1.0})
(12, 0.0, True, False, {'prob': 1.0})
(12, 0, True, False, {'prob': 1.0})
(12, 0, True, False, {'prob': 1.0})
(12, 0, True, False, {'prob': 1.0})
(12, 0, True, False, {'prob': 1.0})
(12, 0, True, False, {'prob': 1.0})
(12, 0, True, False, {'prob': 1.0})
(12, 0, True, False, {'prob': 1.0})
(12, 0, True, False, {'prob': 1.0})
(12, 0, True, False, {'prob': 1.0})
(12, 0, True, False, {'prob': 1.0})
(12, 0, True, False, {'prob': 1.0})
(12, 0, True, False, {'prob': 1.0})
(12, 0, True, False, {'prob': 1.0})
(12, 0, True, False, {'prob': 1.0})
(12, 0, True, False, {'prob': 1.0})
(12, 0, True, False, {'prob': 1.0})
(12, 0, True, False, {'prob': 1.0})
(12, 0, True, False, {'prob': 1.0})
(12, 0, True, False, {'prob': 1.0})
(12, 0, True, False, {'prob': 1.0})
(12, 0, True

KeyboardInterrupt: 

In [295]:
env = gym.make('FrozenLake-v1', is_slippery=False)

In [299]:
print(env.reset())
while True:
    action = int(input())
    print(env.step(action))

(0, {'prob': 1})
(4, 0.0, False, False, {'prob': 1.0})
(5, 0.0, True, False, {'prob': 1.0})


ValueError: invalid literal for int() with base 10: ''