In [12]:
pip install gym[box2d]

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [13]:
import torch
import torch.nn as nn
import torch.nn.functional as f
import torch.optim as optim
import numpy as np
import random
import gym
import matplotlib.pyplot as plt
env = gym.make("LunarLander-v2")

  deprecation(
  deprecation(


In [15]:
class Actor(nn.Module):
  def __init__(self,state_size,action_size,limits):
    super(Actor,self).__init__()
    self.device      = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    self.state_size  = state_size
    self.action_size = action_size
    self.limits      = limits
    self.lin1 = nn.Linear(self.state_size , 32)
    self.lin2 = nn.Linear(32,64)
    self.lin3 = nn.Linear(64,128)
    self.lin6 = nn.Linear(128,32)
    self.lin7 = nn.Linear(32,self.action_size)
    self.to(self.device)
  def action_limiting(self,action):
    action = torch.round(action).int()
    if action <= self.limits[1] and action >= self.limits[0]:
      action = action
    else:
      action = []
      for i  in range(self.action_size):
        action.append(1)
      action = torch.tensor(action,dtype = torch.int)
    return action
  def forward(self,x):
    x  = f.relu(self.lin1(x))
    x  = f.relu(self.lin2(x))
    x  = f.relu(self.lin3(x))
    x  = f.relu(self.lin6(x))
    x  = f.relu(self.lin7(x))
    x  = self.action_limiting(x)
    return x

In [None]:
class Critic(nn.Module):
  def __init__(self,state_size,action_size):
    super(Critic,self).__init__()
    self.device      = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    self.state_size  = state_size
    self.action_size = action_size
    self.lin1 = nn.Linear(self.state_size , 32)
    self.lin2 = nn.Linear(32,64)
    self.lin3 = nn.Linear(64,128)
    self.lin6 = nn.Linear(128,32)
    self.lin7 = nn.Linear(32,1)
  def forward(self,x):
    x  = f.relu(self.lin1(x))
    x  = f.relu(self.lin2(x))
    x  = f.relu(self.lin3(x))
    x  = f.relu(self.lin6(x))
    x  = f.relu(self.lin7(x))
    return x

In [18]:
class Agent:
  def __init__(self,state_size,action_size,limits,env):
    self.device      = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    self.state_size  = state_size
    self.action_size = action_size
    self.limits      = limits
    self.lr1         = 0.00009
    self.lr2         = 0.00005
    self.gamma       = 0.99
    self.lamda       = 0.95
    self.clip        = 0.2
    self.steps       = 350
    self.episodes    = 10000000
    self.count       = 0
    self.c1          = 0.5
    self.c2          = 0.001
    self.env         = env
    self.actor       = Actor(self.state_size,self.action_size,self.limits).to(self.device)
    self.critic      = Critic(self.state_size,self.action_size).to(self.device)
    self.actor_optim = optim.Adam(self.actor.parameters() ,lr = self.lr1)
    self.critic_optim= optim.Adam(self.critic.parameters(),lr = self.lr2)
    self.states      = []
    self.actions     = []
    self.loss        = []
    self.episode     = []
    self.rewards     = []
  def choose_action(self,state):
    state = torch.tensor(state,dtype = torch.float32).to(self.device)
    action = self.actor(state)
    return action
  def gae_return(self,reward,value,next_value,done):
    returns = []
    gae     = 0
    delta   = reward + self.gamma * next_value * (1-done) -value
    for i in range(self.action_size):
      gae   = delta + self.gamma*self.lamda * delta 
      gae   += value
      returns.append(gae)
    returns = torch.tensor(returns,dtype = torch.float).to(self.device)
    return returns
  def learn(self,state,next_state,reward,done,action,count):
    self.actor.train()
    self.critic.train()
    self.states.append(state)
    self.actions.append(action)
    self.rewards.append(reward)
    prev_action  = self.actions[count]
    log_prob     = f.softmax(action.float())
    log_old_prob = f.softmax(prev_action.float())
    ratio        = log_prob / log_old_prob
    state        = torch.tensor(state,dtype = torch.float).to(self.device)
    next_state   = torch.tensor(next_state,dtype = torch.float).to(self.device)
    value        = self.critic(state).to(self.device)
    next_value   = self.critic(next_state).to(self.device)
    returns      = self.gae_return(reward,value,next_value,done) 
    advantage    = returns -value
    s1           = ratio * advantage
    s2           = torch.clamp(ratio,1-self.clip , 1+self.clip)
    actor_loss   = torch.min(s1,s2)
    actor_loss   = actor_loss.mean()
    critic_loss  = (returns - value)**2
    entropy      = 1
    loss         = actor_loss - self.c1*critic_loss# + self.c2 * entropy
    loss         = loss.mean()
    loss.rquires_grad = True
    self.loss.append(loss.cpu().detach().numpy())
    torch.save(self.actor.state_dict() , "actor.pth")
    torch.save(self.critic.state_dict() , "critic.pth")
    self.actor_optim.zero_grad()
    self.critic_optim.zero_grad()
    loss.backward()
    self.actor_optim.step()
    self.critic_optim.step()
    return loss,value
  def ploting(self):
    plt.plot(self.episode,self.loss)
    plt.xlabel("episodes")
    plt.ylabel("loss")
    plt.savefig("loss.png")
    plt.close()
    plt.plot(self.episode,self.rewards)
    plt.xlabel("episodes")
    plt.ylabel("rewards")
    plt.savefig("reward.png")
    plt.close()
  def train(self):
    count = 0
    for i in range(self.episodes):
      state = self.env.reset()
      if count==0:
        self.actions.append(torch.tensor(1))
      else:
        pass
      for step in range(self.steps):
        action = self.choose_action(state)
        next_state,reward,done,info = self.env.step(action.item())
        if done == False:
          state = next_state
          self.episode.append(count)
          loss,value = self.learn(state,next_state,reward,done,action,count-1)
          self.ploting()
          state = next_state
          count +=1
          print("episode:{}/10000000 || step: {}/350 || reward : {} || loss:{} || action: {}  || value: {}".format(i,step,reward,loss.item(),action.item(),value.item()))

In [19]:
state_size  = 8
action_size = 1
limits      = [0,3]
agent       = Agent(state_size,action_size,limits,env)

In [None]:
if __name__ =="__main__":
  agent.train()

  log_prob     = f.softmax(action.float())
  log_old_prob = f.softmax(prev_action.float())


episode:0/10000000 || step: 0/350 || reward : -0.3138887704526212 || loss:-0.7953526377677917 || action: 0  || value: 0.023995932191610336
episode:0/10000000 || step: 1/350 || reward : -0.3523952709215621 || loss:-0.918448805809021 || action: 0  || value: 0.02505512535572052
episode:0/10000000 || step: 2/350 || reward : -0.35842818580550784 || loss:-0.9382716417312622 || action: 0  || value: 0.026162132620811462
episode:0/10000000 || step: 3/350 || reward : -0.3707028035420308 || loss:-0.9789901971817017 || action: 0  || value: 0.027269916608929634
episode:0/10000000 || step: 4/350 || reward : -0.3861794992844523 || loss:-1.031130313873291 || action: 0  || value: 0.028357334434986115
episode:0/10000000 || step: 5/350 || reward : -0.4048573491324703 || loss:-1.095248818397522 || action: 0  || value: 0.0294111967086792
episode:0/10000000 || step: 6/350 || reward : -0.4267333862970304 || loss:-1.1720114946365356 || action: 0  || value: 0.030449092388153076
episode:0/10000000 || step: 7/35

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive
