In [4]:
import os
import random
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import gymnasium as gym
import ale_py
from collections import deque
import time

# Register Atari environments
gym.register_envs(ale_py)

# Custom frame stacking to replace gymnasium's FrameStack
class CustomFrameStack:
    def __init__(self, env, num_stack=4):
        self.env = env
        self.num_stack = num_stack
        self.frames = deque(maxlen=num_stack)
        self.observation_space = gym.spaces.Box(
            low=0, high=255, 
            shape=(84, 84, num_stack), 
            dtype=np.uint8
        )
        self.action_space = env.action_space
        
    def reset(self, **kwargs):
        obs, info = self.env.reset(**kwargs)
        for _ in range(self.num_stack):
            self.frames.append(obs)
        stacked_frames = np.stack(self.frames, axis=-1)
        return stacked_frames, info
    
    def step(self, action):
        obs, reward, terminated, truncated, info = self.env.step(action)
        self.frames.append(obs)
        stacked_frames = np.stack(self.frames, axis=-1)
        return stacked_frames, reward, terminated, truncated, info
    
    def close(self):
        return self.env.close()
        
    def render(self):
        return self.env.render()

# Configuration parameters
class DQNConfig:
    def __init__(self):
        # Environment
        self.env_name = "SpaceInvadersNoFrameskip-v4"
        self.render_mode = None  # Set to "human" to visualize
        self.num_stack = 4
        
        # Training hyperparameters
        self.gamma = 0.99  # Discount factor
        self.learning_rate = 0.00025
        self.batch_size = 32
        self.epsilon_start = 1.0
        self.epsilon_min = 0.1
        self.epsilon_decay_frames = 1000000  # Frames over which to decay epsilon
        self.target_update_freq = 10000  # Update target network every N steps
        
        # Memory
        self.replay_memory_size = 100000  # Reduced from original paper to save memory
        self.min_replay_memory_size = 10000  # Start training after this many frames
        
        # Training limits
        self.total_frames = 5000000  # Total frames to train for (reduced from paper's 50M)
        self.max_episode_length = 10000  # Max frames per episode
        
        # Checkpointing
        self.save_freq = 50000  # Save model every N frames
        self.checkpoint_dir = "checkpoints"
        
        # Game specific
        self.num_actions = 6  # Space Invaders has 6 actions
        
    def epsilon_by_frame(self, frame):
        # Linear epsilon decay
        epsilon = self.epsilon_start - frame * (self.epsilon_start - self.epsilon_min) / self.epsilon_decay_frames
        return max(self.epsilon_min, epsilon)

# Deep Q-Network architecture
def create_q_model(num_actions):
    model = keras.Sequential([
        layers.Input(shape=(84, 84, 4)),
        layers.Conv2D(32, kernel_size=8, strides=4, activation="relu"),
        layers.Conv2D(64, kernel_size=4, strides=2, activation="relu"),
        layers.Conv2D(64, kernel_size=3, strides=1, activation="relu"),
        layers.Flatten(),
        layers.Dense(512, activation="relu"),
        layers.Dense(num_actions, activation="linear")
    ])
    model.compile(optimizer=keras.optimizers.Adam(learning_rate=0.00025, clipnorm=1.0),
                  loss=keras.losses.Huber())
    return model

# Create environment with preprocessing
def create_env(config):
    env = gym.make(config.env_name, render_mode=config.render_mode)
    # Apply Atari preprocessing: max pooling, frame skipping, etc.
    env = gym.wrappers.AtariPreprocessing(
        env,
        frame_skip=4,
        screen_size=84,
        grayscale_obs=True,
        scale_obs=False,
        terminal_on_life_loss=True  # This helps with training
    )
    # Use our custom frame stacking
    env = CustomFrameStack(env, config.num_stack)
    return env

# Experience replay memory
class ReplayMemory:
    def __init__(self, capacity):
        self.memory = deque(maxlen=capacity)
    
    def add(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        batch = random.sample(self.memory, batch_size)
        states, actions, rewards, next_states, dones = map(np.array, zip(*batch))
        return states, actions, rewards, next_states, dones
    
    def __len__(self):
        return len(self.memory)

# Main training function
def train_dqn():
    # Initialize configuration
    config = DQNConfig()
    
    # Create checkpoint directory
    os.makedirs(config.checkpoint_dir, exist_ok=True)
    
    # Set up environment
    env = create_env(config)
    
    # Create Q-networks
    model = create_q_model(config.num_actions)
    target_model = create_q_model(config.num_actions)
    target_model.set_weights(model.get_weights())
    
    # Initialize replay memory
    memory = ReplayMemory(config.replay_memory_size)
    
    # Training metrics
    frame_count = 0
    episode_count = 0
    rewards_history = []
    epsilon_history = []
    avg_q_history = []
    loss_history = []
    
    # Start training
    print("Starting training...")
    start_time = time.time()
    
    while frame_count < config.total_frames:
        episode_count += 1
        state, _ = env.reset()
        episode_reward = 0
        episode_loss = []
        episode_q_values = []
        
        for step in range(config.max_episode_length):
            frame_count += 1
            epsilon = config.epsilon_by_frame(frame_count)
            epsilon_history.append(epsilon)
            
            # Select action: epsilon-greedy policy
            if random.random() < epsilon:
                action = random.randrange(config.num_actions)
            else:
                state_tensor = tf.convert_to_tensor(state)
                state_tensor = tf.expand_dims(state_tensor, 0)
                q_values = model(state_tensor, training=False)
                episode_q_values.append(float(tf.reduce_mean(q_values)))
                action = tf.argmax(q_values[0]).numpy()
            
            # Take action and observe result
            next_state, reward, done, truncated, _ = env.step(action)
            episode_reward += reward
            
            # Store transition in memory
            memory.add(state, action, reward, next_state, done or truncated)
            
            # Move to next state
            state = next_state
            
            # Train every 4 frames once memory has enough samples
            if frame_count % 4 == 0 and len(memory) > config.min_replay_memory_size:
                # Sample from replay memory
                states, actions, rewards, next_states, dones = memory.sample(config.batch_size)
                
                # Calculate target Q values
                future_rewards = target_model.predict(next_states, verbose=0)
                target_q_values = rewards + config.gamma * np.max(future_rewards, axis=1) * (1 - dones)
                
                # Update Q-value for actions taken
                masks = tf.one_hot(actions, config.num_actions)
                with tf.GradientTape() as tape:
                    q_values = model(states)
                    q_action = tf.reduce_sum(q_values * masks, axis=1)
                    loss = tf.reduce_mean(keras.losses.huber(target_q_values, q_action))
                
                # Backpropagation
                grads = tape.gradient(loss, model.trainable_variables)
                model.optimizer.apply_gradients(zip(grads, model.trainable_variables))
                episode_loss.append(float(loss))
            
            # Update target network
            if frame_count % config.target_update_freq == 0:
                target_model.set_weights(model.get_weights())
                print(f"Updated target network at frame {frame_count}")
            
            # Save model
            if frame_count % config.save_freq == 0:
                model_path = os.path.join(config.checkpoint_dir, f"model_frame_{frame_count}.keras")
                model.save(model_path)
                print(f"Model saved at {model_path}")
            
            # Show progress
            if frame_count % 10000 == 0:
                elapsed_time = time.time() - start_time
                avg_reward = np.mean(rewards_history[-100:]) if rewards_history else 0
                print(f"Frame: {frame_count}/{config.total_frames} | "
                      f"Episode: {episode_count} | "
                      f"Epsilon: {epsilon:.4f} | "
                      f"Avg Reward (last 100): {avg_reward:.2f} | "
                      f"Time elapsed: {elapsed_time:.2f}s")
            
            if done or truncated:
                break
        
        # Record episode statistics
        rewards_history.append(episode_reward)
        avg_q = np.mean(episode_q_values) if episode_q_values else 0
        avg_q_history.append(avg_q)
        avg_loss = np.mean(episode_loss) if episode_loss else 0
        loss_history.append(avg_loss)
        
        print(f"Episode {episode_count} finished | "
              f"Frames: {frame_count} | "
              f"Reward: {episode_reward} | "
              f"Avg Q: {avg_q:.4f} | "
              f"Avg Loss: {avg_loss:.4f}")
        
        # Save best model
        if len(rewards_history) >= 10 and episode_reward >= max(rewards_history[:-1]):
            model_path = os.path.join(config.checkpoint_dir, "best_model.keras")
            model.save(model_path)
            print(f"Saved best model with reward {episode_reward}")
    
    # Save final model
    final_model_path = os.path.join(config.checkpoint_dir, "final_model.keras")
    model.save(final_model_path)
    print(f"Training completed. Final model saved at {final_model_path}")
    
    return model, {
        "rewards": rewards_history,
        "epsilon": epsilon_history,
        "avg_q": avg_q_history,
        "loss": loss_history
    }

# Function to evaluate trained model
def evaluate_model(model_path, num_episodes=10):
    config = DQNConfig()
    config.render_mode = "human"  # Set to human to visualize
    
    env = create_env(config)
    model = keras.models.load_model(model_path)
    
    total_rewards = []
    
    for episode in range(num_episodes):
        state, _ = env.reset()
        done = False
        total_reward = 0
        
        while not done:
            state_tensor = tf.convert_to_tensor(state)
            state_tensor = tf.expand_dims(state_tensor, 0)
            action_probs = model(state_tensor, training=False)
            action = tf.argmax(action_probs[0]).numpy()
            
            state, reward, done, truncated, _ = env.step(action)
            total_reward += reward
            
            if done or truncated:
                break
        
        total_rewards.append(total_reward)
        print(f"Episode {episode+1}: Reward = {total_reward}")
    
    print(f"Average reward over {num_episodes} episodes: {np.mean(total_rewards)}")
    env.close()

# Run training if executed directly
if __name__ == "__main__":
    model, metrics = train_dqn()

DependencyNotInstalled: opencv-python package not installed, run `pip install "gymnasium[other]"` to get dependencies for atari