In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import namedtuple, deque
import random
import torch.optim as optim
from Env import Env
import math

OSError: [WinError 127] The specified procedure could not be found. Error loading "C:\Users\RAMYA\anaconda3\lib\site-packages\torch\lib\caffe2_detectron_ops.dll" or one of its dependencies.

In [None]:
# Use GPU or CPU depending on device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# Transition tupple to define a lived experience
Transition = namedtuple("Transition", ("state", "next_state", "action", "reward"))
# Experience Replay Object


class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque([], maxlen=capacity)

    def push(self, *args):
        new_transition = Transition(*args)
        self.buffer.append(new_transition)

    def sample(self, sample_size):
        return random.sample(self.buffer, sample_size)

    def __len__(self):
        return len(self.buffer)

In [None]:
# Neural Network Architecture (Work In Progress)
class QubitQNetwork(nn.Module):
    def __init__(self, n_time_slots, n_actions, n_features):
        super(QubitQNetwork, self).__init__()
        self.l1 = nn.Linear(n_features, n_time_slots)
        self.l2 = nn.Linear(n_time_slots, n_actions)
  
    def forward(self, x):
        x = x.to(device)
        x = self.l1(x)
        x = F.relu(x)
        x = self.l2(x)
        return x


class WeightConstraint:
    def __init__(self):
        pass

    def __call__(self, module):
        if hasattr(module, 'weight'):
            w = module.weight.data
            w = w.clamp(-1, 1)
            module.weight.data = w

In [None]:
# HYPERPARAMETERS
BATCH_SIZE = 64
GAMMA = 0.999
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 200
TARGET_UPDATE = 10
steps_done = 0

In [None]:
# Environment Variables
env = Env()
n_actions = env.n_actions #UNCOMMENT ONLY AFTER ENVIRONMENT IS READY TO BE USED
n_features = env.n_features
n_time_slots = n_actions

In [None]:
# Initializing neural nets
weight_constraint = WeightConstraint()
policy_net = QubitQNetwork(n_time_slots, n_actions, n_features).to(device) # network that gets fitted every iteration ("training" net for target_net)
policy_net._modules['l2'].apply(weight_constraint)
# policy_net = policy_net.float()
target_net = QubitQNetwork(n_time_slots, n_actions, n_features).to(device) # network that predicts (gets fitted every n iterations) and calculates loss / optimizes for policy_net
# target_net._modules['l2'].apply(weight_constraint)
# target_net = target_net.float()
target_net.load_state_dict(policy_net.state_dict()) # copies initial Ws and Bs from policy_net to target_net
target_net.eval()
optimizer = optim.RMSprop(policy_net.parameters())
memory = ReplayBuffer(10000)

In [None]:
def select_action(state):
    global steps_done # ensures that as our algo progresses, we are choosing less and less random actions
    sample = random.random()
    steps_done += 1
    eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * steps_done / EPS_DECAY)
    if sample > eps_threshold:
        with torch.no_grad():
            res = policy_net(state.float())
            return res.max(1)[1].view(1, 1)
    else:
        return torch.tensor([[random.randrange(n_actions)]], device=device, dtype=torch.int32)

In [None]:
def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    policy_net._modules['l2'].apply(weight_constraint)
    
    transitions = memory.sample(BATCH_SIZE)
    batch = Transition(*zip(*transitions))
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)
    state_action_values = policy_net(state_batch.float()).gather(1, action_batch)
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    next_state_values[non_final_mask] = target_net(non_final_next_states.float()).max(1)[0].detach()
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch
    # Compute Huber loss
    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    for param in policy_net.parameters():
        param.grad.data.clamp_(-1, 1)
        optimizer.step()

In [None]:
# Training Loop
num_episodes = 50
for ith_episode in range(num_episodes):
    state = env.reset()
    state = torch.from_numpy(state)
    state = torch.reshape(state, (1,4))
    while True:
        action = select_action(state)
        coefficient = torch.mean(policy_net.l2.weight[action.item()]).detach().numpy()
        new_state, reward, done, fid = env.step(action.item, coefficient)
        reward = torch.tensor([reward], device=device)
        if not done:
            new_state = torch.from_numpy(new_state)
            new_state = torch.reshape(new_state, (1,4))
        else:
            new_state = None
        memory.push(state, new_state, action, reward)
        state = new_state
        optimize_model()
        if done:
            print(coefficient)
            break
    steps_done = 0
    if ith_episode % TARGET_UPDATE == 0:
        target_net.load_state_dict(policy_net.state_dict())
    print("DONE")
print("Complete")