<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Deep-Q-learning-to-Play-Catch" data-toc-modified-id="Deep-Q-learning-to-Play-Catch-1">Deep Q-learning to Play Catch</a></span></li><li><span><a href="#Learning-Outcomes" data-toc-modified-id="Learning-Outcomes-2">Learning Outcomes</a></span><ul class="toc-item"><li><ul class="toc-item"><li><span><a href="#By-the-end-of-this-session,-you-should-be-able-to:" data-toc-modified-id="By-the-end-of-this-session,-you-should-be-able-to:-2.0.1">By the end of this session, you should be able to:</a></span></li></ul></li></ul></li></ul></div>

<center><h2>Deep Q-learning to Play Catch</h2></center>

<center><h2>Learning Outcomes</h2></center>

#### By the end of this session, you should be able to:

- Implement the core logic of Experience Replay, including Q-learning.
- Explain how Deep Q-learning can learn to play Catch, a simplified version of Pong.

See other notebooks for general orientation to experience replay, keras, and the game of Catch.

In [None]:
reset -fs

In [None]:
import numpy as np

import warnings
warnings.filterwarnings('always')

In [None]:
# Import Keras (easy way or hard way)
try:
    import keras
except ImportError:
    import pip
    import sys
    import subprocess
    subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'keras'])
    
    import keras

In [None]:
class Catch():
    """Catch is a simplfied version of Pong.
    Catch tries to capture a single pixel “fruit” using a three pixel “basket”. 
    The fruit moves down one pixel per step.
    Reward of +1 if it catches the fruit and -1 if it misses.
    Input: The network sees the entire "pixels" grid.
    Outputs: 3 actions (move left, stay, move right).
    """
    def __init__(self, grid_size=10, basket_size=3, num_actions=3):
        self.basket_size = basket_size 
        self.grid_size = grid_size
        self.empty_canvas()
        self.reset_state() # Pick random starting location
        self.update_canvas()
            
    def empty_canvas(self):
        "Reset to canvas empty, aka all zeros"
        self.canvas = np.zeros((self.grid_size,)*2)

    def act(self, action=1): # Default action is to stay
        self.update_state(action)
        reward = self.get_reward()
        game_over_state = self.is_over()
        return self.observe(), reward

    def is_over(self):
        "Fruit is at bottom."
        if self.state[0] >= self.grid_size: # Check fruit row index is at bottom
            return True
        else:
            return False
        
    def get_reward(self):
        "Let's see if fruit is in basket or missed."
        fruit_row, fruit_col, basket = self.state  #[0] # This line is tricky
        if self.is_over():
            if abs(fruit_col - basket) <= 1:
                return 1 # Fruit in basket 🙂
            else:
                return -1 # Fruit missed basket ☹
        else:
            return 0 # Carry on 😐

    def observe(self):
        "Convert internal matrix representation into a vector for the input to the MLP DL model."
        return self.canvas.reshape((1, -1))

    def reset_state(self):
        "Pick a new starting place for fruit and basket."
        n = np.random.randint(low=0, high=self.grid_size)
        m = np.random.randint(low=0, high=self.grid_size-2)
        self.state = np.asarray([0,  # Row index of fruit 
                                 n,  # Col index of fruit
                                 m]) # Col index of left side of basket (row is always bottom)
        
    def update_state(self, action_encoded):
        "Given an action, move basket and advance fruit."
        # Convert encoded action into change in basket index
        if action_encoded == 0:   # Left
            action_idx = -1
        elif action_encoded == 1: # Stay
            action_idx = 0
        else:
            action_idx = 1   # Right

        fruit_row_idx, fruit_col_idx, basket_idx = self.state
        new_basket_idx = min(max(1, basket_idx+action_idx), self.grid_size-self.basket_size) # Basket moves
        fruit_row_idx += 1  # Fruit falls down 1 step
        self.state = np.asarray([fruit_row_idx, fruit_col_idx, new_basket_idx])
        if not self.is_over():
            self.update_canvas()
        else:
            self.get_reward()
            
    def update_canvas(self):
        "Read state of fruit and basket, put on canvas."
        self.empty_canvas()
        self.canvas[self.state[0], self.state[1]] = 1  # Draw fruit
        self.canvas[-1, self.state[2]:self.state[2]+3] = np.ones(self.basket_size) #.reshape((1, -1))  # Draw basket


In [None]:
# Watch a small game to understand the game mechanics
c = Catch(grid_size=6)

while not c.is_over():
    reply = input("Press return to make a random move. Press 'q' then return to quit: ") 
    if reply == "q": 
        break
    print(c.canvas) # Show "screen"
    action = np.random.randint(0, 3) # Randomly select
    canvas_snapshot, reward = c.act(action=action) # Make move and see what happens
    print("reward", reward)

In [None]:
class ExperienceReplay():
    "Store the agent's experiences inorder to collect enough example to get a reward signal."
    def __init__(self, max_memory=100, discount=.9):
        self.max_memory = max_memory
        self.memory = list()
        self.discount = discount

    def remember(self, states, game_over):
        self.memory.append([states, game_over])
        if len(self.memory) > self.max_memory:
            del self.memory[0]

In [None]:
"""Write the get_batch method for ExperienceReplay class.

Each line has been started for you.

No tests 

20 points:
-----
1 points for num_actions
1 points for env_dim
9 points for q_sa
9 points for targets[i, action_t]
"""

class ExperienceReplay(ExperienceReplay): # New class (with same name) inherits everything from old class (with same name)
    
    def get_batch(self, model, batch_size=10):
#         len_memory = len(self.memory)  # Given to you
#         num_actions = None # TODO: Read from neural network model
#         env_dim =  None # TODO: Read from neural network model
#         inputs = np.zeros((min(len_memory, batch_size), env_dim))  # Given to you
#         targets = np.zeros((inputs.shape[0], num_actions))  # Given to you
#         for i, idx in enumerate(np.random.randint(0, len_memory, size=inputs.shape[0])): # Given to you
#             state_t, action_t, reward_t, state_tp1 = self.memory[idx][0] # Given to you
#             game_over = self.memory[idx][1] # Given to you
#             inputs[i:i+1] = state_t    # Given to you
#             q_sa = None # TODO: Find best model prediction for state_tp1
#             if game_over:   # Given to you
#                 targets[i, action_t] = reward_t # Given to you
#             else: # Given to you
#                 targets[i, action_t] = None # TODO: Update with Q-learning
                
        # YOUR CODE HERE
        raise NotImplementedError()
        
        return inputs, targets

In [None]:
# Keras model
from keras.models import Sequential
from keras.layers.core import Dense

grid_size = 10
num_actions = 3  # [move_left, stay, move_right]

model = Sequential()

# Input and first hidden layer
model.add(Dense(units=(grid_size*grid_size+num_actions)+15//2,  # Rough rule of thumb is mean of input and output number
                input_shape=(grid_size*grid_size,), 
                activation='relu')) 

# Output layer
model.add(Dense(output_dim=num_actions,
          activation='softmax')) 


model.compile(optimizer='adam', 
              loss="categorical_crossentropy")

model.summary()

In [None]:
# Run Training

# Define environment
c = Catch(grid_size=grid_size)

# Initialize experience replay object
exp_replay = ExperienceReplay(max_memory=500)

# Exploration rate
epsilon = .1  

# Training variables
n_episodes = 11 # 3_001 is a good number for complete learning
win_count = 0
history = []
loss = float('inf')
    
for e in range(n_episodes): 

    if (e == 0) or (e % 10 == 0):
        print(f"Epoch {e:03d}/{n_episodes:,} | Loss {loss:.3f} | Win count {win_count}")
        
    # The next new episode.
    c.reset_state()
 
    while not c.is_over():
        
        # Get initial input (as vector).
        current_screen = c.observe()
        
        # Get next action - You guessed it eplison-greedy.
        if np.random.rand() <= epsilon:
            action = np.random.randint(0, num_actions, size=1)
        else:
            q = model.predict(current_screen)
            action = np.argmax(q[0])

        # Apply action, get rewards and new state.
        future_screen, reward = c.act(action)
        if reward == 1:
            win_count += 1

        # Store experience.
        exp_replay.remember([current_screen, action, reward, future_screen], c.is_over())

        # Get collected data to train model.
        inputs, targets = exp_replay.get_batch(model, batch_size=50)

        # Train model on experiences.
        loss = model.train_on_batch(inputs, targets)
        
    history.append(win_count)


In [None]:
# Inspect trained Keras model

# Make new game
c = Catch(grid_size=10)
print(c.canvas)

# Given a board sate, what move does the model predict?
state = c.observe()
model.predict(state)[0]

```
# Example
[[0. 0. 0. 0. 0. 0. 0. 0. 0. 1.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 1. 1. 1. 0. 0.]]
array([-0.18835554,  0.08628452,  0.3199321 ], dtype=float32)

# Predicts move right!
```