<a href="https://colab.research.google.com/github/wikistat/AI-Frameworks/blob/master/IntroductionDeepReinforcementLearning/Deep_Q_Learning_GridWorld.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# [IA Frameworks](https://github.com/wikistat/AI-Frameworks) - Introduction to Deep Reinforcement Learning 

<center>
<a href="http://www.insa-toulouse.fr/" ><img src="http://www.math.univ-toulouse.fr/~besse/Wikistat/Images/logo-insa.jpg" style="float:left; max-width: 120px; display: inline" alt="INSA"/></a> 
<a href="http://wikistat.fr/" ><img src="http://www.math.univ-toulouse.fr/~besse/Wikistat/Images/wikistat.jpg" width=400, style="max-width: 150px; display: inline"  alt="Wikistat"/></a>
<a href="http://www.math.univ-toulouse.fr/" ><img src="http://www.math.univ-toulouse.fr/~besse/Wikistat/Images/logo_imt.jpg" width=400,  style="float:right;  display: inline" alt="IMT"/> </a>
    
</center>

# Part 1c : Deep Q-Network on Gridworld
The objectives of this noteboks are the following : 

* Implement DQN With replay memory and Target network to solve gridworld (a pacman-like game).
* [OPTIONAL] Implement D3QN on the same game.
* [OPTIONAL] Implement those algorithm to solve real pacman game.


# Files & Data (Google Colab)

If you're running this notebook on Google colab, you do not have access to the `solutions` folder you get by cloning the repository locally. 

The following lines will allow you to build the folders and the files you need for this TP.

**WARNING 1** Do not run this line localy.
**WARNING 2** The magic command `%load` does not work work on google colab, you will have to copy-paste the solution on the notebook.

In [None]:
! mkdir solution
! wget -P solution https://github.com/wikistat/AI-Frameworks/raw/master/IntroductionDeepReinforcementLearning/solutions/Qnetwork_class.py
! wget -P solution https://github.com/wikistat/AI-Frameworks/raw/master/IntroductionDeepReinforcementLearning/solutions/DQN_gridworld_class.py
! wget -P solution https://github.com/wikistat/AI-Frameworks/raw/master/IntroductionDeepReinforcementLearning/solutions/play_game_with_mainq.py
! wget -P solution https://github.com/wikistat/AI-Frameworks/raw/master/IntroductionDeepReinforcementLearning/solutions/dueling.py
! wget -P solution https://github.com/wikistat/AI-Frameworks/raw/master/IntroductionDeepReinforcementLearning/solutions/double_dqn.py
! wget -P . https://github.com/wikistat/AI-Frameworks/raw/master/IntroductionDeepReinforcementLearning/gridworld.py
! wget -P . https://github.com/wikistat/AI-Frameworks/raw/master/IntroductionDeepReinforcementLearning/experience_replay.py   

# Import librairies

In [None]:
import numpy as np
from datetime import datetime
import collections

# Tensorflow
import tensorflow.keras.models as km
import tensorflow.keras.layers as kl
import tensorflow.keras.backend as K

# To plot figures and animations
import matplotlib.animation as animation
import matplotlib.pyplot as plt
from IPython.display import HTML

The following functions enable to build a video from a list of images. <br>
They will be used to build video of the game you will played.

In [None]:
def update_scene(num, frames, patch):
    patch.set_data(frames[num])
    return patch,

def plot_animation(frames, repeat=False, interval=400):
    plt.close()  # or else nbagg sometimes plots in the previous cell
    fig = plt.figure()
    patch = plt.imshow(frames[0])
    plt.axis('off')
    return animation.FuncAnimation(fig, update_scene, fargs=(frames, patch), frames=len(frames), repeat=repeat, interval=interval)

# Grid World environment

The GridWorld environment will be used all along this notebook. <br>
This environment has been developed by Arthur Juliani and original code can be found on his [github](https://github.com/awjuliani/DeepRL-Agents/blob/master/gridworld.py).

We have a 5x5 blocs where blue, green and red squares. In the environment:

* the **agent** controls a *blue* square
* the goal is to navigate to the *green* squares (**reward** +1) 
* and avoiding the *red* squares (**reward** -1).

This game can be seen as a simplify pacman game. The *blue* square is the pacman, the *green* square are the pellets and give positive rewards and the *red* square are the ghost and give negative rewards.<br>
*Green* and *red* square do not move.


### Observation

The observation is the image itself which is a 84x84x3 images.

### Actions

Num | Action
--- | ---
0 | Go Up
1 | Go Down
2 | Go Left
3 | Go Right


### Reward
Reward is 1 for every green square taken and -1 when a red square is taken

### End epsiode
There are no condition limit for an episode to finish

The following code initiate an environment and display the first state.

In [None]:
from gridworld import gameEnv
env = gameEnv(partial=False, size=5)

**Exercise** Let's play a game manually to understand how it works. <br>
Fill the `actions_list` with steps to move the blue square in order to catch green square and win points

The last line of code `HTML(plot_animation(frames).to_html5_video())` enable to display the video of the game from the list of state produced during the game.

In [None]:
frames = []
cum_reward=0
actions_list=[]
for step in actions_list:
    state, reward , end = env.step(step)
    frames.append(state)
    cum_reward+=reward
print(cum_reward)
HTML(plot_animation(frames).to_html5_video())

# DEEP Q Learning on *Gridworld*

The objective of this section is to implement a **Deep Q-learning** tha will be able to play (correctly) gridworld environment.

For that 3 python class will be required:

* `ExperienceReplay`: A class that implement the **Experience Replay Buffer**. (Already implemented)
* `Qnetwork`: A class that will enable to use a function that approximate the Q-values
* `DQN`: A class that will enable to train the Qnetowrk


All the instructions of this section are in this notebook belows. 

However you will have the possibility to 
* work with the scripts DQN_gridworld.py and DQN_gridworld_test.py that can be found in the `IntroductionDeepReinforcementLearning`folder
* work with the codes in cells of this notebok. 


## Experience Replay Buffer


The **Experience Replay Buffer** is where all the agent's experience will be stored and where *batch* will be generate from in order to train the *Q network*  

you can either use the `ExperienceReplay`class solution or use the one you define in the previous notebook.

In [None]:
from experience_replay import ExperienceReplay

## Q network

As in the previous notebook we will use **Deep Q-learning**  to generate the approximation of the *Q-value* rather than remembering the solutions of all (s,a) couple.

As the input of the function (the observations) is the *state* itself, (an image) , we will use a **convolutional neural network** model to approximate the Q values.

Later, we will generate targets from exeperiences and train this CNN.

The `Qnetwork` class below defines the architecture of this *convolutional neural network*.

**Exercise** 

The architecture of the *cnn* as been set for you, as it can requires various iterations to defined it and can takes some times. <br>
It is compose of 4 convolutional layers and two dens layer. <br>

However, the shape of the input as well as the number of neurons and the activation function  of the last layer are not filled.<br>
Fill the gap so that this network can be use to approximate *Q-values*

In [None]:
class Qnetwork:
    def __init__(self):
        self.inputs = kl.Input(shape=??, name="main_input")

        self.model = kl.Conv2D(
            filters=32,
            kernel_size=[8, 8],
            strides=[4, 4],
            activation="relu",
            padding="valid",
            name="conv1")(self.inputs)
        self.model = kl.Conv2D(
            filters=64,
            kernel_size=[4, 4],
            strides=[2, 2],
            activation="relu",
            padding="valid",
            name="conv2")(self.model)
        self.model = kl.Conv2D(
            filters=64,
            kernel_size=[3, 3],
            strides=[1, 1],
            activation="relu",
            padding="valid",
            name="conv3")(self.model)
        self.model = kl.Conv2D(
            filters=512,
            kernel_size=[7, 7],
            strides=[1, 1],
            activation="relu",
            padding="valid",
            name="conv4")(self.model)

        self.model = kl.Flatten()(self.model)
        self.model = kl.Dense(256, activation="relu")(self.model)
        self.model = kl.Dense(??, activation=??)(self.model)
        self.model = km.Model(self.inputs, self.model)
        self.model.compile("adam", "mse")
        self.model.optimizer.lr = 0.0001

In [None]:
# %load solutions/Qnetwork_class.py

Init a model in order to display it's summary.

In [None]:
main_qn = Qnetwork()
main_qn.model.summary()

### DQN Class

**WARNING** : The structure of this exercise is really similar to the one with cartpole. However, the pseudo code is not the same. <br>
More episode will be played randomly before starting to train the model.<br>
Training will be applied every X episod and we will applied it on several batch <br>
And of course we will use A target network to generate the targets<br>
Hence, read this instruction carefully. 

The `DQN` class contains the implementation of the **Deep Q-Learning** algorithm. The code is incomplete and you will have to fill it!. 

**GENERAL INSTRUCTION**:

* Read the init of the `DQN` class. 
    * Various variable are set with their definition, make sure you understand all of its.
    * The *game environment*, the *experience replay buffer* and both *main Q-network* and *target Q-network* are initialised.
* Read the `train` method. It contains the main code corresponding to the **pseudo code** below. YOU DO NOT HAVE TO MODIFY IT! But make sure you understand it.
* The `train` method use methods that are not implemented. 
    * You will have to complete the code of 4 functions. (read instruction of each exercise below)
    * After the cell of the `DQN` class code below there are **test cells** for each of these exercices. <br>
    This cell should be executed after each exercice. This cell will check that the function you implemented take input and output in the desired format. <br> DO NOT MODIFY this cell. They will work if you're code is good <br> **Warning** The test celle does not guarantee that your code is correct. It just test than input and output are in the good format.


#### Pseudo code 
Set both *main Q-network* and *target Q-network* equals and initiate *prob_random*
While you didn't reach the expected *goal* reward or the *max_num_episode* allow to be played:
* Play a complete episode (**Exercise 2 & 3**)
* Store this episode in the buffer.
* If you have played more than *min_pre_train_episode* randomly:
    * Decrease the probability to play random
    * Every *train_frequency* episode:
        * Train *num_epochs* over a batch of targets (**Exercice 4**)
        * Update weight of the *target Q-network*  so that their equals to the *main Q-network* (**Exercise 1**)

    
**Exercise 1**:  Implement `update_target_graph`<br>
&nbsp;&nbsp;&nbsp;&nbsp;This function update the weight of `target_qn` with those of `main_qn`.<br>
&nbsp;&nbsp;&nbsp;&nbsp;**Tips** Use `set_weights`and `get_weights`method of [keras api](https://keras.io/api/layers/)
    
**Exercise 2**:  Implement `choose_action`<br>
&nbsp;&nbsp;&nbsp;&nbsp; This method chooses an action in *eploration* or *eploitation* mod according to the following rules:<br>
&nbsp;&nbsp;&nbsp;&nbsp; -> if less than `min_pre_train_episodes`has been played, choose action randomly<br>
&nbsp;&nbsp;&nbsp;&nbsp; -> else play randomly with probability `prob_random` else exploit the model.


**Exercise 3**:  Implement `run_one_episode` <br>
&nbsp;&nbsp;&nbsp;&nbsp; This method:<br>
&nbsp;&nbsp;&nbsp;&nbsp; -> initiate a new environment<br>
&nbsp;&nbsp;&nbsp;&nbsp; -> play a game until it's done OR until `max_num_step` is reached.<br>
&nbsp;&nbsp;&nbsp;&nbsp; -> all the experiences  are stored and return.

**Exercise 4**:  Implement `generate_target_q`<br>
This method is used within the `train_one_step` method (which is already implemented).This method:<br>
&nbsp;&nbsp;&nbsp;&nbsp; -> Generate a batch of data for training using the `experience_replay` <br>
&nbsp;&nbsp;&nbsp;&nbsp; -> Generate the targets from this batch using `generate_target_q` <br>
&nbsp;&nbsp;&nbsp;&nbsp; -> Train the model using these targets. <br>
<br> 
The `generate_target_q` is not implemented so you have to do it!<br>
You have to generate targets according to the formula below <br>

 
    
#### Separated Target network
Here is how the target is supposed to be computed in order to train or Deep-Q network.

$$target = R(s,a,s')+\gamma \max\limits_{a'}Q_k(s',a';\theta) $$


We build a deep network to learn the values of Q but its target values are changing as we know things better. As shown below, the target values for Q depends on Q itself, we are chasing a non-stationary target. So we use a different network to generate the target, that will be updated from time to time


$$target = R(s,a,s')+\gamma \max\limits_{a'}Q_k(s',a';\theta_{target}) $$

# TODO give tips for train_gameover

In [None]:
class DQN:
    def __init__(self):
        self.batch_size = 64  # How many experiences to use for each training step
        self.train_frequency = 5  # How often you update the network
        self.num_epochs = 20  # How many epochs to train when updating the network
        self.y = 0.99  # Discount factor
        self.prob_random_start = 0.6  # Starting chance of random action
        self.prob_random_end = 0.1  # Ending chance of random action
        self.annealing_steps = 1000.  # Steps of training to reduce from start_e -> end_e
        self.max_num_episodes = 10000  # Max number of episodes you are allowes to played to train the game
        self.min_pre_train_episodes = 100  # Number of episodes played with random actions before to start training.
        self.max_num_step = 50  # Maximum allowed episode length
        self.goal = 15 # Number of rewards we want to achieve while playing a game.

        # Set env
        self.env = gameEnv(partial=False, size=5)

        # Reset everything from keras session
        K.clear_session()

        # Setup our Q-networks
        self.main_qn = Qnetwork()
        self.target_qn = Qnetwork()

        # Setup our experience replay
        self.experience_replay = ExperienceReplay()

    def update_target_graph(self):
        # TODO
        return

    def choose_action(self, state, prob_random, num_episode):
        # TODO
        return action

    def run_one_episode(self, num_episode, prob_random):
        # TODO
        return experiences_episode

    def generate_target_q(self, train_state, train_action, train_reward, train_next_state, train_done):
        # TODO
        return target_q

    def train_one_step(self):
        # Train batch is [[state,action,reward,next_state,done],...]
        train_batch = self.experience_replay.sample(self.batch_size)

        # Separate the batch into numpy array for each compents
        train_state = np.array([x[0] for x in train_batch])
        train_action = np.array([x[1] for x in train_batch])
        train_reward = np.array([x[2] for x in train_batch])
        train_next_state = np.array([x[3] for x in train_batch])
        train_done = np.array([x[4] for x in train_batch])

        # Generate target Q
        target_q = self.generate_target_q(
            train_state=train_state,
            train_action=train_action,
            train_reward=train_reward,
            train_next_state=train_next_state,
            train_done=train_done
        )

        # Train the main model
        loss = self.main_qn.model.train_on_batch(train_state, target_q)
        return loss

    def train(self):

        # Make the networks equal
        self.update_target_graph()

        # We'll begin by acting complete randomly. As we gain experience and improve,
        # we will begin reducing the probability of acting randomly, and instead
        # take the actions that our Q network suggests
        prob_random = self.prob_random_start
        prob_random_drop = (self.prob_random_start - self.prob_random_end) / self.annealing_steps

        # Init variable
        num_steps = []  # Tracks number of steps per episode
        rewards = []  # Tracks rewards per episode
        print_every = 50  # How often to print status
        losses = [0]  # Tracking training losses
        num_episode = 0

        while True:
            # Run one episode
            experiences_episode = self.run_one_episode(num_episode, prob_random)

            # Save the episode in the replay buffer
            self.experience_replay.add(experiences_episode)

            # If we have play enoug episode. Start the training
            if num_episode > self.min_pre_train_episodes:

                # Drop the probability of a random action if wi didn't reach the prob_random_end value
                if prob_random > self.prob_random_end:
                    prob_random -= prob_random_drop

                # Every train_frequency iteration, train the model
                if num_episode % self.train_frequency == 0:
                    for num_epoch in range(self.num_epochs):
                        loss = self.train_one_step()
                        losses.append(loss)

                    # Update the target model with values from the main model
                    self.update_target_graph()

            # Increment the episode
            num_episode += 1
            num_steps.append(len(experiences_episode))
            rewards.append(sum([e[2] for e in experiences_episode]))

            # Print Info
            if num_episode % print_every == 0:
                # datetime object containing current date and time
                now = datetime.now()
                dt_string = now.strftime("%d/%m/%Y %H:%M:%S")
                mean_loss = np.mean(losses[-(print_every * self.num_epochs):])
                print("{} - Num episode: {} Mean reward: {:0.4f} Prob random: {:0.4f}, Loss: {:0.04f}".format(
                    dt_string, num_episode, np.mean(rewards[-print_every:]), prob_random, mean_loss))

            # Stop Condition
            if np.mean(rewards[-print_every:]) >= self.goal:
                now = datetime.now()
                dt_string = now.strftime("%d/%m/%Y %H:%M:%S")
                mean_loss = np.mean(losses[-(print_every * self.num_epochs):])
                print("{} - Num episode: {} Mean reward: {:0.4f} Prob random: {:0.4f}, Loss: {:0.04f}".format(
                    dt_string, num_episode, np.mean(rewards[-print_every:]), prob_random, mean_loss))
                print("Training complete because we reached goal rewards.")
                break
            if num_episode > self.max_num_episodes:
                print("Training Stop because we reached max num of episodes")
                break

**Test `update_target_graph`**

This test ensure that
* all layers weights of both *main Q-network* and *target Q-network* are **different** at initialisation
* all layers weights of both *main Q-network* and *target Q-network* are **equal** after executing `update_target_graph`method

In [None]:
# Test 1 Update weight copy weight
dqn = DQN()
for target_layer_weight, main_layer_weight in zip(dqn.target_qn.model.get_weights(), dqn.main_qn.model.get_weights()):
    if len(target_layer_weight.shape)>1:
        assert not(np.all(target_layer_weight == main_layer_weight))

dqn.update_target_graph()
for target_layer_weight, main_layer_weight in zip(dqn.target_qn.model.get_weights(), dqn.main_qn.model.get_weights()):
    if len(target_layer_weight.shape)>1:
        assert np.all(target_layer_weight == main_layer_weight)

**Test `choose_action`**

This test can't be considered as a real test. <br>
Indeed, if the action are play randomly we can't expect a fixed results. 

However, if your function is implemented correctly these test should word most of the time:

* If `num_episode`=99 (below `min_pre_train_epsiode`=100) -> play randomly even if `prob_random` = 0 
    * Over 100 play, each action should appears various time
* If `prob_random` = 1 -> play randomly even if  `num_episode`=101 (above `min_pre_train_epsiode`=100) 
    * Over 100 play, each action should appears various time
* If `prob_random` = 0 and `num_episode`=101 (above `min_pre_train_epsiode`=100) -> play in exploit mode 
    * The same action is choosen all the time.
* If `prob_random` = 0.5 and `num_episode`=101 (above `min_pre_train_epsiode`=100) -> play both exploration and exploit mode randomly. 
    * All action sould be seen, but the action choosen in exploit mode is always the same and should be choosen more likely.

In [None]:
dqn = DQN()
state = dqn.env.reset()
# Random action if less than min_pre_train_episode has been played
actions = [dqn.choose_action(state=state,num_episode=99, prob_random=0) for _ in range(100)]
count_action = collections.Counter(actions)
print(count_action)
assert count_action[0]>15
assert count_action[1]>15
assert count_action[2]>15
assert count_action[3]>15

# Random action if we play more than min_pre_train_episode and prob_random is 1
actions = [dqn.choose_action(state=state,num_episode=101, prob_random=1) for _ in range(100)]
count_action = collections.Counter(actions)
print(count_action)
assert count_action[0]>15
assert count_action[1]>15
assert count_action[2]>15
assert count_action[3]>15

# Best action according to model if we play more than min_pre_train_episode and prob_random is 0
actions = [dqn.choose_action(state=state,num_episode=101, prob_random=0) for _ in range(100)]
count_action = collections.Counter(actions)
print(count_action)
assert(len(set(actions)))==1
main_action = list(set(actions))[0]

actions = [dqn.choose_action(state=state,num_episode=101, prob_random=0.5) for _ in range(100)]
count_action = collections.Counter(actions)
assert(len(set(actions)))==4
print(count_action)
assert sorted(count_action.items(), key=lambda x : x[1])[-1][0]==main_action

**Test `run_one_episode`**

The `run_one_episode` play a complete episode.

* The method return a list of experiences. Each experience is a list that contains :
 * A *state*: an image of shape (84,84,3)
 * An *action*: an integer
 * A *reward*: a float
 * The *nex_state*: an image of shape (84,84,3)
 * A boolean that indicate if the game is over or not after this action.
* The experiences list can't have more experience than `max_num_step`

In [None]:
dqn = DQN()
experiences_episode = dqn.run_one_episode(num_episode=200,
                          prob_random=1)

for experience in experiences_episode:
    state, action, reward, next_state, done = experience

    assert state.shape == (84, 84, 3)
    assert type(action) is int
    assert type(reward) is float
    assert next_state.shape == (84, 84, 3)
    assert type(done) is bool
assert len(experiences_episode)<=dqn.max_num_step

**Test `generate_target_q`**

This method generate targets of q values.

In this test we set the `batch_size`value is equal to 2. Hence the function take as an input: 
* train_state : An array of size (2,83,83,4)
* train_action : An array of size (2,1)
* train_reward  : An array of size (2,1)
* train_next_state : An array of size (2,83,83,4)
* train_done : An array of size (2,1)

And return as an output an Array of size (2,4), which is a target for each input of the batch.


In [None]:
dqn = DQN()
dqn.batch_size=2
state = np.expand_dims(dqn.env.reset(), axis=0)
target_q = dqn.generate_target_q(
    train_state = np.vstack([state,state]),
    train_action = np.array([0,0]),
    train_reward = np.array([1.0,2.0]),
    train_next_state = np.vstack([state,state]),
    train_done = np.array([1, 1])
)

assert target_q.shape == (2,4)

Here is the solution

In [None]:
# %load solutions/DQN_gridworld_class.py

Train the model

In [None]:
dqn = DQN()
dqn.train()

**Exercise** Once your model learn how to play and reach the expected goal -> Play a game exploiting the main q network trained with deep q learning and display video of this game to check how it performs!

In [None]:
# %load solutions/play_game_with_mainq.py

# [OPTIONAL] D3QN

DQN has  been improved recently by using two different tricks (among others):

* Dueling
* Double DQN

Implement these two solutions.

#### Dueling

See the dueling architecture : 


In order to explain the reasoning behind the architecture changes that Dueling DQN makes, we need to first explain some a few additional reinforcement learning terms. The Q-values that we have been discussing so far correspond to how good it is to take a certain action given a certain state. This can be written as Q(s,a). This action given state can actually be decomposed into two more fundamental notions of value. The first is the value function V(s), which says simple how good it is to be in any given state. The second is the advantage function A(a), which tells how much better taking a certain action would be compared to the others. We can then think of Q as being the combination of V and A. More formally:
$$Q(s,a) =V(s) + A(a)$$
The goal of Dueling DQN is to have a network that separately computes the advantage and value functions, and combines them back into a single Q-function only at the final layer. It may seem somewhat pointless to do this at first glance. Why decompose a function that we will just put back together? The key to realizing the benefit is to appreciate that our reinforcement learning agent may not need to care about both value and advantage at any given time. For example: imagine sitting outside in a park watching the sunset. It is beautiful, and highly rewarding to be sitting there. No action needs to be taken, and it doesn’t really make sense to think of the value of sitting there as being conditioned on anything beyond the environmental state you are in. We can achieve more robust estimates of state value by decoupling it from the necessity of being attached to specific actions.

**Exercise** : Update the Q-network architecture so that if fit the dueling architecture.

In [None]:
# %load solutions/dueling.py

In [None]:
main_qn = Qnetwork()
main_qn.model.summary()

#### Double DQN


This lead to often overestimates the Q-values of the potential actions to take in a given state. While this would be fine if all actions were always overestimates equally, there was reason to believe this wasn’t the case. You can easily imagine that if certain suboptimal actions regularly were given higher Q-values than optimal actions, the agent would have a hard time ever learning the ideal policy. In order to correct for this, the authors of DDQN paper propose a simple trick: instead of taking the max over Q-values when computing the target-Q value for our training step, we use our primary network to chose an action, and our target network to generate the target Q-value for that action. By decoupling the action choice from the target Q-value generation, we are able to substantially reduce the overestimation, and train faster and more reliably. Below is the new DDQN equation for updating the target value.


$$target = R(s,a,s')+\gamma Q_k(s',argmax_aQ(s',a;\theta);\theta_{target}) $$

**Exercise** : Update the DQN class and the Update target function the target are generating with the formula abovr

In [None]:
#%load solutions/double_dqn.py

In [None]:
dqn = DQN()
dqn.train()

# [OPTIONAL] Deep Q Learning on Pacman!

### Creating the MsPacman environment

In [None]:
import gym
env = gym.make("MsPacman-v0")
obs = env.reset()
obs.shape

In [None]:
env.action_space

### Preprocessing

Preprocessing the images is optional but greatly speeds up training.

In [None]:
mspacman_color = 210 + 164 + 74

def preprocess_observation(obs):
    img = obs[1:176:2, ::2] # crop and downsize
    img = img.sum(axis=2) # to greyscale
    img[img==mspacman_color] = 0 # Improve contrast
    img = (img // 3 - 128).astype(np.int8) # normalize from -128 to 127
    return img.reshape(88, 80, 1)/128

img = preprocess_observation(obs)
img.shape

Note: the `preprocess_observation()` function is slightly different from the one in the book: instead of representing pixels as 64-bit floats from -1.0 to 1.0, it represents them as signed bytes (from -128 to 127). The benefit is that the replay memory will take up roughly 8 times less RAM (about 6.5 GB instead of 52 GB). The reduced precision has no visible impact on training.

In [None]:
plt.figure(figsize=(11, 7))
plt.subplot(121)
plt.title("Original observation (160×210 RGB)")
plt.imshow(obs)
plt.axis("off")
plt.subplot(122)
plt.title("Preprocessed observation (88×80 greyscale)")
plt.imshow(img.reshape(88, 80), interpolation="nearest", cmap="gray")
plt.axis("off")
plt.show()