In [1]:
pip install "numpy<2"

Note: you may need to restart the kernel to use updated packages.


In [2]:
pip install ale_py

Note: you may need to restart the kernel to use updated packages.


In [3]:
pip install "gymnasium[atari]"

Note: you may need to restart the kernel to use updated packages.


In [4]:
import gymnasium as gym
import numpy as np
import tensorflow as tf
from tensorflow import keras
from keras.models import Sequential
from keras.layers import Conv2D, Flatten, Dense
from keras.optimizers import Adam
import cv2
import random
from collections import deque
import time
import os
import ale_py

In [5]:
# Preprocessing function
def preprocess_frame(frame):
    """Convert RGB data to grayscale. Keep original dimensions (210x160)"""
    gray_frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
    normalized_frame = gray_frame / 255.0  # Normalize pixel values
    return normalized_frame

In [6]:
def build_model(action_size):
    """CNN architecture"""
    model = Sequential()
    model.add(Conv2D(32, (8, 8), strides=(4, 4), activation='relu', input_shape=(210, 160, 1)))
    model.add(Conv2D(64, (4, 4), strides=(2, 2), activation='relu'))
    model.add(Conv2D(64, (3, 3), strides=(1, 1), activation='relu'))
    model.add(Flatten())
    model.add(Dense(512, activation='relu'))
    model.add(Dense(action_size, activation='linear'))
    
    model.compile(loss='mse', optimizer=Adam(learning_rate=0.00025))
    return model

In [7]:
# System for progressively saving weights as the CNN trains
def create_memory(capacity=100000):
    """Create a memory buffer for experience replay"""
    return deque(maxlen=capacity)

def add_to_memory(memory, state, action, reward, next_state, done):
    """Add experience to memory"""
    memory.append((state, action, reward, next_state, done))

def sample_from_memory(memory, batch_size):
    """Sample random batch from memory"""
    return random.sample(memory, batch_size)

In [8]:
# Epsilon-greedy policy -- balances exploration and exploitation
def epsilon_greedy_action(model, state, epsilon, action_size):
    """Choose action using epsilon-greedy policy"""
    if np.random.random() <= epsilon:
        return random.randrange(action_size)  # Explore: choose random action
    else:
        # Exploit: choose best action
        q_values = model.predict(np.expand_dims(state, axis=0), verbose=0)[0]
        return np.argmax(q_values)  # Choose action with highest Q-value

In [12]:
def train_dqn(episodes=10000, 
              max_steps=50000, 
              batch_size=32, 
              gamma=0.99, 
              epsilon_start=1.0, 
              epsilon_end=0.1, 
              epsilon_decay=0.995,
              update_target_freq=10000,
              memory_capacity=100000,
              save_freq=100):
    """Train a DQN model on Frogger with reward shaping to encourage faster completion."""
    # Create environment
    env = gym.make('ALE/Frogger-v5')
    action_size = env.action_space.n
    
    # Create main and target models
    main_model = build_model(action_size)
    target_model = build_model(action_size)
    target_model.set_weights(main_model.get_weights())
    
    # Experience replay memory
    memory = create_memory(capacity=memory_capacity)
    
    total_steps = 0
    epsilon = epsilon_start
    
    save_dir = "frogger_model"
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
    
    for episode in range(1, episodes + 1):
        frame, info = env.reset()
        state = preprocess_frame(frame)
        state = np.expand_dims(state, axis=-1)

        episode_reward = 0
        previous_row = info.get("player_y", 0)

        for step in range(max_steps):
            action = epsilon_greedy_action(main_model, state, epsilon, action_size)
            next_frame, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated
            
            next_state = preprocess_frame(next_frame)
            next_state = np.expand_dims(next_state, axis=-1)

            # --- Custom reward shaping ---
            time_penalty = -0.01
            completion_bonus = 50 if terminated and reward > 0 else 0

            current_row = info.get("player_y", previous_row)
            progress_reward = (previous_row - current_row) * 0.1  # Encourage upward movement
            previous_row = current_row

            # Adjust the reward
            reward += time_penalty + completion_bonus + progress_reward
            # --------------------------------

            add_to_memory(memory, state, action, reward, next_state, done)

            state = next_state
            episode_reward += reward
            total_steps += 1

            if len(memory) > batch_size:
                minibatch = sample_from_memory(memory, batch_size)

                states = np.array([experience[0] for experience in minibatch])
                actions = np.array([experience[1] for experience in minibatch])
                rewards = np.array([experience[2] for experience in minibatch])
                next_states = np.array([experience[3] for experience in minibatch])
                dones = np.array([experience[4] for experience in minibatch])

                target_q_values = main_model.predict(states, verbose=0)
                next_q_values = target_model.predict(next_states, verbose=0)

                for i in range(batch_size):
                    if dones[i]:
                        target_q_values[i, actions[i]] = rewards[i]
                    else:
                        target_q_values[i, actions[i]] = rewards[i] + gamma * np.max(next_q_values[i])

                main_model.fit(states, target_q_values, epochs=1, verbose=0)

            if total_steps % update_target_freq == 0:
                target_model.set_weights(main_model.get_weights())
                print(f"Target network updated at step {total_steps}")

            if done:
                break

        if epsilon > epsilon_end:
            epsilon *= epsilon_decay

        print(f"Episode: {episode}, Reward: {episode_reward:.2f}, Epsilon: {epsilon:.4f}, Steps: {step+1}")

        if episode % save_freq == 0:
            main_model.save(f"{save_dir}/frogger_dqn_episode_{episode}.h5")
            print(f"Model saved at episode {episode}")
    
    main_model.save(f"{save_dir}/frogger_dqn_final.h5")
    print("Training completed!")
    
    env.close()
    return main_model

In [1]:
# Main execution
if __name__ == "__main__":
    # You can adjust these parameters as needed
    train_dqn(episodes=1000,
              max_steps=10000, 
              batch_size=1, 
              gamma=0.99, 
              epsilon_start=1.0, 
              epsilon_end=0.01, 
              epsilon_decay=0.995,
              update_target_freq=1000,
              memory_capacity=50000,
              save_freq=50)

NameError: name 'train_dqn' is not defined