<a href="https://colab.research.google.com/github/AFNANAMIN/AI_Freelancing/blob/master/Reinforcement_Learning_with_Quantum_Computing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pennylane 



In [0]:
import pennylane as qml
from pennylane import numpy as np
from pennylane.optimize import NesterovMomentumOptimizer

import torch
import torch.nn as nn 
from torch.autograd import Variable

import matplotlib.pyplot as plt
from datetime import datetime
import pickle

import gym
import time
import random
from collections import namedtuple
from copy import deepcopy

In [0]:
from gym.envs import toy_text
class ShortestPathFrozenLake(toy_text.frozen_lake.FrozenLakeEnv):
	def __init__(self, **kwargs):
		super(ShortestPathFrozenLake, self).__init__(**kwargs)

		for state in range(self.nS): # for all states
			for action in range(self.nA): # for all actions
				my_transitions = []
				for (prob, next_state, _, is_terminal) in self.P[state][action]:
					row = next_state // self.ncol
					col = next_state - row * self.ncol
					tile_type = self.desc[row, col]
					if tile_type == b'H':
						reward = -0.2
					elif tile_type == b'G':
						reward = 1.
					else:
						reward = -0.01

					my_transitions.append((prob, next_state, reward, is_terminal))
				self.P[state][action] = my_transitions

In [0]:
from gym.envs.registration import register
register(
    id='Deterministic-ShortestPath-4x4-FrozenLake-v0', # name given to this new environment
    entry_point='ShortestPathFrozenLake:ShortestPathFrozenLake', # env entry point
    kwargs={'map_name': '4x4', 'is_slippery': False} # argument passed to the env
)


In [0]:
Transition = namedtuple('Transition',
						('state', 'action', 'reward', 'next_state', 'done'))

In [0]:
class ReplayMemory(object):

	def __init__(self, capacity):
		self.capacity = capacity
		self.memory = []
		self.position = 0

	def push(self, *args):
		"""Saves a transition."""
		if len(self.memory) < self.capacity:
			self.memory.append(None)
		self.memory[self.position] = Transition(*args)
		self.position = (self.position + 1) % self.capacity

	def sample(self, batch_size):
		return random.sample(self.memory, batch_size)

	def output_all(self):
		return self.memory

	def __len__(self):
		return len(self.memory)

In [0]:
def plotTrainingResultCombined(_iter_index, _iter_reward, _iter_total_steps, _fileTitle):
	fig, ax = plt.subplots()
	# plt.yscale('log')
	ax.plot(_iter_index, _iter_reward, '-b', label='Reward')
	ax.plot(_iter_index, _iter_total_steps, '-r', label='Total Steps')
	leg = ax.legend();

	ax.set(xlabel='Iteration Index', 
		   title=_fileTitle)
	fig.savefig(_fileTitle + "_"+ datetime.now().strftime("NO%Y%m%d%H%M%S") + ".png")


In [0]:
def plotTrainingResultReward(_iter_index, _iter_reward, _iter_total_steps, _fileTitle):
	fig, ax = plt.subplots()
	# plt.yscale('log')
	ax.plot(_iter_index, _iter_reward, '-b', label='Reward')
	# ax.plot(_iter_index, _iter_total_steps, '-r', label='Total Steps')
	leg = ax.legend();

	ax.set(xlabel='Iteration Index', 
		   title=_fileTitle)
	fig.savefig(_fileTitle + "_REWARD" + "_"+ datetime.now().strftime("NO%Y%m%d%H%M%S") + ".png")

In [0]:
def decimalToBinaryFixLength(_length, _decimal):
	binNum = bin(int(_decimal))[2:]
	outputNum = [int(item) for item in binNum]
	if len(outputNum) < _length:
		outputNum = np.concatenate((np.zeros((_length-len(outputNum),)),np.array(outputNum)))
	else:
		outputNum = np.array(outputNum)
	return outputNum

In [0]:
dtype = torch.DoubleTensor

## Define a FOUR qubit system
dev = qml.device('default.qubit', wires=4)

In [0]:
def statepreparation(a):

	
	# Rot to computational basis encoding
	# a = [a_0, a_1, a_2, a_3, a_4, a_5, a_6, a_7, a_8]

	for ind in range(len(a)):
		qml.RX(np.pi * a[ind], wires=ind)
		qml.RZ(np.pi * a[ind], wires=ind)


def layer(W):
	""" Single layer of the variational classifier.
	Args:
		W (array[float]): 2-d array of variables for one layer
	"""

	qml.CNOT(wires=[0, 1])
	qml.CNOT(wires=[1, 2])
	qml.CNOT(wires=[2, 3])


	qml.Rot(W[0, 0], W[0, 1], W[0, 2], wires=0)
	qml.Rot(W[1, 0], W[1, 1], W[1, 2], wires=1)
	qml.Rot(W[2, 0], W[2, 1], W[2, 2], wires=2)
	qml.Rot(W[3, 0], W[3, 1], W[3, 2], wires=3)


In [12]:
qml.about()

Name: PennyLane
Version: 0.8.1
Summary: PennyLane is a Python quantum machine learning library by Xanadu Inc.
Home-page: https://github.com/XanaduAI/pennylane
Author: None
Author-email: None
License: Apache License 2.0
Location: /usr/local/lib/python3.6/dist-packages
Requires: scipy, networkx, appdirs, toml, autograd, semantic-version, numpy
Required-by: 
Platform info:           Linux-4.14.137+-x86_64-with-Ubuntu-18.04-bionic
Python version:          3.6.9
Numpy version:           1.18.2
Scipy version:           1.4.1
Installed devices:
- default.gaussian (PennyLane-0.8.1)
- default.qubit (PennyLane-0.8.1)
- default.tensor (PennyLane-0.8.1)
- default.tensor.tf (PennyLane-0.8.1)


In [0]:
@qml.qnode(dev, interface='torch')
def circuit(weights, angles=None):


	statepreparation(angles)
	
	for W in weights:
		layer(W)

	return [qml.expval(qml.PauliZ(ind)) for ind in range(4)]


def variational_classifier(var_Q_circuit, var_Q_bias , angles=None):
	"""The variational classifier."""



	weights = var_Q_circuit
	
	
	raw_output = circuit(weights, angles=angles) + var_Q_bias

	return raw_output


def square_loss(labels, predictions):
	
	loss = 0
	for l, p in zip(labels, predictions):
	    loss = loss + (l - p) ** 2
	loss = loss / len(labels)
	

	return loss

In [0]:
def abs_loss(labels, predictions):
	""" Square loss function
	Args:
		labels (array[float]): 1-d array of labels
		predictions (array[float]): 1-d array of predictions
	Returns:
		float: square loss
	"""
	
	output = torch.abs(predictions - labels)
	output = torch.sum(output) / len(labels)
	# output = loss(torch.tensor(predictions), torch.tensor(labels))
	# print("LOSS OUTPUT")
	# print(output)

	return output

def huber_loss(labels, predictions):


	# loss = nn.MSELoss()
	loss = nn.SmoothL1Loss()
	# output = loss(torch.tensor(predictions), torch.tensor(labels))
	# print("LOSS OUTPUT")
	# print(output)

	return loss(labels, predictions)


def cost(var_Q_circuit, var_Q_bias, features, labels):
	"""Cost (error) function to be minimized."""

	# predictions = [variational_classifier(weights, angles=f) for f in features]
	# Torch data type??
	
	predictions = [variational_classifier(var_Q_circuit = var_Q_circuit, var_Q_bias = var_Q_bias, angles=decimalToBinaryFixLength(4,item.state))[item.action] for item in features]


	return square_loss(labels, predictions)


#############################

def epsilon_greedy(var_Q_circuit, var_Q_bias, epsilon, n_actions, s, train=False):
	"""
	@param Q Q values state x action -> value
	@param epsilon for exploration
	@param s number of states
	@param train if true then no random actions selected
	"""

	


	if train or np.random.rand() < ((epsilon/n_actions)+(1-epsilon)):
		# action = np.argmax(Q[s, :])
		# variational classifier output is torch tensor
		# action = np.argmax(variational_classifier(var_Q_circuit = var_Q_circuit, var_Q_bias = var_Q_bias, angles = decimalToBinaryFixLength(9,s)))
		action = torch.argmax(variational_classifier(var_Q_circuit = var_Q_circuit, var_Q_bias = var_Q_bias, angles = decimalToBinaryFixLength(4,s)))
	else:
		# need to be torch tensor
		action = torch.tensor(np.random.randint(0, n_actions))
	return action





In [0]:
def deep_Q_Learning(alpha, gamma, epsilon, episodes, max_steps, n_tests, render = False, test=False):


	
	env = gym.make('Deterministic-ShortestPath-4x4-FrozenLake-v0')

	n_states, n_actions = env.observation_space.n, env.action_space.n
	print("NUMBER OF STATES:" + str(n_states))
	print("NUMBER OF ACTIONS:" + str(n_actions))

	
	num_qubits = 4
	num_layers = 2
	# var_init = (0.01 * np.random.randn(num_layers, num_qubits, 3), 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)

	var_init_circuit = Variable(torch.tensor(0.01 * np.random.randn(num_layers, num_qubits, 3), device='cpu').type(dtype), requires_grad=True)
	var_init_bias = Variable(torch.tensor([0.0, 0.0, 0.0, 0.0], device='cpu').type(dtype), requires_grad=True)


	var_Q_circuit = var_init_circuit
	var_Q_bias = var_init_bias
	# print("INIT PARAMS")
	# print(var_Q_circuit)

	var_target_Q_circuit = var_Q_circuit.clone().detach()
	var_target_Q_bias = var_Q_bias.clone().detach()

	opt = torch.optim.RMSprop([var_Q_circuit, var_Q_bias], lr=0.01, alpha=0.99, eps=1e-08, weight_decay=0, momentum=0, centered=False)

	## NEed to move out of the function
	TARGET_UPDATE = 20
	batch_size = 5
	OPTIMIZE_STEPS = 5
	##


	target_update_counter = 0

	iter_index = []
	iter_reward = []
	iter_total_steps = []

	cost_list = []


	timestep_reward = []


	
	memory = ReplayMemory(80)

	


	for episode in range(episodes):
		print(f"Episode: {episode}")
		# Output a s in decimal format
		s = env.reset()
		# Doing epsilog greedy action selection
		# With var_Q
		a = epsilon_greedy(var_Q_circuit = var_Q_circuit, var_Q_bias = var_Q_bias, epsilon = epsilon, n_actions = n_actions, s = s).item()
		t = 0
		total_reward = 0
		done = False


		while t < max_steps:
			if render:
				print("###RENDER###")
				env.render()
				print("###RENDER###")
			t += 1

			target_update_counter += 1

			# Execute the action 
			s_, reward, done, info = env.step(a)
		
			total_reward += reward
			# a_ = np.argmax(Q[s_, :])
			a_ = epsilon_greedy(var_Q_circuit = var_Q_circuit, var_Q_bias = var_Q_bias, epsilon = epsilon, n_actions = n_actions, s = s_).item()
			
			# print("ACTION:")
			# print(a_)

			memory.push(s, a, reward, s_, done)

			if len(memory) > batch_size:



				batch_sampled = memory.sample(batch_size = batch_size)

			

				Q_target = [item.reward + (1 - int(item.done)) * gamma * torch.max(variational_classifier(var_Q_circuit = var_target_Q_circuit, var_Q_bias = var_target_Q_bias, angles=decimalToBinaryFixLength(4,item.next_state))) for item in batch_sampled]
			

				def closure():
					opt.zero_grad()
					loss = cost(var_Q_circuit = var_Q_circuit, var_Q_bias = var_Q_bias, features = batch_sampled, labels = Q_target)
					# print(loss)
					loss.backward()
					return loss
				opt.step(closure)

				# print("UPDATING PARAMS COMPLETED")
				current_replay_memory = memory.output_all()
				current_target_for_replay_memory = [item.reward + (1 - int(item.done)) * gamma * torch.max(variational_classifier(var_Q_circuit = var_target_Q_circuit, var_Q_bias = var_target_Q_bias, angles=decimalToBinaryFixLength(4,item.next_state))) for item in current_replay_memory]
				# current_target_for_replay_memory = [item.reward + (1 - int(item.done)) * gamma * np.max(variational_classifier(var_target_Q, angles=decimalToBinaryFixLength(9,item.next_state))) for item in current_replay_memory]

			

			if target_update_counter > TARGET_UPDATE:
				print("UPDATEING TARGET CIRCUIT...")

				var_target_Q_circuit = var_Q_circuit.clone().detach()
				var_target_Q_bias = var_Q_bias.clone().detach()
				
				target_update_counter = 0

			s, a = s_, a_

			if done:
				if render:
					print("###FINAL RENDER###")
					env.render()
					print("###FINAL RENDER###")
					print(f"This episode took {t} timesteps and reward: {total_reward}")
				epsilon = epsilon / ((episode/100) + 1)
				# print("Q Circuit Params:")
				# print(var_Q_circuit)
				print(f"This episode took {t} timesteps and reward: {total_reward}")
				timestep_reward.append(total_reward)
				iter_index.append(episode)
				iter_reward.append(total_reward)
				iter_total_steps.append(t)
				break

	return timestep_reward, iter_index, iter_reward, iter_total_steps, var_Q_circuit, var_Q_bias






In [0]:
if __name__ =="__main__":
	alpha = 0.4
	gamma = 0.999
	epsilon = 1.
	episodes = 50
	max_steps = 2500
	n_tests = 2
	timestep_reward, iter_index, iter_reward, iter_total_steps , var_Q_circuit, var_Q_bias = deep_Q_Learning(alpha, gamma, epsilon, episodes, max_steps, n_tests, test = False)
	
	print(timestep_reward)
	

	## Drawing Training Result ##
	file_title = 'VQDQN_Frozen_Lake_NonSlip_Dynamic_Epsilon_RMSProp' + datetime.now().strftime("NO%Y%m%d%H%M%S")
	
	plotTrainingResultReward(_iter_index = iter_index, _iter_reward = iter_reward, _iter_total_steps = iter_total_steps, _fileTitle = 'Quantum_DQN_Frozen_Lake_NonSlip_Dynamic_Epsilon_RMSProp')

	## Saving the model
	with open(file_title + "_var_Q_circuit" + ".txt", "wb") as fp:
			pickle.dump(var_Q_circuit, fp)

	with open(file_title + "_var_Q_bias" + ".txt", "wb") as fp:
			pickle.dump(var_Q_bias, fp)

	with open(file_title + "_iter_reward" + ".txt", "wb") as fp:
			pickle.dump(iter_reward, fp)



NUMBER OF STATES:16
NUMBER OF ACTIONS:4
Episode: 0
This episode took 3 timesteps and reward: -0.22
Episode: 1
This episode took 10 timesteps and reward: -0.29000000000000004
Episode: 2
UPDATEING TARGET CIRCUIT...
This episode took 9 timesteps and reward: -0.28
Episode: 3
This episode took 3 timesteps and reward: -0.22
Episode: 4
This episode took 4 timesteps and reward: -0.23
Episode: 5
This episode took 12 timesteps and reward: -0.31
Episode: 6
UPDATEING TARGET CIRCUIT...
This episode took 4 timesteps and reward: -0.23
Episode: 7
UPDATEING TARGET CIRCUIT...
This episode took 21 timesteps and reward: -0.4
Episode: 8
UPDATEING TARGET CIRCUIT...
This episode took 24 timesteps and reward: -0.43000000000000005
Episode: 9
This episode took 3 timesteps and reward: -0.22
Episode: 10
This episode took 2 timesteps and reward: -0.21000000000000002
Episode: 11
UPDATEING TARGET CIRCUIT...
UPDATEING TARGET CIRCUIT...
UPDATEING TARGET CIRCUIT...
UPDATEING TARGET CIRCUIT...
This episode took 91 times