In [23]:
import torch
from torch.autograd import Variable
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
import pennylane as qml
import pandas as pd
import random
import gym
import time
from qiskit import QuantumCircuit, QuantumRegister, ClassicalRegister, assemble, Aer

In [34]:
CIRCUIT_SIZE = 4
MAX_ITERATIONS = 5
MAX_STEPS = 250
BATCHSIZE = 5
TARGET_MAX = 20
GAMMA = 0.99

STATE_T = 0
ACTION = 1
REWARD = 2
STATE_NT = 3
DONE = 4

In [35]:
dev = qml.device('qiskit.aer', wires = 4)

def layer(weights):
    # Entanglement block
    qml.CNOT(wires=[0,1])
    qml.CNOT(wires=[1,2])
    qml.CNOT(wires=[2,3])
    # u3 gate
    qml.Rot(weights[0, 0], weights[0, 1], weights[0, 2], wires=0)
    qml.Rot(weights[1, 0], weights[1, 1], weights[1, 2], wires=1)
    qml.Rot(weights[2, 0], weights[2, 1], weights[2, 2], wires=2)
    qml.Rot(weights[3, 0], weights[3, 1], weights[3, 2], wires=3)

In [53]:
def encoder(encodings):
        return [i for i, b in enumerate(f'{encodings:0{CIRCUIT_SIZE}b}') if b == '1']

@qml.qnode(dev, interface='torch')
def qc(weights, encoding = None):
    wires = []
    # encoding
    if encoding:
        wires = encoder(encoding)
        for wire in wires:
            qml.RX(np.pi, wires=wire)
            qml.RZ(np.pi, wires=wire)
    #layerwise
    for w in weights:
        layer(w)
    
    return [qml.expval(qml.PauliZ(i)) for i in range(4)]
    
def variational_qc(weights, bias, encoding = None):
    return qc(weights, encoding = encoding) + bias

var_init_circuit = Variable(torch.tensor(0.01 * np.random.randn(4, 4, 3), device='cpu'), requires_grad=True)
var_init_bias = Variable(torch.tensor([0.0, 0.0, 0.0, 0.0], device='cpu'), requires_grad=True)
	
variational_qc(var_init_circuit, var_init_bias, 10)

tensor([-1.0000,  1.0000, -0.9980,  1.0000], dtype=torch.float64,
       grad_fn=<AddBackward0>)

In [44]:
def mse(targs, preds):
    return sum([(tar - pred)**2 for tar, pred in zip(targs, preds)])/len(targs)

def cost(weights, bias, batch_features, v_targets):
    v_preds = [variational_qc(weights, bias, encoding=b[STATE_T])[b[ACTION]] for b in batch_features]
    return mse(v_targets, v_preds)

In [45]:
env = gym.make("FrozenLake-v1", is_slippery = False, map_name = '4x4')
state = env.reset()
env.render()


[41mS[0mFFF
FHFH
FFFH
HFFG


In [46]:
n_layers = 2
n_qubits = 4
targ_counter = 0
sampled_vs = []
memory = {}

In [47]:
param = Variable(torch.tensor(0.01 * np.random.randn(n_layers, n_qubits, 3), device='cpu'), requires_grad=True)
bias = Variable(torch.tensor([0.0, 0.0, 0.0, 0.0], device='cpu'), requires_grad=True)

param_targ = param.detach().clone()
bias_targ = bias.detach().clone()

opt = torch.optim.Adam([param, bias], lr = 0.01)

In [54]:
for i in range(MAX_ITERATIONS):
    start = time.time()
    state_t = env.reset()
    total_reward = 0
    done = False
    
    for t in range(MAX_STEPS):
        print(f'Episode: {i}, Steps: {t}')

        acts = variational_qc(param, bias, encoding=state_t) # act type int
        print(f'type of acts: {type(acts)}')
        act_t = torch.argmax(acts).item()
        print(f'act_t: {act_t} type of act_t: {type(act_t)}')
        state_nt, reward, done, info = env.step(action=act_t)
        # print(f'act: {act}, i:{type(i)}, new state:{new_state}, reward:{reward}, done:{done}, info:{info}')
        # env.render()
        targ_counter += 1

        act_nt = torch.argmax(variational_qc(param, bias, encoding=state_nt)).item()
        
        memory[i, t] = (state_t, act_t, reward, state_nt, done)

        if len(memory) >= BATCHSIZE:
            print('Optimizing...')
            sampled_vs = [memory[k] for k in random.sample(list(memory), BATCHSIZE)]
            v_targets = [(s[REWARD] if s[DONE] else s[REWARD] + GAMMA*torch.max(variational_qc(param_targ, bias_targ, encoding=s[STATE_NT]))) for s in sampled_vs]

            opt.zero_grad()
            loss = cost(param, bias, sampled_vs, v_targets)
            loss.backward()
            opt.step()

        # update parameters in target circuit
        if targ_counter == TARGET_MAX:
            param_targ = param.clone().detach()
            bias_targ = bias.clone().detach()
            targ_counter = 0

        state_t, act_t = state_nt, act_nt

        if done:
            break
    end = time.time()
    print(f'time for current episode {end - start}')
    # 1 episode or 250 step cost 27.5min

Episode: 0, Steps: 0
type of acts: <class 'torch.Tensor'>
act_t: 2 type of act_t: <class 'int'>
Optimizing...
Episode: 0, Steps: 1
type of acts: <class 'torch.Tensor'>
act_t: 2 type of act_t: <class 'int'>
Optimizing...
Episode: 0, Steps: 2
type of acts: <class 'torch.Tensor'>
act_t: 3 type of act_t: <class 'int'>
Optimizing...
Episode: 0, Steps: 3
type of acts: <class 'torch.Tensor'>
act_t: 3 type of act_t: <class 'int'>
Optimizing...
Episode: 0, Steps: 4
type of acts: <class 'torch.Tensor'>
act_t: 0 type of act_t: <class 'int'>
Optimizing...
Episode: 0, Steps: 5
type of acts: <class 'torch.Tensor'>
act_t: 2 type of act_t: <class 'int'>
Optimizing...
Episode: 0, Steps: 6
type of acts: <class 'torch.Tensor'>
act_t: 0 type of act_t: <class 'int'>
Optimizing...
Episode: 0, Steps: 7
type of acts: <class 'torch.Tensor'>
act_t: 2 type of act_t: <class 'int'>
Optimizing...
Episode: 0, Steps: 8
type of acts: <class 'torch.Tensor'>
act_t: 0 type of act_t: <class 'int'>
Optimizing...
Episode: 0

KeyboardInterrupt: 