In [1]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.cm as cm
from matplotlib import patches
from matplotlib import colors

In [41]:
actions = {
    0: {"name":"Up", "label":"↑", "delta":(-1,0), 'angle': float(np.pi)},
    1: {"name":"Right", "label":"→", "delta":(0,1), 'angle': float(3 * np.pi / 2)},
    2: {"name":"Down", "label":"↓", "delta":(1,0), 'angle': float(0.0)},
    3: {"name":"Left", "label":"←", "delta":(0,-1), 'angle': float(np.pi / 2)},
}

color_dict = {
    '0': colors.LinearSegmentedColormap.from_list("", ["white", "xkcd:ultramarine"]),
    '1': colors.LinearSegmentedColormap.from_list("", ["white", "xkcd:magenta"]),
}

In [21]:
class GridPlotter:
    def __init__(self, agent):
        self.agent = agent

    def plot_env(self, ax=None):
        if ax is None:
            fig, ax = plt.subplots(figsize=(0.1 * self.agent.width, 0.1 * self.agent.height))
        ax.imshow((color_dict['0'](self.agent.env[:,:,0]) + color_dict['1'](self.agent.env[:,:,1]))/2, interpolation='gaussian', zorder=0) 
        ax.imshow(1 - self.agent.env, cmap=cm.binary, alpha=0.25, zorder=1)
        ax.set_aspect('equal')
        ax.set_xticks([])
        ax.set_yticks([])
        for spine in ax.spines.values():
            spine.set_visible(False)
        ax.set_xlim([self.agent.pk_hw - 1, self.agent.width - self.agent.pk_hw])
        ax.set_ylim([self.agent.height - self.agent.pk_hw, self.agent.pk_hw - 1])
        return ax 
    
    def plot_prey(self, ax):
        for prey_location in self.agent.prey_locations:
            prey_patch = patches.Circle((prey_location[1] - 0.5, prey_location[0] - 0.5), 1, color='black', zorder=2)
            ax.add_patch(prey_patch)
        return ax

    def plot_agent(self, ax, agent_location, color):
        agent_patch = patches.Rectangle((agent_location[1] - 0.5, agent_location[0] - 0.5), 1, 1, color=color, zorder=3)
        ax.add_patch(agent_patch)
        return ax

    def plot_episode(self, trial_data, ax=None):
        agent_path = trial_data["path"]
        
        if ax is None:
            ax = self.plot_env()
            ax = self.plot_rewards(ax)
            
        ax.set_title(f'{len(agent_path)} Steps')

        for i in range(len(agent_path)):
            trial_frac = i // self.agent.n_steps
            ax = self.plot_agent(ax, agent_location=agent_path[i], color=cm.vidris(trial_frac))
        return ax

    def plot_training_progress(self, trial_lengths, ax=None):
        if ax is None:
            fig, ax = plt.subplots(figsize=(6, 4))

        n_trials = len(trial_lengths)
        smoothed_episode_lengths = [np.mean(trial_lengths[max(0, i - 100):i + 1]) for i in range(n_trials)]
        ax.scatter(np.arange(n_trials), trial_lengths, linewidth=0, alpha=0.5, c='C0', label="Episode length")
        ax.plot(np.arange(len(smoothed_episode_lengths)), smoothed_episode_lengths, color='k', linestyle="--", linewidth=0.5, label="Smoothed")
        ax.set_xlabel("Episode")
        ax.set_ylabel("Length")
        ax.legend()
        ax.set_title("Training Progress")
        return ax

In [None]:
class QLearnerAgent:
    def __init__(self, pk, pk_hw, channels, actions, location, sensor_noise_scale, n_steps, n_features, cost_per_step, cost_per_collision, alpha, epsilon, gamma):
        self.pk = pk
        self.pk_hw = pk_hw
        self.channels = np.array(channels)
        self.actions = actions
        self.location = np.array(location)
        self.sensor_noise_scale = sensor_noise_scale
        self.n_steps = n_steps
        
        self.n_features = n_features
        self.n_actions = len(actions)
        self.cost_per_step = cost_per_step
        self.cost_per_collision = cost_per_collision
        self.alpha = alpha
        self.epsilon = epsilon
        self.gamma = gamma

        self.theta = np.zeros((n_features, self.n_actions))
        self.plotter = GridPlotter(self)
        
    def reset(self):
        self.agent_direction = int(0)
        
    def sense_features(self, location):
        features = np.zeros(self.n_features)
        distance, angle = self.closest_prey_features(location)
        features[0] = self.env[location[0] - 1, location[1], 0]
        features[1] = self.env[location[0] + 1, location[1], 0]
        features[2] = self.env[location[0], location[1] - 1, 0]
        features[3] = self.env[location[0], location[1] + 1, 0]
        features[4] = distance
        features[5] = angle
        return features
    
    def closest_prey_features(self, location):
        nearest_prey = min(self.prey_locations, key=lambda reward: np.linalg.norm(location - np.array(reward)))
        delta = tuple(np.array(location) - np.array(nearest_prey[0]))
        distance = float(sum(delta) / 22)
        angle = float(np.arctan2(delta[1], delta[0]))
        return distance, angle
        
    def act(self, action):
        next_location = tuple(np.array(self.location) + np.array(self.action_dict[action]['delta']))
        
        if self.grid[next_location] == 1:
            reward = (100.0 + self.cost_per_step) if next_location in self.prey_locations else self.cost_per_step
        else:
            next_location = self.location
            reward = float(self.cost_per_collision)

        return next_location, reward
    
    def policy(self, env, prey_locations):
        self.env = env
        self.prey_locations = prey_locations
        decaying_alpha = self.alpha
        decaying_epsilon = self.epsilon
        
        action = int(self.epsilon_greedy_policy(self.location, self.epsilon))        
        next_location, reward = self.act(action)
        next_action = int(self.epsilon_greedy_policy(next_location, decaying_epsilon))
        
        self.learn(self.location, next_location, action, next_action, reward, decaying_alpha)
        self.location = next_location
        ## decaying_alpha = self.update_parameter(decaying_alpha, 0.9999, 0.05)
        ## decaying_epsilon = self.update_parameter(decaying_epsilon, 0.9999, 0.05)

    def training_policy(self, env, prey_locations):
        self.env = env
        self.prey_locations = prey_locations
        decaying_alpha = self.alpha
        decaying_epsilon = self.epsilon
        
        action = int(self.epsilon_greedy_policy(self.location, self.epsilon))        
        next_location, reward = self.act(action)
        next_action = int(self.epsilon_greedy_policy(next_location, decaying_epsilon))
        
        self.learn(self.location, next_location, action, next_action, reward, decaying_alpha)
        self.location = next_location
        return next_location, reward
        # action = next_action
        ## decaying_alpha = self.update_parameter(decaying_alpha, 0.9999, 0.05)
        ## decaying_epsilon = self.update_parameter(decaying_epsilon, 0.9999, 0.05)

    def learn(self, env, location, next_location, action, next_action, reward, alpha):
        Q = float(self.Q_value(location)[action])
        Q_next =  float(self.Q_value(next_location)[next_action])
        TD_error = float(reward) + (float(self.gamma) * float(Q_next)) - float(Q)
        self.theta[:, action] += float(alpha) * float(TD_error) * self.sense_features(env, location)            
        return TD_error

    def Q_value(self, location):
        return np.dot(self.sense_features(location), self.theta)

    def epsilon_greedy_policy(self, location, epsilon):
        return int(np.random.randint(self.n_actions)) if np.random.rand() < epsilon else int(np.argmax(self.Q_value(location)))
    
    def update_parameter(self, parameter, decay_rate, parameter_min):
        return max(parameter * decay_rate, parameter_min)
    
    def produce_training_plots(self, training, first_5_last_5, training_trials, trial_lengths):
        if training:
            self.plotter.plot_training_progress(trial_lengths=trial_lengths)
        if first_5_last_5:
            fig, axs = plt.subplots(2, 5, figsize=(10, 4))
            for i in range(5):
                axs[0, i] = self.plotter.plot_env(ax=axs[0, i])
                axs[1, (4 - i)] = self.plotter.plot_env(ax=axs[1, (4 - i)])
                axs[0, i] = self.plotter.plot_rewards(ax=axs[0, i])
                axs[1, (4 - i)] = self.plotter.plot_rewards(ax=axs[1, (4 - i)])
                
                self.plotter.plot_episode(training_trials[i], ax=axs[0, i])
                self.plotter.plot_episode(training_trials[len(training_trials) - i - 1], ax=axs[1, (4 - i)])