## Cart Pole

The following tutorial has been used to implement Random Policy:

http://kvfrans.com/simple-algoritms-for-solving-cartpole/

### Random Search

In [None]:
import gym
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import math

In [None]:
def run_episode(env, parameters):
    #reset environment
    observation = env.reset()
    #total rewards gathered (get 1 for each timestep pole has not been dropped)
    totalreward = 0
    #run for 200 timesteps
    for _ in range(200):
        #env.render()
        #multiply matrix of parameters by obervations
        #if the total is less than 0 set action to be move left, otherwise move right
        action = 0 if np.matmul(parameters,observation) < 0 else 1
        #execute action
        observation, reward, done, info = env.step(action)
        #give 1 reward
        totalreward += reward
        #if pole fell over, break the loop. Episode ends.
        if done:
            break
    #env.close()
    #return how may timesteps the pole way able to be balanced for an episode
    return totalreward

In [None]:
def train(submit):
    #load the CartPole environment
    env = gym.make('CartPole-v0')
           
    #how many episodes it took for the pole to be balanced for 200 timesteps
    counter = 0
    #best starting weights/observations
    bestparams = None
    #best reward achieved from an episode
    bestreward = 0
    
    #run 10000 episodes
    for _ in range(10000):
        counter += 1
        #generate random weights/observations
        parameters = np.random.rand(4) * 2 - 1
        #get the total number of rewards for that episode, with random weights/observations
        reward = run_episode(env,parameters)
        #if the rewards from current episode were better than last best episode
        if reward > bestreward:
            #set the best episode to be the current one 
            bestreward = reward
            #set best weights/observations to be those used for the current episode
            bestparams = parameters
            # considered solved if the agent lasts 200 timesteps
            if reward == 200:
                break

    if submit:
        for _ in range(100):
            env.render()
            run_episode(env,bestparams)
        env.close()

    return counter

In [None]:
train(submit=False)

In [None]:
# create graphs
results = []
for _ in range(1000):
    results.append(train(submit=False))

plt.hist(results,50,density=1, facecolor='g', alpha=0.75)
plt.xlabel('Episodes required to reach 200')
plt.ylabel('Frequency')
plt.title('Histogram of Random Search')
plt.show()

print(np.sum(results) / 1000.0)


### Hill Climbing

In [None]:
def run_episode(env, parameters):
    observation = env.reset()
    totalreward = 0
    counter = 0
    for _ in range(200):
        # env.render()
        action = 0 if np.matmul(parameters,observation) < 0 else 1
        observation, reward, done, info = env.step(action)
        totalreward += reward
        counter += 1
        if done:
            break
    return totalreward

In [None]:
def train(submit):
    env = gym.make('CartPole-v0')
    
    episodes_per_update = 5
    noise_scaling = 0.1
    parameters = np.random.rand(4) * 2 - 1
    bestreward = 0
    counter = 0

    for _ in range(2000):
        counter += 1
        newparams = parameters + (np.random.rand(4) * 2 - 1)*noise_scaling
        
        reward = run_episode(env,newparams)
        
        if reward > bestreward:
            bestreward = reward
            parameters = newparams
            if reward == 200:
                break

    if submit:
        for _ in range(100):
            env.render()
            run_episode(env,bestparams)
        env.close()
    return counter

In [None]:
# create graphs
results = []
for _ in range(1000):
    results.append(train(submit=False))

plt.hist(results,50,density=1, facecolor='g', alpha=0.75)
plt.xlabel('Episodes required to reach 200')
plt.ylabel('Frequency')
plt.title('Histogram of Hill Climbing')
plt.show()

print(np.sum(results) / 1000.0)

## Q-Learning

The following tutorial has been used to implement Q-Learning:
https://medium.com/@flomay/using-q-learning-to-solve-the-cartpole-balancing-problem-c0a7f47d3f9d

In [None]:
class CartPoleQAgent():
    
    """
    Initialise the environment and hyperparameters.
    """
    def __init__(self, buckets=(3, 3, 6, 6), 
                 num_episodes=500, min_lr=0.1, 
                 min_epsilon=0.1, discount=1.0, decay=25):
        self.buckets = buckets
        self.num_episodes = num_episodes
        self.min_lr = min_lr
        self.min_epsilon = min_epsilon
        self.discount = discount
        self.decay = decay

        self.env = gym.make('CartPole-v0')
        
        # Initialise Q_table
        self.Q_table = np.zeros(self.buckets + (self.env.action_space.n,))

        # set upper and lower bounds for [position, velocity, angle, angular velocity]
        self.upper_bounds = [self.env.observation_space.high[0], 0.5, self.env.observation_space.high[2], math.radians(50) / 1.]
        self.lower_bounds = [self.env.observation_space.low[0], -0.5, self.env.observation_space.low[2], -math.radians(50) / 1.]
        
        # used to store timesteps for each episode
        self.steps = np.zeros(self.num_episodes)
        
        
    
    """
    discretize_state() is used to discretize the observation values, which are continuious.
    This is done by grouping similar values together, so that the Q-table space is reduced and it can be filled easily.
    """
    def discretize_state(self, obs):
        discretized = list()
        for i in range(len(obs)):
            scaling = ((obs[i] + abs(self.lower_bounds[i])) 
                       / (self.upper_bounds[i] - self.lower_bounds[i]))
            new_obs = int(round((self.buckets[i] - 1) * scaling))
            new_obs = min(self.buckets[i] - 1, max(0, new_obs))
            discretized.append(new_obs)
        return tuple(discretized)

    """
    choose_action() is used to choose the next action in a greedy way, 
    using either a random action or using q-values from q-table for the given state.
    """
    def choose_action(self, state):
        if (np.random.random() < self.epsilon):
            return self.env.action_space.sample() 
        else:
            return np.argmax(self.Q_table[state])
    """
    get_action() is a variation of the previous function. It will assume that the state passed in
    is not discretized, and calculates probabilites for each action.
    """
    def get_action(self, state, e):
        obs = self.discretize_state(state)
        action_vector = self.Q_table[obs]
        epsilon = self.get_epsilon(e)
        action_vector = self.normalize(action_vector, epsilon)
        return action_vector

    """
    normalize() is used to calculate the probability of each action being chosen for the state passed in.
    """
    def normalize(self, action_vector, epsilon):        
        total = sum(action_vector)
        new_vector = (1-epsilon)*action_vector/(total)
        new_vector += epsilon/2.0
        return new_vector

    """
    update_q() is used to calculate the Q-value for a given state and action, and then update the Q-table.
    """
    def update_q(self, state, action, reward, new_state):
        self.Q_table[state][action] += (self.learning_rate * (reward + self.discount * np.max(self.Q_table[new_state]) - self.Q_table[state][action]))

    """
    get_epsilon() is used to get the epsilon value, which decays or declines as episodes are run.
    """
    def get_epsilon(self, t):
        # Ensures that there's almost at least a min_epsilon chance of randomly exploring
        return max(self.min_epsilon, min(1., 1. - math.log10((t + 1) / self.decay)))
    
    """
    get_learning_rate() is used to get the value for the learning rate. Which declines as episodes are run.
    """
    def get_learning_rate(self, t):
        
        # Learning rate also declines as we add more episodes
        return max(self.min_lr, min(1., 1. - math.log10((t + 1) / self.decay)))

    """
    Used to train the agent using Q-Learning using the greedy method. It gets trained by default for 500 episodes. 
    """
    def train(self):
        # Looping for each episode
        for e in range(self.num_episodes):
            # Initializes the state
            current_state = self.discretize_state(self.env.reset())

            self.learning_rate = self.get_learning_rate(e)
            self.epsilon = self.get_epsilon(e)
            done = False
            
            # Looping for each step
            while not done:
                self.steps[e] += 1
                # Choose A from S
                action = self.choose_action(current_state)
                # Take action
                obs, reward, done, _ = self.env.step(action)
                new_state = self.discretize_state(obs)
                # Update Q(S,A)
                self.update_q(current_state, action, reward, new_state)
                current_state = new_state
                
                # We break out of the loop when done is False which is
                # a terminal state.
        print('Finished training!')
    
    """
    Used to create a line plot to show how long the agent is able to balance the pole at each episode.
    """
    def plot_learning(self):
        sns.lineplot(range(len(self.steps)),self.steps)
        plt.xlabel("Episode")
        plt.ylabel("Steps")
        plt.title("CartPole-Problem")
        plt.show()
        t = 0
        for i in range(self.num_episodes):
            if self.steps[i] == 200:
                t+=1
        #how many times the agent was able to balance the pole for 200 timesteps.
        print(t, "episodes were successfully completed.")
        
        
    """
    Used after Q-Learning training to visualise how well the agent is able to perform now.
    """
    def run(self):
        """Runs an episode while displaying the cartpole environment."""
        self.env = gym.wrappers.Monitor(self.env,'cartpole')
        t = 0
        done = False
        current_state = self.discretize_state(self.env.reset())
        while not done:
                self.env.render()
                t = t+1
                action = self.choose_action(current_state)
                obs, reward, done, _ = self.env.step(action)
                new_state = self.discretize_state(obs)
                current_state = new_state
            
        return t

In [None]:
"""
Driver code
"""
def load_q_learning():
    agent = CartPoleQAgent()
    agent.train()
    agent.plot_learning()

    return agent

In [None]:
agent = load_q_learning()