First we import some necessary modules.

In [2]:
import os
from random import randrange

import pickle
import numpy as np
from sklearn.neural_network import MLPRegressor

To train an agent with Reinforcement Learning we need an environment the agent can act in and that gives the agent feedback for its actions.

For this we are going to use the PyGame Learning Environment (PLE): https://github.com/ntasfi/PyGame-Learning-Environment.

To install this, first pygame has to be installed: https://www.pygame.org/wiki/GettingStarted#Pygame%20Installation

Follow the install instructions for pygame and install the PLE afterwards. If you have done everything correctly, the following imports should work.

In [3]:
from ple import PLE # the main wrapper for the environment
from ple.games.flappybird import FlappyBird # a single game environment, you can try others

Now we can initialize a game and environment instance. After executing the following code, a black window should appear.

In [4]:
game = FlappyBird()
env = PLE(game, fps=30, display_screen=True, force_fps=False)
env.init()

The game waits for our agent to pick and execute an action.

In [5]:
while not env.game_over():
    reward = env.act(119) # do a "Flap"
    #reward = env.act(None) # do nothing
env.reset_game()

If we want to read out the current game state of the environment, we need to pass the environment a preprocessor-function, that translates the dictionary given by the environment into a numpy matrix.

In [6]:
def preprocessor(game_state):
    # the game state for FlappyBird is a dictionary with 8 entries:
    #    - player_y                      : the y-position of the bird
    #    - player_vel                    : the velocity of the bird (pointed downwards)
    #    - next_pipe_dist_to_player      : distance to the next pipe
    #    - next_pipe_top_y               : top position of the next pipe
    #    - next_pipe_bottom_y            : bottom position of the next pipe
    #    - next_next_pipe_dist_to_player : distance to the next next pipe
    #    - next_next_pipe_top_y          : top position of the next next pipe
    #    - next_next_pipe_bottom_y       : bottom position of the next next pipe
    
    # For our purposes it is better, if we normalize the input
    # EXERCISE: Use the values below to normalize the game state values.
    # Screen-Height: 512
    # Screen-Width: 288
    # Distance to next next pipe: 1.65 * Screen_Width
    total_height = 512
    total_width = 288
    max_velocity = 10   
    
    preprocessed_state = np.empty((1, 8), dtype=float)
    preprocessed_state[0][0] = float(game_state["player_y"])
    preprocessed_state[0][1] = float(game_state["player_vel"])
    preprocessed_state[0][2] = float(game_state["next_pipe_dist_to_player"])
    preprocessed_state[0][3] = float(game_state["next_pipe_top_y"])
    preprocessed_state[0][4] = float(game_state["next_pipe_bottom_y"])
    preprocessed_state[0][5] = float(game_state["next_next_pipe_dist_to_player"])
    preprocessed_state[0][6] = float(game_state["next_next_pipe_top_y"])
    preprocessed_state[0][7] = float(game_state["next_next_pipe_bottom_y"])

    return preprocessed_state

Now we can initialize the environment with this preprocessor and read out the game state.

In [7]:
env = PLE(game, fps=30, display_screen=True, force_fps=False,
         state_preprocessor=preprocessor)
env.init()

In [8]:
env.getGameState()

array([[ 256.,   -8.,  283.,   71.,  171.,  427.,   90.,  190.]])

Now we need to define an agent, that is able to interact and learn with the environment.

We can either use a Q-Table approach, storing the Q-values in a "state-lookup-table" or use an estimator, like a neural network. We are going to use the latter. 

EXERCISE: Give an explanation why a table approach might be insufficient. What does an estimator do better?

In [9]:
class RLAgent():
    
    # the constructor for our agent
    def __init__(self, actions):
        self.actions = actions # [119, None] for FlappyBird
        self.q_estimator = self.getQEstimator()
        self.discount_factor = 0.95
        self.number_of_updates = 0
        
    def getQEstimator(self):
        # EXERCISE: Initialize a MLPRegressor from sklearn.neural_network 
        # (see http://scikit-learn.org/stable/modules/generated/sklearn.neural_network.MLPRegressor.html).
        # In the constructor various variables can be set, most importantly the number and size of the hidden layers.
        # E.g.: hidden_layer_sizes=(100,100) would create two hidden layers with 100 neurons each.
        # Hint: To correctly initialize the estimator, call its fit-function after construction
        # with "dummy"-matrices in the shape of the designated input and output.
        
        estimator = None
        
        
        return estimator
    
    def pickAction(self, state, select_greedily=False):
        # EXERCISE: During training we want a trade-off between exploration and exploitation.
        # To achieve this, implement a function that by chance selects an action randomly.
        # That chance should be very high at the beginning of training and decrease over the number of update steps.
        # You can use self.number_of_updates to determine how likely a random choice should be.
        # If the selection is not random, then the action with the maximum Q-value should be chosen (see np.argmax).
        
        # Predict the Q-values with our agent's estimator.
        q_values = np.zeros((1, len(self.actions))) # <= dummy implementation
        
        random_action_index = randrange(0, len(self.actions))
        
        # Return the selected action and the predicted Q-values.
        return self.actions[random_action_index], q_values
        
    def update(self, state, action, sucessor_state, reward, is_terminal, q_values):
        
        # Mapping the action to the corresponding index
        action_index = self.actions.index(action)
        
        # Initialize target vector with old Q-values.
        target = np.zeros((1, len(self.actions)))
        target[:] = q_values[:]
        
        # EXERCISE: 
        # To update the Q-value estimation, we need to implement the Q-Learning algorithm (see the lecture slides).
        # We already have the state "s", a selected action "a", the reward "r" and the new state "s'".
        # Implement the update step according to the Q-Learning algorithm.
        # A few hints:
        # 1. If a terminal state has been reached, then the target value is only the received reward (Q(s,a) <- reward).
        # 2. Only update the Q-value for the chosen action and don't forget to keep the other values unchanged.
        
        
        
        
        self.number_of_updates += 1
        
       
    def save(self, iteration):
        filepath = "save_" + str(iteration) + ".pkl"
        with open(filepath, "wb") as pkl:
            pickle.dump(self.q_estimator, pkl)
            
    def load(self, iteration):
        filepath = "save_" + str(iteration) + ".pkl"
        with open(filepath, "rb") as pkl:
            self.q_estimator = pickle.load(pkl)

Now we can train our agent by playing a number of episodes and updating the Q-value estimation step by step.

The following code block executes a defined number of episodes and trains the agent using the methods defined above.
You can and should add a few things:

* Save the agent's Q-estimator during training in regular intervals (e.g. after every x-th update).
* Print out interesting metrics, like (average) reward per episode or episode length.
* Add a few evaluation steps during or after training that always picks actions greedily.
* Tune the various parameters for the MLP (net layout, activation-function, ...) or the environment (reward-values, frame_skip, ...).
* If you have reached a good performing agent, try another game environemtn.

The training can take a while before the agent shows some kind of performance improvement, but after at most a few hours you should be able to make out some improvements.

In [11]:
training_episodes = 100
episode_counter = 0

# This contains the rewards given to the agent based on different actions in game.
# Tick is rewarded on each time step, while the others are awarded based on the game state.
# E.g.: passing a pipe results in a positive reward while touching a pipe will result in a loss.
# You can adjust the reward to achieve a better result. (There is no win or negative loss in the FlappyBird game.)
reward_values = {"positive" : 1.0,
                 "negative" : -1.0,
                 "tick": 0.0,
                 "loss" : -5.0,
                 "win" : 5.0}

# The number of frames that an action is executed on.
# For a value of 1, a new action is chosen for every frame.
# For a value of 4, a new action is chosen every 4th frame and then executed for 4 frames.
# A higher number leads to a slower reaction of our agent, 
# but increases the influence one selection has and also allows for a faster computation.
frame_skip = 1

# If you want to train without a window opening:
# 1. Set display_screen=False
# 2. Uncomment the following line of code:
# os.environ["SDL_VIDEODRIVER"] = "dummy"

# If you want to evaluate in "normal" speed while watching the agent on the screen:
# 1. Set display_screen=True
# 2. Set force_fps=False
env = PLE(game, fps=30, display_screen=True, force_fps=True, 
         state_preprocessor=preprocessor,
         frame_skip=1,
         reward_values=reward_values)
env.init()

agent = RLAgent(env.getActionSet())

while episode_counter < training_episodes:
    
    state = env.getGameState()
    
    # Pick an action based on the state and execute it in the environment to receive the reward.
    action, q_values = agent.pickAction(state, select_greedily=False)
    reward = env.act(action) 
    
    successor_state = env.getGameState()
    is_terminal = env.game_over()
    
    # Update the agent's Q-value estimation using the current observation.
    agent.update(state, action, successor_state, reward, is_terminal, q_values)
    
    if is_terminal:
        env.reset_game()
        episode_counter += 1