In [1]:
import numpy as np
import random
import gymnasium as gym
import ale_py
from collections import deque
import os
import cv2
import time
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, Flatten, Dense
from tensorflow.keras.optimizers import Adam



In [2]:
def process_frame(frame, shape=(84, 84)):
    """
    Preprocess a single frame:
    - Convert to grayscale
    - Resize to desired shape
    - Normalize pixel values
    """
    # Convert to grayscale
    gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)

    # Resize
    resized = cv2.resize(gray, shape, interpolation=cv2.INTER_AREA)

    # Normalize
    normalized = resized.astype(np.float32) / 255.0

    # Reshape to match model input
    return normalized.reshape((*shape, 1))

In [3]:
def create_replay_buffer(capacity):
    """Create a new experience replay buffer"""
    return deque(maxlen=capacity)

def add_to_buffer(buffer, state, action, reward, next_state, done):
    """Add transition to replay buffer"""
    buffer.append((state, action, reward, next_state, done))

def sample_from_buffer(buffer, batch_size):
    """Sample batch of transitions from replay buffer"""
    batch = random.sample(buffer, batch_size)
    states, actions, rewards, next_states, dones = zip(*batch)
    return np.array(states), np.array(actions), np.array(rewards), np.array(next_states), np.array(dones)

In [4]:
def build_model(state_shape, n_actions, learning_rate=0.00025):
    """Build CNN model for DQN"""
    model = Sequential([
        Conv2D(32, (8, 8), strides=(4, 4), activation='relu', input_shape=state_shape),
        Conv2D(64, (4, 4), strides=(2, 2), activation='relu'),
        Conv2D(64, (3, 3), strides=(1, 1), activation='relu'),
        Flatten(),
        Dense(512, activation='relu'),
        Dense(n_actions)
    ])

    model.compile(optimizer=Adam(learning_rate=learning_rate), loss='mse')
    return model

In [5]:
def update_target_model(model, target_model):
    """Copy weights from main model to target model"""
    target_model.set_weights(model.get_weights())

In [6]:
def choose_action(state, model, n_actions, epsilon):
    """Choose action using epsilon-greedy policy"""
    if np.random.rand() <= epsilon:
        return random.randrange(n_actions)

    q_values = model.predict(state[np.newaxis, ...], verbose=0)
    return np.argmax(q_values[0])

In [7]:
def train_model(model, target_model, replay_buffer, batch_size, gamma):
    """Train model using experience replay"""
    if len(replay_buffer) < batch_size:
        return 0  # Return loss value (0 if no training happened)

    # Sample batch from replay buffer
    states, actions, rewards, next_states, dones = sample_from_buffer(replay_buffer, batch_size)

    # Get target Q values from target model
    target_q_values = target_model.predict(next_states, verbose=0)
    max_target_q = np.max(target_q_values, axis=1)

    # Calculate target using Bellman equation
    targets = rewards + gamma * max_target_q * (1 - dones)

    # Get current Q values and update with targets
    q_values = model.predict(states, verbose=0)

    # Update only the Q values for the actions taken
    for i, action in enumerate(actions):
        q_values[i][action] = targets[i]

    # Train the model
    history = model.fit(states, q_values, epochs=1, verbose=0)
    return history.history['loss'][0] if 'loss' in history.history else 0

In [8]:
def decay_epsilon(epsilon, epsilon_min, epsilon_decay):
    """Decay exploration rate"""
    if epsilon > epsilon_min:
        return epsilon * epsilon_decay
    return epsilon

In [9]:
def load_model_weights(model, target_model, filepath):
    """Load model weights from file"""
    model.load_weights(filepath)
    update_target_model(model, target_model)

In [10]:
def save_model_weights(model, filepath):
    """Save model weights to file"""
    model.save_weights(filepath)

In [11]:
def train_skiing_agent(num_episodes=50000, target_update_freq=1000,
                      save_freq=100, render=False, model_path="skiing_dqn_model.h5"):
    """Train DQN agent on Skiing environment"""
    # Create Atari environment
    env = gym.make("ALE/Skiing-v5")

    # Define parameters
    processed_frame_shape = (84, 84, 1)  # Single frame, not stacked
    n_actions = env.action_space.n
    learning_rate = 0.00025
    gamma = 0.99  # Discount factor
    epsilon = 1.0  # Initial exploration rate
    epsilon_min = 0.1
    epsilon_decay = 0.995
    buffer_size = 100000
    batch_size = 32

    # Create models and replay buffer
    model = build_model(processed_frame_shape, n_actions, learning_rate)
    target_model = build_model(processed_frame_shape, n_actions, learning_rate)
    update_target_model(model, target_model)
    replay_buffer = create_replay_buffer(buffer_size)

    # Training statistics
    episode_rewards = []
    step_counter = 0

    # Main training loop
    for episode in range(num_episodes):
        start_time = time.time()

        # Reset environment and preprocess initial frame
        observation, info = env.reset()
        state = process_frame(observation)

        done = False
        episode_reward = 0
        step = 0

        # Episode loop
        while not done:
            if render:
                env.render()

            # Choose and perform action
            action = choose_action(state, model, n_actions, epsilon)
            observation, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated

            # Process new frame
            next_state = process_frame(observation)

            # Store transition
            add_to_buffer(replay_buffer, state, action, reward, next_state, done)

            # Train network
            train_model(model, target_model, replay_buffer, batch_size, gamma)

            # Update state and counters
            state = next_state
            episode_reward += reward
            step += 1
            step_counter += 1

            # Update target network periodically
            if step_counter % target_update_freq == 0:
                update_target_model(model, target_model)

        # Decay epsilon after each episode
        epsilon = decay_epsilon(epsilon, epsilon_min, epsilon_decay)

        # End of episode processing
        episode_rewards.append(episode_reward)

        # Calculate average reward over last 100 episodes
        avg_reward = np.mean(episode_rewards[-100:]) if len(episode_rewards) >= 100 else np.mean(episode_rewards)

        # Display progress
        duration = time.time() - start_time
        print(f"Episode: {episode+1}/{num_episodes}, Steps: {step}, Reward: {episode_reward:.2f}, "
              f"Avg Reward: {avg_reward:.2f}, Epsilon: {epsilon:.4f}, Duration: {duration:.2f}s")

        # Save model periodically
        if (episode + 1) % save_freq == 0:
            save_model_weights(model, f"{model_path}_episode_{episode+1}")
            print(f"Model saved at episode {episode+1}")

    # Final save
    save_model_weights(model, model_path)
    env.close()

    return model, target_model, episode_rewards

In [12]:
# Train agent
NUM_EPISODES = 10000
RENDER_TRAINING = False
print("Starting training...")
model, target_model, rewards = train_skiing_agent(
        num_episodes=NUM_EPISODES,
        render=RENDER_TRAINING,
        model_path=MODEL_PATH
 )
print("Training and testing completed.")

Starting training...


NameError: name 'MODEL_PATH' is not defined

In [None]:
def test_agent(model_path, num_episodes):
    """Test trained DQN agent on Skiing environment"""
    
    # Define parameters
    processed_frame_shape = (84, 84, 1)  # Match training shape
    n_actions = env.action_space.n
    epsilon_test = 0.01  # Small epsilon for some exploration during testing

    # Create models
    model = build_model(processed_frame_shape, n_actions)
    target_model = build_model(processed_frame_shape, n_actions)
    
    # Load trained model
    load_model_weights(model, target_model, model_path)

    # Test loop
    for episode in range(num_episodes):
        # Reset environment
        observation, info = env.reset()
        state = process_frame(observation)  # Single frame processing

        done = False
        episode_reward = 0
        step = 0

        # Episode loop
        while not done:

            # Choose action
            action = choose_action(state, model, n_actions, epsilon_test)
            observation, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated

            # Process new frame
            next_state = process_frame(observation)  # Single frame processing

            # Update state and counters
            state = next_state
            episode_reward += reward
            step += 1

        print(f"Test Episode: {episode+1}/{num_episodes}, Steps: {step}, Reward: {episode_reward:.2f}")

    env.close()

In [None]:
# Test agent
env = gym.make("ALE/Skiing-v5", render_mode="human")
obsersvation, info = env.reset()
MODEL_PATH = "skiing_dqn_model.h5_episode_10.weights.h5"
test_agent(MODEL_PATH, num_episodes=10)  # Test for 10 episodes