In [3]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import random
import pygame
from stable_baselines3 import DQN, PPO, A2C
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.callbacks import BaseCallback
import os
import matplotlib.pyplot as plt
from IPython.display import display, clear_output

# Custom callback to log every `x` steps to TensorBoard
class CustomTensorBoardCallback(BaseCallback):
    def __init__(self, save_freq: int, verbose=0):
        super().__init__(verbose)
        self.save_freq = save_freq

    def _on_step(self) -> bool:
        # Log to TensorBoard every `save_freq` steps
        if self.num_timesteps % self.save_freq == 0:
            self.logger.record('timesteps', self.num_timesteps)
            self.logger.dump(self.num_timesteps)
        return True

class FlappyBirdEnv(gym.Env):
    """Custom Environment for Flappy Bird Game."""

    def __init__(self):
        super().__init__()
        self.screen_width = 288
        self.screen_height = 512
        self.bird_width = 34
        self.bird_height = 24
        self.pipe_width = 52
        self.pipe_gap = 100

        self.action_space = spaces.Discrete(2)  # Flap or not
        self.observation_space = spaces.Box(
            low=np.array([0, 0, -1, 0, 0], dtype=np.float32),
            high=np.array([1, 1, 1, 1, 1], dtype=np.float32)
        )

        self.screen = None
        self.reset()

    def reset(self, seed=None):
        super().reset(seed=seed)
        self.bird_pos = [self.screen_width * 0.2, self.screen_height / 2]
        self.bird_velocity = 0
        self.pipe_pos = [self.screen_width, random.randint(100, 400)]
        self.score = 0
        return self._get_obs(), {}

    def step(self, action):
        if action:
            self.bird_velocity = -9

        self.bird_velocity += 1
        self.bird_pos[1] += self.bird_velocity

        self.pipe_pos[0] -= 5
        if self.pipe_pos[0] < -self.pipe_width:
            self.pipe_pos = [self.screen_width, random.randint(100, 400)]
            self.score += 1

        done = self._check_collision()
        reward = self._calculate_reward(done)
        return self._get_obs(), reward, done, False, {}

    def _get_obs(self):
        return np.array([
            self.bird_pos[0] / self.screen_width,
            self.bird_pos[1] / self.screen_height,
            np.clip(self.bird_velocity / 10.0, -1, 1),
            self.pipe_pos[0] / self.screen_width,
            self.pipe_pos[1] / self.screen_height,
        ], dtype=np.float32)

    def _check_collision(self):
        if self.bird_pos[1] < 0 or self.bird_pos[1] > self.screen_height - self.bird_height:
            return True

        if (self.pipe_pos[0] < self.bird_pos[0] < self.pipe_pos[0] + self.pipe_width and
            (self.bird_pos[1] < self.pipe_pos[1] - self.pipe_gap / 2 or
             self.bird_pos[1] + self.bird_height > self.pipe_pos[1] + self.pipe_gap / 2)):
            return True

        return False

    def _calculate_reward(self, done):
        if done:
            return -1.0

        reward = 0.1
        mid_screen_y = self.screen_height / 2
        distance_to_center = abs(self.bird_pos[1] - mid_screen_y)
        reward += 0.1 * (1 - distance_to_center / mid_screen_y)

        pipe_center_y = self.pipe_pos[1]
        distance_to_pipe_gap = abs(self.bird_pos[1] - pipe_center_y)
        reward += 0.2 * (1 - distance_to_pipe_gap / (self.pipe_gap / 2))

        if self.pipe_pos[0] + self.pipe_width < self.bird_pos[0]:
            reward += 1.0

        return reward

    def render(self):
        if self.screen is None:
            pygame.init()
            self.screen = pygame.display.set_mode((self.screen_width, self.screen_height), pygame.NOFRAME)

        self.screen.fill((135, 206, 235))
        bird_rect = pygame.Rect(self.bird_pos[0], self.bird_pos[1], self.bird_width, self.bird_height)
        pygame.draw.rect(self.screen, (255, 255, 0), bird_rect)

        upper_pipe_rect = pygame.Rect(self.pipe_pos[0], 0, self.pipe_width, self.pipe_pos[1] - self.pipe_gap / 2)
        lower_pipe_rect = pygame.Rect(self.pipe_pos[0], self.pipe_pos[1] + self.pipe_gap / 2, self.pipe_width, self.screen_height)
        pygame.draw.rect(self.screen, (0, 255, 0), upper_pipe_rect)
        pygame.draw.rect(self.screen, (0, 255, 0), lower_pipe_rect)

        return pygame.surfarray.array3d(self.screen).swapaxes(0, 1)

    def close(self):
        if self.screen:
            pygame.quit()
            self.screen = None

# Set up logging for TensorBoard
log_dir = "./logs"
os.makedirs(log_dir, exist_ok=True)

# Define the algorithms to test
algorithms = {
    "DQN": DQN,
    "PPO": PPO,
    "A2C": A2C
}

# Specify the number of steps to log to TensorBoard
log_freq = 5000  # Save to TensorBoard every 1000 steps

# Loop through different algorithms and train them
for algo_name, algo_class in algorithms.items():
    print(f"Training {algo_name}...")
    
    
    # Ensure a fresh environment for each algorithm
    env = FlappyBirdEnv()  
    check_env(env)  # Re-check the environment to avoid misconfiguration
    
    model = algo_class("MlpPolicy", env, verbose=1, tensorboard_log=log_dir)
    
    # Add the custom callback to log every `log_freq` steps
    callback = CustomTensorBoardCallback(save_freq=log_freq)
    
    model.learn(total_timesteps=100000, callback=callback)
    
    # Close the environment explicitly
    env.close()
    env.reset()
    print(f"Training for {algo_name} completed. You can monitor using tensorboard --logdir=./logs")


def play_game_in_notebook(env, model, episodes=1, render_delay=0.1):
    """Play the game using the trained model and display frames in a Jupyter Notebook."""
    for _ in range(episodes):
        obs, _ = env.reset()
        done = False
        while not done:
            action, _ = model.predict(obs, deterministic=True)
            obs, _, done, _, _ = env.step(action)

            frame = env.render()
            if isinstance(frame, np.ndarray):
                plt.imshow(frame)
                plt.axis('off')
                display(plt.gcf())
                plt.close()
                pygame.time.delay(int(render_delay * 1000))
                clear_output(wait=True)

    env.close()


Training DQN...
Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Logging to ./logs/DQN_1
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 40.5     |
|    ep_rew_mean      | -13.9    |
|    exploration_rate | 0.846    |
| time/               |          |
|    episodes         | 4        |
|    fps              | 2554     |
|    time_elapsed     | 0        |
|    total_timesteps  | 162      |
| train/              |          |
|    learning_rate    | 0.0001   |
|    loss             | 0.0837   |
|    n_updates        | 15       |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 42.8     |
|    ep_rew_mean      | -11.1    |
|    exploration_rate | 0.675    |
| time/               |          |
|    episodes         | 8        |
|    fps              | 2982     |
|    time_elapsed     | 0        |
|    total_timesteps  | 34

FileNotFoundError: [Errno 2] No such file or directory: 'flappy_bird_dqn.zip'

In [7]:
%load_ext tensorboard
%tensorboard --logdir ./logs --host localhost --port 6006

The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard


Reusing TensorBoard on port 6006 (pid 7213), started 0:00:36 ago. (Use '!kill 7213' to kill it.)

In [None]:
## DQN, PPO, A2C
model = PPO("MlpPolicy", env, verbose=1, tensorboard_log=log_dir)

model.learn(total_timesteps=10000)
model.save(f"models/flappy_bird_{algo_name.lower()}")

In [None]:
# Load and play with trained models
for algo_name, algo_class in algorithms.items():
    print(f"Playing with {algo_name} model...")
    trained_model = algo_class.load(f"flappy_bird_{algo_name.lower()}")
    play_game_in_notebook(FlappyBirdEnv(), trained_model, episodes=3, render_delay=0.1)