# ConvPyBoy: Training AI to Play Tetris

This notebook introduces ConvPyBoy, a project that trains reinforcement learning agents to play Tetris using PyBoy (a Game Boy emulator) and convolutional neural networks. We'll explore:

1. Setting up the environment
2. Understanding the CNN architecture
3. Training an agent
4. Visualizing the agent's performance
5. Analyzing the training results

Let's get started!

## 1. Setup and Dependencies

First, let's import the necessary packages and modules for this project.

In [None]:
import os
import sys
import torch
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime
from tqdm import tqdm
from IPython.display import clear_output

# Make sure the project root is in the path
sys.path.append('..')

# Import project modules
from environment.tetris_env import TetrisEnv
from models.cnn_model import TetrisCNN, DuelingTetrisCNN
from agents.dqn_agent import DQNAgent
from utils.device_utils import get_device, print_device_info

### Check Compute Device

Next, let's check what compute device (CPU or GPU) we'll be using.

In [None]:
device = get_device()
print_device_info()
print(f"Using device: {device}")

## 2. The Tetris Environment

The `TetrisEnv` class provides a reinforcement learning-friendly interface to the PyBoy Tetris emulation. Let's create an instance and explore how it works.

In [None]:
# Path to the Tetris ROM file
ROM_PATH = "../tetris.gb"

# Create the environment
env = TetrisEnv(rom_path=ROM_PATH, headless=False, down_sample=2, frameskip=4)

### Environment Parameters

- **rom_path**: Path to the Tetris ROM file
- **headless**: Whether to run in headless mode (no visualization)
- **down_sample**: Factor by which to downsample the screen
- **frameskip**: Number of frames to skip between observations

### Environment Interface

The environment follows the standard RL interface:
- `reset()`: Resets the environment and returns the initial observation
- `step(action)`: Takes an action and returns (next_state, reward, done, info)
- `close()`: Closes the environment

Let's see what the initial observation looks like.

In [None]:
# Reset environment and get initial state
initial_state = env.reset()
print(f"Initial state shape: {initial_state.shape}")

# Visualize initial state
plt.figure(figsize=(5, 5))
plt.imshow(initial_state)
plt.title("Tetris Initial State")
plt.axis('off')
plt.show()

### Available Actions

The environment supports the following actions:

In [None]:
print("Available actions:")
for action_id, action_name in env.ACTIONS.items():
    print(f"{action_id}: {action_name}")

print(f"\nTotal number of actions: {len(env.ACTIONS)}")

### Manual Play Test

Let's test the environment by manually playing a few steps with random actions.

In [None]:
def play_random_steps(env, num_steps=10):
    """Play a few random steps and visualize the results"""
    # Reset environment
    state = env.reset()
    total_reward = 0
    
    # Play random actions
    for step in range(num_steps):
        # Choose random action
        action = np.random.randint(0, len(env.ACTIONS))
        
        # Take action
        next_state, reward, done, info = env.step(action)
        
        # Accumulate reward
        total_reward += reward
        
        # Print step info
        print(f"Step {step+1}: Action={env.ACTIONS[action]}, Reward={reward:.2f}, Score={info['score']}, Done={done}")
        
        # Visualize state
        plt.figure(figsize=(5, 5))
        plt.imshow(next_state)
        plt.title(f"Step {step+1} - Action: {env.ACTIONS[action]}")
        plt.axis('off')
        plt.show()
        
        # Update state
        state = next_state
        
        # Break if game is done
        if done:
            print("Game over!")
            break
            
    print(f"\nTotal reward: {total_reward:.2f}")
    
# Play 5 random steps
play_random_steps(env, num_steps=5)

In [None]:
# Close the environment
env.close()

## 3. The CNN Model Architecture

The project uses convolutional neural networks (CNNs) to process the game screen and predict the best actions. Let's examine the model architecture.

In [None]:
# Create a temporary environment to get state dimensions
env = TetrisEnv(rom_path=ROM_PATH, headless=True, down_sample=2)
initial_state = env.reset()
env.close()

# Get state dimensions
input_height, input_width, input_channels = initial_state.shape
num_actions = len(env.ACTIONS)

print(f"Input dimensions: {initial_state.shape}")
print(f"Number of actions: {num_actions}")

# Create the CNN model
cnn_model = TetrisCNN(input_channels=input_channels, 
                       input_height=input_height, 
                       input_width=input_width, 
                       num_actions=num_actions)

# Print model architecture
print("\nCNN Model Architecture:")
print(cnn_model)

### Model Architecture Explanation

The CNN model consists of:

1. **Convolutional layers**: Three convolutional layers that extract spatial features from the game screen:
   - Conv1: 32 filters with 5×5 kernel, stride 2, padding 2
   - Conv2: 64 filters with 4×4 kernel, stride 2, no padding
   - Conv3: 64 filters with 3×3 kernel, stride 1, no padding

2. **Fully connected layers**: Two fully connected layers that map extracted features to action values:
   - FC1: Maps flattened features to 512 units
   - FC2: Maps 512 units to action space size

Let's also look at the dueling architecture which is an advanced variant of DQN.

In [None]:
# Create the Dueling CNN model
dueling_cnn = DuelingTetrisCNN(input_channels=input_channels, 
                               input_height=input_height, 
                               input_width=input_width, 
                               num_actions=num_actions)

# Print model architecture
print("\nDueling CNN Architecture:")
print(dueling_cnn)

### Dueling Architecture Explanation

The Dueling CNN separates the estimation of the state value and the action advantages, which can lead to better policy evaluation. It consists of:

1. **Convolutional layers**: Same as the base CNN model

2. **Value stream**: Estimates the value of being in the given state
   - FC from 512 to 256 units
   - Output layer with a single value

3. **Advantage stream**: Estimates the relative advantage of each action
   - FC from 512 to 256 units
   - Output layer with one value per action

The dueling architecture combines these streams to produce better Q-values.

## 4. DQN Agent Implementation

Let's examine the DQN (Deep Q-Network) agent that uses our CNN models for learning.

In [None]:
# Import the DQNAgent class
from agents.dqn_agent import DQNAgent

# Create a DQN agent
agent = DQNAgent(
    state_shape=(input_channels, input_height, input_width),
    num_actions=num_actions,
    learning_rate=0.0001,
    dueling=True  # Use dueling architecture
)

print(f"DQN Agent created with {'dueling' if agent.dueling else 'standard'} architecture")

## 5. Training the Agent

Let's implement a simplified training loop to train our agent on the Tetris environment.

In [None]:
def train_agent(env, agent, num_episodes=10, max_steps=1000, epsilon_start=1.0, epsilon_final=0.1, epsilon_decay=0.995):
    """Train the agent on the environment"""
    
    # Lists to store metrics
    scores = []
    episode_lengths = []
    epsilon_values = []
    loss_values = []
    
    # Current exploration rate
    epsilon = epsilon_start
    
    # Training loop
    for episode in range(1, num_episodes + 1):
        # Reset environment
        state = env.reset()
        score = 0
        episode_loss = []
        
        # Episode loop
        for step in range(1, max_steps + 1):
            # Select action
            action = agent.act(state, epsilon)
            
            # Take action
            next_state, reward, done, info = env.step(action)
            
            # Store experience in replay buffer
            agent.remember(state, action, reward, next_state, done)
            
            # Learn from experience
            if len(agent.memory) > agent.batch_size:
                loss = agent.learn()
                episode_loss.append(loss)
            
            # Update state and score
            state = next_state
            score += reward
            
            # Break if game is done
            if done:
                break
        
        # Update exploration rate
        epsilon = max(epsilon_final, epsilon * epsilon_decay)
        
        # Store metrics
        scores.append(score)
        episode_lengths.append(step)
        epsilon_values.append(epsilon)
        loss_values.append(np.mean(episode_loss) if episode_loss else 0)
        
        # Print progress
        print(f"Episode {episode}/{num_episodes} - Score: {score:.2f}, Steps: {step}, Epsilon: {epsilon:.4f}, Avg Loss: {loss_values[-1]:.6f}")
        
        # Plot progress every few episodes
        if episode % 5 == 0 or episode == num_episodes:
            clear_output(wait=True)
            plot_training_results(scores, episode_lengths, epsilon_values, loss_values)
            print(f"Episode {episode}/{num_episodes} - Score: {score:.2f}, Steps: {step}, Epsilon: {epsilon:.4f}, Avg Loss: {loss_values[-1]:.6f}")
    
    return scores, episode_lengths, epsilon_values, loss_values

def plot_training_results(scores, episode_lengths, epsilon_values, loss_values):
    """Plot training metrics"""
    plt.figure(figsize=(15, 10))
    
    # Plot scores
    plt.subplot(2, 2, 1)
    plt.plot(scores)
    plt.title('Episode Scores')
    plt.xlabel('Episode')
    plt.ylabel('Score')
    
    # Plot episode lengths
    plt.subplot(2, 2, 2)
    plt.plot(episode_lengths)
    plt.title('Episode Lengths')
    plt.xlabel('Episode')
    plt.ylabel('Steps')
    
    # Plot epsilon values
    plt.subplot(2, 2, 3)
    plt.plot(epsilon_values)
    plt.title('Exploration Rate (Epsilon)')
    plt.xlabel('Episode')
    plt.ylabel('Epsilon')
    
    # Plot loss values
    plt.subplot(2, 2, 4)
    plt.plot(loss_values)
    plt.title('Average Loss')
    plt.xlabel('Episode')
    plt.ylabel('Loss')
    
    plt.tight_layout()
    plt.show()

### Start Training

Now, let's train our agent on the Tetris environment. Note that this can take significant time and computational resources.

In [None]:
# Create a new environment for training
env = TetrisEnv(rom_path=ROM_PATH, headless=True, down_sample=2, frameskip=4)

# Create a new agent
agent = DQNAgent(
    state_shape=(input_channels, input_height, input_width),
    num_actions=num_actions,
    learning_rate=0.0001,
    memory_size=10000,
    batch_size=64,
    gamma=0.99,
    target_update=10,
    dueling=True
)

# Train the agent
scores, episode_lengths, epsilon_values, loss_values = train_agent(
    env=env,
    agent=agent,
    num_episodes=20,  # Start with a small number for demonstration
    max_steps=2000,
    epsilon_start=1.0,
    epsilon_final=0.1,
    epsilon_decay=0.95
)

# Close the environment
env.close()

### Save the Trained Agent

After training, let's save our agent for later use.

In [None]:
# Create save directory if it doesn't exist
save_dir = "../data/agents"
os.makedirs(save_dir, exist_ok=True)

# Generate timestamp for unique filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_path = os.path.join(save_dir, f"notebook_agent_{timestamp}.pt")

# Save the agent
agent.save(save_path)
print(f"Agent saved to {save_path}")

## 6. Testing the Agent

Let's test our trained agent or one of the pre-trained agents on the Tetris environment.

In [None]:
# Define the path to a pre-trained agent
pretrained_path = "../data/agents/best_agent_gen3.pt"  # Using one of the pre-trained agents

# Create a new agent and load the pre-trained weights
test_agent = DQNAgent(
    state_shape=(input_channels, input_height, input_width),
    num_actions=num_actions,
    dueling=True
)

# Load pre-trained weights
test_agent.load(pretrained_path)
print(f"Loaded pre-trained agent from {pretrained_path}")

In [None]:
def test_agent_performance(env, agent, num_episodes=3, max_steps=5000, render=True):
    """Test the agent's performance"""
    all_scores = []
    all_steps = []
    
    for episode in range(1, num_episodes + 1):
        state = env.reset()
        total_reward = 0
        step = 0
        
        print(f"\nEpisode {episode}/{num_episodes}")
        
        for step in range(1, max_steps + 1):
            # Choose action with no exploration (epsilon=0)
            action = agent.act(state, epsilon=0)
            
            # Take action
            next_state, reward, done, info = env.step(action)
            
            # Update total reward and state
            total_reward += reward
            state = next_state
            
            # Print progress every 100 steps
            if step % 100 == 0:
                print(f"Step {step} - Score: {info['score']}, Reward: {total_reward:.2f}")
                
                # Render the current state
                if render:
                    plt.figure(figsize=(5, 5))
                    plt.imshow(next_state)
                    plt.title(f"Episode {episode} - Step {step}\nScore: {info['score']}")
                    plt.axis('off')
                    plt.show()
            
            # Break if game is done
            if done:
                break
        
        # Record episode stats
        all_scores.append(info['score'])
        all_steps.append(step)
        
        print(f"Episode {episode} finished - Score: {info['score']}, Steps: {step}, Total Reward: {total_reward:.2f}")
    
    # Print overall stats
    print(f"\nOverall performance over {num_episodes} episodes:")
    print(f"Average score: {np.mean(all_scores):.2f}")
    print(f"Average steps: {np.mean(all_steps):.2f}")
    print(f"Best score: {np.max(all_scores)}")
    
    return all_scores, all_steps

In [None]:
# Create a new environment for testing
env = TetrisEnv(rom_path=ROM_PATH, headless=False, down_sample=2, frameskip=4)

# Test the agent
test_scores, test_steps = test_agent_performance(
    env=env,
    agent=test_agent,
    num_episodes=1,  # Test for 1 episode for demonstration
    max_steps=2000,
    render=True
)

# Close the environment
env.close()

## 7. Evolutionary Training

This project also supports evolutionary training through the `Population` class. Let's briefly explore how this works.

In [None]:
# Import the Population class
from agents.population import Population

# Create a small population for demonstration
population = Population(
    state_shape=(input_channels, input_height, input_width),
    num_actions=num_actions,
    population_size=5,  # Small population for demonstration
    mutation_rate=0.05,
    mutation_scale=0.1,
    elite_count=1,
    save_dir="../data/agents",
    training_steps_per_episode=16
)

print(f"Created population with {population.population_size} agents")

### Evolutionary Training Process

The evolutionary training process implemented in this project follows these steps:

1. **Initialize population**: Create a population of agents with random neural networks
2. **Evaluate fitness**: Test each agent's performance on the environment
3. **Selection**: Select the best-performing agents for reproduction
4. **Reproduction**: Create new agents by combining and mutating the selected agents
5. **Repeat**: Evaluate the new population and continue the process

This approach can efficiently explore the parameter space and find good solutions without requiring gradient information.

## 8. Analyzing Pre-trained Agents

Let's analyze some of the pre-trained agents to understand their performance and behavior.

In [None]:
# List available pre-trained agents
agents_dir = "../data/agents"
pretrained_agents = [f for f in os.listdir(agents_dir) if f.endswith(".pt")]

print("Available pre-trained agents:")
for i, agent_file in enumerate(pretrained_agents):
    print(f"{i+1}. {agent_file}")

## 9. Conclusion

In this notebook, we explored the ConvPyBoy project for training AI agents to play Tetris using PyBoy and reinforcement learning. We covered:

1. Setting up the Tetris environment with PyBoy
2. Understanding the CNN architecture used for processing game screens
3. Training DQN agents to play Tetris
4. Testing pre-trained agents
5. Introduction to evolutionary training

The project demonstrates how deep reinforcement learning can be applied to classic games, combining modern AI techniques with retro gaming emulation. The agents learn to make sense of pixel inputs and develop strategies for playing Tetris effectively.

### Next Steps

If you want to continue exploring this project, here are some suggestions:

1. Train agents for longer periods to achieve higher scores
2. Experiment with different CNN architectures or hyperparameters
3. Try different reinforcement learning algorithms like PPO or A3C
4. Apply the same approach to other Game Boy games
5. Implement reward shaping to guide the learning process more effectively