# Deep Reinforcement learning

Author : Nicholas Setijabudiharsa

TSE - Econometrics and Statistics : Data science for social science

2022

## 1 Reinforcement learning

### 1.1 Definitions and  Framework

In order to better understand the idea of reinforcement learning, lets take an example of a game of chess. Theoritically speaking, one can create an brute force algorithm that analyze all possible states of the game and takes into account all the possible states that can come after the current state, and take the action that give the most probability of winning at the end. However, this method is very inefficient. In 2021, [John Tromp](https://github.com/tromp/ChessPositionRanking) did an analysis and found that there are about $4.8 x 10^{44}$ possible combinations of legal chess positions in chess. For perspective, here is the full form of that number

$$480000000000000000000000000000000000000000000$$

So for sure, this process is very inefficient. This is where Reinforcement learning comes in handy.



Reinforcement learning is the process optimizing an agent's action $a$ of a given state $s$ by optimizing it's policy $\pi(s, a)$ where
$$
\pi(s, a) = Pr(a=a | s=s)
$$

This allows us to define a notion of Value for a given state for a given policy which will be the expected discounted future rewards or in mathematical terms

$$
V_\pi(s) = E(\sum_t\gamma^t r_t |s_0=s)
$$

with $\gamma$ to be the discount rate.


With our given framework we can now understand that the goal of Reinforcement learning is a **maximization problem** where we **optimize** our agent's policy to **maximize** future rewards.

Generally we also normally assume our model is based on a markovian decision process, where we dont take the next steps with certainty but instead given a current state $s_t$, we assume that $s_{t+1}$ adopts a certain probability distribution. For example, any games that includes rolling a dice.

An environment where we get an information about the reward after every action an agent take, is called **Dense** rewards, on the other hand, environment such as playing a simple game of chess where we only receive rewards at the end of the game (whether we win or loses), is called **Sparse** rewards, naturally, the denser the rewards, the more efficient our data will be and therefore faster our learning will be.

## 2 Deep Reinforcement Learning [draft only](https://www.youtube.com/watch?v=wDVteayWWvU&ab_channel=SteveBrunton)

### 2.1 Deep policy network

Takes input state and returns policy $\pi_\theta(s, a)$ where $\theta$ is weight of our NN

### 2.2 Deep Q Network

## 3 Implementation

### Preliminary

In [1]:
!pip install pygame stable_baselines3 tensorflow==1.15.0 gym==0.21.0 Box2D stable-baselines3[extra]
# SOURCE : https://www.101computing.net/breakout-tutorial-using-pygame-getting-started/

#Import the pygame library and initialise the game engine
import pygame
from random import randint
from pygame.locals import (
    K_LEFT,
    K_RIGHT
)
import sys
import math
import random
import numpy as np
import gym
from gym.utils import seeding
from gym.spaces import Discrete, Box, Dict

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting tensorboard<1.16.0,>=1.15.0
  Using cached tensorboard-1.15.0-py3-none-any.whl (3.8 MB)
Collecting stable-baselines3[extra]
  Downloading stable_baselines3-1.6.1-py3-none-any.whl (180 kB)
[K     |████████████████████████████████| 180 kB 11.2 MB/s 
[?25h  Downloading stable_baselines3-1.6.0-py3-none-any.whl (177 kB)
[K     |████████████████████████████████| 177 kB 43.9 MB/s 
[?25hCollecting protobuf>=3.6.1
  Downloading protobuf-3.19.6-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.1 MB)
[K     |████████████████████████████████| 1.1 MB 30.5 MB/s 
[?25hCollecting stable-baselines3[extra]
  Downloading stable_baselines3-1.5.0-py3-none-any.whl (177 kB)
[K     |████████████████████████████████| 177 kB 44.0 MB/s 
[?25h  Downloading stable_baselines3-1.4.0-py3-none-any.whl (176 kB)
[K     |████████████████████████████████| 176 kB 44.7 MB/s 
[?25h  Downloading s

In [3]:
BLACK = (0, 0, 0)
 
class Ball(pygame.sprite.Sprite):
    #This class represents a ball. It derives from the "Sprite" class in Pygame.
    
    def __init__(self, color, width, height):
        # Call the parent class (Sprite) constructor
        super().__init__()
        
        # Pass in the color of the ball, its width and height.
        # Set the background color and set it to be transparent
        self.image = pygame.Surface([width, height])
        self.image.fill(BLACK)
        self.image.set_colorkey(BLACK)
 
        # Draw the ball (a rectangle!)
        pygame.draw.rect(self.image, color, [0, 0, width, height])
        
        self.velocity = [randint(4,8),randint(-8,8)]
        
        # Fetch the rectangle object that has the dimensions of the image.
        self.rect = self.image.get_rect()
        
    def update(self):
        self.rect.x += self.velocity[0]
        self.rect.y += self.velocity[1]
          
    def bounce(self):
        self.velocity[0] = -self.velocity[0]
        self.velocity[1] = randint(-8,8)

class Brick(pygame.sprite.Sprite):
    #This class represents a brick. It derives from the "Sprite" class in Pygame.

    def __init__(self, color, width, height):
        # Call the parent class (Sprite) constructor
        super().__init__()

        # Pass in the color of the brick, and its x and y position, width and height.
        # Set the background color and set it to be transparent
        self.image = pygame.Surface([width, height])
        self.image.fill(BLACK)
        self.image.set_colorkey(BLACK)

        # Draw the brick (a rectangle!)
        pygame.draw.rect(self.image, color, [0, 0, width, height])

        # Fetch the rectangle object that has the dimensions of the image.
        self.rect = self.image.get_rect()


class Paddle(pygame.sprite.Sprite):
    #This class represents a paddle. It derives from the "Sprite" class in Pygame.

    def __init__(self, color, width, height):
        # Call the parent class (Sprite) constructor
        super().__init__()

        # Pass in the color of the paddle, its width and height.
        # Set the background color and set it to be transparent
        self.image = pygame.Surface([width, height])
        self.image.fill(BLACK)
        self.image.set_colorkey(BLACK)

        # Draw the paddle (a rectangle!)
        pygame.draw.rect(self.image, color, [0, 0, width, height])

        # Fetch the rectangle object that has the dimensions of the image.
        self.rect = self.image.get_rect()


    def moveLeft(self, pixels):
        self.rect.x -= pixels
	    #Check that you are not going too far (off the screen)
        if self.rect.x < 0:
          self.rect.x = 0

    def moveRight(self, pixels):
        self.rect.x += pixels
        #Check that you are not going too far (off the screen)
        if self.rect.x > 700:
          self.rect.x = 700

class Game:
    def __init__( self, mode='human',  lives=3, framerate = 60):
        pygame.init()
        # Define some colors
        self.WHITE = (255,255,255)
        self.DARKBLUE = (36,90,190)
        self.LIGHTBLUE = (0,176,240)
        self.RED = (255,0,0)
        self.ORANGE = (255,100,0)
        self.YELLOW = (255,255,0)
        self.GREEN = (0, 255, 0)
        self.BRIGHT_BLUE = (0, 255, 255)
        self.COLORS = [
            self.RED,
            self.ORANGE,
            self.YELLOW,
            self.GREEN,
            self.BRIGHT_BLUE
        ]
        self.PADDLE_WIDTH = 100
        self.PADDLE_HEIGHT = 10
        self.PADDLE_INITIAL_POSITION_X = 350
        self.PADDLE_INITIAL_POSITION_Y = 700
        self.BALL_DIMENSION = 10
        self.BALL_INITIAL_POSITION_X = 345
        self.BALL_INITIAL_POSITION_Y = 700

        self.BRICK_WIDTH = 50
        self.BRICK_HEIGHT = 10
        self.BRICK_N_COLUMN = 10
        self.BRICK_N_ROW = 20
        self.BRICK_MARGIN = 60
        self.BRICK_MARGIN_COLUMN = 20
        self.BRICK_MARGIN_ROW = 5

        self.score = 0
        self.lives = lives
        self.mode = mode
        if self.mode == 'human':
            self.screen_mode = pygame.SHOWN
        else:
            self.screen_mode = pygame.HIDDEN

        self.framerate = framerate

        # Open a new window
        self.size = (800, 800)
        self.screen = pygame.display.set_mode(self.size)
        pygame.display.set_caption("Breakout Game")

        #This will be a list that will contain all the sprites we intend to use in our game.
        self.all_sprites_list = pygame.sprite.Group()

        #Create the Paddle
        self.paddle = Paddle(self.LIGHTBLUE, self.PADDLE_WIDTH, self.PADDLE_HEIGHT)
        self.paddle.rect.x = self.PADDLE_INITIAL_POSITION_X
        self.paddle.rect.y = self.PADDLE_INITIAL_POSITION_Y
        self.all_sprites_list.add(self.paddle)

        #Create the ball sprite
        self.ball = Ball(self.WHITE,self.BALL_DIMENSION, self.BALL_DIMENSION)
        self.ball.rect.x = self.BALL_INITIAL_POSITION_X
        self.ball.rect.y = self.BALL_INITIAL_POSITION_Y
        self.all_sprites_list.add(self.ball)

        self.all_bricks = pygame.sprite.Group()
        for j in range(self.BRICK_N_ROW):
            for i in range(self.BRICK_N_COLUMN):
                brick = Brick(self.COLORS[j % len(self.COLORS)],self.BRICK_WIDTH,self.BRICK_HEIGHT)
                brick.rect.x = self.BRICK_MARGIN + i* (self.BRICK_WIDTH + self.BRICK_MARGIN_COLUMN)
                brick.rect.y = self.BRICK_MARGIN + j* (self.BRICK_HEIGHT + self.BRICK_MARGIN_ROW)
                self.all_sprites_list.add(brick)
                self.all_bricks.add(brick)

        # The clock will be used to control how fast the screen updates
        self.clock = pygame.time.Clock()

    def get_action(self, pressed_keys):
        right = pressed_keys[K_RIGHT]
        left = pressed_keys[K_LEFT]
        n_pressed = sum([right, left])
        if n_pressed != 1:
            action = 0
        elif n_pressed == 1 and right:
            action = 1
        elif n_pressed == 1 and left:
            action = 2
        else:
            action = 0
        return action

    def step_frame(self, action):
        if action == 1:
            self.paddle.moveRight(5)
        if action == 2:
            self.paddle.moveLeft(5)
        # To make sure it doesnt do endless loop
        if self.ball.velocity[1] == 0:
            self.ball.velocity[1] = 1
        # --- Game logic should go here
        self.all_sprites_list.update()
        #Check if the ball is bouncing against any of the 4 walls:
        if self.ball.rect.x>=800:
            self.ball.velocity[0] = -self.ball.velocity[0]
        if self.ball.rect.x<=0:
            self.ball.velocity[0] = -self.ball.velocity[0]
        if self.ball.rect.y>790:
            self.ball.velocity[1] = -self.ball.velocity[1]
            self.lives -= 1
            self.ball.rect.x = self.BALL_INITIAL_POSITION_X
            self.ball.rect.y = self.BALL_INITIAL_POSITION_Y
            self.ball.velocity = [randint(4,8),randint(-8,8)]

            if self.lives == 0:
                #Display Game Over Message for 3 seconds
                font = pygame.font.Font(None, 74)
                text = font.render("GAME OVER", 1, self.WHITE)
                self.screen.blit(text, (250,300))
                pygame.display.flip()
                pygame.time.wait(3000)
                #Stop the Game
                self.carryOn=False
        if self.ball.rect.y<40:
            self.ball.velocity[1] = -self.ball.velocity[1]
        #Detect collisions between the ball and the paddles
        if pygame.sprite.collide_mask(self.ball, self.paddle):
            self.ball.rect.x -= self.ball.velocity[0]
            self.ball.rect.y -= self.ball.velocity[1]
            self.ball.bounce()
        #Check if there is the ball collides with any of bricks
        brick_collision_list = pygame.sprite.spritecollide(self.ball, self.all_bricks,False)
        for brick in brick_collision_list:
            self.ball.bounce()
            self.score += 1
            brick.kill()
            if len(self.all_bricks)==0:
                #Display Level Complete Message for 3 seconds
                    font = pygame.font.Font(None, 74)
                    text = font.render("LEVEL COMPLETE", 1, self.WHITE)
                    self.screen.blit(text, (200,300))
                    pygame.display.flip()
                    pygame.time.wait(3000)
                    #Stop the Game
                    self.carryOn=False
        self.update_screen()

        
    def turn_on_screen(self):
        self.screen = pygame.display.set_mode(
            self.size, 
            flags=pygame.SHOWN
        )
        self.include_info=True
    
    def update_screen(self):
        # --- Drawing code should go here
        # First, clear the screen to dark blue.
        self.screen.fill(self.DARKBLUE)
        pygame.draw.line(self.screen, self.WHITE, [0, 38], [800, 38], 2)
        #Display the score and the number of lives at the top of the screen
        font = pygame.font.Font(None, 34)
        text = font.render("Score: " + str(self.score), 1, self.WHITE)
        self.screen.blit(text, (20,10))
        text = font.render("Lives: " + str(self.lives), 1, self.WHITE)
        self.screen.blit(text, (650,10))
        #Now let's draw all the sprites in one go. (For now we only have 2 sprites!)
        self.all_sprites_list.draw(self.screen)

    def render_screen(self):
        pygame.display.flip()

    def play(self):
        # The loop will carry on until the user exits the game (e.g. clicks the close button).
        self.carryOn = True
        # -------- Main Program Loop -----------
        while self.carryOn:
            # --- Main event loop
            for event in pygame.event.get(): # User did something
                if event.type == pygame.QUIT: # If user clicked close
                    self.carryOn = False # Flag that we are done so we exit this loop
            #Moving the paddle when the use uses the arrow keys
            pressed_keys = pygame.key.get_pressed()
            action = self.get_action(pressed_keys)
            self.step_frame(action)
            self.render_screen()
            # --- Limit to 60 frames per second
            self.clock.tick(60)


### Gym

In [6]:
class BreakoutEnv(gym.Env):
    """ Custom PyGame OpenAI Gym Environment 
    The user has the following discrete actions:
     - 0: Don't move
     - 1: Right
     - 2: Left
     
    The environment will provide the score as the rewards
    """
    metadata = {'render.modes': ['human', 'rgb_array']}
    def __init__(
        self, 
        mode = 'agent',
        lives = 3,
        framerate = 60,
        output_size=64
    ):
        self.mode = mode
        self.lives_start = lives
        self.output_size = output_size
        self.framerate = framerate
        self.game = self.init_game()
        self.iteration = 0
        self.iteration_max = 15 * 60 * self.game.framerate  # 15 minutes
        self.init_obs = self.get_state()

        self.action_space = Discrete(3)
        self.observation_space = Dict({
            "paddle_x" : Discrete(800),
            "paddle_y" : Discrete(800),
            "ball_x" : Discrete(800),
            "ball_y" : Discrete(800),
            "ball_v_x" :  Discrete(20),
            "ball_v_y" : Discrete(20)
        })

        self.reward_range = (-1, 0, 1)

    def init_game(self):
        game = Game(
            mode=self.mode,
            lives=self.lives_start,
            framerate=self.framerate
        )
        return game


    def step(self, action):
        """Run one timestep of the environment's dynamics. When end of
        episode is reached, you are responsible for calling `reset()`
        to reset this environment's state.
        Accepts an action and returns a tuple (observation, reward, done, info).
        Args:
            action (object): an action provided by the agent
        Returns:
            observation (object): agent's observation of the current environment
            reward (float) : amount of reward returned after previous action
            done (bool): whether the episode has ended, in which case further step() calls will return undefined results
            info (dict): contains auxiliary diagnostic information (helpful for debugging, and sometimes learning)
        """

        # Step frame

        print(action)

        self.game.step_frame(action)
        self.iteration += 1

        # Gather observation
        observation = self.get_state()

        print(observation)
        # Check stop conditions
        if self.game.lives == 0:
            done = True
        elif self.iteration > self.iteration_max:
            done = True
        else:
            done = False

        # Gather metadata/info
        info = {
            'iteration': self.iteration
        }
        reward = self.game.score
        return (observation, reward, done, info)

    def reset(self):
        """Resets the environment to an initial state and returns an initial
        observation.
        Note that this function should not reset the environment's random
        number generator(s); random variables in the environment's state should
        be sampled independently between multiple calls to `reset()`. In other
        words, each call of `reset()` should yield an environment suitable for
        a new episode, independent of previous episodes.
        Returns:
            observation (object): the initial observation.
        """
        self.game = self.init_game()
        self.iteration = 0
        observation = self.get_state()
        return observation

    def render(self, mode, render_lidar=False):
        """Renders the environment.
        The set of supported modes varies per environment. (And some
        environments do not support rendering at all.) By convention,
        if mode is:
        - human: render to the current display or terminal and
            return nothing. Usually for human consumption.
        - rgb_array: Return an numpy.ndarray with shape (x, y, 3),
            representing RGB values for an x-by-y pixel image, suitable
            for turning into a video.
        - ansi: Return a string (str) or StringIO.StringIO containing a
            terminal-style text representation. The text can include newlines
            and ANSI escape sequences (e.g. for colors).
        Note:
            Make sure that your class's metadata 'render.modes' key includes
                the list of supported modes. It's recommended to call super()
                in implementations to use the functionality of this method.
        Args:
            mode (str): the mode to render with
        Example:
        class MyEnv(Env):
            metadata = {'render.modes': ['human', 'rgb_array']}
            def render(self, mode='human'):
                if mode == 'rgb_array':
                    return np.array(...) # return RGB frame suitable for video
                elif mode == 'human':
                    ... # pop up a window and render
                else:
                    super(MyEnv, self).render(mode=mode) # just raise an exception
        """
        if mode == 'human':
            self.game.turn_on_screen()
            self.game.update_screen()
            self.game.render_screen()
        if mode == 'rgb_array':
            return self.get_rgb_array()
    
    def close(self):
        """Override close in your subclass to perform any necessary cleanup.
        Environments will automatically close() themselves when
        garbage collected or when the program exits.
        """
        pass

    def seed(self, seed=None):
        """Sets the seed for this env's random number generator(s).
        Note:
            Some environments use multiple pseudorandom number generators.
            We want to capture all such seeds used in order to ensure that
            there aren't accidental correlations between multiple generators.
        Returns:
            list<bigint>: Returns the list of seeds used in this env's random
              number generators. The first value in the list should be the
              "main" seed, or the value which a reproducer should pass to
              'seed'. Often, the main seed equals the provided 'seed', but
              this won't be true if seed=None, for example.
        """
        self.np_random, seed = seeding.np_random(seed)
        return [seed]

    def get_state(self):
        return {
            "paddle_x" : self.game.paddle.rect.x,
            "paddle_y" : self.game.paddle.rect.y,
            "ball_x" : self.game.ball.rect.x,
            "ball_y" : self.game.ball.rect.y,
            "ball_v_x" :self.game.ball.velocity[0],
            "ball_v_y" :self.game.ball.velocity[1]
        }
          
      
        
    def get_rgb_state(self):
        rgb_array = self.get_rgb_array()
        rgb_array = self.down_sample_rgb_array(rgb_array, self.output_size)
        rgb_array = rgb_array[:, :, 0]
        rgb_array = np.reshape(rgb_array, (self.output_size, self.output_size, 1))
        rgb_array = rgb_array.astype(np.uint8)
        return rgb_array

    def get_rgb_array(self):
        surf = pygame.display.get_surface()
        array = pygame.surfarray.array3d(surf).astype(np.float16)
        array = np.rot90(array)
        array = np.flip(array)
        array = np.fliplr(array)
        array = array.astype(np.uint8)
        return array

    def down_sample_rgb_array(self, array, output_size):
        bin_size = int(self.game.screen_size / output_size)
        array_ds = array.reshape((output_size, bin_size, output_size, bin_size, 3)).max(3).max(1)
        return array_ds


In [8]:
#breakout = Game()
#breakout.play()

from stable_baselines3 import DQN
from stable_baselines3.common.evaluation import evaluate_policy

env = BreakoutEnv()
env = gym.make("LunarLander-v2")

# Instantiate the agent
model = DQN("MlpPolicy", env, verbose=1)
# Train the agent and display a progress bar
model.learn(total_timesteps=int(2e5))

Using cpu device
Wrapping the env in a DummyVecEnv.
----------------------------------
| rollout/            |          |
|    exploration rate | 0.982    |
| time/               |          |
|    episodes         | 4        |
|    fps              | 3436     |
|    time_elapsed     | 0        |
|    total timesteps  | 387      |
----------------------------------
----------------------------------
| rollout/            |          |
|    exploration rate | 0.967    |
| time/               |          |
|    episodes         | 8        |
|    fps              | 2472     |
|    time_elapsed     | 0        |
|    total timesteps  | 701      |
----------------------------------
----------------------------------
| rollout/            |          |
|    exploration rate | 0.948    |
| time/               |          |
|    episodes         | 12       |
|    fps              | 1902     |
|    time_elapsed     | 0        |
|    total timesteps  | 1087     |
----------------------------------
---

<stable_baselines3.dqn.dqn.DQN at 0x7fd70010ae90>

In [9]:
mean_reward, std_reward = evaluate_policy(model, model.get_env(), n_eval_episodes=10)


In [12]:
# Enjoy trained agent
obs = env.reset()
for i in range(1000):
    action, _states = model.predict(obs, deterministic=True)
    obs, rewards, dones, info = env.step(action)
    env.render()

NoSuchDisplayException: ignored