In [1]:
import gymnasium as gym
import numpy as np
import cv2
import os
from tqdm import trange

from stable_baselines3 import DQN
from stable_baselines3.common.vec_env import DummyVecEnv, VecTransposeImage, VecFrameStack
from stable_baselines3.common.evaluation import evaluate_policy

SEED = 42
log_dir = "./dqn_carracing_tensorboard/"
os.makedirs(log_dir, exist_ok=True)

# --- Grayscale Wrapper ---
class GrayScaleObservationWrapper(gym.ObservationWrapper):
    def __init__(self, env):
        super().__init__(env)
        self.observation_space = gym.spaces.Box(
            low=0, high=255, shape=(96, 96, 1), dtype=np.uint8
        )

    def observation(self, obs):
        gray = cv2.cvtColor(obs, cv2.COLOR_RGB2GRAY)
        gray = np.expand_dims(gray, axis=-1)
        return gray

# --- Discretized Action Wrapper ---
class DiscretizedCarRacing(gym.ActionWrapper):
    def __init__(self, env):
        super().__init__(env)
        self.actions = [
            np.array([0.0, 0.0, 0.0]),   # No action
            np.array([0.0, 1.0, 0.0]),   # Gas
            np.array([0.0, 0.0, 0.8]),   # Brake
            np.array([-1.0, 1.0, 0.0]),  # Left + Gas
            np.array([1.0, 1.0, 0.0]),   # Right + Gas
            np.array([-1.0, 0.0, 0.8]),  # Left + Brake
            np.array([1.0, 0.0, 0.8]),   # Right + Brake
        ]
        self.action_space = gym.spaces.Discrete(len(self.actions))

    def action(self, act):
        return self.actions[act]

# --- Environment Factory ---
def make_env():
    env = gym.make("CarRacing-v3", render_mode="rgb_array", continuous=True)
    env = DiscretizedCarRacing(env)
    env = GrayScaleObservationWrapper(env)
    env.reset(seed=SEED)
    env.action_space.seed(SEED)
    return env

# --- Vectorized + Transposed + FrameStacked Env ---
env = DummyVecEnv([make_env])
env = VecTransposeImage(env)  # Converts (H, W, C) -> (C, H, W)
env = VecFrameStack(env, n_stack=4, channels_order='first')  # Output: (4, 96, 96)

# --- Model ---
model = DQN(
    "CnnPolicy",
    env,
    seed=SEED,
    learning_rate=1e-4,
    buffer_size=50000,
    learning_starts=1000,
    batch_size=32,
    train_freq=4,
    target_update_interval=1000,
    exploration_fraction=0.1,
    exploration_final_eps=0.02,
    tensorboard_log=log_dir,
    verbose=0
)

# --- Training ---
total_timesteps = 1_000_000
chunk_size = 10000

with trange(0, total_timesteps, chunk_size, desc="Training DQN on CarRacing") as pbar:
    for _ in pbar:
        model.learn(total_timesteps=chunk_size, reset_num_timesteps=False, tb_log_name="DQN_CarRacing_Stacked")

# --- Save Model ---
model.save("dqn_carracing_stacked")

# --- Evaluate ---
model = DQN.load("dqn_carracing_stacked", env=env)
mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=5, render=True, deterministic=True)
print(f"Mean reward: {mean_reward:.2f} ± {std_reward:.2f}")


Training DQN on CarRacing:   0%|                                                               | 0/100 [03:17<?, ?it/s]


KeyboardInterrupt: 