In [1]:
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from torch.utils.tensorboard import SummaryWriter

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [3]:
class PolicyNetwork(nn.Module):
    def __init__(self, input_dim, hidden_sizes, output_dim):
        super(PolicyNetwork, self).__init__()
        layers = []
        prev_size = input_dim
        for hs in hidden_sizes:
            layers.append(nn.Linear(prev_size, hs))
            layers.append(nn.ReLU())
            prev_size = hs
        layers.append(nn.Linear(prev_size, output_dim))
        self.model = nn.Sequential(*layers)

    def forward(self, x):
        logits = self.model(x)
        return torch.softmax(logits, dim=-1)

In [4]:
def reinforce(env,
              input_dim,
              output_dim,
              hidden_sizes,
              alpha=0.001,
              episodes=500,
              gamma=0.99,
              log_dir="runs/reinforce"):
    """
    REINFORCE training loop.

    Args:
        env: OpenAI gym environment
        input_dim: dimension of input space
        output_dim: number of actions
        hidden_sizes: list of sizes for each hidden layer
        alpha: The learning rate
        episodes: number of training episodes
        gamma: discount factor
        log_dir: directory for tensorboard logs

    Returns:
        policy_network: the trained policy network
        rewards_per_episode: a list containing the total reward per episode
    """
    writer = SummaryWriter(log_dir=log_dir)

    policy_network = PolicyNetwork(input_dim, hidden_sizes, output_dim).to(device)
    optimizer = optim.Adam(policy_network.parameters(), lr=alpha)
    rewards_per_episode = []

    for episode in range(episodes):
        # generate a new episode
        state, _ = env.reset()
        done = False
        truncated = False
        episode_log_probs = []
        episode_rewards = []

        while not (done or truncated):
            state_tensor = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
            action_probs = policy_network(state_tensor)
            action_dist = torch.distributions.Categorical(action_probs)
            action = action_dist.sample()
            log_prob = action_dist.log_prob(action)

            next_state, reward, done, truncated, info = env.step(action.item())

            episode_log_probs.append(log_prob)
            episode_rewards.append(reward)

            state = next_state

        # compute the returns (G_t) for the episode (including gamma)
        returns = []
        G = 0
        for r in reversed(episode_rewards):
            G = r + gamma * G
            returns.append(G)

        returns.reverse()
        returns = torch.tensor(returns, dtype=torch.float32, device=device)

        # compute policy gradient and update
        policy_loss = []
        for log_prob, R in zip(episode_log_probs, returns):
            policy_loss.append(-log_prob * R)
        policy_loss = torch.cat(policy_loss).sum()

        optimizer.zero_grad()
        policy_loss.backward()
        optimizer.step()

        total_reward = sum(episode_rewards)
        rewards_per_episode.append(total_reward)

        print(f"Episode {episode + 1}/{episodes}, Total Reward: {total_reward}")

        # log metrics to TensorBoard
        writer.add_scalar("Policy Loss", policy_loss.item(), episode)
        writer.add_scalar("Total Reward", total_reward, episode)

        # Early stopping if solved
        if len(rewards_per_episode) >= 100:
            avg_reward = np.mean(rewards_per_episode[-100:])
            print(f"Episode {episode + 1}/{episodes}, Total Reward: {total_reward}, Average Reward: {avg_reward}")
            if avg_reward >= 475.0:
                print(f"Solved in {episode + 1} episodes!")
                break
        else:
            print(f"Episode {episode + 1}/{episodes}, Total Reward: {total_reward}")

    writer.close()
    return policy_network, rewards_per_episode

In [5]:
def test_policy(env, policy_network, episodes=10):
    policy_network.eval()
    avg_reward = 0
    for ep in range(episodes):
        state, _ = env.reset()
        total_reward = 0
        done = False
        while not done:
            state_tensor = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
            with torch.no_grad():
                action_probs = policy_network(state_tensor)
            action = torch.argmax(action_probs).item()
            state, reward, done, truncated, info = env.step(action)
            total_reward += reward
            if total_reward >= 10000:
                break

        avg_reward += total_reward
        print(f"Test Episode {ep + 1}, Total Reward: {total_reward}")
    return avg_reward / episodes

In [6]:
def grid_search(env, input_dim, output_dim, hidden_sizes, gamma_values, alpha_values, episodes=2000):
    best_combination = None
    best_episodes = float('inf')
    best_policy_network = None
    best_reward = float('inf')
    results = []

    for gamma in gamma_values:
        for alpha in alpha_values:
            print(f"Training with gamma={gamma}, alpha={alpha}")
            policy_network, rewards = reinforce(
                env,
                input_dim=input_dim,
                output_dim=output_dim,
                hidden_sizes=hidden_sizes,
                alpha=alpha,
                episodes=episodes,
                gamma=gamma,
                log_dir=f"runs/reinforce_{alpha}_{gamma}"
            )

            avg_reward = np.mean(rewards[-100:])
            solved_episodes = len(rewards)
            results.append((gamma, alpha, solved_episodes, avg_reward))

            if avg_reward >= 475 and solved_episodes < best_episodes:
                best_combination = (gamma, alpha)
                best_episodes = solved_episodes
                best_policy_network = policy_network
                best_reward = avg_reward

            print(f"Gamma={gamma}, Alpha={alpha}, Solved in {solved_episodes} episodes")

    return best_policy_network, best_combination, best_episodes, best_reward, results

In [7]:
# Define the environment and input/output dimensions
env = gym.make("CartPole-v1")
input_dim = env.observation_space.shape[0]
output_dim = env.action_space.n
hidden_sizes = [16, 32, 16]

# Define the ranges for gamma and alpha
gamma_values = [0.95, 0.99, 0.995]
alpha_values = [0.001, 0.0005, 0.0001]

# Perform grid search
best_policy_network, best_params, best_episodes, best_reward,  all_results = grid_search(env, input_dim, output_dim, hidden_sizes, gamma_values, alpha_values, 1000)

# Print the best parameters
if best_params:
    print("\nBest Parameters:")
    print(f"Gamma: {best_params[0]}, Alpha: {best_params[1]}, Solved in {best_episodes} episodes\n")

for gamma, alpha, solved_episode, average_reward in all_results:
    print(f"Gamma: {gamma}, Alpha: {alpha}, Solved in: {solved_episode} episodes, Average Reward: {average_reward}")

Training with gamma=0.95, alpha=0.001


  if not isinstance(terminated, (bool, np.bool8)):


Episode 1/1000, Total Reward: 11.0
Episode 1/1000, Total Reward: 11.0
Episode 2/1000, Total Reward: 15.0
Episode 2/1000, Total Reward: 15.0
Episode 3/1000, Total Reward: 16.0
Episode 3/1000, Total Reward: 16.0
Episode 4/1000, Total Reward: 12.0
Episode 4/1000, Total Reward: 12.0
Episode 5/1000, Total Reward: 10.0
Episode 5/1000, Total Reward: 10.0
Episode 6/1000, Total Reward: 8.0
Episode 6/1000, Total Reward: 8.0
Episode 7/1000, Total Reward: 13.0
Episode 7/1000, Total Reward: 13.0
Episode 8/1000, Total Reward: 25.0
Episode 8/1000, Total Reward: 25.0
Episode 9/1000, Total Reward: 28.0
Episode 9/1000, Total Reward: 28.0
Episode 10/1000, Total Reward: 12.0
Episode 10/1000, Total Reward: 12.0
Episode 11/1000, Total Reward: 15.0
Episode 11/1000, Total Reward: 15.0
Episode 12/1000, Total Reward: 18.0
Episode 12/1000, Total Reward: 18.0
Episode 13/1000, Total Reward: 10.0
Episode 13/1000, Total Reward: 10.0
Episode 14/1000, Total Reward: 16.0
Episode 14/1000, Total Reward: 16.0
Episode 15/1

In [11]:
# test the trained policy
print("Testing the policy...")
test_avg_reward = test_policy(env, best_policy_network, episodes=10)
print(f"Average Test Reward: {test_avg_reward}")

Testing the policy...
Test Episode 1, Total Reward: 10000.0
Test Episode 2, Total Reward: 10000.0
Test Episode 3, Total Reward: 10000.0
Test Episode 4, Total Reward: 10000.0
Test Episode 5, Total Reward: 10000.0
Test Episode 6, Total Reward: 10000.0
Test Episode 7, Total Reward: 10000.0
Test Episode 8, Total Reward: 837.0
Test Episode 9, Total Reward: 10000.0
Test Episode 10, Total Reward: 10000.0
Average Test Reward: 9083.7


In [12]:
# save the policy
torch.save(best_policy_network.state_dict(), 'reinforce_policy.pth')