In [1]:
import gym
import numpy as np
import pennylane as qml
import torch
import torch.nn as nn
from torch.nn.parameter import Parameter

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
class Agent:
    def __init__(self,
                 net,
                 action_space=None,
                 exploration_initial_eps=None,
                 exploration_decay=None,
                 exploration_final_eps=None):

        self.net = net
        self.action_space = action_space
        self.exploration_initial_eps = exploration_initial_eps
        self.exploration_decay = exploration_decay
        self.exploration_final_eps = exploration_final_eps
        self.epsilon = 0.

    def __call__(self, state, device=torch.device('cpu')):
        if np.random.random() < self.epsilon:
            action = self.get_random_action()
        else:
            action = self.get_action(state, device)

        return action

    def get_random_action(self):
        action = self.action_space.sample()
        return action

    def get_action(self, state, device=torch.device('cpu')):
        if not isinstance(state, torch.Tensor):
            state = torch.tensor(np.array([state]))

        if device.type != 'cpu':
            state = state.cuda(device)

        q_values = self.net.eval()(state)
        _, action = torch.max(q_values, dim=1)
        return int(action.item())

    def update_epsilon(self, step):
        self.epsilon = max(
            self.exploration_final_eps, self.exploration_final_eps +
            (self.exploration_initial_eps - self.exploration_final_eps) *
            self.exploration_decay**step)
        return self.epsilon


In [3]:
def test_agent(env, agent, n_eval_episodes):
    episode_steps = []
    episode_reward = []

    for _ in range(n_eval_episodes):
        state,_ = env.reset()
        done = False
        episode_steps.append(0)
        episode_reward.append(0)
        for i in range(100):
            action = agent.get_action(state)
            next_state, reward, terminated, truncated ,_= env.step(action)
            done = terminated or truncated
            #state = next_state
            episode_steps[-1] += 1
            episode_reward[-1] += reward
            if done:
                break
            else:
                state = next_state

    episode_steps = np.mean(episode_steps)
    episode_reward = np.mean(episode_reward)
    return {'steps': episode_steps, 'reward': episode_reward}

In [5]:
from collections import namedtuple

Transition = namedtuple('Transition',
                        ('state', 'action', 'reward', 'done', 'next_state'))


class ReplayMemory(object):
    def __init__(self, buffer_size):
        self.buffer_size = buffer_size
        self.memory = []
        self.position = 0

    def push(self, *args):
        if len(self.memory) < self.buffer_size:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.buffer_size

    def sample(self, batch_size, device):
        indices = np.random.choice(len(self), batch_size, replace=False)
        states, actions, rewards, dones, next_states = zip(
            *[self.memory[idx] for idx in indices])
        states = torch.from_numpy(np.array(states)).to(device)
        actions = torch.from_numpy(np.array(actions)).to(device)
        rewards = torch.from_numpy(np.array(rewards,
                                            dtype=np.float32)).to(device)
        dones = torch.from_numpy(np.array(dones, dtype=np.int32)).to(device)
        next_states = torch.from_numpy(np.array(next_states)).to(device)
        return states, actions, rewards, dones, next_states

    def __len__(self):
        return len(self.memory)


In [6]:
import gym.spaces

class BinaryWrapper(gym.ObservationWrapper):
    def __init__(self, env):
        super(BinaryWrapper, self).__init__(env)
        self.bits = int(np.ceil(np.log2(env.observation_space.n)))
        self.observation_space = gym.spaces.MultiBinary(self.bits)

    def observation(self, obs):
        binary = map(float, "{0:b}".format(int(obs)).zfill(self.bits))
        return np.array(list(binary))


In [7]:
"""def encode(n_qubits, inputs):
    for wire in range(n_qubits):
        qml.RX(inputs[wire], wires=wire)"""


"""def layer(n_qubits, y_weight, z_weight):
    for wire, y_weight in enumerate(y_weight):
        qml.RY(y_weight, wires=wire)
    for wire, z_weight in enumerate(z_weight):
        qml.RZ(z_weight, wires=wire)
    for wire in range(n_qubits):
        qml.CZ(wires=[wire, (wire + 1) % n_qubits])


def measure(n_qubits):
    return [qml.expval(qml.PauliZ(wire)) for wire in range(n_qubits)]"""


def get_model(n_qubits, n_layers, data_reupload):
    dev = qml.device("default.qubit", wires=n_qubits)
    shapes = {
        "y_weights": (n_layers, n_qubits),
    }

    @qml.qnode(dev, interface='torch')
    def circuit(inputs, y_weights, z_weights):
        for layer_idx in range(n_layers):
            if (layer_idx == 0) or data_reupload:
                for wire in range(n_qubits):
                    qml.RX(inputs[wire], wires=wire)
            for wire, y_weight in enumerate(y_weight):
                qml.RY(y_weight, wires=wire)
            for wire in range(n_qubits):
                qml.CZ(wires=[wire, (wire + 1) % n_qubits])
        return [qml.expval(qml.PauliZ(wire)) for wire in range(n_qubits)]

    model = qml.qnn.TorchLayer(circuit, shapes)

    return model


class QuantumNet(nn.Module):
    def __init__(self, n_layers):
        super(QuantumNet, self).__init__()
        self.n_qubits = 4
        self.n_actions = 4
        self.q_layers = get_model(n_qubits=self.n_qubits,
                                  n_layers=n_layers,
                                  data_reupload=False)

    def forward(self, inputs):
        inputs = inputs * np.pi
        outputs = self.q_layers(inputs)
        outputs = (1 + outputs) / 2
        return outputs
