In [2]:
!pip install torch
!pip install torchinfo
!pip install tqdm
!pip install matplotlib
!pip install swig
!pip install gym



In [3]:
%%capture
!apt install python-opengl
!apt install ffmpeg
!apt install xvfb
!pip install pyvirtualdisplay
!pip install pyglet==1.5.1

In [4]:
import torch
import numpy as np
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
from torchinfo import summary

In [5]:
import abc
from abc import ABC, abstractmethod
import typing
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from itertools import count
from collections import deque

In [6]:
import gym

In [7]:
# prompt: write me line to check the device for cpu pytorch

if torch.cuda.is_available():
  device = torch.device("cuda:0")
else:
  device = torch.device("cpu")
print(device)

cuda:0


  and should_run_async(code)


In [8]:
class NeuralNet(nn.Module):

  def __init__(self, input_dims, num_actions, hidden_units):
    super(NeuralNet, self).__init__()
    self.num_actions = num_actions
    self.input_dims = input_dims
    self.device = "cuda:0" if torch.cuda.is_available() else "cpu"
    self.fc_layer_one = nn.Linear(in_features=self.input_dims, out_features=hidden_units)
    self.fc_layer_two = nn.Linear(in_features=hidden_units, out_features=self.num_actions)
   # self._init_weights()

  # def _init_weights(self):
  #   nn.init.zeros_(self.fc_layer_one.weight)
  #   nn.init.zeros_(self.fc_layer_two.weight)

  def forward(self, input):
    x = F.relu(self.fc_layer_one(input))
    x = self.fc_layer_two(x)
    return F.softmax(x,dim=1)





##Neural Net Test Area

In [9]:
import gym
env = gym.make('CartPole-v1', new_step_api=True)

# Reset the environment to get the initial state
state = env.reset()
num_actions = env.action_space.n
state_dim = env.observation_space.shape[0]
print("Initial state:", state)
print("Number of Actions:", num_actions)
print("State dimension:", state_dim)

learner = NeuralNet(state_dim,num_actions,16)
state_tensor = torch.tensor(state).unsqueeze(0)
print(f"Input Tensor shape: {state_tensor.shape}")
#Learner Output
out = learner(state_tensor)


#print(f"Output Tensor shape: {out.shape}")
print(f"Ouput from the Net: {out}")


# Optionally, you can take an action and observe the next state and reward
# action = env.action_space.sample()  # Sample a random action
# next_state, reward, done, info = env.step(action)

# print("Next state:", next_state)
# print("Reward:", reward)
# print("Done:", done)
# print("Info:", info)


# Close the environment
env.close()


Initial state: [0.03377339 0.03123699 0.03177733 0.01959691]
Number of Actions: 2
State dimension: 4
Input Tensor shape: torch.Size([1, 4])
Ouput from the Net: tensor([[0.4674, 0.5326]], grad_fn=<SoftmaxBackward0>)


In [10]:
class Policy:
  def __init__(self, state_dim: int, num_actions: int, hidden_units: int):
    self._input_dim = state_dim
    self._num_actions = num_actions
    self._hidden_units = hidden_units
    self.device = "cuda:0" if torch.cuda.is_available() else "cpu"
    self._network = NeuralNet(self._input_dim, self._num_actions, self._hidden_units)
    self._network.to(self.device)

  def get_network(self):
    return self._network

  def act(self, state: np.ndarray):
    state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(self.device)
    probs = self._network(state_tensor).cpu()

    action_distribution = Categorical(probs)

    action_sample = action_distribution.sample()

    #log_prob = action_distribution.log_prob(action_sample)

    return action_sample.item(), action_distribution.log_prob(action_sample)




##Policy Test Area


In [11]:
import gym
env = gym.make('CartPole-v1', new_step_api=True)

# Reset the environment to get the initial state
state = env.reset()
num_actions = env.action_space.n
state_dim = env.observation_space.shape[0]
print("Initial state:", state)
print("Number of Actions:", num_actions)
print("State dimension:", state_dim)

policy = Policy(state_dim,num_actions,128)
state_tensor = torch.tensor(state).unsqueeze(0)
print(f"Input Tensor shape: {state_tensor.shape}")
#Learner Output
outa, logout = policy.act(state)

#print(f"Output Tensor shape: {out.shape}")
print(f"first :Ouput from the Net - Action : {outa}")
print(f"first :Ouput from the Net - Log Prob : {type(logout)}")



# Optionally, you can take an action and observe the next state and reward
# action = env.action_space.sample()  # Sample a random action
# next_state, reward, done, info = env.step(action)

# print("Next state:", next_state)
# print("Reward:", reward)
# print("Done:", done)
# print("Info:", info)


# Close the environment
env.close()


Initial state: [-0.01476944 -0.00631044  0.03936973 -0.01567288]
Number of Actions: 2
State dimension: 4
Input Tensor shape: torch.Size([1, 4])
first :Ouput from the Net - Action : 1
first :Ouput from the Net - Log Prob : <class 'torch.Tensor'>


In [32]:
class AgentReinforce:
  def __init__(self, env: gym.Env, step_size: float, gamma: float):
    self._env = env
    self._num_actions = env.action_space.n
    self._state_dim = env.observation_space.shape[0]
    self._policy = Policy(self._state_dim, self._num_actions,16)
    self._network = self._policy.get_network()
    self._optimizer = optim.Adam(self._network.parameters(), lr=step_size)
    self._gamma = gamma
    self.device = "cuda:0" if torch.cuda.is_available() else "cpu"

  def _update_agent(self, action_log_probs: list, expected_return: np.ndarray):
    #policy_loss = -torch.sum(action_log_probs * expected_return)
    policy_loss = []
    for action_probs, discounted_return in zip(action_log_probs, expected_return):
      policy_loss.append(action_probs* discounted_return * (-1))
    policy_loss = torch.cat(policy_loss).sum()




# Compute the policy loss using element-wise multiplication and sum along the new dimension

    self._optimizer.zero_grad()
    policy_loss.backward()
    self._optimizer.step()

  def _compute_discounted_reward(self, rewards_array: list, gamma: float):

    num_trajec = len(rewards_array)
    discounted_return = deque(maxlen=num_trajec)
    #set final discounted return to zero; for ease computation
    discounted_return.append(0)
    # traverse backwards through the rewards array to compute..
    # the expected return at the current time step
    for reward in reversed(rewards_array):
      #expected return at the current time step
      current_expectation = (gamma * discounted_return[0]) + reward
      #latest discount return always at the front of the queue and oldest return at the back of the queue
      discounted_return.appendleft(current_expectation)

    return np.array(discounted_return)

  def _generate_trajectories(self):
    states = []
    actions = []
    action_log_probs = []
    rewards =[]
    done = False
    state = self._env.reset()
    while not done:
      action, action_log_prob = self._policy.act(state)
      states.append(state)
      actions.append(action)
      action_log_probs.append(action_log_prob)
      state, reward, done, _, _ = self._env.step(action)
      rewards.append(reward)
      if np.sum(rewards) > 500:
        done = True

    return states, rewards, actions, action_log_probs

  def _process_trajectories(self, states: list, actions: list, rewards: list):
    states_array = np.array(states)
    actions_array = np.array(actions)
    rewards_array = np.array(rewards)
    return states_array, actions_array, rewards_array

  def train(self, training_episodes: int):
    scores_deque = deque(maxlen=100)
    scores = []
    for episode in range(training_episodes):
      states, rewards, actions, action_log = self._generate_trajectories()
      #print(f"Shape of Action Log Prob: {action_log.shape}")
      states_array, actions_array, rewards_array = self._process_trajectories(states,actions,rewards)
      scores_deque.append(np.sum(rewards_array))
      scores.append(np.sum(rewards_array))
      discounted_return = self._compute_discounted_reward(rewards,self._gamma)
      discounted_return = torch.tensor(discounted_return, requires_grad=True)
      #print(f"Shape of Discounted return: {discounted_return.shape}")
      #computed discounted rewards are normalized to stabilze training : from huggingface tutorials
      eps = np.finfo(np.float32).eps.item()
      discounted_return = (discounted_return - discounted_return.mean()) / (discounted_return.std() + eps)
      # action_log_tensor = torch.tensor(action_log)
      self._update_agent(action_log,discounted_return)


      if episode != 0 and episode % 100 == 0:
        print(f"Episode {episode}\tAverage Score: {np.mean(scores_deque)}")

  def evaluate(self, test_episodes: int):
    scores = []
    for episode in range(test_episodes):
      state = self._env.reset()
      episode_rewards = []
      done = False
      time_step = 0
      while not done:
        time_step += 1
        action, _ = self._policy.act(state)
        next_state, reward, termi, trunc, _ = self._env.step(action)
        episode_rewards.append(reward)
        done = termi or trunc
        if np.sum(episode_rewards) > 510:
          print(f"Test No: {episode} Environment Solved in {time_step} \t Score : {np.sum(episode_rewards)}")
          done = True

        state = next_state
      print(f"Logger Data: Episode {episode}\t Score: {np.sum(episode_rewards)}")




  and should_run_async(code)


##Agent Test Area

In [33]:
# prompt: write me a code to test my agent reinforce class generate rraj method
import gym
env = gym.make('CartPole-v1', new_step_api=True)

agent = AgentReinforce(env,1e-2,1.0)
#agent.train(1000)

In [34]:
#Before training
agent.evaluate(20)

  if not isinstance(terminated, (bool, np.bool8)):


Logger Data: Episode 0	 Score: 14.0
Logger Data: Episode 1	 Score: 12.0
Logger Data: Episode 2	 Score: 9.0
Logger Data: Episode 3	 Score: 17.0
Logger Data: Episode 4	 Score: 13.0
Logger Data: Episode 5	 Score: 20.0
Logger Data: Episode 6	 Score: 21.0
Logger Data: Episode 7	 Score: 12.0
Logger Data: Episode 8	 Score: 33.0
Logger Data: Episode 9	 Score: 21.0
Logger Data: Episode 10	 Score: 17.0
Logger Data: Episode 11	 Score: 23.0
Logger Data: Episode 12	 Score: 20.0
Logger Data: Episode 13	 Score: 11.0
Logger Data: Episode 14	 Score: 14.0
Logger Data: Episode 15	 Score: 10.0
Logger Data: Episode 16	 Score: 18.0
Logger Data: Episode 17	 Score: 18.0
Logger Data: Episode 18	 Score: 33.0
Logger Data: Episode 19	 Score: 17.0


In [35]:
agent.train(1000)

Episode 100	Average Score: 61.64
Episode 200	Average Score: 222.18
Episode 300	Average Score: 444.29
Episode 400	Average Score: 501.0
Episode 500	Average Score: 488.58
Episode 600	Average Score: 277.67
Episode 700	Average Score: 102.13
Episode 800	Average Score: 161.12
Episode 900	Average Score: 460.42


In [36]:
agent.evaluate(20)

Logger Data: Episode 0	 Score: 500.0
Logger Data: Episode 1	 Score: 500.0
Logger Data: Episode 2	 Score: 500.0
Logger Data: Episode 3	 Score: 147.0
Logger Data: Episode 4	 Score: 500.0
Logger Data: Episode 5	 Score: 137.0
Logger Data: Episode 6	 Score: 500.0
Logger Data: Episode 7	 Score: 119.0
Logger Data: Episode 8	 Score: 279.0
Logger Data: Episode 9	 Score: 500.0
Logger Data: Episode 10	 Score: 500.0
Logger Data: Episode 11	 Score: 500.0
Logger Data: Episode 12	 Score: 500.0
Logger Data: Episode 13	 Score: 500.0
Logger Data: Episode 14	 Score: 500.0
Logger Data: Episode 15	 Score: 500.0
Logger Data: Episode 16	 Score: 500.0
Logger Data: Episode 17	 Score: 143.0
Logger Data: Episode 18	 Score: 500.0
Logger Data: Episode 19	 Score: 500.0


In [None]:
# cartpole_hyperparameters = {
#     "h_size": 16,
#     "n_training_episodes": 1000,
#     "n_evaluation_episodes": 10,
#     "max_t": 1000,
#     "gamma": 1.0,
#     "lr": 1e-2,
#     "env_id": env_id,
#     "state_space": s_size,
#     "action_space": a_size,
# }

In [None]:
# policy_loss = []
# for log_prob, disc_return in zip(log_probs, discount_return):
#   policy_loss.append(-log_prob * disc_return)
#   print(policy_loss)
#policy_loss = torch.cat(policy_loss).sum()
