In [1]:
import gym
import math
import random
import numpy as np
import matplotlib
from collections import namedtuple
from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as T

from acrobot.acrobot import AcroEnvNew

env = gym.make('MyAcrobot-v2').unwrapped
device = torch.device("cpu") ## Using CPU

[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m


In [2]:
### Architecture of the Deep Q-neural network 
### This sample neural network architecture has been referenced from 
### https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html

class DQN(nn.Module):

    def __init__(self):
        super(DQN, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=5, stride=2)
        self.bn1 = nn.BatchNorm2d(16)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=5, stride=2)
        self.bn2 = nn.BatchNorm2d(32)
        self.conv3 = nn.Conv2d(32, 32, kernel_size=5, stride=2)
        self.bn3 = nn.BatchNorm2d(32)
        self.head = nn.Linear(448, 3)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        return self.head(x.view(x.size(0), -1))
    
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

class ReplayMemory(object):

    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        """Saves a transition."""
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [3]:
# Getting the image-> current/previous for states
screen_width = 600
resize = T.Compose([T.ToPILImage(),
                    T.Resize(40, interpolation=Image.CUBIC),
                    T.ToTensor()])

def get_cart_location():
    world_width = 2.4*2 #env.x_threshold * 2
    scale = screen_width / world_width
    return int(env.state[0] * scale + screen_width / 2.0)  # MIDDLE OF CART


def get_image():
    screen = env.render(mode='rgb_array').transpose(
        (2, 0, 1))  # transpose into torch order (CHW)
    # Strip off the top and bottom of the screen
    screen = screen[:,238:398]
    view_width = 320
    cart_location = get_cart_location()
    if cart_location>340: 
        cart_location = 340
    if cart_location < view_width // 2:
        slice_range = slice(view_width)
    elif cart_location > (screen_width - view_width // 2):
        slice_range = slice(-view_width, None)
    else:
        slice_range = slice(cart_location - view_width // 2,
                            cart_location + view_width // 2)
    # Strip off the edges, so that we have a square image centered on a cart
    screen = screen[:, :, slice_range]
    # Convert to float, rescare, convert to torch tensor
    # (this doesn't require a copy)
    screen = np.ascontiguousarray(screen, dtype=np.float32) / 255
    screen = torch.from_numpy(screen)
    # Resize, and add a batch dimension (BCHW)
    return resize(screen).unsqueeze(0).to(device)


In [4]:
### Initializing thte policy and target neural networks
policy_net = DQN().to(device)
target_net = DQN().to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

### Defining the optimizer as RMSprop
optimizer = torch.optim.RMSprop(policy_net.parameters())

## Number of training samples to choose from at any instant
memory = ReplayMemory(10000)

### get action state based on the e-greedy algorithm
EPS = 0.1
#steps_done = 0
def get_action(state):
    global steps_done
    sample = random.random()
    #eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * steps_done / EPS_DECAY)
    #steps_done += 1
    if sample > EPS: #eps_threshold:
        with torch.no_grad():
            return policy_net(state).max(1)[1].view(1, 1)
    else:
        return torch.tensor([[random.randrange(3)]], device=device, dtype=torch.long)

In [5]:
### This training model is referenced from the blog on reinforcement learning for cartpole 
### Reference: https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html
### Comments are added by me and the code has been thoroughly analyzed
BATCH_SIZE = 128
GAMMA = 0.999
def train_model():
    ### If length of memory is less than batch_size then we skip the optimization it we have enough samples
    if len(memory) < BATCH_SIZE:
        return
    ### Else we randomly choose a BTACH_SIZE number of samples 
    transitions = memory.sample(BATCH_SIZE)
    # Transpose the batch (see http://stackoverflow.com/a/19343/3343043 for
    # detailed explanation).
    batch = Transition(*zip(*transitions))

    # Compute a mask of non-final states and concatenate the batch elements
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.uint8)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # Compute the policy Q(s, a), thereafter we select the corresponding action items
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    # Compute next state values
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()
    
    # Compute the expected Q values for the next state
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    ### Evaluate objective function based on huber loss-> 
    ### acts as mean squared loss, i.e., (\delta)^2/2 if |\delta|<1 
    ### acts as mean absolute loss, i.e., |\delta|-0.5 elsewhere
    loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))

    ### Optimize the model using SGD implementation -> especially RMSprop
    optimizer.zero_grad()
    loss.backward()
    
    ### Update the training model based on the optimization carried out
    for param in policy_net.parameters():
        param.grad.data.clamp_(-1, 1)
    optimizer.step()

In [6]:
episode_durations = []
total_eps = 8
TARGET_UPDATE = 5

for i_ep in range(total_eps):
    ### Reset the environment before the start of each episode
    print('episode_no: ', i_ep)
    env.reset()

    ### Difference between the previous image and current image is provided as state
    ### Not the output from the env.step-> which gives state values-> theta_1, theta_2, dtheta_1, dtheta_2
    prev_im = get_image()
    cur_im = get_image()
    state = cur_im - prev_im

    tt = 0
    while True:
        tt = tt + 1
        ### Get action based on e-greedy algorithm
        action = get_action(state)
        ### Perform environment step to get the reward and done condition
        _, reward, done, _ = env.step(action.item())
        reward = torch.tensor([reward], device=device)

        # The current image now becomes the prev image and also get the current image
        prev_im = cur_im
        #cur_im = get_image()
        if not done:
            next_state = get_image() - prev_im
        else:
            print('episode_done: ', done)
            episode_durations.append(tt)
            next_state = None

        ### Store the transition array for current instant in memory
        memory.push(state, action, next_state, reward)

        # Next state now becomes current state
        state = next_state

        ### Train the neural network in this step by performing SGD based optimization 
        train_model()
        
        ### Finally if done, then break out of the loop after the training step. 
        ### This marks the end of the episode after which the state of the acrobot is again reset
        if done:
            break
            
    # Update the target network
    if i_ep % TARGET_UPDATE == 0:
        target_net.load_state_dict(policy_net.state_dict())

print('Complete')
env.render()
env.close()

episode_no:  0
episode_done:  True
episode_no:  1
episode_done:  True
episode_no:  2
episode_done:  True
episode_no:  3
episode_done:  True
episode_no:  4
episode_done:  True
episode_no:  5
episode_done:  True
episode_no:  6
episode_done:  True
episode_no:  7
episode_done:  True
Complete
