<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Deep-Q-learning-to-Play-Catch" data-toc-modified-id="Deep-Q-learning-to-Play-Catch-1">Deep Q-learning to Play Catch</a></span></li><li><span><a href="#Learning-Outcomes" data-toc-modified-id="Learning-Outcomes-2">Learning Outcomes</a></span></li><li><span><a href="#Code" data-toc-modified-id="Code-3">Code</a></span></li><li><span><a href="#Demo-A-Single-Episode-" data-toc-modified-id="Demo-A-Single-Episode--4">Demo A Single Episode </a></span></li><li><span><a href="#Inspect-predictions-from-the-trained-model-" data-toc-modified-id="Inspect-predictions-from-the-trained-model--5">Inspect predictions from the trained model </a></span></li><li><span><a href="#Hints" data-toc-modified-id="Hints-6">Hints</a></span></li></ul></div>

<center><h2>Deep Q-learning to Play Catch</h2></center>

<center><h2>Learning Outcomes</h2></center>

__By the end of this session, you should be able to__:

- Understand how Deep Q-learning can learn to play Catch, a simplified version of Pong.
- Implement the core logic of Experience Replay, including Q-learning.

Code
-----

In [60]:
reset -fs

In [61]:
import numpy as np

In [62]:
# Define variables
actions = ["move left", "stay", "move right"]
n_actions = len(actions)
grid_size = 3     # Start with minimum viable demo (MVD) 
basket_size = 1   # Start with minimum viable demo (MVD) 

In [63]:
class Catch():
    """Catch is a simplfied version of Pong.
    Catch tries to capture a single pixel “fruit” using a three pixel “basket”. 
    The fruit moves down one pixel per step.
    Reward of +1 if it catches the fruit and -1 if it misses.
    Input: The network sees the entire "pixels" grid.
    Outputs: 3 actions (move left, stay, move right).
    """
    def __init__(self, grid_size, basket_size, actions):
        self.basket_size = basket_size 
        self.grid_size = grid_size
        self.actions = actions
        self.n_actions=len(actions)
        self.empty_canvas()
        self.reset_state() # Pick random starting location
        self.update_canvas()
            
    def empty_canvas(self):
        "Reset to canvas empty, aka all zeros"
        self.canvas = np.zeros((self.grid_size,)*2)

    def act(self, action=1): # Default action is to stay
        self.update_state(action)
        reward = self.get_reward()
        game_over_state = self.is_over()
        return self.observe(), reward

    def is_over(self):
        "Fruit is at bottom."
        if self.state[0] >= self.grid_size: # Check fruit row index is at bottom
            return True
        else:
            return False
        
    def get_reward(self):
        "Let's see if fruit is in basket or missed."
        fruit_row, fruit_col, basket = self.state  #[0] # This line is tricky
        if self.is_over():
            if abs(fruit_col - basket) <= 1:
                return 1 # Fruit in basket 🙂
            else:
                return -1 # Fruit missed basket 😦
        else:
            return 0 # Carry on 😐

    def observe(self):
        "Convert internal matrix representation into a vector for the input to the MLP DL model."
        return self.canvas.reshape((1, -1))

    def reset_state(self):
        "Pick a new starting place for fruit and basket."
        n = np.random.randint(low=0, high=self.grid_size)
        m = np.random.randint(low=0, high=self.grid_size-basket_size-1)
        self.state = np.asarray([0,  # Row index of fruit 
                                 n,  # Col index of fruit
                                 m]) # Col index of left side of basket (row is always bottom)
        
    def update_state(self, action_encoded):
        "Given an action, move basket and advance fruit."
        # Convert encoded action into change in basket index
        if action_encoded == 0:   # Left
            action_idx = -1
        elif action_encoded == 1: # Stay
            action_idx = 0
        else:
            action_idx = 1        # Right

        fruit_row_idx, fruit_col_idx, basket_idx = self.state
        new_basket_idx = min(max(1, basket_idx+action_idx), self.grid_size-self.basket_size) # Basket moves
        fruit_row_idx += 1  # Fruit falls down 1 step
        self.state = np.asarray([fruit_row_idx, fruit_col_idx, new_basket_idx])
        if not self.is_over():
            self.update_canvas()
        else:
            self.get_reward()
            
    def update_canvas(self):
        "Read state of fruit and basket, put on canvas."
        self.empty_canvas()
        # Draw fruit
        self.canvas[self.state[0], self.state[1]] = 1  
        # Draw basket
        self.canvas[-1, self.state[2]:self.state[2]+self.basket_size] = np.ones(self.basket_size) #.reshape((1, -1))  
        

Demo A Single Episode 
----

Watch a single game episode to understand the game mechanics.

Comment out during training.

In [64]:
# c = Catch(grid_size=grid_size, basket_size=basket_size, actions=actions)

# while not c.is_over():
#     reply = input("Press return to make a random move. Press 'q' then return to quit: ") 
#     if reply == "q": 
#         break
#     print(c.canvas)                                # Show "screen"
#     action = np.random.randint(0, 3)               # Randomly select
#     canvas_snapshot, reward = c.act(action=action) # Make move and see what happens
#     print(f"current reward: {reward}")

In [65]:
class ExperienceReplay():
    "Store the agent's experiences inorder to collect enough example to get a reward signal."
    
    def __init__(self, max_memory=100, discount=.9):
        self.max_memory = max_memory
        self.memory = list()
        self.discount = discount

    def remember(self, states, game_over):
        self.memory.append([states, game_over])
        
        # If memory is too large, then evict to reduce memory size
        if len(self.memory) > self.max_memory:
            # Evict oldest
            del self.memory[0]

In [66]:
"""Write the get_batch method for ExperienceReplay class.

Each line has been started for you.

No tests  😦

"""

class ExperienceReplay(ExperienceReplay): # New class (with same name) inherits everything from old class (with same name)
    
    def get_batch(self, model, batch_size=10):
        
        # TODO: Finish each line with code and comments
        len_memory = len(self.memory)                              # Given to you
        n_actions = None                                           # TODO: Read from neural network model
        env_dim =  None                                            # TODO: Read from neural network model
        inputs = np.zeros((min(len_memory, batch_size), env_dim))  # Given to you
        targets = np.zeros((inputs.shape[0], n_actions))           # Given to you
        for i, idx in enumerate(np.random.randint(0, len_memory, size=inputs.shape[0])): # Given to you
            state_t, action_t, reward_t, state_tp1 = self.memory[idx][0] # Given to you
            game_over = self.memory[idx][1]                        # Given to you
            inputs[i:i+1] = state_t                                # Given to you
            targets[i] = model.predict(state_t)[0]                 # Given to you; There should be no target values for actions not taken.
            q_sa = None                                            # TODO: Find best model prediction for state
            if game_over:                                          # Given to you
                targets[i, action_t] = reward_t                    # Given to you
            else:                                                  # Given to you
                targets[i, action_t] = None                        # TODO: Update with Q-learning
                

        
        return inputs, targets

In [67]:
# Define a Deep Learning model in Keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense

model = Sequential()

# Input and first hidden layer
model.add(Dense(units=(grid_size*grid_size+n_actions)//2,  # Rough rule of thumb for hidden layer size: mean of input and output 
                input_shape=(grid_size*grid_size,), # Define by "pixel" space
                activation='relu') # ReLU a common modern choice
         ) 

# Output layer
model.add(Dense(units=n_actions,      # Defined by action space
                activation='softmax') # Standard for categorial output
         ) 


model.compile(optimizer='adam',       # Hot rodded version of SGD
              loss="categorical_crossentropy") # Standard for categorial output

model.summary()

Model: "sequential_6"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense_12 (Dense)             (None, 6)                 60        
_________________________________________________________________
dense_13 (Dense)             (None, 3)                 21        
Total params: 81
Trainable params: 81
Non-trainable params: 0
_________________________________________________________________


In [68]:
# Run Training

# Define environment
c = Catch(grid_size=grid_size, basket_size=basket_size, actions=actions)

# Initialize experience replay object
exp_replay = ExperienceReplay(max_memory=500)

# Exploration rate
epsilon = .1  

# Training variables
n_episodes = 101  
#       1 is a good choice for number of episodes to seef if there is silly error
#      11 is a good choice for number of episodes to see if model is learning
#   3_001 is a good choice for number of episodes for complete learning
win_count = 0
history = []
loss = float('inf')
    
for e in range(n_episodes): 

    if (e == 0) or (e % 10 == 0):
        print(f"Epoch: {e:03d}/{n_episodes:,} | Loss value: {loss:>6.3f} | Win count: {win_count:>3}")
        
    # The next new episode.
    c.reset_state()
 
    while not c.is_over():
        
        # Get initial input (as vector).
        current_screen = c.observe() 
        
        # Get next action - You guessed it eplison-greedy.
        if np.random.rand() <= epsilon:
            action = np.random.randint(0, n_actions, size=1)
        else:
            q = model.predict(current_screen)
            action = np.argmax(q[0])

        # Apply action, get rewards and new state.
        future_screen, reward = c.act(action)
        if reward == 1:
            win_count += 1

        # Store experience.
        exp_replay.remember([current_screen, action, reward, future_screen], c.is_over())

        # Get collected data to train model.
        inputs, targets = exp_replay.get_batch(model, batch_size=50)

        # Train model on experiences.
        loss = model.train_on_batch(inputs, targets)
        
    history.append(win_count)


Epoch: 000/101 | Loss value:    inf | Win count:   0
Epoch: 010/101 | Loss value:  1.286 | Win count:  10
Epoch: 020/101 | Loss value:  1.271 | Win count:  20
Epoch: 030/101 | Loss value:  1.294 | Win count:  30


KeyboardInterrupt: 

Inspect predictions from the trained model 
-----

In [None]:
from typing import List

def pprint_predicted_action(actions: List[str], action_index: List[float]) -> None:
    "Pretty print predicted action give the softmax output vector of a model"
    print(f"\nModel's predicted action: {actions[np.argmax(action_index)].title()}")

def pprint_canvas(canvas):
    "Pretty print canvas"
    print("Current board state:")
    print(canvas.astype(int))

In [None]:
# Make new game
c = Catch(grid_size=grid_size, basket_size=basket_size, actions=actions)
pprint_canvas(c.canvas)

# Given a board sate, what move does the model predict?
state = c.observe()
pprint_predicted_action(actions, action_index=model.predict(state)[0])

In [None]:
c.basket_size

__Example of trained model on a larger grid__


```

[[0 1 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 1 1 1]]

Model's predicted action: Move Left
```

Hints
-----

- The learning can be slow and unstable! Because:
    - A simple learning mechanism 
    - A small model
    - A naive implementation of experience replay
- The goal of the lab is for you to gain experience implementing experience replay, not create an optimal system.

<br>
<br> 
<br>

----