In [None]:
# Initial imports and enviroment setup
import numpy as np
import sys
import seaborn as sns
import random
import time

import matplotlib.pyplot as plt
import plotly.express as px
import plotly.graph_objects as go
import plotly.io as pio
from collections import defaultdict
pio.renderers.default = 'notebook_connected'

import os
sns.set()

os.environ["SDL_VIDEODRIVER"] = "dummy"
from IPython.display import clear_output

if 'notebook' in os.getcwd():
    os.chdir('..')

import gym
import gym.envs.toy_text
import copy 

# Temporal Differencing (TD)

An approach that combines the benifits of Dynamic Programming (DP) and Monte Carlo (MC) by using bootstrapping from DP and the sample-based approach from MC. Below is the implementation of TD(0) where the 0 refers to one less than how many future states are discounted.

$$ V'(s) = V(s) + \alpha[R + \lambda V(s') - V(s)]$$
$$ Q'(s,a) = Q(s,a) + \alpha[R + \lambda Q(s',a') - Q(s',a')]$$

where:
- $V(s)$: Current value of a state
- $V'(s)$: New value of a state
- $s$: current state
- $s'$: next state after an action is taken
- $\alpha$: learning rate
- $a$: action
- $\lambda$: discount factor

# Cart Pole Environment

In [None]:
env = gym.make("CartPole-v1")
print('Action Space:', env.action_space)
print('Observation Space:', env.observation_space)

In [None]:
def discretize_observation(x):
    """Discretizes the observation space to a fixed number of bins. Please change me"""
    
    x = np.clip(x, -10_000, 10_000)
    x_max = np.clip(env.observation_space.high, -10_000, 10_000)
    x_min = np.clip(env.observation_space.low, -10_000, 10_000)
    
    x = (x - x_min) / (x_max - x_min)
    
    return tuple([int(xi * 100) for xi in x])

In [None]:
discretize_observation(env.reset())

# State-Actio-Reward-State-Action (SARSA)

This is in on-policy control algorithm. It sample step by step and updates the Q values where the learning rate can be controlled. 

In [None]:
# SARSA Learning agent class

class SARSAAgent:
    def __init__(self, alpha, epsilon, gamma, get_possible_actions):
        """ Implementation of a SARSA model.

        Args:
            alpha (float): Learning rate when updating the Q values.
            epsilon (float): Probability to produce random action.
            gamma (flaot): Discount factor
            get_possible_actions (list): List of possible actions to return.
        """
        self.get_possible_actions = get_possible_actions
        self.alpha = alpha
        self.epsilon = epsilon
        self.gamma = gamma
        self._Q = defaultdict(lambda: defaultdict(lambda: 0)) # Q-values of unseen state-action pairs default to 0.

    def get_Q(self, state, action):
        return self._Q[state][action]

    def set_Q(self, state, action, value):
        self._Q[state][action] = value

    # carryout SARSA updated based on the sample (S, A, R, S', A')
    def update(self, state, action, reward, next_state, next_action, done):
        # Implement temporal differencing
        if not done:
            # Calculate future discounted reward
            td_error = reward + \
                       self.gamma * self.get_Q(next_state, next_action) - \
                       self.get_Q(state, action)
        else:
            # No future discounter reward
            td_error = reward - self.get_Q(state, action)

        # Update Q-value
        new_value = self.get_Q(state, action) + self.alpha * td_error
        self.set_Q(state, action, new_value)

    # get argmax for q(s,a)
    def max_action(self, state):
        actions = self.get_possible_actions(state)
        best_action = []
        best_q_value = float("-inf")

        for action in actions:
            q_s_a = self.get_Q(state, action)
            if q_s_a > best_q_value:
                best_action = [action]
                best_q_value = q_s_a
            elif q_s_a == best_q_value:
                best_action.append(action)
        return np.random.choice(np.array(best_action))

    # choose action as per epsilon-greedy policy
    def get_action(self, state):
        actions = self.get_possible_actions(state)

        if len(actions) == 0:
            return None

        if np.random.random() < self.epsilon:
            a = np.random.choice(actions)
            return a
        else:
            a = self.max_action(state)
            return a

In [None]:
# training algorithm
def train_sarsa_agent(env, agent, episode_cnt=10000, tmax=10000, anneal_eps=True):
    episode_rewards = []
    for i in range(episode_cnt):
        G = 0
        state = env.reset()
        state = discretize_observation(state)
        action = agent.get_action(state)
        for t in range(tmax):
            next_state, reward, done, _ = env.step(action)
            next_state = discretize_observation(next_state)
            
            next_action = agent.get_action(next_state)
            agent.update(state, action, reward, next_state, next_action, done)
            G += reward
            if done:
                episode_rewards.append(G)
                # to reduce the exploration probability epsilon over the
                # training period.
                if anneal_eps:
                    agent.epsilon = agent.epsilon * 0.99
                break
            state = next_state
            action = next_action
    return np.array(episode_rewards)

In [None]:
# plot rewards
def plot_rewards(env_name, rewards, label, n = 20):
    plt.title("env={}, Mean reward = {:.1f}".format(env_name,
                                                    np.mean(rewards[-n:])))
    rewards_mean = np.array([np.nan] * len(rewards))
    for i in range(n,len(rewards)):
        rewards_mean[i] = rewards[i-n:i].mean()
    plt.plot(rewards, label=label)
    plt.plot(rewards_mean, label=label + '_mean')
    plt.grid()
    plt.legend()
    plt.show()

In [None]:
# create a SARSA agent
sarsa_agent = SARSAAgent(
            alpha=0.2,
            epsilon=0.2,
            gamma=0.995,
            get_possible_actions=lambda s : range(env.action_space.n)
        )

#train agent and get rewards for episodes
rewards = train_sarsa_agent(env, sarsa_agent, episode_cnt = 1000)

In [None]:
len(sarsa_agent._Q)

In [None]:
# plot rewards
plot_rewards("Cart Pole V1", rewards, 'SARSA')

In [None]:
tmax = 10000
G = 0

env_run = copy.deepcopy(env)
state = env_run.reset()
sarsa_agent.epsilon = 0.0

for t in range(tmax):
    state = discretize_observation(state)
    action = sarsa_agent.get_action(state)
    
    env_run.render(mode = 'human')
    next_state, reward, done, _ = env_run.step(action)
    G += reward
    if done:
        break

    time.sleep(0.1)

print(f'lasted for {t} steps')
env_run.close()

# Q-Learning

This is in on-policy control algorithm. It sample step by step and updates the Q values where the learning rate can be controlled. 

In [None]:
class QLearningAgent:
    def __init__(self, alpha, epsilon, gamma, get_possible_actions):
        """ Implementation of a Q-learning model.

        Args:
            alpha (float): Learning rate when updating the Q values.
            epsilon (float): Probability to produce random action.
            gamma (flaot): Discount factor
            get_possible_actions (list): List of possible actions to return.
        """
        self.get_possible_actions = get_possible_actions
        self.alpha = alpha
        self.epsilon = epsilon
        self.gamma = gamma
        self._Q = defaultdict(lambda: defaultdict(lambda: 0))

    def get_Q(self, state, action):
        return self._Q[state][action]

    def set_Q(self, state, action, value):
        self._Q[state][action] = value

    # Q learning update step
    def update(self, state, action, reward, next_state, done):
        if not done:
            best_next_action = self.max_action(next_state)
            td_error = reward + \
                       self.gamma * self.get_Q(next_state, best_next_action) \
                       - self.get_Q(state, action)
        else:
            td_error = reward - self.get_Q(state, action)

        new_value = self.get_Q(state, action) + self.alpha * td_error
        self.set_Q(state, action, new_value)

    # get best A for Q(S,A) which maximizes the Q(S,a) for actions in state S
    def max_action(self, state):
        actions = self.get_possible_actions(state)
        best_action = []
        best_q_value = float("-inf")

        for action in actions:
            q_s_a = self.get_Q(state, action)
            if q_s_a > best_q_value:
                best_action = [action]
                best_q_value = q_s_a
            elif q_s_a == best_q_value:
                best_action.append(action)
        return np.random.choice(np.array(best_action))

    # choose action as per epsilon-greedy policy for exploration
    def get_action(self, state):
        actions = self.get_possible_actions(state)

        if len(actions) == 0:
            return None

        if np.random.random() < self.epsilon:
            a = np.random.choice(actions)
            return a
        else:
            a = self.max_action(state)
            return a

In [None]:
# training algorithm
def train_q_agent(env, agent, episode_cnt=10000, tmax=10000, anneal_eps=0.99):
    episode_rewards = []
    for i in range(episode_cnt):
        G = 0
        state = env.reset()
        state = discretize_observation(state)
        for t in range(tmax):
            action = agent.get_action(state)
            next_state, reward, done, _ = env.step(action)
            next_state = discretize_observation(next_state)
            agent.update(state, action, reward, next_state, done)
            G += reward
            if done:
                episode_rewards.append(G)
                # to reduce the exploration probability epsilon over the
                # training period.
                agent.epsilon = agent.epsilon * anneal_eps
                break
            state = next_state
    return np.array(episode_rewards)

In [None]:
# create a Q Learning agent
q_agent = QLearningAgent(alpha=0.2, epsilon=0.2, gamma=0.995, 
                       get_possible_actions=lambda s : range(env.action_space.n))

#train agent and get rewards for episodes
rewards = train_q_agent(env, q_agent, episode_cnt = 1000)

In [None]:
len(q_agent._Q)

In [None]:
# Plot rewards
plot_rewards("Cart Pole V1",rewards, 'Q-Learning')

In [None]:
tmax = 10000
G = 0

env_run = copy.deepcopy(env)
state = env_run.reset()
q_agent.epsilon = 0.0
for t in range(tmax):
    env_run.render(mode = 'human')
    
    state = discretize_observation(state)
    action = q_agent.get_action(state)

    next_state, reward, done, _ = env_run.step(action)
    G += reward
    if done:
        break
    state = next_state

    time.sleep(0.1)

print(f'lasted for {t} steps with a score of {G}')
env_run.close()

## Hyperparameter tuning

In [None]:
import optuna
from optuna.visualization import *

In [None]:
env_run = copy.deepcopy(env)

In [None]:
def objective(trial):
    alpha = trial.suggest_float("alpha", 1e-6, 5e-1) #trial.suggest_float("alpha", 1e-6, 5e-1, log=True)
    epsilon = trial.suggest_float("epsilon", 1e-6, 9e-1) #trial.suggest_float("epsilon", 1e-6, 9e-1, log=True)
    anneal_eps = trial.suggest_float("anneal_eps", 9e-1, 1.0, log=True)
    
    # create a Q Learning agent
    q_agent = QLearningAgent(alpha=alpha, epsilon=epsilon, gamma=0.995,
                        get_possible_actions=lambda s : range(env.action_space.n))

    #train agent and get rewards for episodes
    train_q_agent(env, q_agent, episode_cnt = 300, anneal_eps = anneal_eps)
    
    # Evaluate Agent
    tmax = 10000
    G = 0

    state = env_run.reset()
    q_agent.epsilon = -1.0
    for t in range(tmax):
        state = discretize_observation(state)
        action = q_agent.get_action(state)

        next_state, reward, done, _ = env_run.step(action)
        G += reward
        if done:
            break
        state = next_state
    return G

In [None]:
study = optuna.create_study(direction = 'maximize')

In [None]:
study.optimize(objective, n_trials = 1000, n_jobs=-1)

In [None]:
fig = plot_parallel_coordinate(study)
fig.show()

In [None]:
fig = plot_param_importances(study)
fig.show()

In [None]:
fig = plot_contour(study)
fig.update_layout(height = 2400, width = 1600)

In [None]:
trial = study.best_trial

print('Best trial value:', trial.value)
for key, value in trial.params.items():
    print(f'{key} = {value},')

In [None]:
# create a Q Learning agent
q_agent = QLearningAgent(alpha = trial.params['alpha'], epsilon = trial.params['epsilon'], gamma=0.995, 
                       get_possible_actions=lambda s : range(env.action_space.n),
                       )

#train agent and get rewards for episodes
rewards = train_q_agent(env, q_agent, episode_cnt = 10000, anneal_eps = trial.params['anneal_eps'])

# Plot rewards
plot_rewards("Cart Pole V1",rewards, 'Q-Learning')

In [None]:
tmax = 10000
G = 0

env_run = copy.deepcopy(env)
state = env_run.reset()
q_agent.epsilon = -1.0
for t in range(tmax):
    env_run.render(mode = 'human')
    
    state = discretize_observation(state)
    action = q_agent.get_action(state)

    next_state, reward, done, _ = env_run.step(action)
    G += reward
    if done:
        break
    state = next_state

    time.sleep(0.1)

print(f'lasted for {t} steps with a score of {G}')
env_run.close()