In [1]:
import gymnasium as gym
import highway_env
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Normal
import matplotlib.pyplot as plt
from collections import deque
import random
import os

# Set seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)

# Check if CUDA is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [None]:
class Memory:
    """Experience replay buffer to store and sample experiences."""
    def __init__(self, capacity):
        self.capacity = capacity
        self.buffer = deque(maxlen=capacity)
    
    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        state, action, reward, next_state, done = zip(*random.sample(self.buffer, batch_size))
        return (np.array(state), np.array(action), 
                np.array(reward, dtype=np.float32), 
                np.array(next_state), np.array(done, dtype=np.uint8))
    
    def __len__(self):
        return len(self.buffer)

class Actor(nn.Module):
    """Actor network for determining the actions to take with Layer Normalization."""
    def __init__(self, state_dim, action_dim, hidden_dim=512, log_std_min=-20, log_std_max=2):
        super(Actor, self).__init__()
        self.log_std_min = log_std_min
        self.log_std_max = log_std_max
        
        # Feature extraction layers with Layer Normalization
        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.ln1 = nn.LayerNorm(hidden_dim)
        
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.ln2 = nn.LayerNorm(hidden_dim)
        
        # Mean and log_std output layers
        self.mu = nn.Linear(hidden_dim, action_dim)
        self.log_std = nn.Linear(hidden_dim, action_dim)

    def forward(self, state):
        x = F.relu(self.ln1(self.fc1(state)))
        x = F.relu(self.ln2(self.fc2(x)))
        
        mu = self.mu(x)
        
        # Constrain log_std within reasonable range
        log_std = self.log_std(x)
        log_std = torch.clamp(log_std, self.log_std_min, self.log_std_max)
        
        return mu, log_std
    
    def sample(self, state):
        mu, log_std = self.forward(state)
        std = log_std.exp()
        
        # Use reparameterization trick
        normal = Normal(mu, std)
        z = normal.rsample()
        
        # Squash using tanh to bound actions between -1 and 1
        action = torch.tanh(z)
        
        # Calculate log probability, incorporating the Jacobian adjustment for tanh
        log_prob = normal.log_prob(z) - torch.log(1 - action.pow(2) + 1e-6)
        log_prob = log_prob.sum(1, keepdim=True)
        
        return action, log_prob

class Critic(nn.Module):
    """Critic network for estimating value functions with Layer Normalization."""
    def __init__(self, state_dim, action_dim, hidden_dim=256):
        super(Critic, self).__init__()
        
        # Q1 architecture with Layer Normalization
        self.fc1 = nn.Linear(state_dim + action_dim, hidden_dim)
        self.ln1 = nn.LayerNorm(hidden_dim)
        
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.ln2 = nn.LayerNorm(hidden_dim)
        
        self.q1 = nn.Linear(hidden_dim, 1)
        
        # Q2 architecture (for twin Q-learning) with Layer Normalization
        self.fc3 = nn.Linear(state_dim + action_dim, hidden_dim)
        self.ln3 = nn.LayerNorm(hidden_dim)
        
        self.fc4 = nn.Linear(hidden_dim, hidden_dim)
        self.ln4 = nn.LayerNorm(hidden_dim)
        
        self.q2 = nn.Linear(hidden_dim, 1)

    def forward(self, state, action):
        sa = torch.cat([state, action], 1)
        
        # Q1 estimate
        q1 = F.relu(self.ln1(self.fc1(sa)))
        q1 = F.relu(self.ln2(self.fc2(q1)))
        q1 = self.q1(q1)
        
        # Q2 estimate
        q2 = F.relu(self.ln3(self.fc3(sa)))
        q2 = F.relu(self.ln4(self.fc4(q2)))
        q2 = self.q2(q2)
        
        return q1, q2

In [3]:
class SAC:
    """Soft Actor-Critic algorithm."""
    def __init__(self, state_dim, action_dim, action_space, hidden_dim=256, lr=3e-4, gamma=0.99, tau=0.005, alpha=0.2, auto_entropy_tuning=True):
        self.gamma = gamma
        self.tau = tau
        self.alpha = alpha
        self.auto_entropy_tuning = auto_entropy_tuning
        self.action_space = action_space
        
        # Initialize networks
        self.actor = Actor(state_dim, action_dim, hidden_dim).to(device)
        self.critic = Critic(state_dim, action_dim, hidden_dim).to(device)
        self.critic_target = Critic(state_dim, action_dim, hidden_dim).to(device)
        
        # Copy parameters from critic to target
        for target_param, param in zip(self.critic_target.parameters(), self.critic.parameters()):
            target_param.data.copy_(param.data)
        
        # Setup optimizers
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=lr)
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=lr)
        
        # Automatic entropy tuning
        if auto_entropy_tuning:
            self.target_entropy = -torch.prod(torch.Tensor(action_space.shape).to(device)).item()
            self.log_alpha = torch.zeros(1, requires_grad=True, device=device)
            self.alpha_optimizer = optim.Adam([self.log_alpha], lr=lr)
    
    def select_action(self, state, evaluate=False):
        state = torch.FloatTensor(state).to(device)
        state = state.view(1, -1)

        if evaluate:
            # Use mean action for evaluation (no exploration)
            with torch.no_grad():
                mu, _ = self.actor(state)
                return torch.tanh(mu).cpu().numpy()[0]
        else:
            # Sample action for training
            with torch.no_grad():
                action, _ = self.actor.sample(state)
                return action.cpu().numpy()[0]
    
    def train(self, memory, batch_size=256):
        if len(memory) < batch_size:
            return
        
        state_batch, action_batch, reward_batch, next_state_batch, done_batch = memory.sample(batch_size)
        
        # Convert to tensor
        state_batch = torch.FloatTensor(state_batch).to(device)
        action_batch = torch.FloatTensor(action_batch).to(device)
        reward_batch = torch.FloatTensor(reward_batch).to(device).unsqueeze(1)
        next_state_batch = torch.FloatTensor(next_state_batch).to(device)
        done_batch = torch.FloatTensor(done_batch).to(device).unsqueeze(1)
        
        with torch.no_grad():
            next_state_batch = next_state_batch.view(next_state_batch.size(0), -1)
            next_action, next_log_prob = self.actor.sample(next_state_batch)
            next_q1, next_q2 = self.critic_target(next_state_batch, next_action)
            next_q = torch.min(next_q1, next_q2) - self.alpha * next_log_prob
            target_q = reward_batch + (1 - done_batch) * self.gamma * next_q
        
        # Current Q estimates
        state_batch = state_batch.view(state_batch.size(0), -1)
        current_q1, current_q2 = self.critic(state_batch, action_batch)
        
        # Critic loss
        critic_loss = F.mse_loss(current_q1, target_q) + F.mse_loss(current_q2, target_q)
        
        # Update critic
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        torch.nn.utils.clip_grad_norm_(self.critic.parameters(), max_norm=1.0)
        self.critic_optimizer.step()
        
        # Actor loss
        action, log_prob = self.actor.sample(state_batch)
        q1, q2 = self.critic(state_batch, action)
        min_q = torch.min(q1, q2)
        actor_loss = (self.alpha * log_prob - min_q).mean()
        
        # Update actor
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        torch.nn.utils.clip_grad_norm_(self.actor.parameters(), max_norm=1.0)
        self.actor_optimizer.step()
        
        # Update alpha if needed
        if self.auto_entropy_tuning:
            alpha_loss = -(self.log_alpha * (log_prob + self.target_entropy).detach()).mean()
            self.alpha_optimizer.zero_grad()
            alpha_loss.backward()
            self.alpha_optimizer.step()
            self.alpha = self.log_alpha.exp()
        
        # Soft update of target networks
        for target_param, param in zip(self.critic_target.parameters(), self.critic.parameters()):
            target_param.data.copy_(target_param.data * (1.0 - self.tau) + param.data * self.tau)
        
        return critic_loss.item(), actor_loss.item()
    
    def save(self, directory):
        if not os.path.exists(directory):
            os.makedirs(directory)
        
        torch.save(self.actor.state_dict(), os.path.join(directory, "actor.pth"))
        torch.save(self.critic.state_dict(), os.path.join(directory, "critic.pth"))
        torch.save(self.critic_target.state_dict(), os.path.join(directory, "critic_target.pth"))
        
        if self.auto_entropy_tuning:
            torch.save(self.log_alpha, os.path.join(directory, "log_alpha.pth"))
    
    def load(self, directory):
        self.actor.load_state_dict(torch.load(os.path.join(directory, "actor.pth")))
        self.critic.load_state_dict(torch.load(os.path.join(directory, "critic.pth")))
        self.critic_target.load_state_dict(torch.load(os.path.join(directory, "critic_target.pth")))
        
        if self.auto_entropy_tuning and os.path.exists(os.path.join(directory, "log_alpha.pth")):
            self.log_alpha = torch.load(os.path.join(directory, "log_alpha.pth"))
            self.alpha = self.log_alpha.exp()

In [14]:
class NormalizedEnv(gym.Wrapper):
    """Environment wrapper for normalizing observations and rewards."""
    def __init__(self, env):
        super(NormalizedEnv, self).__init__(env)
        self.state_mean = np.zeros(self.observation_space.shape[0])
        self.state_std = np.ones(self.observation_space.shape[0])
        self.state_buffer = deque(maxlen=10000)
        self.reward_scale = 1.0
        
    def normalize_state(self, state):
        return (state - self.state_mean) / (self.state_std + 1e-8)
    
    def update_state_stats(self):
        if len(self.state_buffer) > 0:
            states = np.array(self.state_buffer)
            self.state_mean = np.mean(states, axis=0)
            self.state_std = np.std(states, axis=0) + 1e-8
    
    def step(self, action):
        try:
            action = np.array(action[0])
        except:
            action = np.array(action)
        next_state, reward, done, truncated, info = self.env.step(action)
        self.state_buffer.append(next_state)
        
        return self.normalize_state(next_state), reward * self.reward_scale, done, truncated, info
    
    def reset(self, **kwargs):
        state, info = self.env.reset(**kwargs)
        self.state_buffer.append(state)
        self.update_state_stats()
        
        return self.normalize_state(state), info

In [5]:
# Custom reward shaping
class RewardShapingEnv(gym.Wrapper):
    """Environment wrapper for shaping rewards."""
    def __init__(self, env):
        super(RewardShapingEnv, self).__init__(env)
        self.prev_pos = None
        self.prev_velocity = None
        self.track_progress = 0
    
    def reset(self, **kwargs):
        state, info = self.env.reset(**kwargs)
        self.prev_pos = state[:2] 
        self.prev_velocity = state[2:]
        self.track_progress = 0
        return state, info
    
    def step(self, action):
        next_state, reward, done, truncated, info = self.env.step(action)
        
        # Extract current position and velocity
        curr_pos = next_state[:2]
        curr_velocity = next_state[2:]
        
        # Basic reward from environment
        shaped_reward = reward 
        
        # Reward for making progress on the track (distance traveled in desired direction)
        if self.prev_pos is not None:
            progress = np.linalg.norm(curr_pos - self.prev_pos)
            self.track_progress += progress
            shaped_reward += progress * 0.1  # Small reward for progress
        
        if info["rewards"].get("on_road_reward", True) :
            shaped_reward += 0.5
        elif info["rewards"].get("on_road_reward", False) :
            shaped_reward -= 10

        # Reward for maintaining speed
        speed = np.linalg.norm(curr_velocity)
        shaped_reward += 0.1 * speed 
        
        # Penalty for abrupt changes in velocity (smooth driving)
        if self.prev_velocity is not None:
            velocity_change = np.linalg.norm(curr_velocity - self.prev_velocity)
            shaped_reward -= 0.01 * velocity_change  
        
        # Additional success reward
        if done and reward > 0:
            shaped_reward += 20.0  # Bonus for completing the track
        
        # Update previous values for next step
        self.prev_pos = curr_pos
        self.prev_velocity = curr_velocity
        
        return next_state, shaped_reward, done, truncated, info


In [None]:
def plot_rewards(rewards, window_size=100):
    """Plot rewards over time with a rolling average."""
    plt.figure(figsize=(10, 5))
    plt.plot(rewards, label='Reward per Episode')
    
    if len(rewards) >= window_size:
        rolling_mean = [np.mean(rewards[max(0, i-window_size):i]) for i in range(1, len(rewards)+1)]
        plt.plot(range(len(rolling_mean)), rolling_mean, label=f'{window_size}-Episode Moving Average')
    
    plt.xlabel('Episode')
    plt.ylabel('Reward')
    plt.title('Training Progress')
    plt.legend()
    plt.grid(True)
    plt.savefig('training_rewards.png')
    plt.close()

def evaluate_policy(agent, env, eval_episodes=10):
    """Evaluate the agent's performance without exploration."""
    avg_reward = 0.0
    successes = 0
    
    for _ in range(eval_episodes):
        state, _ = env.reset()
        episode_reward = 0.0
        done = False
        truncated = False
        
        while not (done or truncated):
            action = agent.select_action(state, evaluate=True)
            state, reward, done, truncated, _ = env.step(action)
            episode_reward += reward
            
            if done and reward > 0:
                successes += 1
        
        avg_reward += episode_reward
    
    avg_reward /= eval_episodes
    success_rate = successes / eval_episodes
    
    print(f"Evaluation over {eval_episodes} episodes: {avg_reward:.3f} average reward, {success_rate:.2f} success rate")
    return avg_reward, success_rate

def train_agent(config):
    # Create environment
    env = gym.make("racetrack-v0", render_mode="rgb_array")
    env.unwrapped.config.update(config)
    env.reset()
    env = RewardShapingEnv(env)  # Apply reward shaping
    env = NormalizedEnv(env)     # Apply observation normalization
    
    # Get state and action dimensions
    print(env.observation_space.shape)
    obs_dim = 432
    action_dim = env.action_space.shape[0]
    action_high = env.action_space.high[0]
    
    print(f"State dimension: {obs_dim}")
    print(f"Action dimension: {action_dim}")
    print(f"Action high: {action_high}")

    # Hyperparameters
    lr = 3e-4
    gamma = 0.99
    tau = 0.005
    alpha = 0.2
    max_episodes = 500
    max_steps = 100
    batch_size = 256
    updates_per_step = 1
    eval_interval = 20
    
    # Initialize agent
    agent = SAC(
        state_dim=obs_dim,
        action_dim=action_dim,
        action_space=env.action_space,
        hidden_dim=256,
        lr=lr,
        gamma=gamma,
        tau=tau,
        alpha=alpha,
        auto_entropy_tuning=True
    )
    
    # Initialize replay buffer
    memory = Memory(capacity=100000)
    
    # Tracking metrics
    rewards = []
    eval_rewards = []
    eval_success_rates = []
    best_eval_reward = -float('inf')
    
    # Main training loop
    total_steps = 0
    for episode in range(1, max_episodes + 1):
        state, _ = env.reset()
        episode_reward = 0
        episode_steps = 0
        done = False
        truncated = False
        
        while not (done or truncated) and episode_steps < max_steps:
            if total_steps < 250:  # Initial exploration
                action = env.action_space.sample()
            else:
                action = agent.select_action(state)
            
            next_state, reward, done, truncated, _ = env.step(action)
            episode_reward += reward
            
            # Store transition in memory
            memory.push(state, action, reward, next_state, float(done))
            
            # Update agent
            if len(memory) > batch_size:
                for _ in range(updates_per_step):
                    agent.train(memory, batch_size)
            
            state = next_state
            episode_steps += 1
            total_steps += 1
        
        rewards.append(episode_reward)
        
        # Print episode stats
        print(f"Episode: {episode}, Total Steps: {total_steps}, Reward: {episode_reward:.3f}")
        
        # Evaluate and save best model
        if episode % eval_interval == 0:
            avg_reward, success_rate = evaluate_policy(agent, env)
            eval_rewards.append(avg_reward)
            eval_success_rates.append(success_rate)
            
            # Save if it's the best model so far
            if avg_reward > best_eval_reward:
                best_eval_reward = avg_reward
                agent.save('./models/best_model')
                print(f"New best model saved with reward: {best_eval_reward:.3f}")
            
            # Save latest model
            agent.save('./models/latest_model')
            
            # Plot training progress
            plot_rewards(rewards)
    
    # Final evaluation
    print("Training finished. Final evaluation:")
    evaluate_policy(agent, env, eval_episodes=20)
    
    # Save final model
    agent.save('./models/final_model')
    
    # Close environment
    env.close()
    
    return agent, rewards, eval_rewards, eval_success_rates

In [None]:
racing_config = {
    'action': {
        'lateral': True,
        'longitudinal': True,
        'target_speeds': [0, 30, 50, 80],
        'type': 'ContinuousAction'
    },
    'duration': 120,
    'controlled_vehicles': 1,
    'observation': {
        'align_to_vehicle_axes': True,
        'as_image': False,
        'features': ['presence', 'velocity', 'acceleration'],
        'grid_size': [[-30, 30], [-30, 30]],
        'grid_step': [5, 5],
        'type': 'OccupancyGrid'
    },
    'other_vehicles': 1,
    'other_vehicles_type': 'highway_env.vehicle.behavior.AggressiveVehicle',
    'render_agent': True,
    'manual_control': False
    }
    
agent, rewards, eval_rewards, eval_success_rates = train_agent(config=racing_config)

# Plot final training results
plot_rewards(rewards)

# Plot evaluation metrics
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(list(range(0, len(rewards), 20)), eval_rewards)
plt.xlabel('Episode')
plt.ylabel('Evaluation Reward')
plt.title('Evaluation Rewards')
plt.grid(True)

plt.subplot(1, 2, 2)
plt.plot(list(range(0, len(rewards), 20)), eval_success_rates)
plt.xlabel('Episode')
plt.ylabel('Success Rate')
plt.title('Success Rate')
plt.grid(True)

plt.tight_layout()
plt.savefig('evaluation_metrics.png')
plt.close()

# Test the best model
print("Testing the best model:")
env = gym.make("racetrack-v0", render_mode="human")
# env.unwrapped.config.update(racing_config)
env = RewardShapingEnv(env)
env = NormalizedEnv(env)

# Load best model
state_dim = 432
action_dim = env.action_space.shape[0]
best_agent = SAC(state_dim, action_dim, env.action_space)
best_agent.load('./models_test/best_model')

# Run some test episodes
for episode in range(5):
    state, _ = env.reset()
    done = False
    truncated = False
    episode_reward = 0
    
    while not (done or truncated):
        action = best_agent.select_action(state, evaluate=True)
        state, reward, done, truncated, _ = env.step(action)
        episode_reward += reward
    
    print(f"Test Episode {episode+1}: Reward = {episode_reward:.3f}")

env.close()

(2, 12, 12)
State dimension: 432
Action dimension: 1
Action high: 1.0


IndexError: invalid index to scalar variable.

In [None]:
import gymnasium as gym
from gymnasium.wrappers import RecordVideo
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.vec_env import SubprocVecEnv

import highway_env  # noqa: F401


TRAIN = True

if __name__ == "__main__":
    n_cpu = 6
    batch_size = 64
    env = make_vec_env("racetrack-v0", n_envs=n_cpu, vec_env_cls=SubprocVecEnv)
    model = PPO(
        "MlpPolicy",
        env,
        policy_kwargs=dict(net_arch=[dict(pi=[256, 256], vf=[256, 256])]),
        n_steps=batch_size * 12 // n_cpu,
        batch_size=batch_size,
        n_epochs=10,
        learning_rate=5e-4,
        gamma=0.9,
        verbose=2,
    )
    # Train the model
    if TRAIN:
        model.learn(total_timesteps=int(1e5))
        model.save("racetrack_ppo/model")
        del model

    # Run the algorithm
    model = PPO.load("racetrack_ppo/model", env=env)

    env = gym.make("racetrack-v0")

    for episode in range(10):
        done = truncated = False
        obs, info = env.reset()
        while not (done or truncated):
            # Predict
            action, _states = model.predict(obs, deterministic=True)
            # Get reward
            obs, reward, done, truncated, info = env.step(action)
            # Render
            env.render()
    env.close()