# Deep Q-Learning - Space Invaders
Train an agent to play Space Invaders using [Deep Reinforcement Learning](https://deepmind.google/discover/blog/deep-reinforcement-learning/)

# Outline
- [ 1 - Import Packages <img align="Right" src="./images/space_invaders.png" width = 60% >](#1)
- [ 2 - Hyperparameters](#2)
- [ 3 - The Game (Space Invaders)](#3)
  - [ 3.1 Action Space](#3.1)
  - [ 3.2 Observation Space](#3.2)
  - [ 3.3 Rewards](#3.3)
  - [ 3.4 Episode Termination](#3.4)
- [ 4 - Load the game using Selenium](#4)
- [ 5 - Game State](#5)
- [ 6 - Deep Q-Learning](#6)
  - [ 6.1 Target Network](#6.1)
  - [ 6.2 Experience Replay](#6.2)
- [ 7 - Deep Q-Learning Algorithm with Experience Replay](#7)
- [ 8 - Update the Network Weights](#8)

<a name="1"></a>
## 1 - Import Packages

We'll make use of the following packages:
- `selenium` is a package for Selenium Python bindings provides a simple API to write functional/acceptance tests using Selenium WebDriver.
- `PIL` is a package from Pillow, the Python Imaging Library adds image processing capabilities.
- `OpenCV` (Open Source Computer Vision Library) is an open source computer vision and machine learning software library.
- `numpy` is the fundamental package for scientific computing in Python.
- `TensorFlow` is an end-to-end open source platform for machine learning
- `TensorFlow.Keras` is the high-level API of the TensorFlow platform. It provides an approachable, highly-productive interface for solving machine learning (ML) problems, with a focus on modern deep learning.

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
from collections import namedtuple
import numpy as np
import cv2
import tensorflow as tf

from config import Config
from classes.game import Game
from classes.game_state import Game_State

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.losses import MSE
from tensorflow.keras.optimizers import Adam

<a name="2"></a>
## 2 - Hyperparameters

Agent hyperparameters

In [None]:
BUFFER_SIZE = 100_000     # size of memory buffer
GAMMA = 0.995             # discount factor
ALPHA = 1e-3              # learning rate
NUM_STEPS_FOR_UPDATE = 4  # perform a learning update every C time steps

<a name="3"></a>
## 3 - Space Invaders
In this notebook we will be using [free80sarcade Space Invaders](https://www.free80sarcade.com/spaceinvaders.php) web version.

Our goal is develop an AI agent that is able to learn how to play the popular game Space Invaders from scratch. To do it, we implement a Deep Reinforcement Learning algorithm using both Keras on top of Tensorflow.

<a name="3.1"></a>
### 3.1 Action Space

The agent has four discrete actions available:

* Do nothing.
* Move right.
* Move left.
* Fire weapon.

Each action has a corresponding numerical value:

```python
Do nothing = 0
Fire right engine = 1
Fire main engine = 2
Fire left engine = 3
```

<a name="3.2"></a>
### 3.2 Observation Space

<img src="./images/observation_space.png" width="268" height="158" ><br />

Screen coordinate: x = 0, y = 58, cx = 268, cy = 158

BW Image size: 268 * 158 = 42344

<a name="3.3"></a>
### 3.3 Rewards

After every step, a reward is granted. The total reward of an episode is the sum of the rewards for all the steps within that episode.

For each step, the reward:
- is increased by the difference between the previous and current scores.
- is decreased by 0.03 each time the weapon is fired
- is decreased by 100 when a live is lost

<a name="3.4"></a>
### 3.4 Episode Termination

An episode ends (i.e the environment enters a terminal state) if:

* The number of lives reach 0.


<a name="4"></a>
## 4 - Load Space Invaders using Selenium

We start by loading the `Space Invaders` emulation from the [free80sarcade](https://www.free80sarcade.com/spaceinvaders.php) website by using the `Selenium` webdriver.

In [None]:
config = Config()

# Create an instance of the Service object
service = Service(executable_path=config.chrome_driver)
# Start Chrome using the service keyword
driver = webdriver.Chrome(service=service)
# Load the site
driver.get(config.game_url)

wait = WebDriverWait(driver, 10)

# Cookie consent
wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, "button[class='fc-button fc-cta-consent fc-primary-button']"))).click()

# Find canvas and create Game
canvas = wait.until(EC.element_to_be_clickable((By.ID, 'videoCanvas')))

# Create the game
game = Game(driver, canvas)


<a name="5"></a>
## 5 - Game State



To build our neural network later on we need to know the size of the state vector and the number of valid actions.

In [None]:
state_size = Game_State.shape
num_actions = Game_State.num_actions

print('State Shape:', state_size)
print('Number of actions:', num_actions)

In [None]:
game.start()

game_state = Game_State(game.get_image_bw())

cv2.imshow('Gray', game_state.state())
cv2.waitKey()

print(f"Score: %s" % game_state.score())
print(f"Lives: %s" % game_state.lives())

<a name="6"></a>
## 6 - Deep Q-Learning

<a name="6.1"></a>
### 6.1 Target Network

We can train the $Q$-Network by adjusting it's weights at each iteration to minimize the mean-squared error in the Bellman equation, where the target values are given by:

$$
y = R + \gamma \max_{a'}Q(s',a';w)
$$

where $w$ are the weights of the $Q$-Network. This means that we are adjusting the weights $w$ at each iteration to minimize the following error:

$$
\overbrace{\underbrace{R + \gamma \max_{a'}Q(s',a'; w)}_{\rm {y~target}} - Q(s,a;w)}^{\rm {Error}}
$$

Notice that this forms a problem because the $y$ target is changing on every iteration. Having a constantly moving target can lead to oscillations and instabilities. To avoid this, we can create a separate neural network for generating the $y$ targets.

We call this separate neural network the **target $\hat Q$-Network** and it will have the same architecture as the original $Q$-Network. By using the target $\hat Q$-Network, the above error becomes:

$$
\overbrace{\underbrace{R + \gamma \max_{a'}\hat{Q}(s',a'; w^-)}_{\rm {y~target}} - Q(s,a;w)}^{\rm {Error}}
$$

where $w^-$ and $w$ are the weights of the target $\hat Q$-Network and $Q$-Network, respectively.

In practice, we will use the following algorithm: every $C$ time steps we will use the $\hat Q$-Network to generate the $y$ targets and update the weights of the target $\hat Q$-Network using the weights of the $Q$-Network. We will update the weights $w^-$ of the the target $\hat Q$-Network using a **soft update**. This means that we will update the weights $w^-$ using the following rule:

$$
w^-\leftarrow \tau w + (1 - \tau) w^-
$$

where $\tau\ll 1$. By using the soft update, we are ensuring that the target values, $y$, change slowly, which greatly improves the stability of our learning algorithm.

In [None]:
# Create the Q-Network
q_network = Sequential([
    Input(shape=state_size),
    Dense(units=64, activation='relu'),
    Dense(units=64, activation='relu'),
    Dense(units=num_actions, activation='linear')
    ])

# Create the target Q^-Network
target_q_network = Sequential([
    Input(shape=state_size),
    Dense(units=64, activation='relu'),
    Dense(units=64, activation='relu'),
    Dense(units=num_actions, activation='linear')
    ])

optimizer = Adam(learning_rate=ALPHA)


<a name="6.2"></a>
### 6.2 Experience Replay

When an agent interacts with the environment, the states, actions, and rewards the agent experiences are sequential by nature. If the agent tries to learn from these consecutive experiences it can run into problems due to the strong correlations between them. To avoid this, we employ a technique known as **Experience Replay** to generate uncorrelated experiences for training our agent. Experience replay consists of storing the agent's experiences (i.e the states, actions, and rewards the agent receives) in a memory buffer and then sampling a random mini-batch of experiences from the buffer to do the learning. The experience tuples $(S_t, A_t, R_t, S_{t+1})$ will be added to the memory buffer at each time step as the agent interacts with the environment.

For convenience, we will store the experiences as named tuples.

In [None]:
# Store experiences as named tuples
experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])

By using experience replay we avoid problematic correlations, oscillations and instabilities. In addition, experience replay also allows the agent to potentially use the same experience in multiple weight updates, which increases data efficiency.

<a name="7"></a>
## 7 - Deep Q-Learning Algorithm with Experience Replay

Now we will compute the loss between the $y$ targets and the $Q(s,a)$ values.
The `compute_loss` function will set the $y$ targets equal to:

$$
\begin{equation}
    y_j =
    \begin{cases}
      R_j & \text{if episode terminates at step  } j+1\\
      R_j + \gamma \max_{a'}\hat{Q}(s_{j+1},a') & \text{otherwise}\\
    \end{cases}
\end{equation}
$$

Here are a couple of things to note:

* The `compute_loss` function takes in a mini-batch of experience tuples. This mini-batch of experience tuples is unpacked to extract the `states`, `actions`, `rewards`, `next_states`, and `done_vals`. You should keep in mind that these variables are *TensorFlow Tensors* whose size will depend on the mini-batch size. For example, if the mini-batch size is `64` then both `rewards` and `done_vals` will be TensorFlow Tensors with `64` elements.

* Using `if/else` statements to set the $y$ targets will not work when the variables are tensors with many elements. However, notice that we can use the `done_vals` to implement the above in a single line of code. To do this, recall that the `done` variable is a Boolean variable that takes the value `True` when an episode terminates at step $j+1$ and it is `False` otherwise. Taking into account that a Boolean value of `True` has the numerical value of `1` and a Boolean value of `False` has the numerical value of `0`, we can use the factor `(1 - done_vals)` to implement the above in a single line of code.

Lastly, we compute the loss by calculating the Mean-Squared Error (`MSE`) between the `y_targets` and the `q_values`.

In [None]:
def compute_loss(experiences, gamma, q_network, target_q_network):
    """
    Calculates the loss.

    Args:
      experiences: (tuple) tuple of ["state", "action", "reward", "next_state", "done"] namedtuples
      gamma: (float) The discount factor.
      q_network: (tf.keras.Sequential) Keras model for predicting the q_values
      target_q_network: (tf.keras.Sequential) Keras model for predicting the targets

    Returns:
      loss: (TensorFlow Tensor(shape=(0,), dtype=int32)) the Mean-Squared Error between
            the y targets and the Q(s,a) values.
    """

    # Unpack the mini-batch of experience tuples
    states, actions, rewards, next_states, done_vals = experiences

    # Compute max Q^(s,a)
    max_qsa = tf.reduce_max(target_q_network(next_states), axis=-1)

    # Set y = R if episode terminates, otherwise set y = R + γ max Q^(s,a).
    y_targets = rewards + (gamma * max_qsa * (1 - done_vals))

    # Get the q_values and reshape to match y_targets
    q_values = q_network(states)
    q_values = tf.gather_nd(q_values, tf.stack([tf.range(q_values.shape[0]),
                                                tf.cast(actions, tf.int32)], axis=1))

    # Compute the loss
    loss = MSE(y_targets, q_values)

    return loss

In [None]:
from classes.number_parser import Number_Parser
import cv2
import random as rng

# 9,9
# Lives 21, 218
# Score 37, 42 - 45 - 53 - 61 - 69 - 77

#matrix = np.asarray(bw_image.getdata(0)).reshape((bw_image.size[1], bw_image.size[0]))

# Convert image to BGR2GRAY format needed by findContours
gray = cv2.cvtColor(np.array(image), cv2.COLOR_BGR2GRAY)

score_digits = [None] * 6
score_digits[0] = gray[42:42 + 9, 37:37 + 9]
score_digits[1] = gray[42:42 + 9, 45:45 + 9]
score_digits[2] = gray[42:42 + 9, 53:53 + 9]
score_digits[3] = gray[42:42 + 9, 61:61 + 9]
score_digits[4] = gray[42:42 + 9, 69:69 + 9]
score_digits[5] = gray[42:42 + 9, 77:77 + 9]

cv2.imshow('Gray', score_digits[0])
cv2.waitKey()

#print(model.predict(cv2.bitwise_not(score_digits[0])))
#print(model.predict(cv2.bitwise_not(cv2.resize(score_digits[1], (200, 200)))))
#print(model.predict(cv2.bitwise_not(score_digits[2])))
#print(model.predict(cv2.bitwise_not(score_digits[3])))

number_parser = Number_Parser()

print(number_parser.get_number(score_digits))
#print(number_parser.get_digit(score_digits[1]))

"""
number_contours = pickle.load(open('number_contours.dump', 'rb'))
number_contours[0] = score_digits[0]
number_contours[1] = score_digits[1]
number_contours[2] = score_digits[2]
"""


"""
digit_contours, _ = cv2.findContours(score_digits[0], cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)
number_contours[0] = digit_contours[0]

digit_contours, _ = cv2.findContours(score_digits[1], cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)
number_contours[1] = digit_contours[0]

digit_contours, _ = cv2.findContours(score_digits[2], cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)
number_contours[2] = digit_contours[0]
"""

#pickle.dump(number_contours, open('number_contours.dump', 'wb'))

#ret = cv2.matchShapes(score_digits[0], score_digits[3], 1, 0.0)
#print(ret)

#cv2.imshow('Gray', score_digits[2])
#cv2.waitKey()

# Find contours
contours, _ = cv2.findContours(gray, cv2.RETR_CCOMP, cv2.CHAIN_APPROX_NONE)

contours_poly = [None]*len(contours)
boundRect = [None]*len(contours)
centers = [None]*len(contours)
radius = [None]*len(contours)
for i, c in enumerate(contours):
    contours_poly[i] = cv2.approxPolyDP(c, 3, True)
    boundRect[i] = cv2.boundingRect(contours_poly[i])
    centers[i], radius[i] = cv2.minEnclosingCircle(contours_poly[i])

drawing = np.zeros((gray.shape[0], gray.shape[1], 3), dtype=np.uint8)

for i in range(len(contours)):
    color = (rng.randint(0,256), rng.randint(0,256), rng.randint(0,256))
    cv2.drawContours(drawing, contours_poly, i, color)
    #cv2.rectangle(drawing, (int(boundRect[i][0]), int(boundRect[i][1])), \
    #   (int(boundRect[i][0]+boundRect[i][2]), int(boundRect[i][1]+boundRect[i][3])), color, 2)
    #cv2.circle(drawing, (int(centers[i][0]), int(centers[i][1])), int(radius[i]), color, 2)

cv2.imshow('Gray', drawing)
cv2.waitKey()



In [None]:
from classes.number_parser import Number_Parser
import cv2
import time

max_num_timesteps = 100

number_parser = Number_Parser()
game.start()

time.sleep(5)

for t in range(max_num_timesteps):
    image = game.get_image_png()

    # Convert image to BGR2GRAY format needed by findContours
    gray = cv2.cvtColor(np.array(image), cv2.COLOR_BGR2GRAY)

    score_digits = [None] * 6
    score_digits[0] = gray[42:42 + 9, 37:37 + 9]
    score_digits[1] = gray[42:42 + 9, 45:45 + 9]
    score_digits[2] = gray[42:42 + 9, 53:53 + 9]
    score_digits[3] = gray[42:42 + 9, 61:61 + 9]
    score_digits[4] = gray[42:42 + 9, 69:69 + 9]
    score_digits[5] = gray[42:42 + 9, 77:77 + 9]

    lives_digit = gray[218:218 + 9, 21:21 + 9]

    print(f"Score: %s" % number_parser.get_number(score_digits))
    print(f"Lives: %s" % number_parser.get_digit(lives_digit))
    print("----------")
    time.sleep(1)

In [None]:
game.fire()

In [None]:
game.quit()