In [24]:
!pip install highway-env
!pip install stable-baselines3

Collecting stable-baselines3
  Downloading stable_baselines3-2.6.0-py3-none-any.whl.metadata (4.8 kB)
Downloading stable_baselines3-2.6.0-py3-none-any.whl (184 kB)
Installing collected packages: stable-baselines3
Successfully installed stable-baselines3-2.6.0


In [30]:
# Imports
import highway_env
import gymnasium as gym
import matplotlib.pyplot as plt
%matplotlib inline
from IPython.display import clear_output

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import deque



from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env

In [32]:
env = gym.make('highway-v0', render_mode='rgb_array')

env.unwrapped.config["observation"]["type"] = "Kinematics"
env.unwrapped.config["vehicles_count"] = 10
env.unwrapped.config["duration"] = 40
env.reset()

state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
print(state_dim, action_dim)


5 5


In [43]:
class DQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DQN, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, action_dim)
        )
    
    def forward(self, x):
        return self.net(x)

In [44]:
class ReplayBuffer:
    def __init__(self, capacity=100_000):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return (
            torch.tensor(np.array(states), dtype=torch.float32),
            torch.tensor(actions, dtype=torch.long),
            torch.tensor(rewards, dtype=torch.float32),
            torch.tensor(np.array(next_states), dtype=torch.float32),
            torch.tensor(dones, dtype=torch.float32)
        )


    def __len__(self):
        return len(self.buffer)


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
q_net = DQN(state_dim, action_dim).to(device)
target_q_net = DQN(state_dim, action_dim).to(device)
target_q_net.load_state_dict(q_net.state_dict())

optimizer = optim.Adam(q_net.parameters(), lr=1e-4)
buffer = ReplayBuffer()
batch_size = 64
gamma = 0.99
epsilon = 1.0
epsilon_min = 0.05
epsilon_decay = 0.995
target_update_freq = 10
num_episodes = 500

rewards_history = []

for episode in range(num_episodes):
    obs, _ = env.reset()
    total_reward = 0

    for t in range(200):
        # Epsilon-greedy action selection
        if random.random() < epsilon:
            action = env.action_space.sample()
        else:
            with torch.no_grad():
                state_tensor = torch.tensor(obs, dtype=torch.float32).unsqueeze(0).to(device)
                q_values = q_net(state_tensor)
                action = q_values.argmax().item()

        next_obs, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        buffer.push(obs, action, reward, next_obs, done)

        obs = next_obs
        total_reward += reward

        # Training
        if len(buffer) > batch_size:
            states, actions, rewards, next_states, dones = buffer.sample(batch_size)

            states = states.to(device)
            actions = actions.to(device)
            rewards = rewards.to(device)
            next_states = next_states.to(device)
            dones = dones.to(device)

            # Q(s, a)
            q_values = q_net(states).gather(1, actions.unsqueeze(1)).squeeze()

            # max_a' Q_target(s', a')
            with torch.no_grad():
                max_next_q_values = target_q_net(next_states).max(1)[0]
                target = rewards + gamma * max_next_q_values * (1 - dones)

            loss = nn.MSELoss()(q_values, target)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        if done:
            break

    # Update target network
    if episode % target_update_freq == 0:
        target_q_net.load_state_dict(q_net.state_dict())

    # Decay epsilon
    epsilon = max(epsilon_min, epsilon * epsilon_decay)
    rewards_history.append(total_reward)
    print(f"Episode {episode}, Total Reward: {total_reward:.2f}, Epsilon: {epsilon:.2f}")


Episode 0, Total Reward: 4.78, Epsilon: 0.99
Episode 1, Total Reward: 9.84, Epsilon: 0.99
Episode 2, Total Reward: 10.64, Epsilon: 0.99
Episode 3, Total Reward: 1.96, Epsilon: 0.98
Episode 4, Total Reward: 3.84, Epsilon: 0.98
Episode 5, Total Reward: 5.11, Epsilon: 0.97
Episode 6, Total Reward: 8.41, Epsilon: 0.97
Episode 7, Total Reward: 2.46, Epsilon: 0.96


RuntimeError: Index tensor must have the same number of dimensions as input tensor

: 

In [None]:
plt.plot(rewards_history)
plt.xlabel("Episode")
plt.ylabel("Total Reward")
plt.title("DQN Learning Progress")
plt.grid()
plt.show()

In [29]:
env = make_vec_env("highway-v0", n_envs=4)

model = PPO("MlpPolicy", env, verbose=1)
model.learn(total_timesteps=10) # total_timesteps=100_000
model.save("ppo_lane_keeping")

Using cpu device


KeyboardInterrupt: 