# A PyTorch Tutorial for Reinforcement Learning

Welcome to this interactive tutorial on using PyTorch to build and train Reinforcement Learning (RL) agents. The goal of this notebook is to guide you through the fundamentals of building RL agents that can learn to solve tasks in simulated environments from the [Gym](https://gymnasium.farama.org/) library.

We will cover:
1.  Interacting with Gym environments.
2.  Building policy networks using `torch.nn`.
3.  Implementing a complete policy gradient agent (REINFORCE).
4.  Training the agent using PyTorch's automatic differentiation.
5.  Adapting the agent for continuous control tasks in MuJoCo.

This tutorial is designed to be hands-on. You will find several exercises where you'll need to fill in the missing code. This will help you solidify your understanding of the concepts.

Let's get started! We'll assume you are working on a machine without a dedicated GPU.

## Part 1: Setting up the Environment and Interacting with Gym

Before we can build an RL agent, we need to understand the environment it will interact with. We'll be using [Gymnasium](https://gymnasium.farama.org/) (a fork of OpenAI's Gym), which provides a wide variety of simulated environments for RL research. We will also use `torch` for building our neural networks and `matplotlib` for plotting our results. If you are using a MuJoCo environment, you will also need to have `mujoco` installed.

Let's start by importing the necessary libraries.

In [11]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Categorical, Normal

import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
from collections import deque

torch.manual_seed(0)

<torch._C.Generator at 0x1258bbb10>

A Gym environment has a few key methods and attributes:

*   `observation_space`: This defines the structure and range of the states you can observe.
*   `action_space`: This defines the set of possible actions the agent can take.
*   `reset()`: This resets the environment to a starting state and returns that initial observation.
*   `step(action)`: This executes an action in the environment and returns a tuple of `(observation, reward, terminated, truncated, info)`.
    *   `observation`: The new state of the environment.
    *   `reward`: The reward received for the last action.
    *   `terminated`: A boolean indicating if the episode has ended (e.g., the pole fell over).
    *   `truncated`: A boolean indicating if the episode was cut short (e.g., reached a time limit).
    *   `info`: A dictionary with auxiliary diagnostic information.

Let's see this in action with the `CartPole-v1` environment.

In [12]:
# Create the CartPole environment
env = gym.make('CartPole-v1')

# Get the state and action space sizes
state_size = env.observation_space.shape[0]
action_size = env.action_space.n

print(f"State space size: {state_size}")
print(f"Action space size: {action_size}")

# Reset the environment
state, info = env.reset()
print(f"Initial state: {state}")

# Take a random action
action = env.action_space.sample()
next_state, reward, terminated, truncated, info = env.step(action)

print(f"Action taken: {action}")
print(f"Next state: {next_state}")
print(f"Reward: {reward}")
print(f"Terminated: {terminated}")
print(f"Truncated: {truncated}")
print(f"Info: {info}")

env.close()

State space size: 4
Action space size: 2
Initial state: [ 0.04181642  0.00337763  0.03376347 -0.01761198]
Action taken: 0
Next state: [ 0.04188398 -0.19221185  0.03341123  0.2855296 ]
Reward: 1.0
Terminated: False
Truncated: False
Info: {}


## Part 2: Building a Neural Network with PyTorch

Our RL agent will use a neural network to decide which action to take based on the current state. This network is called a **policy network**. For a given state, it will output a probability distribution over the possible actions.

We can easily create neural networks in PyTorch by creating a class that inherits from `torch.nn.Module`. We define the layers of our network in the `__init__` method and specify how data flows through them in the `forward` method.

### Exercise: Create a Policy Network

Now it's your turn! Your task is to create a simple policy network for the `CartPole-v1` environment. The network should take the state as input and output the probabilities for the two possible actions (move left or right).

The network should have:
*   An input layer that accepts the state (size `state_size`).
*   One hidden layer with 128 neurons and a ReLU activation function.
*   An output layer that produces the action probabilities (size `action_size`). We'll use a softmax activation on the output to ensure the probabilities sum to 1.

Fill in the missing code in the cell below.

In [None]:
class Policy(nn.Module):
    """
    Fully connected neural network policy for reinforcement learning.
    Maps:
        State --> hidden dim --> Action
    """
    def __init__(self, state_size, action_size, hidden_size=128):
        super(Policy, self).__init__()
        # fully connected layers
        self.fc1 = nn.Linear(state_size, hidden_size)# ... Fill in the missing code: a linear layer from state_size to hidden_size
        self.fc2 = nn.Linear(hidden_size, action_size)# ... Fill in the missing code: a linear layer from hidden_size to action_size

    def forward(self, x):
        # ... Fill in the missing code: apply a ReLU activation to the first layer's output
        x = F.relu(self.fc1(x))# ...
        # ... Fill in the missing code: get the output from the second layer
        x = F.relu(self.fc2(x))# ...
        # Apply a softmax to get action probabilities
        return F.softmax(x, dim=-1)

# Example of how to create the policy network
policy = Policy(state_size, action_size)
print(policy)

Policy(
  (fc1): Linear(in_features=4, out_features=128, bias=True)
  (fc2): Linear(in_features=128, out_features=2, bias=True)
)


## Part 3: Training a Simple Agent with Policy Gradients (REINFORCE)

Now that we have a policy network, we need a way to train it. We'll use a policy gradient algorithm called **REINFORCE**. The main idea behind REINFORCE is to increase the probability of actions that lead to high rewards and decrease the probability of actions that lead to low rewards.

To do this, we will:
1.  Run an episode using the current policy and collect the trajectory of states, actions, and rewards.
2.  For each step in the trajectory, calculate the **discounted future return**.
3.  Calculate the **policy loss**.
4.  Update the policy network's weights using backpropagation.

### Action Selection

Given the action probabilities from our policy network, we need a way to sample an action. We can use `torch.distributions.Categorical` to create a distribution object from which we can sample an action and also get the log probability of that action, which we'll need for the loss calculation.

### Exercise: Implement the REINFORCE Training Loop

Your next task is to implement the main training loop for the REINFORCE agent. We'll break this down into a few functions.

First, a function to collect a trajectory from the environment using the policy. Then, the main training loop that uses this trajectory to update the policy.

You will need to:
1.  In the `collect_trajectory` function, loop until the episode is done. In each step, get the action probabilities, sample an action, get its log probability, and store the log probability and the reward.
2.  In the main loop, after collecting a trajectory, calculate the discounted returns.
3.  Calculate the policy loss. The loss for each step is `-log_prob * return`. The total loss is the sum of these values.
4.  Use the optimizer to perform a gradient update.

Fill in the missing code in the cell below.

In [9]:
x = torch.tensor([[1.0, 2.0, 3.0, 4.0]])  # Example input
z = x.unsqueeze(0)  # Add batch dimension
print("x:", x, x.shape)
print("z:", z, z.shape)
z[0, 0, 0].item() # only works if scalar is selected

x: tensor([[1., 2., 3., 4.]]) torch.Size([1, 4])
z: tensor([[[1., 2., 3., 4.]]]) torch.Size([1, 1, 4])


1.0

In [None]:
def collect_trajectory(policy, env):
    state, _ = env.reset()
    log_probs = []
    rewards = []
    done = False
    
    while not done:
        state_tensor = torch.from_numpy(state).float().unsqueeze(0)
        # ... Fill in the missing code: get action probabilities from the policy
        action_probs = policy(state_tensor)
        
        # Create a categorical distribution and sample an action
        m = Categorical(action_probs)
        action = m.sample()
        
        # ... Fill in the missing code: get the log probability of the sampled action
        log_probs.append(m.log_prob(action))
        
        # Take a step in the environment ... extract scalar value from tensor
        state, reward, terminated, truncated, _ = env.step(action.item())
        rewards.append(reward)
        done = terminated or truncated
        
    return log_probs, rewards

def train(policy, env, optimizer, n_episodes=1000, gamma=0.99):
    scores = []
    scores_window = deque(maxlen=100)
    
    for i_episode in range(1, n_episodes + 1):
        log_probs, rewards = collect_trajectory(policy, env)
        
        # Calculate discounted returns
        returns = []
        R = 0
        for r in rewards[::-1]:
            R = r + gamma * R
            returns.insert(0, R)
        returns = torch.tensor(returns)
        # Normalize returns for better stability
        returns = (returns - returns.mean()) / (returns.std() + 1e-6) # add a small neglicible const. to prevent 0div
        
        policy_loss = []
        # ... Fill in the missing code: calculate the policy loss for each step
        for log_prob, R in zip(log_probs, returns):
            policy_loss.append(-log_prob * R)
            
        # ... Fill in the missing code: sum the policy loss and perform a gradient update
        optimizer.zero_grad() # reset optimizer
        policy_loss = torch.cat(policy_loss).sum()
        # ...
        policy_loss.backward()
        # ...
        optimizer.step()
        
        scores.append(sum(rewards))
        scores_window.append(sum(rewards))
        
        if i_episode % 100 == 0:
            print(f'Episode {i_episode}\tAverage Score: {np.mean(scores_window):.2f}')
        if np.mean(scores_window) >= 195.0:
            print(f'Environment solved in {i_episode-100:d} episodes!\tAverage Score: {np.mean(scores_window):.2f}')
            break
            
    return scores

# Create the policy and optimizer
policy = Policy(state_size, action_size)
optimizer = optim.Adam(policy.parameters(), lr=1e-2)

# Train the agent
scores = train(policy, env, optimizer)

# Plot the scores
plt.plot(np.arange(len(scores)), scores)
plt.ylabel('Score')
plt.xlabel('Episode #')
plt.show()

env.close()

TypeError: list.append() takes exactly one argument (0 given)

## Part 4: Continuous Control with MuJoCo

The `CartPole-v1` environment has a discrete action space (left or right). Many interesting problems, especially in robotics, have **continuous action spaces**, where actions are real-valued numbers (e.g., the amount of torque to apply to a motor).

We'll use the `InvertedPendulum-v4` environment from `mujoco-py` to explore continuous control. The goal is to keep a pendulum upright.

To handle continuous actions, we need to modify our policy network. Instead of outputting probabilities for discrete actions, the network will output the parameters of a probability distribution, typically a **Gaussian (Normal) distribution**. The network will output the `mean` and `standard deviation` of the distribution, and we will sample an action from it.

### Exercise: Create a Policy Network for Continuous Actions

Your task is to create a policy network for the `InvertedPendulum-v4` environment.

The network should:
*   Take the state as input.
*   Have one hidden layer.
*   Have two output heads:
    *   One for the `mean` of the action distribution.
    *   One for the `standard deviation` (`log_std`) of the action distribution. We output the log of the standard deviation for numerical stability and ensure it's always positive by exponentiating it later.

Fill in the missing code in the cell below.

In [None]:
# First, let's create the environment and get its properties
try:
    env_continuous = gym.make('InvertedPendulum-v4')
    continuous_state_size = env_continuous.observation_space.shape[0]
    continuous_action_size = env_continuous.action_space.shape[0]
    
    class ContinuousPolicy(nn.Module):
        def __init__(self, state_size, action_size, hidden_size=128):
            super(ContinuousPolicy, self).__init__()
            self.fc1 = nn.Linear(state_size, hidden_size)
            # ... Fill in the missing code: a linear layer for the mean
            self.fc_mean = # ...
            # ... Fill in the missing code: a linear layer for the log standard deviation
            self.fc_log_std = # ...

        def forward(self, x):
            x = F.relu(self.fc1(x))
            # ... Fill in the missing code: get the mean from the appropriate layer
            mean = # ...
            # ... Fill in the missing code: get the log_std from the appropriate layer
            log_std = # ...
            std = torch.exp(log_std) # ensure std is positive
            return mean, std

    # Example of how to create the policy network
    continuous_policy = ContinuousPolicy(continuous_state_size, continuous_action_size)
    print(continuous_policy)
    env_continuous.close()

except ImportError:
    print("MuJoCo not installed, skipping continuous control part.")
except Exception as e:
    print(f"An error occurred with the MuJoCo environment: {e}")
    print("Skipping continuous control part.")

The training process is very similar to the discrete case. The main difference is how we calculate the log probability of an action, which is required for the policy gradient loss. Instead of `Categorical`, we'll use `torch.distributions.Normal` to create our distribution, sample an action, and compute its log probability.

### Exercise: Implement the Training Loop for Continuous Control

Now, adapt the training loop for the continuous control case. The structure is almost identical to the discrete version.

You will need to:
1.  In the `collect_trajectory_continuous` function, get the `mean` and `std` from the policy.
2.  Create a `Normal` distribution and sample an action.
3.  Get the log probability of the sampled action.
4.  The rest of the training loop (calculating returns and the policy update) remains the same.

Fill in the missing code in the cell below.

In [None]:
def collect_trajectory_continuous(policy, env):
    state, _ = env.reset()
    log_probs = []
    rewards = []
    done = False
    
    while not done:
        state_tensor = torch.from_numpy(state).float().unsqueeze(0)
        # ... Fill in the missing code: get mean and std from the policy
        mean, std = # ...
        
        # Create a normal distribution and sample an action
        m = Normal(mean, std)
        action = m.sample()
        action = torch.clamp(action, min=env.action_space.low[0], max=env.action_space.high[0])
        
        # ... Fill in the missing code: get the log probability of the sampled action
        log_probs.append(# ...)
        
        # Take a step in the environment
        state, reward, terminated, truncated, _ = env.step(action.numpy().flatten())
        rewards.append(reward)
        done = terminated or truncated
        
    return log_probs, rewards

def train_continuous(policy, env, optimizer, n_episodes=2000, gamma=0.99):
    scores = []
    scores_window = deque(maxlen=100)
    
    for i_episode in range(1, n_episodes + 1):
        log_probs, rewards = collect_trajectory_continuous(policy, env)
        
        returns = []
        R = 0
        for r in rewards[::-1]:
            R = r + gamma * R
            returns.insert(0, R)
        returns = torch.tensor(returns)
        returns = (returns - returns.mean()) / (returns.std() + 1e-6)
        
        policy_loss = []
        for log_prob, R in zip(log_probs, returns):
            policy_loss.append(-log_prob * R)
            
        optimizer.zero_grad()
        policy_loss = torch.cat(policy_loss).sum()
        policy_loss.backward()
        optimizer.step()
        
        scores.append(sum(rewards))
        scores_window.append(sum(rewards))
        
        if i_episode % 100 == 0:
            print(f'Episode {i_episode}\tAverage Score: {np.mean(scores_window):.2f}')
            
    return scores

try:
    if 'env_continuous' in locals() and env_continuous is not None:
        # Create the policy and optimizer
        continuous_policy = ContinuousPolicy(continuous_state_size, continuous_action_size)
        continuous_optimizer = optim.Adam(continuous_policy.parameters(), lr=1e-3)

        # Train the agent
        continuous_scores = train_continuous(continuous_policy, env_continuous, continuous_optimizer)

        # Plot the scores
        plt.plot(np.arange(len(continuous_scores)), continuous_scores)
        plt.ylabel('Score')
        plt.xlabel('Episode #')
        plt.show()

        env_continuous.close()
except NameError:
    print("Skipping continuous training because the environment was not created.")
except Exception as e:
    print(f"An error occurred during continuous training: {e}")

## Conclusion

Congratulations! You have successfully built and trained Reinforcement Learning agents for both discrete and continuous control tasks using PyTorch.

In this tutorial, you have learned how to:
*   Interact with Gym environments.
*   Build neural networks with `torch.nn.Module`.
*   Implement the REINFORCE algorithm from scratch.
*   Use PyTorch's automatic differentiation to train your policy network.
*   Adapt your agent for continuous action spaces.

### Further Learning

This tutorial covers the basics, but the field of Deep Reinforcement Learning is vast. Here are some topics you might want to explore next:

*   **Actor-Critic Methods**: These methods, like A2C and A3C, combine policy gradients with a value function to learn more efficiently.
*   **Proximal Policy Optimization (PPO)**: A state-of-the-art policy gradient method that is widely used and often more stable than REINFORCE.
*   **Deep Q-Networks (DQN)**: A popular value-based method for discrete action spaces.
*   **Soft Actor-Critic (SAC)**: An advanced off-policy actor-critic method for continuous control that incorporates entropy maximization for better exploration.

I hope this tutorial has provided you with a solid foundation for your journey into Reinforcement Learning!