# DSA - Deep Learning [4] - Reinforcement learning

In [None]:
# Install necessary libraries
# !pip install flappy-bird-gymnasium pygame
# !apt-get install -y xvfb python3-opengl ffmpeg
# !pip install pyvirtualdisplay
# !pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu

In [None]:
# Import necessary libraries
import os
import torch
import random
import numpy as np
import pygame
import imageio
from IPython.display import display, Image
from PIL import Image as PILImage  # Importing PIL for image manipulation
from flappy_bird_gymnasium.envs.flappy_bird_env import FlappyBirdEnv

# Set environment variables for rendering and audio in Colab
os.environ["SDL_VIDEODRIVER"] = "dummy"
os.environ["SDL_AUDIODRIVER"] = "dummy"


pygame 2.6.1 (SDL 2.28.4, Python 3.10.12)
Hello from the pygame community. https://www.pygame.org/contribute.html


## Create Environment

In [None]:
#Flappy Bird Environment as a Class Object
class CustomFlappyBirdEnv(FlappyBirdEnv):
  def __init__(self):
    super().__init__()

    #Initialising Flappy Bird Game
    pygame.init()
    if not pygame.display.get_init():
      pygame.display.init()
    pygame.display.set_mode((1, 1))

    if not pygame.mixer.get_init():
      pygame.mixer.init()

    self._surface = pygame.Surface((288,512))

    self._display = pygame.display.set_mode((288,512))

    self._fps_clock = pygame.time.Clock()

    self._images = {}

    #Calling Game Functions from Imported Game
    self._images["background"] = self._load_image("background-day.png")
    self._images["pipe"] = [
        self._load_image("pipe-green.png"),  # Top pipe
        pygame.transform.flip(self._load_image("pipe-green.png"), False, True)  # Bottom pipe (flipped)
    ]
    self._images["base"] = self._load_image("base.png")
    self._images["player"] = [
        self._load_image("yellowbird-upflap.png"),
        self._load_image("yellowbird-midflap.png"),
        self._load_image("yellowbird-downflap.png"),
    ]
    self._images["numbers"] = {
        i: self._load_image(f"{i}.png") for i in range(10)  # Load images for digits 0-9
    }

    self._audio = {
        "wing": self._load_audio("wing.wav"),
        "point": self._load_audio("point.wav"),
        "hit": self._load_audio("hit.wav"),
        "die": self._load_audio("die.wav")
    }

    #Setting the attributes of the environment (Flappy Bird Environment)
    self._score = 0
    self._player_index = 0
    self._base_shift = self._images["base"].get_width() - self._surface.get_width()
    self._pipes = []
    self._player_y = 256
    self._player_velocity_y = 0
    self._gravity = 1
    self._pipe_gap = 100

  #Run the image, audio and game
  def _load_image(self, filename):
    assets_path = "/usr/local/lib/python3.10/dist-packages/flappy_bird_gymnasium/assets/sprites"
    filepath = os.path.join(assets_path, filename)
    return pygame.image.load(filepath).convert_alpha()

  def _load_audio(self, filename):
    assets_path = "/usr/local/lib/python3.10/dist-packages/flappy_bird_gymnasium/assets/audio"
    filepath = os.path.join(assets_path, filename)
    return pygame.mixer.Sound(filepath)

  #To save each frame of the game being played
  def render(self):
    super.render()

    #[Limitation of Google Colab] Cannot see the game being run, instead will return GIF to us to visualise the machine learning.
    frame = pygame.surfarray.array3d(pygame.display.get_surface())
    self.frames.append(frame)

    self._fpg_clock.tick(self.metdata["render_fps"])

  #To create a GIF
  def create_gif(self, gif_name="flappy_bird_game.gif"):
    flipped_frames=[]
    for frame in self.frames:
      pil_frame = PILImage.fromarray(frame)
      flipped_frame = pil_frame.rotate(270, expand=True)
      flipped_frames.append(flipped_frame)

    flipped_gif_name = gif_name.replace(".gif", "_flipped.gif")
    imageio.mimsave(flipped_gif_name, flipped_frame, duration=1/ self.metadata["render_fps"])
    display(Image(flipped_gif_name))

  #To reset game
  def reset(self):
    self.frames = []
    return super().reset()


#Next lesson complete code, state: 21/11/2024 DSA Lesson

## Define Reward

In [None]:
class DQN(torch.nn.Module):
  def __init__(self, state_size, action_size):
    super(DQN, self).__init__()
    self.fc1 = torch.nn.Linear(state_size,128)
    self.fc2 = torch.nn.Linear(128,129)
    self.fc3 = torch.nn.Linear(128,action_size)

  #Forward propagation function - movement of calculations/variables in input layers to hidden layer and ending at, outer layer.
  def forward(self, x):
    x = torch.relu(self.fc1(x))
    x = torch.relu(self.fc2(x))
    return self.fc3(x)

In [None]:
def preprocess_state(state):
  #Converting to an array
  if isinstance(state, tuple):
    observation = state[0]
  else:
    observation = state

  observation = np.array(observation, dtype=np.float32)

  #Normalizing data for efficient processing
  if observation.max() > 1.0:
    observation = observation/255.0

  return observation.flatten()

In [None]:
#env, frames/nTraing, file saved, savepoint of env
def train_dqn(env, num_episodes, model_save_path="flappy_bird_dqn_final.pth", checkpoint_interval=100):
  state = preprocess_state(env.reset())
  state_size = state.shape[0]
  action_size = env.action_space.n

  #Optimization of maximum reward
  model = DQN(state_size, action_size)
  optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
  loss_fn = torch.nn.MSELoss()

  replay_buffer = []
  max_buffer_size = 1000

  gamma = 0.90
  epsilon = 1.0
  epsilon_min = 0.1
  epsilon_decay = 0.995

  rewards_per_episode = []
  steps_per_episode = []

  for episode in range(num_episodes):
    state = preprocess_state(env.reset())
    done = False
    total_reward = 0
    steps = 0

    while not done:
      state_tensor = torch.FloatTensor(state).unsqueeze(0)

      if random.random() < epsilon:
        action = random.choice(range(action_size))
      else:
        with torch.no_grad():
          action = torch.argmax(model(state_tensor)).item()

      next_state, reward, done, _, _= env.step(action)
      next_state = preprocess_state(next)
      total_reward += 1
      steps += 1

      replay_buffer.append((state, action, reward, next_state, done))
      if len(replay_buffer) > max_buffer_size:
        replay_buffer.pop(0)

      if len(replay_buffer) >= 32:
        batch = random.sample(replay_buffer, 32)
        states, actions, rewards, next_states, dones = zip(*batch)

        states_tensor = torch.FloatTensor(states)
        actions_tensor = torch.LongTensor(actions).unsqueeze(1)
        rewards_tensor = torch.FloatTensor(rewards).unsqueeze(1)
        next_states_tensor = torch.FloatTensor(next_states)
        dones_tensor = torch.FloatTensor(dones).unsqueeze(1)

        q_values = model(states_tensor).gather(1, actions_tensor)

        with torch.no_grad():
          next_q_values = model(next_states_tensor).max(1, keepdim=True)[0]
          targets = rewards_tensor + gamma * next_q_values * (1 - dones_tensor)

        loss = loss_fn(q_values,targets)

        #Backwards propagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

      state = next_state

      if epsilon > epsilon_min:
        epsilon *= epsilon_decay

      rewards_per_episode.append(total_reward)
      steps_per_episode.append(steps)

      #Display Metrics
      print(
          f"Total Rewards = {total_reward:.2f}, "
          f"Steps = {steps}, "
          f"Epsilon = {epsilon:.4f}"
      )

      #Save model
      if (episode + 1) % checkpoint_interval == 0:
        checkpoint_path = model_save_path.replace(".pth", f"_ep{episode + 1}.pth")
        torch.save(model.state_dict(), checkpoint_path)
        print(f"Checkpoint saved at episode {episode + 1}: {checkpoint_path}")

        torch.save(model.state_dict(), model_save_path)
        print(f"Final model saved to {model_save_path}")

        print(f"Training completed. Average Reward: {np.mean(rewards_per_episode):.2f}, Average Steps: {np.mean(steps_per_episode):.2f}")

        return model, rewards_per_episode, steps_per_episode

In [None]:
def load_model(env, model_path="flappy_bird_dqn_final.pth"):
  state = preprocess_state(env.reset())
  state_size = state.shape[0]
  action_size = env.action_space.n

  model = DQN(state_size, action_size)
  model.load_state_dict(torch.load(model_path))
  model.eval()

  return model

In [None]:
def test_rl_agent_playing(env, model, step_limit=500):
  state = preprocess_state(env.reset())

  for step in range (step_limit):
    print(f"Step {step}: State Shape: {state.shape}, State Type: {type(state)}")

    state_tensor = torch.FloatTensor(state).unsqueeze(0)

    with torch.no_grad():
      action = torch.argmax(model(state_tensor)).item()

      next_state, reward, done, _, _ = env.step(action)
      next_state = preprocess_state(next_state)

      print(f"Step {step + 1}: Reward: {reward}, Done: {done}")

      env.render()

      if done:
        break

      state = next_state

    env.create_gif("flappy_bird_rl_agent.gif")

In [None]:
env = CustomFlappyBirdEnv()

In [None]:
model = train_dqn(env, num_episodes=500, model_save_path="flappy_bird_dqn_final.pth", checkpoint_interval=1000)

TypeError: float() argument must be a string or a real number, not 'builtin_function_or_method'

In [None]:
model = load_model(env, "flappy_bird_dqn_final_ep4000.pth")

test_rl_agent_playing(env, model)