In [75]:
from typing import Dict, Tuple, Optional, Union

import gym
import numpy as np
import pygame

from flappy_bird_gym.envs.game_logic import FlappyBirdLogic
from flappy_bird_gym.envs.game_logic import PIPE_WIDTH, PIPE_HEIGHT
from flappy_bird_gym.envs.game_logic import PLAYER_WIDTH, PLAYER_HEIGHT
from flappy_bird_gym.envs.renderer import FlappyBirdRenderer


class FlappyBirdEnvSimple(gym.Env):
    """ Flappy Bird Gym environment that yields simple observations.

    The observations yielded by this environment are simple numerical
    information about the game's state. Specifically, the observations are:

        * absolute y position of the player;
        * Difference between the player's y position and the next hole's y
          position.

    The reward received by the agent in each step is equal to the score obtained
    by the agent in that step. A score point is obtained every time the bird
    passes a pipe.

    Args:
        screen_size (Tuple[int, int]): The screen's width and height.
        normalize_obs (bool): If `True`, the observations will be normalized
            before being returned.
        pipe_gap (int): Space between a lower and an upper pipe.
        bird_color (str): Color of the flappy bird. The currently available
            colors are "yellow", "blue" and "red".
        pipe_color (str): Color of the pipes. The currently available colors are
            "green" and "red".
        background (Optional[str]): Type of background image. The currently
            available types are "day" and "night". If `None`, no background will
            be drawn.
    """

    metadata = {'render.modes': ['human']}

    def __init__(self,
                 screen_size: Tuple[int, int] = (288, 512),
                 normalize_obs: bool = True,
                 pipe_gap: int = 100,
                 bird_color: str = "yellow",
                 pipe_color: str = "green",
                 background: Optional[str] = "day") -> None:
        self.action_space = gym.spaces.Discrete(2)
        self.observation_space = gym.spaces.Box(-np.inf, np.inf,
                                                shape=(2,),
                                                dtype=np.float32)
        self._screen_size = screen_size
        self._normalize_obs = normalize_obs
        self._pipe_gap = pipe_gap

        self._game = None
        self._renderer = None

        self._bird_color = bird_color
        self._pipe_color = pipe_color
        self._bg_type = background

    def _get_observation(self):
        up_pipe = low_pipe = None
        h_dist = 0
        for up_pipe, low_pipe in zip(self._game.upper_pipes,
                                     self._game.lower_pipes):
            h_dist = (low_pipe["x"] + PIPE_WIDTH / 2
                      - (self._game.player_x - PLAYER_WIDTH / 2))
            h_dist += 3  # extra distance to compensate for the buggy hit-box
            if h_dist >= 0:
                break

        upper_pipe_y = up_pipe["y"] + PIPE_HEIGHT
        lower_pipe_y = low_pipe["y"]
        player_y = self._game.player_y

        v_dist = (upper_pipe_y + lower_pipe_y) / 2 - (player_y
                                                      + PLAYER_HEIGHT/2)

        if self._normalize_obs:
            # h_dist /= self._screen_size[0]
            v_dist /= self._screen_size[1]

        return np.array([
            self._game.player_y / self._screen_size[1],
            v_dist,
        ])

    def step(self,
             action: Union[FlappyBirdLogic.Actions, int],
    ) -> Tuple[np.ndarray, float, bool, Dict]:
        """ Given an action, updates the game state.

        Args:
            action (Union[FlappyBirdLogic.Actions, int]): The action taken by
                the agent. Zero (0) means "do nothing" and one (1) means "flap".

        Returns:
            A tuple containing, respectively:

                * an observation (horizontal distance to the next pipe;
                  difference between the player's y position and the next hole's
                  y position);
                * a reward (always 1);
                * a status report (`True` if the game is over and `False`
                  otherwise);
                * an info dictionary.
        """
        alive = self._game.update_state(action)
        obs = self._get_observation()

        reward = 1

        done = not alive
        info = {"score": self._game.score}

        return obs, reward, done, info

    def reset(self):
        """ Resets the environment (starts a new game). """
        self._game = FlappyBirdLogic(screen_size=self._screen_size,
                                     pipe_gap_size=self._pipe_gap)
        if self._renderer is not None:
            self._renderer.game = self._game

        return self._get_observation()

    def render(self, mode='human') -> None:
        """ Renders the next frame. """
        if self._renderer is None:
            self._renderer = FlappyBirdRenderer(screen_size=self._screen_size,
                                                bird_color=self._bird_color,
                                                pipe_color=self._pipe_color,
                                                background=self._bg_type,
                                                audio_on=False)
            self._renderer.game = self._game
            self._renderer.make_display()

        self._renderer.draw_surface(show_score=True)
        self._renderer.update_display()

    def get_image(self):
      # Convert the Pygame surface to a NumPy array
      image_array = pygame.surfarray.array3d(self._renderer.surface)
      return image_array

    def close(self):
        """ Closes the environment. """
        if self._renderer is not None:
            pygame.display.quit()
            self._renderer = None
        super().close()

In [76]:
import time
env = FlappyBirdEnvSimple()

## Lets see how a random policy does

In [77]:
obs = env.reset()
i = 0
imgs = []
states = []
import os
# os.environ["SDL_VIDEODRIVER"] = "dummy"
while True:
    # Next action:
    # (feed the observation to your agent here)
    action = env.action_space.sample() # for a random action

    # Processing:
    next_obs, reward, done, info = env.step(action)
    # the observation is the normalized horizontal and vertical distance to the next pipe
    
    # Rendering the game:
    # (remove these two lines during training)
    env.render()
    imgs.append(env.get_image().transpose((1, 0, 2)))
    states.append(next_obs)
    time.sleep(1 / 30)  # FPS
    
    # Checking if the player is still alive
    if done or i == 200:
        break
    i += 1

env.close()

In [78]:
%matplotlib inline
from matplotlib import pyplot as plt
from matplotlib import animation
from IPython.display import HTML

# np array with shape (frames, height, width, channels)
fig = plt.figure()
im = plt.imshow(imgs[0])

plt.close() # this is required to not display the generated image

def init():
    im.set_data(imgs[0])

def animate(i):
    print(states[i])
    im.set_data(imgs[i])
    return im

anim = animation.FuncAnimation(fig, animate, init_func=init, frames=len(imgs),
                               interval=50)
HTML(anim.to_html5_video())

[0.4609375 0.0390625]
[0.44726562 0.05273438]
[0.4296875 0.0703125]
[0.4140625 0.0859375]
[0.40039062 0.09960938]
[0.3828125 0.1171875]
[0.36523438 0.13476562]
[0.34960938 0.15039062]
[0.33203125 0.16796875]
[0.31445312 0.18554688]
[0.29882812 0.20117188]
[0.28125 0.21875]
[0.26367188 0.23632812]
[0.24804688 0.25195312]
[0.234375 0.265625]
[0.22265625 0.27734375]
[0.20507812 0.29492188]
[0.18945312 0.31054688]
[0.17578125 0.32421875]
[0.15820312 0.34179688]
[0.14257812 0.35742188]
[0.125 0.375]
[0.109375 0.390625]
[0.09570312 0.40429688]
[0.08398438 0.41601562]
[0.06640625 0.43359375]
[0.04882812 0.45117188]
[0.03125 0.46875]
[0.01367188 0.48632812]
[-0.00390625  0.50390625]
[-0.01953125  0.51953125]
[-0.03710938  0.53710938]
[-0.0546875  0.5546875]
[-0.0703125  0.5703125]
[-0.08789062  0.58789062]
[-0.10351562  0.60351562]
[-0.1171875  0.6171875]
[-0.12890625  0.62890625]
[-0.13867188  0.63867188]
[-0.14648438  0.64648438]
[-0.15234375  0.65234375]
[-0.15625  0.65625]
[-0.15820312  0.

## Okay, now lets compute our transition function by sampling a bunch of times. 

In [34]:
# we don't know the transition function so we're just going to approximate it by doing a bunch of rollouts
# and recording the state transitions
discretization = 20
transition_matrix = np.zeros((discretization,discretization,2,discretization,discretization))
reward = 1
def snap_to_grid(h, v):
  '''Assume that h, v goes from [0 to 2] and [-1, 1] respectively '''
  
  return int(h * discretization / 2), int((v + 1) * discretization / 2)

In [62]:
# sample a bunch of rollouts and fill the transition matrices each time
num_rollouts = 5
for i in range(num_rollouts):
  env = FlappyBirdEnvSimple()
  obs = env.reset()
  while True:
      # Next action:
      # (feed the observation to your agent here)
      action = np.random.randint(2) # for a random action

      # Processing:
      next_obs, reward, done, info = env.step(action)
      # the observation is the normalized horizontal and vertical distance to the next pipe

      # record the transition
      x, y = snap_to_grid(obs[0], obs[1])
      next_x, next_y = snap_to_grid(next_obs[0], next_obs[1])
      transition_matrix[x, y, action, next_x, next_y] += 1
      if done:
          break
# now we need to normalize the transition matrices
# I'm writing out the entire loop for clarity but we really should vectorize this
for x in range(discretization):
  for y in range(discretization):
    for a in range(2):
      for next_x in range(discretization):
        for next_y in range(discretization):
          normalization = np.sum(transition_matrix[x, y, a, :, :])
          if normalization > 0:
            transition_matrix[x, y, a, next_x, next_y] /= normalization
    

In [66]:
# okay, now we can run value iteration 
discount = 0.99
epsilon = 0.001
value = np.zeros((discretization,discretization))
policy = np.zeros((discretization,discretization))
for i in range(100):
  new_value = np.zeros((discretization,discretization))
  for x in range(discretization):
    for y in range(discretization):
      best_action = 0
      best_value = -np.inf
      for a in range(2):
        value = 0
        for next_x in range(discretization):
          for next_y in range(discretization):
            value += transition_matrix[x, y, a, next_x, next_y] * (reward + discount * new_value[next_x, next_y])
        if value > best_value:
          best_value = value
          best_action = a
      new_value[x, y] = best_value
      policy[x, y] = best_action
  error = np.max(np.abs(new_value - value))
  value = new_value
print(error)

3.2172737863415004


In [64]:
# okay, now lets play the policy!
env = FlappyBirdEnvSimple()
obs = env.reset()
imgs = []
while True:
    # Next action:
    # (feed the observation to your agent here)
    x, y = snap_to_grid(obs[0], obs[1])
    action = policy[x, y]

    # Processing:
    next_obs, reward, done, info = env.step(action)

    # Rendering the game:
    env.render()
    imgs.append(env.get_image().transpose((1, 0, 2)))
    time.sleep(1 / 30)  # FPS

    # Checking if the player is still alive
    if done:
        break
    obs = next_obs

In [55]:
# lets make a movie!
%matplotlib inline
from matplotlib import pyplot as plt
from matplotlib import animation
from IPython.display import HTML

# np array with shape (frames, height, width, channels)
fig = plt.figure()
im = plt.imshow(imgs[0])

plt.close() # this is required to not display the generated image

def init():
    im.set_data(imgs[0])

def animate(i):
    im.set_data(imgs[i])
    return im

anim = animation.FuncAnimation(fig, animate, init_func=init, frames=len(imgs),
                               interval=50)
HTML(anim.to_html5_video())