<a href="https://colab.research.google.com/github/Goutham345/Reinforcement_Learning/blob/main/Lab-06.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [10]:
import gymnasium as gym
import torch, torch.nn as nn, torch.optim as optim
from torch.distributions import Categorical
import numpy as np

ENV_ID = "CartPole-v1"
GAMMA = 0.99
LR = 1e-3
EPISODES = 1000
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


In [11]:
class PolicyNet(nn.Module):
    def __init__(self, obs_dim, hidden=128, n_actions=2):
        super().__init__()
        self.fc1 = nn.Linear(obs_dim, hidden)
        self.fc2 = nn.Linear(hidden, n_actions)
    def forward(self, x):
        x = torch.tanh(self.fc1(x))
        return torch.softmax(self.fc2(x), dim=-1)


In [12]:
def select_action(policy, state):
    s = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(device)   # [1, obs_dim]
    probs = policy(s)                              # [1, n_actions]
    m = Categorical(probs)
    action = m.sample()
    return action.item(), m.log_prob(action)       # keep log_prob (tensor) for gradient


In [16]:
env = gym.make(ENV_ID)
state, _ = env.reset(seed=0)   # new Gym API returns (obs, info)
log_probs = []
rewards = []
done = False

# Initialize the policy network
obs_dim = env.observation_space.shape[0]
n_actions = env.action_space.n
policy = PolicyNet(obs_dim, n_actions=n_actions).to(device)

while not done:
    a, lp = select_action(policy, state)
    next_state, r, terminated, truncated, _ = env.step(a)
    done = terminated or truncated
    log_probs.append(lp)     # list of 0-dim tensors
    rewards.append(r)
    state = next_state

In [17]:
def compute_returns(rewards, gamma=GAMMA):
    R = 0
    returns = []
    for r in reversed(rewards):
        R = r + gamma * R
        returns.insert(0, R)
    returns = torch.tensor(returns, dtype=torch.float32).to(device)
    # normalize for stability
    returns = (returns - returns.mean()) / (returns.std() + 1e-8)
    return returns


In [18]:
returns = compute_returns(rewards, GAMMA)   # shape: [T]
# compute loss = -sum(log_prob_t * return_t)
loss_terms = []
for lp, R in zip(log_probs, returns):
    loss_terms.append(-lp * R)    # lp is a tensor that requires grad
loss = torch.stack(loss_terms).sum()

optimizer = optim.Adam(policy.parameters(), lr=LR) # Initialize the optimizer

optimizer.zero_grad()
loss.backward()
optimizer.step()

In [19]:
def train_reinforce(env_name='CartPole-v1', hidden=128, lr=1e-3, gamma=0.99, episodes=1000, seed=0):
    env = gym.make(env_name)
    obs_dim = env.observation_space.shape[0]
    n_actions = env.action_space.n
    policy = PolicyNet(obs_dim, hidden, n_actions).to(device)
    optimizer = optim.Adam(policy.parameters(), lr=lr)

    running_rewards = []
    mean_hist = []
    for ep in range(episodes):
        # collect trajectory (use code from step 4)
        # compute returns (step 5)
        # update policy (step 6)
        # record ep_return, append to running_rewards, optionally print every N episodes
        pass  # <- replace with the code above
    env.close()
    return policy, running_rewards, mean_hist
