In [78]:
import time
import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Normal
import copy

In [None]:
# FROM TP5
def layer_init(layer, std=np.sqrt(2), bias_const=0.0):
    torch.nn.init.orthogonal_(layer.weight, std)
    torch.nn.init.constant_(layer.bias, bias_const)
    return layer

class Agent(nn.Module):
    def __init__(self, env):
        super().__init__()
        
        obs_dim = int(np.prod(env.observation_space.shape))
        
        action_dim = env.action_space.shape[0]
        
        
        self.actor = nn.Sequential(
            layer_init(nn.Linear(obs_dim, 64)),
            nn.Tanh(),
            layer_init(nn.Linear(64, 64)),
            nn.Tanh(),
            layer_init(nn.Linear(64, action_dim), std=0.01),
        )
        
        self.log_std = nn.Parameter(torch.zeros(action_dim))

    def transform_action(self, action):
        # Permet de garder les bounds du action space correctes
        transformed = action.clone()
        # Pour le steering
        transformed[..., 0] = torch.tanh(transformed[..., 0])
        # Pour le gas et le brake, si présents
        if transformed.shape[-1] > 1:
            transformed[..., 1:] = torch.sigmoid(transformed[..., 1:])
        return transformed
        
    def get_action(self, obs, action=None):
        if obs.dim() > 2:
            obs = obs.view(1, -1)

        # Calcul de la moyenne
        mean = self.actor(obs)
        # Calcul de l'écart type (on exponentie log_std)
        std = torch.exp(self.log_std).expand_as(mean)

        # Création de la distribution Gaussienne 
        dist = Normal(mean, std)

        if action is None:
            action = dist.sample()   # Échantillonne une action

        # Calcul de la log-probabilité et de l'entropie
        log_prob = dist.log_prob(action).sum(dim=-1)  # somme sur chaque dimension d'action
        entropy = dist.entropy().sum(dim=-1)

        transformed_action = self.transform_action(action)

        return transformed_action.squeeze(), log_prob, entropy


In [None]:
LEARNING_RATE = 3e-4
NUM_EPOCH = 10
N_ITERATION = 10
N_STEPS = 128
N_SAMPLE = 64
BATCH_SIZE = 64

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

env = gym.make(
            "CarRacing-v3",
            continuous=True,
            lap_complete_percent=0.95,
            domain_randomize=False,
            render_mode="rgb_array"
        )

print("Observation space", env.observation_space.shape)
print("Action space", env.action_space.shape)

# GRPO algorithm: https://www.youtube.com/watch?v=YCawyzAOg1Y
agent = Agent(env).to(device)
optimizer = optim.Adam(agent.parameters(), lr=LEARNING_RATE)

actions = torch.zeros(N_SAMPLE, N_STEPS, env.action_space.shape[0]).to(device)
log_probs = torch.zeros(N_SAMPLE, N_STEPS).to(device)
entropies = torch.zeros(N_SAMPLE, N_STEPS).to(device)
rewards = torch.zeros(N_SAMPLE, N_STEPS).to(device)
next_observations = torch.zeros((N_SAMPLE, N_STEPS) + env.observation_space.shape, device=device)
dones = torch.zeros(N_SAMPLE, N_STEPS).to(device)
advantages = torch.zeros(N_SAMPLE, N_STEPS).to(device)

global_step = 0
start_time = time.time()
observation, info = env.reset()
observation = torch.tensor(observation, dtype=torch.float32).to(device)
print("Observation shape", observation.shape)

for iter in range(1, N_ITERATION + 1):
    print("New iteration")
    for step in range(0, N_STEPS):
        global_step += 1

        with torch.no_grad():
            # Boucle pour échantillonner N_SAMPLE actions => on a plus le critic model
            for i in range(N_SAMPLE):                
                action, log_prob, entropy = agent.get_action(observation)
                next_observation, reward, terminated, truncated, info = env.step(action.cpu().numpy())
                done = terminated or truncated

                rewards[i, step] = reward
                next_observations[i, step] = torch.tensor(next_observation, dtype=torch.float32)
                dones[i, step] = done
                actions[i, step] = action
                log_probs[i, step] = log_prob
                entropies[i, step] = entropy

        
        mean_rewards = rewards[:, step].mean()

        std_rewards = rewards[:, step].std() + 1e-8

        advantages[:, step] = (rewards[:, step] - mean_rewards) / std_rewards  # advantages formula => Ait = (Ri - mean(R)) / std(R) => paper

    break
        


Observation space (96, 96, 3)
Action space (3,)
Observation shape torch.Size([96, 96, 3])
New iteration
