In [None]:
#!pip install flappy-bird-gymnasium
#!pip install gymnasium
#!pip install matplotlib
#!pip install torch
#!pip install tqdm

# Cell 1: Import necessary libraries
import gymnasium as gym
import flappy_bird_gymnasium
import numpy as np
import matplotlib.pyplot as plt
import random
import torch
from torch import nn
import torch.nn.functional as F
from collections import deque
from IPython.display import clear_output, display, HTML
from datetime import datetime
import time
from matplotlib.animation import FuncAnimation
import os
import requests
from tqdm.notebook import tqdm

# Use GPU if available
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Using device: {device}")

In [None]:
class DQN(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=256):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.output = nn.Linear(hidden_dim, action_dim)
        
    def forward(self, x):
        x = F.relu(self.fc1(x))
        Q = self.output(x)
        return Q

In [None]:
class ReplayMemory:
    def __init__(self, maxlen):
        self.memory = deque([], maxlen=maxlen)
        
    def append(self, transition):
        self.memory.append(transition)
        
    def sample(self, sample_size):
        return random.sample(self.memory, sample_size)
    
    def __len__(self):
        return len(self.memory)


In [None]:
hyperparameters = {
    # Which environment are we using?
    "env_id": "FlappyBird-v0",
    
    # Memory size for storing past experiences
    "replay_memory_size": 100000,
    
    # Number of experiences to sample for each learning update
    "mini_batch_size": 32,
    
    # Initial exploration rate (1.0 = 100% random actions)
    "epsilon_init": 1.0,
    
    # Rate at which exploration decreases
    "epsilon_decay": 0.9995,
    
    # Minimum exploration rate (never explore less than this)
    "epsilon_min": 0.05,
    
    # How often to update the target network (in steps)
    "network_sync_rate": 10,
    
    # Learning rate for the neural network
    "learning_rate": 0.0001,
    
    # Discount factor for future rewards
    "discount_factor": 0.99,
    
    # Training will stop if agent gets this much reward in one episode
    "max_reward_threshold": 10000,
    
    # Neural network hidden layer size
    "fc1_nodes": 256,
    
    # Parameters about the environment we're going to use.
    # In our case, we just want the locations of the pipes (relative to us), 
    # and not the lidar scanning of the bird. 
    "env_make_params": {"use_lidar": False},
    
    # Stop training on reward threshold
    "stop_on_reward": 10000
}


In [None]:

class Agent:
    def __init__(self):
        # Hyperparameters
        self.env_id = hyperparameters['env_id']
        self.learning_rate_a = hyperparameters['learning_rate']
        self.discount_factor_g = hyperparameters['discount_factor']
        self.network_sync_rate = hyperparameters['network_sync_rate']
        self.replay_memory_size = hyperparameters['replay_memory_size']
        self.mini_batch_size = hyperparameters['mini_batch_size']
        self.epsilon_init = hyperparameters['epsilon_init']
        self.epsilon_decay = hyperparameters['epsilon_decay']
        self.epsilon_min = hyperparameters['epsilon_min']
        self.stop_on_reward = hyperparameters['stop_on_reward']
        self.fc1_nodes = hyperparameters['fc1_nodes']
        self.env_make_params = hyperparameters.get('env_make_params', {})
        
        # Neural Network loss function
        self.loss_fn = nn.MSELoss()
        self.optimizer = None
        
        # For visualisation in notebook
        self.fig = None
        self.ax = None
        
    def create_environment(self, render_mode="rgb_array"):
        """Create environment with specified render mode"""
        return gym.make(self.env_id, render_mode=render_mode, **self.env_make_params)
        
    def optimize(self, mini_batch, policy_dqn, target_dqn):
        # Transpose and separate elements
        states, actions, new_states, rewards, terminations = zip(*mini_batch)
        
        # Stack tensors to create batch tensors
        states = torch.stack(states)
        actions = torch.stack(actions)
        new_states = torch.stack(new_states)
        rewards = torch.stack(rewards)
        terminations = torch.tensor(terminations).float().to(device)
        
        with torch.no_grad():
            # Calculate target Q values
            target_q = rewards + (1-terminations) * self.discount_factor_g * target_dqn(new_states).max(dim=1)[0]
        
        # Calculate Q values from current policy
        current_q = policy_dqn(states).gather(dim=1, index=actions.unsqueeze(dim=1)).squeeze()
        
        # Compute loss
        loss = self.loss_fn(current_q, target_q)
        
        # Optimize the model (backpropagation)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        return loss.item()
        
    def train(self, num_episodes=1000, update_display_every=5, save_path=None):
        """Train the agent for a specified number of episodes"""
        
        # Create instance of environment
        env = self.create_environment()
        
        # Get state and action dimensions
        num_states = env.observation_space.shape[0]
        num_actions = env.action_space.n
        
        # Create neural networks
        policy_dqn = DQN(num_states, num_actions, self.fc1_nodes).to(device)
        target_dqn = DQN(num_states, num_actions, self.fc1_nodes).to(device)
        target_dqn.load_state_dict(policy_dqn.state_dict())
        
        # Initialize optimizer
        self.optimizer = torch.optim.Adam(policy_dqn.parameters(), lr=self.learning_rate_a)
        
        # Initialize replay memory
        memory = ReplayMemory(self.replay_memory_size)
        
        # Initialize epsilon
        epsilon = self.epsilon_init
        
        # Lists to track progress
        rewards_per_episode = []
        epsilon_history = []
        step_count = 0
        
        # Initialize best reward
        best_reward = -float('inf')
        best_model_path = save_path if save_path else "best_flappy_model.pt"
        
        # Create plot for visualisation
        plt.figure(figsize=(12, 5))
        
        # Training loop
        for episode in range(num_episodes):
            state, _ = env.reset()
            state = torch.tensor(state, dtype=torch.float, device=device)
            terminated = False
            truncated = False
            episode_reward = 0.0
            episode_loss = 0.0
            num_opt_steps = 0
            
            # Episode loop
            while not (terminated or truncated) and episode_reward < self.stop_on_reward:
                # Select action based on epsilon-greedy
                if random.random() < epsilon:
                    # Select random action
                    action = env.action_space.sample()
                    action = torch.tensor(action, dtype=torch.int64, device=device)
                else:
                    # Select best action
                    with torch.no_grad():
                        action = policy_dqn(state.unsqueeze(dim=0)).squeeze().argmax()
                
                # Execute action
                new_state, reward, terminated, truncated, _ = env.step(action.item())
                
                # Accumulate rewards
                episode_reward += reward
                
                # Convert new state and reward to tensors
                new_state = torch.tensor(new_state, dtype=torch.float, device=device)
                reward = torch.tensor(reward, dtype=torch.float, device=device)
                
                # Save experience into memory
                memory.append((state, action, new_state, reward, terminated))
                
                # Increment step counter
                step_count += 1
                
                # Train if enough experiences collected
                if len(memory) > self.mini_batch_size:
                    mini_batch = memory.sample(self.mini_batch_size)
                    loss = self.optimize(mini_batch, policy_dqn, target_dqn)
                    episode_loss += loss
                    num_opt_steps += 1
                    
                    # Decay epsilon
                    epsilon = max(epsilon * self.epsilon_decay, self.epsilon_min)
                    
                    # Sync networks
                    if step_count > self.network_sync_rate:
                        target_dqn.load_state_dict(policy_dqn.state_dict())
                        step_count = 0
                
                # Move to next state
                state = new_state
            
            # Calculate average loss for this episode
            avg_loss = episode_loss / max(1, num_opt_steps)
            
            # Store epsilon at the end of episode
            epsilon_history.append(epsilon)
            
            # Track rewards
            rewards_per_episode.append(episode_reward)
            
            # Update best model
            if episode_reward > best_reward:
                best_reward = episode_reward
                torch.save(policy_dqn.state_dict(), best_model_path)
            
            # Display progress
            if episode % update_display_every == 0:
                clear_output(wait=True)
                
                # Calculate moving average of rewards
                window_size = min(100, len(rewards_per_episode))
                avg_rewards = [np.mean(rewards_per_episode[max(0, i-window_size):i+1]) 
                              for i in range(len(rewards_per_episode))]
                
                # Create visualisation
                plt.figure(figsize=(12, 5))
                
                # Plot rewards
                plt.subplot(1, 2, 1)
                plt.plot(rewards_per_episode, alpha=0.5, label='Episode Reward')
                plt.plot(avg_rewards, label='Moving Average')
                plt.xlabel('Episode')
                plt.ylabel('Reward')
                plt.legend()
                plt.title(f'Episode {episode}/{num_episodes}, Best Reward: {best_reward:.1f}')
                
                # Plot epsilon decay
                plt.subplot(1, 2, 2)
                plt.plot(epsilon_history)
                plt.xlabel('Episode')
                plt.ylabel('Epsilon')
                plt.title(f'Exploration Rate: {epsilon:.3f}, Avg Loss: {avg_loss:.4f}')
                
                plt.tight_layout()
                plt.show()
            
        # Final update
        clear_output(wait=True)
        print(f"Training completed! Best reward: {best_reward:.1f}")
        plt.figure(figsize=(12, 5))
        
        # Calculate moving average of rewards
        window_size = min(100, len(rewards_per_episode))
        avg_rewards = [np.mean(rewards_per_episode[max(0, i-window_size):i+1]) 
                      for i in range(len(rewards_per_episode))]
        
        # Plot rewards
        plt.subplot(1, 2, 1)
        plt.plot(rewards_per_episode, alpha=0.5, label='Episode Reward')
        plt.plot(avg_rewards, label='Moving Average')
        plt.xlabel('Episode')
        plt.ylabel('Reward')
        plt.legend()
        plt.title(f'Training Complete, Best Reward: {best_reward:.1f}')
        
        # Plot epsilon decay
        plt.subplot(1, 2, 2)
        plt.plot(epsilon_history)
        plt.xlabel('Episode')
        plt.ylabel('Epsilon')
        plt.title(f'Final Exploration Rate: {epsilon:.3f}')
        
        plt.tight_layout()
        plt.show()
        
        return policy_dqn
    
    # Run a trained model and display gameplay in notebook
    def run_trained_model(self, model_path="best_flappy_model.pt", max_steps=1000, fps=30):
        """
        Run a trained model and display gameplay in a Jupyter notebook
        using matplotlib animation instead of pygame window.
        This method is specifically designed for cloud Jupyter notebooks.
        
        Parameters:
        - model_path: Path to the saved model
        - max_steps: Maximum number of steps to run
        - fps: Frames per second for the display
        
        Returns:
        - Total reward achieved
        """
        # Setup and prepare the visualisation
        frames = self.collect_gameplay_frames(model_path, max_steps)
        
        # Display animation using matplotlib animation
        return self.display_gameplay_animation(frames)
    
    # Separate function to collect gameplay frames
    def collect_gameplay_frames(self, model_path="best_flappy_model.pt", max_steps=1000):
        """Collect frames from the agent playing the game"""
        # Create environment with rgb_array render mode for notebook display
        env = self.create_environment(render_mode="rgb_array")
        
        # Get state and action dimensions
        num_states = env.observation_space.shape[0]
        num_actions = env.action_space.n
        
        # Create and load the trained model
        policy_network = DQN(num_states, num_actions, self.fc1_nodes).to(device)
        policy_network.load_state_dict(torch.load(model_path))
        policy_network.eval()  # Set to evaluation mode
        
        # Initialize environment
        state, _ = env.reset()
        state = torch.tensor(state, dtype=torch.float, device=device)
        
        # Game loop variables
        terminated = False
        truncated = False
        total_reward = 0
        frames = []
        step = 0
        
        # Print progress message
        print("Collecting gameplay frames... Please wait.")
        
        # Collect frames while playing
        while not (terminated or truncated) and step < max_steps:
            # Select action based on policy
            with torch.no_grad():
                action = policy_network(state.unsqueeze(dim=0)).squeeze().argmax().item()
            
            # Execute action
            next_state, reward, terminated, truncated, _ = env.step(action)
            total_reward += reward
            
            # Convert state to tensor for next iteration
            state = torch.tensor(next_state, dtype=torch.float, device=device)
            
            # Capture frame
            frames.append(env.render())
            
            step += 1
            
            # Show progress periodically
            if step % 50 == 0:
                print(f"Collected {step} frames, current reward: {total_reward:.1f}")
        
        env.close()
        print(f"Game completed! Total reward: {total_reward:.1f}, collected {len(frames)} frames")
        
        return frames
    
    # Display the collected frames as a smooth animation
    def display_gameplay_animation(self, frames):
        """Display collected frames as a smooth animation in the notebook"""
        if not frames:
            print("No frames to display!")
            return 0
            
        print(f"Creating animation with {len(frames)} frames...")
        
        # Create figure for the animation
        fig, ax = plt.subplots(figsize=(8, 6))
        plt.close()  # Close the figure to prevent it from displaying twice
        
        # Initialize with the first frame
        img = ax.imshow(frames[0])
        ax.axis('off')
        
        # Update function for animation
        def update(frame):
            img.set_array(frame)
            return [img]
        
        # Create the animation with a faster interval for smoother playback
        animation = FuncAnimation(
            fig, update, frames=frames, 
            interval=20,  # Smaller interval for smoother animation
            blit=True
        )
        
        # Display the animation in the notebook
        return HTML(animation.to_jshtml())


In [None]:

# Cell 6: Functions for workshop usage
def run_short_training(num_episodes=7000):
    """
    Run a short training session for demonstration purposes
    
    Parameters:
    - num_episodes: Number of episodes to train (default: 50)
    
    Returns:
    - Trained model and agent
    """
    agent = Agent()
    model = agent.train(num_episodes=num_episodes, update_display_every=2)
    return model, agent


def visualise_trained_agent(agent=None, model_path="best_flappy_model.pt"):
    """
    visualise a trained agent playing Flappy Bird
    
    Parameters:
    - agent: Agent instance (creates new one if None)
    - model_path: Path to the trained model
    
    Returns:
    - Animation of the agent playing
    """
    if agent is None:
        agent = Agent()
    return agent.run_trained_model(model_path=model_path)


In [None]:

def download_pretrained_model(github_url, local_filename="pretrained_model.pt"):
    """
    Download a pre-trained model from GitHub
    
    Parameters:
    - github_url: URL to the model file on GitHub
    - local_filename: Where to save the downloaded model
    
    Returns:
    - Path to the downloaded model
    """
    try:
        print(f"Downloading pre-trained model from {github_url}...")
        
        # Make HTTP request to download the file
        response = requests.get(github_url, stream=True)
        response.raise_for_status()  # Raise an exception for HTTP errors
        
        # Get the total file size if available
        total_size = int(response.headers.get('content-length', 0))
        
        # Download with progress bar
        with open(local_filename, 'wb') as f:
            if total_size == 0:  # No content length header
                f.write(response.content)
            else:
                # Use tqdm for progress bar
                with tqdm(total=total_size, unit='B', unit_scale=True) as pbar:
                    for chunk in response.iter_content(chunk_size=8192):
                        if chunk:
                            f.write(chunk)
                            pbar.update(len(chunk))
        
        print(f"Model successfully downloaded to {local_filename}")
        return local_filename
    
    except Exception as e:
        print(f"Error downloading model: {e}")
        return None

def load_and_run_pretrained_model(model_path="pretrained_model.pt"):
    """
    Load a pre-trained model and run it in the notebook
    
    Parameters:
    - model_path: Path to the pre-trained model
    
    Returns:
    - Animation of the model playing
    """
    # Check if model exists
    if not os.path.exists(model_path):
        print(f"Model file {model_path} not found!")
        return None
    
    # Create agent and run the model
    agent = Agent()
    return agent.run_trained_model(model_path=model_path)
