In [None]:
import gymnasium as gym
from stable_baselines3 import DQN
from stable_baselines3.common.env_util import make_atari_env
from stable_baselines3.common.vec_env import (
    DummyVecEnv,
    VecFrameStack,
    VecTransposeImage,
)
from stable_baselines3.common.callbacks import EvalCallback, StopTrainingOnRewardThreshold
from stable_baselines3.common.monitor import Monitor
import numpy as np
import os
import ale_py
import torch
import time
import cv2

In [7]:
def preprocess_frame(frame):    
	# Convert to grayscale    
    gray_frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)    
    
    # Resize the frame    
    resized_frame = cv2.resize(gray_frame, (84, 84))    
    
    # Normalize pixel values   
    normalized_frame = resized_frame / 255.0    
    
    return normalized_frame

In [8]:
def make_env(env_id: str, n_stack: int, seed=None):
    env = gym.make(env_id)
    # Apply custom preprocessing *first*
    # Add other wrappers if needed (e.g., Monitor for logging)
    # Note: If using NoFrameskip, you might add MaxAndSkipEnv here
    # env = MaxAndSkipEnv(env, skip=4) # Example if starting with NoFrameskip

    # env = PongHitRewardWrapper(env)
    env = Monitor(env)
    env = DummyVecEnv([lambda: env])  # Wrap in vectorized environment
    env = VecFrameStack(env, n_stack=n_stack)
    env = VecTransposeImage(env)
    return env

**Make atari env already does some preporcessing like making the images cropped or making them grayscale**

In [9]:
# Configuration
SEED = 42
TOTAL_TIMESTEPS = 500_000  # Total training timesteps, it seems they even use 10_000_000 steps, but they se frame skip with atariwrapper (talking about stable baselines zoo)
EVAL_FREQ = 10_000  # Evaluate every N timesteps
LOG_DIR = "./logs/dqn_pong_full_vision/"
MODEL_SAVE_PATH = "./models/dqn_pong_full_vision"
TRAIN = True  # Set to False to load and evaluate a trained model
N_STACK = 4

ENV_ID = "PongNoFrameskip-v4"

device = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"

def main():
    gym.register_envs(ale_py)

    # Create environment using RAM observations instead of pixels
    env = make_env(ENV_ID, N_STACK)
    env = make_atari_env(ENV_ID, n_envs=4) 
    env = VecFrameStack(env, n_stack=N_STACK)
    env = VecTransposeImage(env)
    
    # Callback for evaluation during training
    eval_env = make_env(ENV_ID, N_STACK)
    eval_env = make_atari_env(ENV_ID, n_envs=1)
    eval_env = VecFrameStack(eval_env, n_stack=N_STACK)
    eval_env = VecTransposeImage(eval_env)
    
    # Stop training when mean reward reaches 18 (Pong is scored between -21 and +21)
    callback_on_best = StopTrainingOnRewardThreshold(reward_threshold=18, verbose=1)
    eval_callback = EvalCallback(
        eval_env,
        callback_on_new_best=callback_on_best,
        best_model_save_path=MODEL_SAVE_PATH,
        log_path=LOG_DIR,
        eval_freq=EVAL_FREQ,
        verbose=1,
    )
    
    if TRAIN:
        # Create model with MLP policy instead of CNN
        model = DQN(
            "CnnPolicy",
            env,
            verbose=0,
            buffer_size=10_000,
            learning_starts=100_000,
            batch_size=32,
            gamma=0.99,
            train_freq=4,
            gradient_steps=1,
            target_update_interval=1_000,
            exploration_fraction=0.1,
            exploration_final_eps=0.01,
            learning_rate=1e-4,
            seed=SEED,
            tensorboard_log=LOG_DIR,
            device=device,
            # optimize_memory_usage=True,
        )
        
        # Train the model
        model.learn(
            total_timesteps=TOTAL_TIMESTEPS,
            callback=eval_callback,
            progress_bar=True
        )
        
        # Save the final model
        model.save(f"{MODEL_SAVE_PATH}_final")
    else:
        # Load the trained model
        model = DQN.load(f"{MODEL_SAVE_PATH}_best", env=env)
    
    # Evaluate the model
    mean_reward, std_reward = evaluate_model(model, eval_env, n_eval_episodes=10)
    print(f"Mean reward: {mean_reward:.2f} +/- {std_reward:.2f}")
    
    # Close environments
    env.close()
    eval_env.close()

def evaluate_model(model, eval_env, n_eval_episodes=10):
    """
    Evaluate a RL model
    :param model: (BaseAlgorithm) The RL model
    :param eval_env: (gym.Env) The evaluation environment
    :param n_eval_episodes: (int) Number of episodes to evaluate
    :return: (float, float) Mean reward and standard deviation
    """
    episode_rewards = []
    for _ in range(n_eval_episodes):
        obs = eval_env.reset()
        done = False
        total_reward = 0.0
        while not done:
            action, _ = model.predict(obs, deterministic=True)
            obs, reward, done, _ = eval_env.step(action)
            total_reward += reward
        episode_rewards.append(total_reward)
    
    mean_reward = np.mean(episode_rewards)
    std_reward = np.std(episode_rewards)
    
    return mean_reward, std_reward

# Create directories if they don't exist
os.makedirs(LOG_DIR, exist_ok=True)
os.makedirs(os.path.dirname(MODEL_SAVE_PATH), exist_ok=True)

main()

Output()

Mean reward: -21.00 +/- 0.00


In [None]:

MODEL_PATH = "./models/dqn_pong_full_vision_final"  # Update this to your model path
NUM_EPISODES = 1  # Number of games to play
RENDER_DELAY = 0.05  # Delay between frames (in seconds) for better visualization
N_STACK = 4

def load_model(env):
    # Load the trained model
    model = DQN.load(MODEL_PATH, env=env)
    return model

def watch_agent_play():
    # Create environment
    env = gym.make("PongDeterministic-v4", render_mode="human")
    # env = PongHitRewardWrapper(env)
    env = DummyVecEnv([lambda: env])  # Wrap in vectorized environment
    env = VecFrameStack(env, n_stack=N_STACK)
    
    
    # Load the trained model
    model = load_model(env)
    
    for episode in range(1, NUM_EPISODES + 1):
        # obs, _ = env.reset()
        obs = env.reset()
        done = False
        total_reward = 0
        frames = 0
        
        while not done:
            # Show the game screen
            env.render()
            
            # Get action from the model
            action, _ = model.predict(obs, deterministic=True)
            
            # Take the action
            # obs, reward, done, truncated, info = env.step(action)
            obs, reward, done, info = env.step(action)
            # done = done or truncated
            
            total_reward += reward
            frames += 1
            
            # Add small delay to make the game watchable
            time.sleep(RENDER_DELAY)
        
        print(f"Episode {episode}: Total reward: {total_reward}, Frames: {frames}")
    
    env.close()

In [None]:
watch_agent_play()

Episode 1: Total reward: [-15.7], Frames: 888


: 