<h1> <p style="text-align: center;"> Simple Reinforcement Learning in Python</p></h1> 

<img src="imgs/game_intro.png" width="400">

In [None]:
%matplotlib inline

import numpy as np
from numpy.random import randint as rand
import matplotlib.pyplot as plt
from matplotlib.offsetbox import OffsetImage, AnnotationBbox
from matplotlib.gridspec import GridSpec
import copy

import tqdm.notebook
from IPython.display import clear_output
from time import sleep

In [None]:
# Create a Maze

class Maze:
    
    def __init__(self, width=81, height=51, complexity=.75, density=.75):
        
        # make a maze, convert array from True/False to integer:
        self.z = np.array(self.build_maze(width, height, complexity, density), dtype='int')
        self.shape = self.z.shape
        
        # Place Target/Reward
        self.target_image = plt.imread('imgs/bread.png') 
        self.target_x, self.target_y = self.random_availale_ij()
    
    def __repr__(self):
        
        return f"Maze {self.shape} with target at {(self.target_x, self.target_y)}\n\n{self.z}"
        
    # From Wikipedia
    def build_maze(self, width, height, complexity, density):
        # Only odd shapes
        shape = ((height // 2) * 2 + 1, (width // 2) * 2 + 1)
        # Adjust complexity and density relative to maze size
        complexity = int(complexity * (5 * (shape[0] + shape[1]))) # number of components
        density    = int(density * ((shape[0] // 2) * (shape[1] // 2))) # size of components
        # Build actual maze
        Z = np.zeros(shape, dtype=bool)
        # Fill borders
        Z[0, :] = Z[-1, :] = 1
        Z[:, 0] = Z[:, -1] = 1
        # Make aisles
        for i in range(density):
            x, y = rand(0, shape[1] // 2) * 2, rand(0, shape[0] // 2) * 2 # pick a random position
            Z[y, x] = 1
            for j in range(complexity):
                neighbours = []
                if x > 1:             neighbours.append((y, x - 2))
                if x < shape[1] - 2:  neighbours.append((y, x + 2))
                if y > 1:             neighbours.append((y - 2, x))
                if y < shape[0] - 2:  neighbours.append((y + 2, x))
                if len(neighbours):
                    y_,x_ = neighbours[rand(0, len(neighbours) - 1)]
                    if Z[y_, x_] == 0:
                        Z[y_, x_] = 1
                        Z[y_ + (y - y_) // 2, x_ + (x - x_) // 2] = 1
                        x, y = x_, y_
        return Z
    
    def random_availale_ij(self):
        
        i, j = np.random.choice(self.shape[0]), np.random.choice(self.shape[1])
        while self.z[i][j] != 0:
            i, j = np.random.choice(self.shape[0]), np.random.choice(self.shape[1])
        
        return i, j
    

    def render_target(self, ax):
        
        target_ico = OffsetImage(self.target_image, zoom=5/ax.figure.dpi, dpi_cor=False)
        target_ab = AnnotationBbox(target_ico, (self.target_y, self.target_x), frameon=False)  # flipped x/y
        ax.add_artist(target_ab)
        return ax
    
    def render(self, ax=None):
        
        if ax is None:
            fig = plt.figure(figsize=(6,6)) 
            ax = plt.gca()
            
        ax.imshow(self.z, origin='upper', vmin=0, vmax=3)
        ax = self.render_target(ax)
        
        ax.axis('off')
 
        return ax
        

In [None]:
class Agent():
    
    def __init__(self, maze, q=None):
        
        self.maze = maze
        
        self.x, self.y = self.maze.random_availale_ij()
        while (self.x, self.y) == (self.maze.target_x, self.maze.target_y):
            self.x, self.y = self.maze.random_availale_ij()
        
        self.x_prev, self.y_prev = (None, None)
        
        if q is None:
            self.q = np.zeros([maze.shape[0], maze.shape[1], 4])
        else:
            self.q = q
        
        self.a = None
        self.r = 0
        
        self.on_target = False
        
        # agent icon
        self.agent_image = plt.imread('imgs/emoji.png') 
        
            
    def __repr__(self):
        
        return f"Agent now at {(self.x, self.y)} took action {self.a} from {(self.x_prev, self.y_prev)} and got rewarded {self.r} | Policy table {self.q.shape}"
    
    def available_actions(self):
        
        d = {(-1, 0):0, (1, 0):1, (0, 1):2, (0, -1):3}  # U, D, R, L
        
        # All actions available unless lead to wall
        available_a = [d[k] for k in d if self.maze.z[self.x + k[0], self.y + k[1]] == 0]

        return available_a
        
    def pick_action(self, eps):
        
        actions = self.available_actions()
        
        strategy = np.random.random()
    
        if strategy < eps:
            # Random Exploaration
            self.a = np.random.choice(actions)
        
        else:
            # Table Exploit
            avaq = [self.q[self.x, self.y, ava] for ava in actions] # find Q of available actions
            self.a = actions[np.argmax(avaq)] # select available action with maximum Q

    
    def execute_action(self):
        
        mov = {0:(-1, 0), 1:(1, 0), 2:(0, 1), 3:(0, -1)}  # U, D, R, L
        
        self.x_prev = self.x
        self.y_prev = self.y
        
        self.x += mov[self.a][0]
        self.y += mov[self.a][1]
        
        
    def move(self, eps):
        
        self.pick_action(eps)
        
        self.execute_action()
        
        if (self.x, self.y) == (self.maze.target_x, self.maze.target_y):
            self.on_target = True
      
    def reward(self):
        
        if self.on_target:
            r = 10
        else:
            r = 0
        
        self.r = r
        
    def update_q(self, alpha, gamma):
        
        q_max = np.max(self.q[self.x, self.y, :]) # maximum Q that I can see from this new position      
            
        self.q[self.x_prev, self.y_prev, self.a] = (1 - alpha)*self.q[self.x_prev, self.y_prev, self.a] + alpha*(self.r + gamma*q_max)
      
    
    def respawn(self):
        
        new_agent = Agent(self.maze, self.q)
        
        return new_agent
        
        
    def render_q(self, axs=None):
        """
        Visualization of the policy table for each action
        """
        vmax = np.max(self.q)
        
        if axs is None:
            fig, axs = plt.subplots(1, 5, figsize=(12,3))
        
        # Individual actions Q-values
        for i, x in enumerate(axs[:-1]):
            dirs = {0:"UP", 1:"DOWN", 2:"RIGHT", 3:"LEFT"} # U, D, R, L
            x.imshow(self.q[:,:,i] - self.maze.z, origin='upper', vmin=-1, vmax=vmax)  # also see maze walls
            x.set_title(f"{dirs[i]}")
            x.axis('off')
        
        # Best Q-Value for any given state
        axs[-1].imshow(np.max(self.q, axis=-1) - self.maze.z, origin='upper', vmin=-1, vmax=vmax)
        axs[-1].axis('off')
        
        return axs
         
    def render(self, ax=None):
        
        if ax is None:
            fig = plt.figure(figsize=(6,6)) 
            ax = plt.gca()
            
        agent_ico = OffsetImage(self.agent_image, zoom=4/ax.figure.dpi, dpi_cor=False)
        agent_ab = AnnotationBbox(agent_ico, (self.y, self.x), frameon=False)  # flipped x/y
        ax.add_artist(agent_ab)
        
        return ax
    

In [None]:
def plot_dashboard(agent, maze, epoch=0):
    
    fig = plt.figure(figsize=(15, 9))
    fig.suptitle(f"Epoch {epoch}")

    # Blueprint
    gs1 = GridSpec(3, 5)
    ax1 = fig.add_subplot(gs1[0:2, 0:2])
    ax2 = fig.add_subplot(gs1[0:2, 2:4])
    ax3 = fig.add_subplot(gs1[-1, :2])
    ax4 = fig.add_subplot(gs1[-1, 2])
    ax5 = fig.add_subplot(gs1[-1, 3])
    ax6 = fig.add_subplot(gs1[0:1, 4])
    ax7 = fig.add_subplot(gs1[1:2, 4])
    ax8 = fig.add_subplot(gs1[-1, 4])

    # Maze
    ax1 = maze.render(ax1)
    ax1 = agent.render(ax1)
    
    # Brains
    axs = [ax6, ax7, ax5, ax4, ax2]  # U, D, R, L, TOT
    axs = agent.render_q(axs)
    
    # Q at given state
    # -- normalise by max Q in brain
    vmax = np.max(agent.q)
    # -- roll array so to have actions in human-friendly order L,U,D,R
    qs_here = [np.array(agent.q[agent.x, agent.y, :])]
    ax3.imshow(np.roll(qs_here, 1), vmin=-1, vmax=vmax)
    # -- Set plot ticks
    ax3.set_yticks([])
    ax3.set_xticks(range(4))
    ax3.set_xticklabels(["L", "U", "D", "R"],fontsize=16)  # note we rolled values
    # -- Remove axes framse
    ax3.spines['top'].set_visible(False)
    ax3.spines['right'].set_visible(False)
    ax3.spines['bottom'].set_visible(False)
    ax3.spines['left'].set_visible(False)

    # Avatar icon
    ax8.imshow(agent.agent_image)
    ax8.axis("off")
    
    return ax1, ax2, ax3, ax4, ax5, ax6, ax7, ax8 
  
# Exponential decay exploration rate update at episode i
def update_eps(i, min_eps, max_eps, eps_tau):
    
    return min_eps + (max_eps - min_eps) * np.exp(-i/eps_tau)

In [None]:
# HYPERPARAMETERS
maze_w = 32
maze_h = 32

# Training Parameters
alpha = 0.1  # Learning rate, keep low if evolving in stochastic environments (0.1)
gamma = 0.9  # Reward glow (0.9)
epochs = 6000  # number of epochs to train for (6000)
max_steps = 3000 # maximum number of steps agent has available per epoch (3000)
#save_epochs = np.logspace(0, np.log10(epochs), 10, dtype=int)  # epoch at which to save agent brain

# Exploration / Exploitation parameters
max_eps = 1.0  
min_eps = 0.1
eps_tau = epochs/4  # 1/e exploration rate @ eps_gamma epochs


# INIT
#qBank = []

# Create a fixed random maze
world = Maze(width=maze_w, height=maze_h)
ax = world.render()

# Init Agent with no experience
agent = Agent(world)

# TRAIN
for epoch in tqdm.notebook.tqdm(range(epochs)):
    
    # Set Exploration rate of agent in this epoch
    eps = update_eps(epoch, min_eps, max_eps, eps_tau)

    for s in range(max_steps):

        agent.move(eps)
        agent.reward()
        agent.update_q(alpha, gamma)
            
        # Plot every step if specific epoch
        if epoch+1 == epochs:
            
            axs = plot_dashboard(agent, world, epoch)
            plt.show()

            sleep(0.1)
            if not agent.on_target: clear_output(wait=True)
        
        # Stop stepping if on target
        if agent.on_target:
            break
    
    # Save agent in database
    #if epoch in save_epochs:
        #qBank.append(copy.deepcopy(agent.q))
     
    # Reset agent with newly acquired experience 
    if epoch+1 != epochs:
        agent = agent.respawn()

In [None]:
# -- Show off

for i in range(10):

    clone = copy.deepcopy(Agent(world, agent.q))

    # Show off agent
    while not clone.on_target:

        clone.move(0)  # move in complete table-exploit
        axs = plot_dashboard(clone, world)
        plt.show()

        sleep(0.1)
        clear_output(wait=True)