In [1]:
#Google colab file 
#Created by ashutoshtiwari13
import warnings
warnings.filterwarnings('ignore')

import torch 
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.multiprocessing as mp
import threading 

import numpy as np 
from collections import namedtuple, deque
import matplotlib.pyplot as plt
import matplotlib.pylab as pylab
from itertools import cycle, count 

In [2]:
import matplotlib
import gym
import os
import io
import sys
import time 

%matplotlib inline

In [3]:
class CartPoleEnvV1(gym.Wrapper):
  def __init__(self,env):
    gym.Wrapper.__init__(self,env)
  def reset(self, **kwargs):
    return self.env.reset(**kwargs)
  def step(self, action):
    next_state, reward,done, info = self.env.step(action)
    (x, x_dot, theta, theta_dot) = next_state
    pole_fall = x < -self.env.unwrapped.x_threshold or x > self.env.unwrapped.x_threshold or theta < -self.env.unwrapped.theta_threshold_radians or theta >  self.env.unwrapped.theta_threshold_radians
    reward = -1 if pole_fell else 0
    return next_state, reward, done, info

In [4]:
class CartPoleEnvV2(gym.Wrapper):
  def __init__(self,env):
    gym.Wrapper.__init__(self,env)
  def reset(self, **kwargs):
    return self.env.reset(**kwargs)
  def step(self, action):
    next_state, reward,done, info = self.env.step(action)
    (x, x_dot, theta, theta_dot) = next_state
    pole_fall = x < -self.env.unwrapped.x_threshold or x > self.env.unwrapped.x_threshold or theta < -self.env.unwrapped.theta_threshold_radians or theta >  self.env.unwrapped.theta_threshold_radians
    
    if done:
      if pole_fell:
        reward=0
      else:
        reward = self.env._max_episode_steps
    return next_state, reward, done, info

In [5]:
class NN_arch(nn.Module):
  def __init__(Self, input_dim, output_dim, hidden_dim=(32,32),activation = F.relu):
    super(NN_arch, self).__init__()
    self.activation = activation
    self.input = nn.Linear(input_dim , hidden_dims[0])
    self.hidden_layers = nn.ModuleList()
    for i in range(len(hidden_dims)-1):
      hidden = nn.Linear(hidden_dims[i],hidden_dims[i+1])
      self.hidden.append(hidden)
    self.output = nn.Linear(hidden_dims[-1],output_dim)

  def stateChange(self, state):
    """
    Making sure the state is the type of variable and shape before passing to the NN_arch
    """
    x = state
    if not isinstance(x, torch.Tensor):
      x = torch.tensor(x, dtype= torch.float32)
      x = x.unsqueeze(0)

    x = self.activation(self.input(x))
    for h in hidden:
      x = self.activation(hidden)
    return self.output(x)
  
  def forwardPassAction(self, state):
    #stateChange returns logits, preferences over actions
    logits = self.stateChange(state)
    #sample the action from the probability distribution
    dist = torch.distributions.Categorical(logits=logits)
    action = dist.sample()
    #calculate the log probablity of that action and format it for training 
    logpa = dist.log_prob(action).unsqueeze(-1)
    #determine the entropy of the policy 
    entropy = dist.entropy().unsqueeze(-1)
    #Keeping a check on whether the policy selected was exploratory or not
    is_exploratory = action != np.argmax(logits.detach().numpy())
    """
    returns:
    The action that can directly pe passed to the env
    Flag to check whether the action was exploratory
    log probability of the action 
    entropy of the policy 
    """
    return action.item(), is_exploratory.item(),logpa,entropy

  def actionSample(self, action):
    logits = self.stateChange(state)
    dist = torch.distributions.Categorical(logits=logits)
    action = dist.sample()
    return action.item()

  def greedyAction(self, state):
    #select the greedy action according to the policy 
    logits = self.stateChange(state)
    return np.argmax(logits.detach().numpy())    

In [None]:
class REINFORCE():
  def __init__(self, policy_model_fn , policy_optimizer_fn, policy_optimizer_lr):
    self.policy_model_fn =policy_model_fn
    self.policy_optimizer_fn =policy_optimizer_fn
    self.policy_optimizer_fn = policy_optimizer_fn

  def optimize_model(self):
    T = len(self.rewards)
    #logspace helps in setting up gamma values ans returns a series of timestep[ 1, 0.99, 0.9801]
    discounts = np.logspace(0,T ,num=T, base =self.gamma, endpoint =False)
    #sum of discounted returns fro all timesteps
    returns = np.array([np.sum(discounts[:T-t] * self.rewards[t:]) for t in range(T)])

    """
    Policy loss :
    Log probability of the actions selected weighted by the returns obtained after that action
    was selected. We need to minimize the loss, so using negative mean. Also, to account fro
    discounted policy gradients, we multiply the returns by the discounts
    """

    discounts = torch.FloatTensor(discounts).unsqueeze(1)
    returns = torch.FloatTensor(returns).unsqueeze(1)
    self.logpas = torch.cat(self.logpas)

    policy_loss = -(discounts * returns * self.logpas).mean()

    #Step1 : zero the gradient in the optimizer
    #Step2 : back propagation
    #Step3 : step in the direction of the gradient
    self.policy_optimizer.zero_grad()
    policy_loss.backward()
    self.policy_optimizer.step()

  def interaction_step(self,state, env):
      



