In [1]:
import math
import random

import gymnasium as gym
import numpy as np
from mad_pod_racing import MapPodRacing

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Normal, Categorical
from IPython.display import clear_output
import matplotlib.pyplot as plt

  from pkg_resources import resource_stream, resource_exists


In [2]:
gym.register(
        id="gymnasium_env/MapPodRacing-v0",
        entry_point=MapPodRacing,
        max_episode_steps=1000,  # Prevent infinite episodes
)

env_name = 'gymnasium_env/MapPodRacing-v0'
env = gym.make(env_name)

In [3]:
class RL(nn.Module):
    def __init__(self, action_num=8, hidden_size=256):
        super(RL, self).__init__()
        self.fc1 = nn.Linear(8, hidden_size)
        self.fc2 = nn.Linear(hidden_size, action_num)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        logits = self.fc2(x)
        return Categorical(logits = logits)

In [4]:
def calc_returns(rewards, gamma = 0.99):
    returns = []
    delta = 0
    for reward in rewards[::-1]:
        #Bug fixed on this line
        delta = reward + gamma*delta
        returns.insert(0, delta)
    return returns

def test_agent():
    done = False
    total_reward = 0
    obs, info = env.reset()
    observation = torch.FloatTensor(obs).unsqueeze(0)

    with torch.no_grad():
        while not done:
            dist = rl_model(observation)
            action = dist.sample().cpu().item()
            observation, reward, done, truncated, info = env.step(action)
            
            observation = torch.FloatTensor(observation).unsqueeze(0)
            total_reward += reward
            
    return total_reward

In [5]:
rl_model = RL()
lr = 1e-4
optimizer = optim.SGD(rl_model.parameters(), lr=lr)

In [6]:
max_steps = 10000
rollouts = 0
step = 0
score_logger = []

In [7]:
while step < max_steps:
    obs, info = env.reset()
    observation =  torch.FloatTensor(obs).unsqueeze(0)
    done = False
    rewards = []
    log_probs = []
    
    while not done:
        dist = rl_model(observation)
        action = dist.sample()
        log_prob = dist.log_prob(action.unsqueeze(0))
        
        observation, reward, done, truncated, info = env.step(action.cpu().item())
        
        observation = torch.FloatTensor(observation).unsqueeze(0)
        reward = torch.FloatTensor([reward]).unsqueeze(0)

        rewards.append(reward)
        log_probs.append(log_prob)
        step +=1
    
    returns = calc_returns(rewards)
    
    returns = torch.cat(returns, 1)
    returns /= returns.max()
    log_probs = torch.cat(log_probs, 1)
    
    action_loss = - (log_probs * returns).sum()
    
    optimizer.zero_grad()
    action_loss.backward()
    optimizer.step()
    rollouts += 1
    
    if rollouts % 10 == 0:
        new_lr = ((max_steps - step)/max_steps) * lr
        optimizer.param_groups[0]["lr"] = new_lr
        
        score_logger.append(np.mean([test_agent() for _ in range(10)]))
        clear_output(True)
        plt.plot(score_logger)
        plt.show()
    
env.close()

ValueError: Expected parameter logits (Tensor of shape (1, 8)) of distribution Categorical(logits: torch.Size([1, 8])) to satisfy the constraint IndependentConstraint(Real(), 1), but found invalid values:
tensor([[nan, nan, nan, nan, nan, nan, nan, nan]], grad_fn=<SubBackward0>)

In [None]:
plt.plot(score_logger)