# Loading the modules And the Environment

In [2]:
import torch
import numpy as np
import gym
from collections import deque
import matplotlib.pyplot as plt
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
import time
import pygame

In [3]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device_name = torch.cuda.get_device_name(torch.cuda.current_device())
device_name

'NVIDIA GeForce RTX 3050 Laptop GPU'

In [4]:
env_name = "CartPole-v1"
env = gym.make("CartPole-v1", render_mode='human')
eval_env = gym.make("CartPole-v1")

In [5]:
s_size = env.observation_space.shape[0]
a_size = env.action_space.n

In [6]:
print("_____OBSERVATION SPACE_____ \n")
print("The State Space is: ", s_size)
print("Sample observation", env.observation_space.sample()) # Get a random observation

_____OBSERVATION SPACE_____ 

The State Space is:  4
Sample observation [ 4.6879525e+00  1.6285270e+38  3.7221727e-01 -1.3018692e+38]


In [7]:
print("\n _____ACTION SPACE_____ \n")
print("The Action Space is: ", a_size)
print("Action Space Sample", env.action_space.sample()) # Take a random action


 _____ACTION SPACE_____ 

The Action Space is:  2
Action Space Sample 1


# Building the Network

In [30]:
class policy(nn.Module):
    def __init__(self, s_size, a_size, h_size) -> None:
        super().__init__() # calling the super class
        
        #Network layers 
        self.L1 = nn.Linear(s_size, h_size)
        self.L2 = nn.Linear(h_size, a_size)
    
    def forward(self, x):
        # the forward pass of the network
        x = F.relu(self.L1(x))
        x = self.L2(x)
        x = F.softmax(x,dim=1)
        return x
    
    
    def act(self, state):
        state = torch.from_numpy(state).float().unsqueeze(0).to(device) # setting the state to tensor with the input shape
        probs = self.forward(state).cpu()
        m = Categorical(probs)
        action = m.sample()
        return action.item(), m.log_prob(action)

In [9]:
state = env.reset()
print(state)
env.close()

(array([-0.03125834,  0.03659783,  0.0251511 , -0.00340792], dtype=float32), {})


In [10]:
state = np.asanyarray(state[0])
state

array([-0.03125834,  0.03659783,  0.0251511 , -0.00340792], dtype=float32)

In [11]:
debug_policy = policy(s_size, 16, a_size).to(device)

In [12]:
temp_action, temp_prob = debug_policy.random_act(state)
temp_action

9

# Building the Algorithm

In [13]:
def reinforce(policy, optimizer, n_training_episodes, max_t, gamma, print_every):
    scores_deque = deque(maxlen=100)
    scores = []
    
    for i_episode in range(1, n_training_episodes+1):
        saved_log_probs = []
        rewards = []
        state = env.reset()
        state = np.asanyarray(state[0])
        
        

        for t in range(max_t):
            action, log_prob = policy.act(state)
            saved_log_probs.append(log_prob)
            
            state, reward, done, _, _ = env.step(action)
            
            
            rewards.append(reward)
            
            if done:
                break
            
        scores_deque.append(sum(rewards))
        scores.append(sum(rewards))
        
        returns = deque(maxlen=max_t)
        n_steps = len(rewards)
        
        
        for t in range(n_steps-1, 0, -1):
            current_t = (returns[0] if len(returns) > 0 else 0)
            returns.appendleft(current_t * gamma + rewards[t])
        
        eps = np.finfo(np.float32).eps.item() # getting the smallest positive float
        
        returns = torch.tensor(returns)
        returns = (returns - returns.mean()) / (returns.std() + eps)
        
        policy_loss = []
        
        for action_log_prob, disconted_reward in zip(saved_log_probs, returns):
            policy_loss.append(- action_log_prob * disconted_reward)
        
        policy_loss = torch.cat(policy_loss).sum()
        
        
        optimizer.zero_grad()
        policy_loss.backward()
        optimizer.step()
        
        
        if i_episode % print_every == 0:
            print('Episode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_deque)))
        
    return scores

# Training

In [14]:
cartpole_hyperparameters = {
    "h_size": 16,
    "n_training_episodes": 500,
    "n_evaluation_episodes": 10,
    "max_t": 1000,
    "gamma": 1.0,
    "lr": 1e-2,
    "env_id": env_name,
    "state_space": s_size,
    "action_space": a_size,
}

In [15]:
env = gym.make(cartpole_hyperparameters["env_id"])

In [16]:
cartpole_policy = policy(cartpole_hyperparameters["state_space"], cartpole_hyperparameters["action_space"], cartpole_hyperparameters["h_size"]).to(device)
cartpole_optimizer = optim.Adam(cartpole_policy.parameters(), lr=cartpole_hyperparameters["lr"])

In [17]:
scores = reinforce( cartpole_policy,
                    cartpole_optimizer,
                    cartpole_hyperparameters["n_training_episodes"], 
                    cartpole_hyperparameters["max_t"],
                    cartpole_hyperparameters["gamma"], 
                    100)

  if not isinstance(terminated, (bool, np.bool8)):


Episode 100	Average Score: 66.07
Episode 200	Average Score: 717.14
Episode 300	Average Score: 984.37
Episode 400	Average Score: 879.98
Episode 500	Average Score: 793.53
Episode 600	Average Score: 962.70
Episode 700	Average Score: 1000.00
Episode 800	Average Score: 985.97
Episode 900	Average Score: 922.32
Episode 1000	Average Score: 1000.00


#### It's obvious that i used high learning rate so i decreased it from 0.001 to 0.0001, and i will decrease the training episodes from 1000 to 500

In [18]:
cartpole_hyperparameters['lr'] = 1e-3
cartpole_hyperparameters['n_training_episodes'] = 500

In [None]:
scores = reinforce( cartpole_policy,
                    cartpole_optimizer,
                    cartpole_hyperparameters["n_training_episodes"], 
                    cartpole_hyperparameters["max_t"],
                    cartpole_hyperparameters["gamma"], 
                    100)

# Evaluating

In [19]:
eval_env = gym.make(env_name)

In [20]:
def evaluate_agent(env, max_steps, n_eval_episodes, policy):
  """
  Evaluate the agent for ``n_eval_episodes`` episodes and returns average reward and std of reward.
  :param env: The evaluation environment
  :param n_eval_episodes: Number of episode to evaluate the agent
  :param policy: The Reinforce agent
  """
  episode_rewards = []
  for episode in range(n_eval_episodes):
    state = env.reset()
    state = np.asanyarray(state[0])
    step = 0
    done = False
    total_rewards_ep = 0
    
    for step in range(max_steps):
      action, temp = policy.act(state)
      new_state, reward, done, info, _ = env.step(action)
      total_rewards_ep += reward
        
      if done:
        break
      state = new_state
    episode_rewards.append(total_rewards_ep)
  mean_reward = np.mean(episode_rewards)
  std_reward = np.std(episode_rewards)

  return mean_reward, std_reward

In [21]:
evaluate_agent(eval_env, 
               cartpole_hyperparameters["max_t"], 
               cartpole_hyperparameters["n_evaluation_episodes"],
               cartpole_policy)

  if not isinstance(terminated, (bool, np.bool8)):


(1000.0, 0.0)

In [29]:
eval_env = gym.make("CartPole-v1", render_mode = 'human')
state = eval_env.reset()
eval_env.render()


cartpole_policy.eval()
while True:
    action, _ = cartpole_policy.act(state[0])
    state, _, done, _ , _ = eval_env.step(action)
    if done:
        break

AttributeError: 'policy' object has no attribute 'act'

In [9]:
pygame.quit()

In [272]:
torch.save(cartpole_policy.state_dict(), 'CartPole-v1.pth')