In [2]:
import sys
sys.path.append('c:/users/sadia/documents/python scripts/envs/gym/lib/site-packages')
import random
import math
import numpy as np
import gym
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt

In [3]:
class Random():
    def __init__(self):
        super().__init__()

    # the training action is any random action from within the environment action space
    def action(self, env):
        return env.action_space.sample()

In [4]:
class Qlearner():
    def __init__(self, parameters):
        self.alpha = parameters['alpha']
        self.gamma = parameters['gamma']
        self.epsilon = parameters['epsilon']
        super().__init__()

    def initialize_frozen_lake_q_table(self, env):
        self.q_table = np.zeros([env.observation_space.n, env.action_space.n])

    def frozen_lake_training_action(self, env, observation):
        self.previous_observation = observation
        if random.uniform(0, 1) < self.epsilon:
            return env.action_space.sample() # explore
        else:
            return np.argmax(self.q_table[observation]) # exploit

    def frozen_lake_evaluation_action(self, observation):
        return np.argmax(self.q_table[observation])

    def frozen_lake_update(self, observation, action, reward):
        # updates the previous observation qtable entry with the reward gained,
        # uses the maximum/best future option always
        old_value = self.q_table[self.previous_observation, action]
        next_max = np.max(self.q_table[observation])
        new_value = (1 - self.alpha) * old_value + self.alpha * (reward + self.gamma * next_max)
        self.q_table[self.previous_observation, action] = new_value  

In [5]:
class TDlearner():
    def __init__(self, parameters):
        self.alpha = parameters['alpha']
        self.gamma = parameters['gamma']
        self.epsilon = parameters['epsilon']
        super().__init__()
    def initialize_frozen_lake_q_policy(self, env):
        self.q_policy = np.ones([env.observation_space.n, env.action_space.n])
        self.obs_range = env.action_space.n

    def frozen_lake_training_action(self, env, observation):
        self.previous_observation = observation
        if random.uniform(0, 1) < self.epsilon:
            return env.action_space.sample() # explore
        else:
            # exploit using a probability weighted selection fom future states
            # with the existing policy
            next_actions = self.q_policy[observation]
            next_actions_sum = sum(next_actions)
            weighted_actions = [action / next_actions_sum for action in next_actions]
            return np.random.choice(np.arange(self.obs_range), p=weighted_actions)

    def frozen_lake_evaluation_action(self, observation):
        next_actions = self.q_policy[observation]
        next_actions_sum = sum(next_actions)
        weighted_actions = [action / next_actions_sum for action in next_actions]
        return np.random.choice(np.arange(self.obs_range), p=weighted_actions)

    def frozen_lake_update(self, observation, action, reward):
        # updates the policy with the reward gained, using a probability weighted
        # selection fom future states with the existing policy
        old_value = self.q_policy[self.previous_observation, action]
        next_actions = self.q_policy[observation]
        next_actions_sum = sum(next_actions)
        weighted_actions = [action / next_actions_sum for action in next_actions]
        next_action_score = np.random.choice(next_actions, p=weighted_actions)
        new_value = (1 - self.alpha) * old_value + self.alpha * (reward + self.gamma * next_action_score)
        self.q_policy[self.previous_observation, action] = new_value


In [6]:
class Driver:
    def __init__(self, params):
        self.epochs = params['epochs']
        self.env = params['env']
        self.agent = params['agent']
        self.training_rewards = []
        self.evaluation_rewards = []
    def run_frozen_lake_random(self):
        training_action = lambda _observation: self.agent.action(self.env)
        update = lambda _observation, _action, _reward: None
        evaluation_action = training_action

        self.run(training_action, update, evaluation_action)

    def run_frozen_lake_qlearner(self):
        self.agent.initialize_frozen_lake_q_table(self.env)

        training_action = lambda observation: self.agent.frozen_lake_training_action(self.env, observation)
        update = lambda observation, action, reward: self.agent.frozen_lake_update(observation, action, reward)
        evaluation_action = lambda observation: self.agent.frozen_lake_evaluation_action(observation)

        self.run(training_action, update, evaluation_action)

    def run_frozen_lake_tdlearner(self):
        self.agent.initialize_frozen_lake_q_policy(self.env)

        training_action = lambda observation: self.agent.frozen_lake_training_action(self.env, observation)
        update = lambda observation, action, reward: self.agent.frozen_lake_update(observation, action, reward)
        evaluation_action = lambda observation: self.agent.frozen_lake_evaluation_action(observation)

        self.run(training_action, update, evaluation_action)

    # main engine: training and evaluation loop, plot then demonstrate
    def run(self, training_action, update, evaluation_action):
        for i in range(self.epochs):
            if ((i + 1) % 1000 == 0):
                print("progress: {}%".format(100 * (i + 1) // self.epochs))
            self.train_once(training_action, update)
            self.evaluate_once(evaluation_action)

        self.plot()
        
        try:
            self.demonstrate(evaluation_action)
        except NotImplementedError:
            print("Cannot demonstrate: render method on env not implemented.")

    # a single instance of training of the agent in the environment
    def train_once(self, training_action, update):
        observation = self.env.reset()
        done = False
        episode_reward = 0
        while not done:
            action = training_action(observation)
            observation, reward, done, info = self.env.step(action)
            episode_reward += reward
            update(observation, action, reward)
        self.training_rewards.append(episode_reward)

    # a single instance of evaluation of the agent at it's current level of training
    def evaluate_once(self, evaluation_action):
        observation = self.env.reset()
        done = False
        episode_reward = 0
        while not done:
            action = evaluation_action(observation)
            observation, reward, done, info = self.env.step(action)
            episode_reward += reward
        self.evaluation_rewards.append(episode_reward)

    # plot training and evaluation reward levels at each epoch
    def plot(self):
        plt.subplot('211')
        plt.plot(self.training_rewards, linewidth=1)
        plt.title('Training reward over time')
        plt.ylabel('reward')
        plt.xlabel('iterations')

        plt.subplot('212')
        plt.plot(self.evaluation_rewards, linewidth=1)
        plt.title('Evaluation reward over time')
        plt.ylabel('reward')
        plt.xlabel('iterations')

        plt.show()

    # use the environments render method and print some additional info
    # to the console. permit user input for repeated demonstrations in a loop
    def demonstrate(self, evaluation_action):
        user_input = 'Y'
        while (user_input == 'Y'):
            observation = self.env.reset()
            done = False
            episode_reward = 0
            reward = 0
            step = 0
            while not done:
                print(f"Step: {step} | Cumulative Reward: {episode_reward}")
                step += 1
                print("RENDERING...")
                self.env.render()
                action = evaluation_action(observation)
                print('observation: ', observation)
                print('action: ', action)
                print('reward: ', reward)
                observation, reward, done, info = self.env.step(action)
                episode_reward += reward

            user_input = input('Enter Y for another demo: ')

In [None]:
def frozen_lake_random():
    agent = Random()
    driver = Driver({
        'epochs': 1000,
        'env': gym.make('FrozenLake-v0'),
        'agent': agent,
    })
    driver.run_frozen_lake_random()

def frozen_lake_qlearner():
    agent = Qlearner({
        'alpha': 0.1,
        'gamma': 0.6,
        'epsilon': 0.3,
    })
    driver = Driver({
        'epochs': 10000,
        'env': gym.make('FrozenLake-v0'),
        'agent': agent,
    })
    driver.run_frozen_lake_qlearner()

def frozen_lake_tdlearner():
    agent = TDlearner({
        'alpha': 0.1,
        'gamma': 0.6,
        'epsilon': 0.3,
    })
    driver = Driver({
        'epochs': 10000,
        'env': gym.make('FrozenLake-v0'),
        'agent': agent,
    })
    driver.run_frozen_lake_tdlearner()
if __name__ == '__main__':
    frozen_lake_random()
    #frozen_lake_qlearner()
    #frozen_lake_tdlearner()

progress: 100%
