In [None]:
from google.colab import drive, files
drive.mount('/content/drive')

import os
import shutil

SAVE_DIR = "/content/drive/MyDrive/highway_results/lidar_dqn"
os.makedirs(SAVE_DIR, exist_ok=True)

In [None]:
!pip install highway-env stable-baselines3 gymnasium

In [None]:
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
import torch

from stable_baselines3 import DQN
from stable_baselines3.common.callbacks import BaseCallback, CheckpointCallback
from stable_baselines3.common.vec_env import DummyVecEnv, SubprocVecEnv

import highway_env

## Environment Setup

In [None]:
env_config = {
    "observation": {
        "type": "LidarObservation",
        "cells": 64,
        "maximum_range": 60,
        "normalize": True
    },
    "action": {
        "type": "DiscreteMetaAction",
    },
    "lanes_count": 4,
    "duration": 40,
    "collision_reward": -1.0,
    "reward_speed_range": [20, 30],
    "simulation_frequency": 15,
    "policy_frequency": 5,
}

def create_env():
    env = gym.make("highway-fast-v0", render_mode=None, config=env_config)
    return env

test_env = create_env()
obs, _ = test_env.reset()
print(f"Observation shape: {obs.shape}")
print(f"Action space: {test_env.action_space}")
test_env.close()

## Callbacks

In [None]:
class RewardLoggerCallback(BaseCallback):
    def __init__(self, save_path, verbose=0):
        super().__init__(verbose)
        self.episode_rewards = []
        self.current_rewards = 0
        self.save_path = save_path

    def _on_step(self) -> bool:
        self.current_rewards += self.locals['rewards'][0]
        if self.locals['dones'][0]:
            self.episode_rewards.append(self.current_rewards)
            self.current_rewards = 0
            if len(self.episode_rewards) % 100 == 0:
                np.save(f"{self.save_path}/episode_rewards.npy", self.episode_rewards)
        return True

## DQN Model

In [None]:
env = DummyVecEnv([create_env])

policy_kwargs = dict(net_arch=[256, 256])

model = DQN(
    "MlpPolicy",
    env,
    policy_kwargs=policy_kwargs,
    learning_rate=5e-4,
    buffer_size=15000,
    learning_starts=200,
    batch_size=64,
    gamma=0.99,
    train_freq=4,
    gradient_steps=1,
    target_update_interval=1000,
    exploration_fraction=0.3,
    exploration_final_eps=0.05,
    verbose=1,
    tensorboard_log=f"{SAVE_DIR}/tensorboard/",
    device="auto"
)

## Training

In [None]:
reward_callback = RewardLoggerCallback(SAVE_DIR)
checkpoint_callback = CheckpointCallback(
    save_freq=25000,
    save_path=SAVE_DIR,
    name_prefix="highway_lidar_dqn"
)

TOTAL_TIMESTEPS = 200000

model.learn(
    total_timesteps=TOTAL_TIMESTEPS,
    callback=[reward_callback, checkpoint_callback],
    progress_bar=True
)

model.save(f"{SAVE_DIR}/highway_lidar_final_1")
np.save(f"{SAVE_DIR}/episode_rewards.npy", reward_callback.episode_rewards)

## Learning Curve (ID 1)

In [None]:
rewards = np.array(reward_callback.episode_rewards)

window = 50
rolling_mean = np.convolve(rewards, np.ones(window)/window, mode='valid')

plt.figure(figsize=(12, 6))
plt.plot(rewards, alpha=0.3, color='blue', label='Episode Reward')
plt.plot(range(window-1, len(rewards)), rolling_mean, color='blue', linewidth=2, label=f'Rolling Mean ({window} ep)')
plt.xlabel('Episode')
plt.ylabel('Mean Episodic Reward (Return)')
plt.title('Highway LiDAR DQN - Learning Curve (ID 1)')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()

plt.savefig(f"{SAVE_DIR}/highway_lidar_learning_curve_1.png", dpi=150)
plt.show()

## Performance Test (ID 2)

In [None]:
eval_model = DQN.load(f"{SAVE_DIR}/highway_lidar_final_1")

eval_env = create_env()

n_episodes = 500
test_rewards = []

for ep in range(n_episodes):
    obs, _ = eval_env.reset()
    done = truncated = False
    episode_reward = 0
    
    while not (done or truncated):
        action, _ = eval_model.predict(obs, deterministic=True)
        obs, reward, done, truncated, _ = eval_env.step(action)
        episode_reward += reward
    
    test_rewards.append(episode_reward)

eval_env.close()

np.save(f"{SAVE_DIR}/test_rewards_500ep.npy", test_rewards)
print(f"Mean: {np.mean(test_rewards):.2f}, Std: {np.std(test_rewards):.2f}")

In [None]:
plt.figure(figsize=(8, 8))

parts = plt.violinplot([test_rewards], positions=[1], showmeans=True, showmedians=False)

for pc in parts['bodies']:
    pc.set_facecolor('steelblue')
    pc.set_alpha(0.7)

quartile1 = np.percentile(test_rewards, 25)
median = np.percentile(test_rewards, 50)
quartile3 = np.percentile(test_rewards, 75)

plt.hlines([quartile1, median, quartile3], 0.8, 1.2, colors='blue', linewidth=1.5)

plt.ylabel('Episodic Reward (Return)')
plt.title('Performance Test - 500 Episodes (ID 2)')
plt.xticks([1], ['Highway LiDAR\nDQN Agent'])

mean_reward = np.mean(test_rewards)
std_reward = np.std(test_rewards)
plt.text(1.3, mean_reward, f'Mean: {mean_reward:.2f}\nStd: {std_reward:.2f}', 
         verticalalignment='center', fontsize=10)

plt.tight_layout()

plt.savefig(f"{SAVE_DIR}/highway_lidar_performance_2.png", dpi=150)
plt.show()

## Download Results

In [None]:
import glob

for f in glob.glob(f"{SAVE_DIR}/*"):
    print(f"  {os.path.basename(f)}")

shutil.make_archive('/content/highway_lidar_final', 'zip', SAVE_DIR)
files.download('/content/highway_lidar_final.zip')