In [1]:
# Base Data Science snippet
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os
import time
from tqdm import tqdm_notebook
from scipy.spatial.distance import cdist
import imageio
from matplotlib.patches import Rectangle
from matplotlib.collections import PatchCollection

plt.style.use("seaborn-dark")

import sys
sys.path.append("../")
from rl.agents.q_agent import QAgent

In [2]:
class Environment(object):
    
    def __init__(self, n_stops=8,method="angle_difference", **kwargs):
        
        print(f"Initialized Delivery Environment with {n_stops} random stops")
        print(f"Target metric for optimization is {method}")
              
        #Initializaiton 
        self.n_stops = n_stops
        self.action_space = self.n_stops
        self.observation_space = self.n_stops
        self.stops = []
        self.method = method
        
        
        #Generate Stops 
        self._generate_stops()
        self._generate_q_values()
        self.render
        self.reset()
        
        
    def _generate_stops(self):
        
        xy = np.loadtxt('test.csv', delimiter=",")
        self.x = xy[:,0]
        self.y = xy[:,1]
    
        
    def render(self,return_img = False):
        
        fig = plt.figure(figsize=(7,7))
        ax = fig.add_subplot(111)
        ax.set_title("Stops")

        # Show stops
        ax.scatter(self.x,self.y,c = "red",s = 50)

        # Show START
        if len(self.stops)>0:
            xy = self._get_xy(initial = True)
            xytext = xy[0]+0.1,xy[1]-0.05
            ax.annotate("START",xy=xy,xytext=xytext,weight = "bold")

        # Show itinerary
        if len(self.stops) > 1:
            ax.plot(self.x[self.stops],self.y[self.stops],c = "blue",linewidth=1,linestyle="--")
            
            # Annotate END
            xy = self._get_xy(initial = False)
            xytext = xy[0]+0.1,xy[1]-0.05
            ax.annotate("END",xy=xy,xytext=xytext,weight = "bold")

        plt.xticks([])
        plt.yticks([])
        
        if return_img:
            # From https://ndres.me/post/matplotlib-animated-gifs-easily/
            fig.canvas.draw_idle()
            image = np.frombuffer(fig.canvas.tostring_rgb(), dtype='uint8')
            image  = image.reshape(fig.canvas.get_width_height()[::-1] + (3,))
            plt.close()
            return image
        else:
            plt.show()
    
    def reset(self):

        # Stops placeholder
        self.stops = [7]

        # Random first stop
        return self.stops[-1]

    def step(self,destination):

        # Get current state
        state = self._get_state()
        new_state = destination
        
        # Get reward for such a move
        reward = self._get_reward(state,new_state)

        # Append new_state to stops
        self.stops.append(destination)
        done = len(self.stops) == self.n_stops

        return new_state,reward,done

    def _get_state(self):
        return self.stops[-1]


    def _get_xy(self,initial = False):
        state = self.stops[0] if initial else self._get_state()
        x = self.x[state]
        y = self.y[state]
        return x,y

    def _get_reward(self,state,new_state):
        current_point = (self.x[state], self.y[state])
        next_point = (self.env.x[next_state], self.env.y[next_state])

        # Find the previous point
        prev_point_index = state - 1 if state > 0 else self.env.n_stops - 1
        prev_point = (self.env.x[prev_point_index], self.env.y[prev_point_index])

        # Calculate the angle of incidence
        angle_of_incidence = angle_between_points(prev_point, current_point)

        # Calculate the angle of reflection
        angle_of_reflection = angle_between_points(current_point, next_point)

        # Calculate the difference between angle of incidence and angle of reflection
        angle_difference = abs(angle_of_reflection - angle_of_incidence)

        # Return the negative difference as the reward (minimizing the difference)
        return -angle_difference

def run_episode(env,agent,verbose = 1):

    s = env.reset()
    agent.reset_memory()
    max_step = env.n_stops 

    episode_reward = 0
    
    # Remember the states
    agent.remember_state(s)
    # Take the action, and get the reward from environment
    s_next,r,done = env.step(0)

    # Tweak the reward
    r = r
    if verbose: print(s_next,r,done)

    # Update our knowledge ein the Q-table
    agent.train(s,0,r,s_next)

    # Update the caches
    episode_reward += r
    s = s_next

    i = 0
    while i < max_step-1:
        
        # Remember the states
        agent.remember_state(s)
        # Choose an action
        a = agent.act(s)
        # Take the action, and get the reward from environment
        s_next,r,done = env.step(a)

        # Tweak the reward
        r = r

        if verbose: print(s_next,r,done)

        # Update our knowledge ein the Q-table
        agent.train(s,a,r,s_next)

        # Update the caches
        episode_reward += r
        s = s_next

        # If the episode is terminated
        i += 1
        if done:
            break
            
    return env,agent,episode_reward


class DeliveryQAgent(QAgent):

    def __init__(self, env, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.env = env
        self.reset_memory()

    def act(self, s):
        # Get Q Vector
        q = np.copy(self.Q[s, :])

        # Avoid already visited states
        q[self.states_memory] = -np.inf

        # Filter out invalid actions (path from non-zero y to non-zero y and from x-axis to x-axis)
        valid_actions = [a for a in range(self.actions_size) if (self.env.y[a] == 0 and self.env.y[s] != 0) or (self.env.y[a] != 0 and self.env.y[s] == 0)]

        if np.random.rand() > self.epsilon:
            # Choose the action with the highest Q-value among valid actions
            valid_q = q[valid_actions]
            a = valid_actions[np.argmax(valid_q)]
        else:
            # Choose a random valid action
            a = np.random.choice(valid_actions)

        return a


    def remember_state(self,s):
        self.states_memory.append(s)

    def reset_memory(self):
        self.states_memory = []

    
def run_n_episodes(env,agent,name="training.gif", name2 ="optimal8.gif",n_episodes=1000,render_each=10,fps=10):

    # Store the rewards
    rewards = []
    imgs = []

    # Experience replay
    for i in tqdm_notebook(range(n_episodes)):

        # Run the episode
        env,agent,episode_reward = run_episode(env,agent,verbose = 0)
        rewards.append(episode_reward)
        
        if i % render_each == 0:
            img = env.render(return_img = True)
            imgs.append(img)

    
    # Show rewards
    plt.figure(figsize = (15,3))
    plt.title("Rewards over training")
    plt.plot(rewards)
    plt.show()

    # Save imgs as gif
    imageio.mimsave(name,imgs,fps = fps)
    return env,agent, rewards

In [3]:
def run_corr_episode(env,agent,verbose = 1):

    s = env.reset()
    agent.reset_memory()
    arr = [0, 1, 2, 3, 4, 5, 6]

    episode_reward = 0

    for i in arr:
                  
        # Remember the states
        agent.remember_state(s)
        
        # Take the action, and get the reward from environment
        s_next,r,done = env.step(i)

        # Tweak the reward
        r = r

        if verbose: print(s_next,r,done)

        # Update our knowledge ein the Q-table
        agent.train(s,i,r,s_next)

        # Update the caches
        episode_reward += r
        s = s_next

        # If the episode is terminated
        if done:
            break
            
    return env,agent,episode_reward


In [4]:
#Correct Reward Path For Light
env = Environment(n_stops = 8,method = "angle_difference" )
agent = DeliveryQAgent(env,env.observation_space,env.action_space)
env, agent, reward = run_corr_episode(env,agent)
print("Iteration Reward = ", reward)
env.render()

Initialized Delivery Environment with 8 random stops
Target metric for optimization is angle_difference


AttributeError: 'Environment' object has no attribute 'env'

In [None]:
def grid_search_hyperparameters(env, hyperparameter_grid, n_episodes=1000, render_each=10, fps=10):
    best_hyperparameters = None
    best_mean_reward = float("-inf")
    rewards_by_hyperparameters = {}

    # Loop through all possible combinations of hyperparameters
    for hyperparameters in hyperparameter_grid:
        print(f"Training with hyperparameters: {hyperparameters}")

        # Create the agent with the current set of hyperparameters
        agent = DeliveryQAgent(env,states_size=env.observation_space, actions_size=env.action_space, 
                               epsilon=hyperparameters['epsilon'],
                               epsilon_decay=hyperparameters['epsilon_decay'],
                               gamma=hyperparameters['gamma'],
                               lr=hyperparameters['lr'])

        # Train the agent and get rewards
        _, _, rewards = run_n_episodes(env, agent, n_episodes=n_episodes, render_each=render_each, fps=fps)
        rewards_by_hyperparameters[str(hyperparameters)] = rewards

        # Calculate the mean reward over the last episodes
        mean_reward = np.mean(rewards[-100:])
        print(f"Mean reward over the last 100 episodes: {mean_reward}")

        # Check if it's the best set of hyperparameters so far
        if mean_reward > best_mean_reward:
            best_mean_reward = mean_reward
            best_hyperparameters = hyperparameters

    print("Grid search complete.")
    print("Best Hyperparameters:")
    print(best_hyperparameters)

    return best_hyperparameters, rewards_by_hyperparameters


if __name__ == "__main__":
    # Define the hyperparameter grid with the range of values to search through
    hyperparameter_grid = [
        {"epsilon": 0.2, "epsilon_decay": 0.8, "gamma": 0.7, "lr": 0.1},
        {"epsilon": 0.4, "epsilon_decay": 0.9, "gamma": 0.8, "lr": 0.3},
        {"epsilon": 0.6, "epsilon_decay": 0.95, "gamma": 0.9, "lr": 0.5},
        {"epsilon": 0.8, "epsilon_decay": 0.95, "gamma": 0.9, "lr": 0.7}
    ]

    # Create the environment
    env = Environment(n_stops=8)

    # Perform grid search hyperparameter tuning
    best_hyperparameters, rewards_by_hyperparameters = grid_search_hyperparameters(env, hyperparameter_grid)

    # Train the final agent with the best hyperparameters
    agent = DeliveryQAgent(env,states_size=env.observation_space, actions_size=env.action_space, 
                           epsilon=best_hyperparameters['epsilon'],
                           epsilon_decay=best_hyperparameters['epsilon_decay'],
                           gamma=best_hyperparameters['gamma'],
                           lr=best_hyperparameters['lr'])
    p, q, r = run_n_episodes(env, agent, n_episodes=1000, render_each=10, fps=10)


In [None]:
result = r[-100:]==reward
count_true = np.count_nonzero(result)
print("Iterations matching optimal reward = ", count_true)

In [None]:
from IPython.display import Image
Image(filename="training.gif")