## Imports

In [None]:
!git clone https://github.com/ntasfi/PyGame-Learning-Environment
!pip install gym_ple
!pip install -e ./PyGame-Learning-Environment
!pip install pyvirtualdisplay
!sudo apt-get install -y xvfb ffmpeg freeglut3-dev

fatal: destination path 'PyGame-Learning-Environment' already exists and is not an empty directory.
Obtaining file:///content/PyGame-Learning-Environment
  Preparing metadata (setup.py) ... [?25l[?25hdone
Installing collected packages: ple
  Attempting uninstall: ple
    Found existing installation: ple 0.0.1
    Uninstalling ple-0.0.1:
      Successfully uninstalled ple-0.0.1
  Running setup.py develop for ple
Successfully installed ple-0.0.1
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
freeglut3-dev is already the newest version (2.8.1-6).
ffmpeg is already the newest version (7:4.4.2-0ubuntu0.22.04.1).
xvfb is already the newest version (2:21.1.4-2ubuntu1.7~22.04.12).
0 upgraded, 0 newly installed, 0 to remove and 49 not upgraded.


In [None]:
import sys
sys.path.append('/content/PyGame-Learning-Environment')
from ple.games.flappybird import FlappyBird
from ple import PLE
from gym import spaces
import IPython
import numpy as np
import gym
import os
import pyvirtualdisplay
import base64
import warnings
import imageio
from abc import ABC, abstractmethod
import matplotlib.pyplot as plt
import pickle
warnings.filterwarnings("ignore")


## Utils

In [None]:
def embed_mp4(filename):
  """Embeds an mp4 file in the notebook."""
  video = open(filename,'rb').read()
  b64 = base64.b64encode(video)
  tag = '''
  <video width="640" height="480" controls>
    <source src="data:video/mp4;base64,{0}" type="video/mp4">
  Your browser does not support the video tag.
  </video>'''.format(b64.decode())

  return IPython.display.HTML(tag)
display = pyvirtualdisplay.Display(visible=0, size=(1400, 900)).start()

## Standard Classes - Given in the homework

In [None]:
# to disable the python game window popup
os.environ["SDL_VIDEODRIVER"] = "dummy"

class Game(gym.Env):
    def __init__(self, display_screen=False,
                 force_fps=True,
                 custom_obs=False,
                 pipe_gap=80,
                 custome_observation_map={},
                 preprocess=lambda x: x,
                 reward_shaping=lambda x, y, z: x):

        os.environ["SDL_VIDEODRIVER"] = "dummy"
        game = FlappyBird(pipe_gap=pipe_gap)  # define and initiate the environment
        self.env = PLE(game, fps=30, display_screen=display_screen,
                       force_fps=force_fps)
        self.env.init()
        # list of actions in the environment
        self.actions = self.env.getActionSet()
        # length of actions
        self.action_space = spaces.Discrete(len(self.actions))
        self.custom_obs = custom_obs
        self._observation_map = custome_observation_map if custom_obs else OBSERVATION_MAP
        self.preprocess = preprocess
        self.reward_shaping = reward_shaping
        self.score = 0

    @property
    def observation_space(self):
        return spaces.Box(low=0, high=512, shape=(len(self._observation_map),), dtype=int)

    def _get_rgb(self):
        return self.env.getScreenRGB().transpose(1, 0, 2)

    @property
    def observation_map(self):
        return self._observation_map

    def step(self, action):
        """Take the action chosen and update the reward"""
        reward = self.env.act(self.actions[action])
        if reward < 0:
          reward = -1

        if reward > 0:
          self.score += 1
        state = self.env.getGameState()
        terminal = self.env.game_over()
        reward = self.reward_shaping(reward, terminal, self.env)
        info = {'score':self.score}
        return self.preprocess(np.array(list(state.values())).astype(int)), reward, terminal, info

    def getGameState(self):
        '''
        PLEenv return gamestate as a dictionary. Returns a modified form
        of the gamestate only with the required information to define the state
        '''
        state = self.env.getGameState()
        h_dist = state['next_pipe_dist_to_player']
        v_dist = state['next_pipe_bottom_y'] - state['player_y']
        vel = state['player_vel']

        return ' '.join([str(vel), str(h_dist), str(v_dist)])

    def reset(self):
        """Resets the game to start a new game"""
        self.env.reset_game()
        state = self.env.getGameState()
        self.score = 0
        return self.preprocess(np.array(list(state.values())).astype(int))

    def render(self, mode='human'):
        """Render the game"""
        return self._get_rgb()

    def seed(self, seed):
        rng = np.random.RandomState(seed)
        self.env.rng = rng
        self.env.game.rng = self.env.rng

        self.env.init()

## Build Env

In [None]:
def preprocess(obs):
  """
  insert your preprocessing code here
  """
  # obs parsing:[player_y, player_y_dot, next_x, next_top, nex_bottom, next_next_x, next_next_top, next_next_bottom]

  # Step 1: Normalize
  # the min and max values for each observation feature are computed
  max_values = np.array([512, 20, 512, 512, 512, 512, 512, 512])
  min_values = np.array([0, -20, 0, 0, 0, 0, 0, 0])

  # normalize
  normalized_obs = (obs - min_values) / (max_values - min_values)

  # Step 2: Compute key features
  # compute y distance between player and center of next and next next pipe
  next_y_dist = ((obs[3] + obs[4]) / 2 - obs[0] + 512) / (512 * 2)
  next_next_y_dist = ((obs[6] + obs[7]) / 2 - obs[0] + 512) / (512 * 2)

  # build the reduced features:
  # [player_y_dot, next_x, next_y_dist, next_next_x, next_next_y_dist]
  obs = [normalized_obs[1], normalized_obs[2], next_y_dist, normalized_obs[5], next_next_y_dist]

  # Step 3: if disregard_next_next:
  global disregard_next_next
  if disregard_next_next:
    obs = obs[:3]

  return obs

In [None]:
def build_my_env():
  # observation mapping
  custom_obs_mapping = {0: "player_y_dot",
                        1: 'next_x',
                        2: 'next_y_dist',
                        3: 'next_next_x',
                        4: 'next_next_y_dist'}

  # if we look only one pipe ahead
  if disregard_next_next: custom_obs_mapping = dict(list(custom_obs_mapping.items())[:3])

  # build the env
  env = Game(custom_obs=True,
            force_fps = True,
            custome_observation_map=custom_obs_mapping,
            preprocess=preprocess)
  return env

In [None]:
disregard_next_next = True
env = build_my_env()

## Agent class

In [None]:
def discretize_observation(observation, bins):
  '''
  args:
  - observation: observation vector of any length, with values normalized [0,1]
  - bins = the number of bins to use per element of the vector - must have at least the same len as observation
  returns:
    the index of the state (int)
  '''

  # discretize each feature according to the bin count - using the fact they are normalized
  discrete_idx = (observation * np.array(bins)).astype(int)
  discrete_idx = np.clip(discrete_idx, 0, np.array(bins) - 1)

  # map to state index
  state = np.ravel_multi_index(discrete_idx, bins)

  return state

In [None]:
class Load_agent():
  def __init__(self, action_space, observation_space, filepath, bins = [10] * 5):
      """
      - action_space: The action space of the environment
      - observation_space: The observation space of the environment
      - bins: the discretization strategy to use
      """
      self.action_space = action_space
      self.bins = bins
      self.observation_space = observation_space

      # load file path
      if os.path.exists(filepath):
        try:
          self.policy = np.load(filepath)
          print(f"Policy loaded successfully from {filepath}")
        except Exception as e:
          print(f"Error loading policy: {e}")
      else:
        print(f"File not found: {filepath}")

  def select_action(self, state, deterministic=False):
    '''
    Select an epsilon-greedy action.
    - state: discretized state
    - deterministic: Flag indicating whether to use a deterministic policy (default is False)
    Returns:
      action (0 or 1)
    '''
    # if deterministic OR greedy
    if deterministic or np.random.rand() > self.epsilon:
      return np.argmax(self.policy[state])
    # else - eps-greedy exploration
    else:
      return self.action_space.sample()

  def run_policy(self, env, video_filename = None):
    # setup
    obs = env.reset()
    s = discretize_observation(obs, self.bins)
    done = False
    total_reward = 0
    # video
    if video_filename is not None:
      video = imageio.get_writer(video_filename, fps=24)
    # run on frames
    while not done:
      # action
      a_prime = self.select_action(s, deterministic=True)
      obs_prime, reward, done, info = env.step(a_prime)
      # new state
      s_prime = discretize_observation(obs_prime, self.bins)
      # reward and update
      total_reward += reward
      s = s_prime
      # generate video
      if video_filename is not None:
        re = env.render()
        video.append_data(re)
    # close video
    if video_filename is not None:
      video.close()
    # get score
    score = info['score']
    return total_reward, score

## Load policy and run

In [None]:
bins = [10, 40, 40]
agent = Load_agent(env.action_space, env.observation_space, '/content/policy.npy', bins = bins)

Policy loaded successfully from /content/policy.npy


In [52]:
filename = '/content/run_vid.mp4'

for i in range(9):
  reward, score = agent.run_policy(env)
  print('Run: '+f'{i}'+', Reward:' + f'{reward}', ', Score:' + f'{score}')

reward, score = agent.run_policy(env, video_filename = filename)
print('Run: '+f'{i+1}'+', Reward:' + f'{reward}', ', Score:' + f'{score}')

Run: 0, Reward:62.0 , Score:63
Run: 1, Reward:52.0 , Score:53
Run: 2, Reward:96.0 , Score:97
Run: 3, Reward:52.0 , Score:53
Run: 4, Reward:21.0 , Score:22
Run: 5, Reward:18.0 , Score:19
Run: 6, Reward:15.0 , Score:16
Run: 7, Reward:6.0 , Score:7
Run: 8, Reward:24.0 , Score:25
Run: 9, Reward:12.0 , Score:13


In [53]:
embed_mp4('/content/run_vid.mp4')