In [87]:
import gym
import numpy as np
import sys
import matplotlib.pyplot as plt
from collections import defaultdict
from torch.utils.tensorboard import SummaryWriter

from abc import ABC, abstractmethod
import time
from tqdm import tqdm
from collections import deque

In [104]:
np.random.seed(42)

In [123]:
np.random.normal(-0.1, 1)

-1.0080240755212109

In [124]:
class MaxBiasEnv(gym.Env):
    def __init__(self) -> None:
        super().__init__()

        self.action_space = gym.spaces.Discrete(2)
        self.observation_space = gym.spaces.Discrete(3)

        self.reset()

    def step(self, action):
        if self.state == 0:
            if action == 0:  # Going Right
                self.state = 2
                return self.state, 0, True, {}

            elif action == 1:
                self.state = 1
                return self.state, 0, False, {}

        elif self.state == 1: # Going Left
            self.state = 2
            return self.state, np.random.normal(-0.1, 1), True, {}

        else:
            return self.state, 0, True, {}

    def reset(self):
        self.state = 0
        
        return self.state


In [125]:
env = MaxBiasEnv() # gym.make("FrozenLake-v1")
env.seed(42)

In [149]:
class Agent(ABC):
    def __init__(self, n_states, n_actions, lr, gamma, epsilon=0.1) -> None:
        self.n_states = n_states
        self.n_actions = n_actions
        self.lr = lr
        self.gamma = gamma
        self.epsilon = epsilon

        self.steps = 0

    def __str__(self):
        return f"{self.__class__.__name__}"

    @abstractmethod
    def init_Q(self):  ## set everything equal to zero for all state n action pairs
        pass

    @abstractmethod
    def choose_action(self, state):  ## choose action using epsilon greedy algorithm
        pass

    @abstractmethod
    def learn(self, state, action, reward, state_):
        pass


In [150]:
class QLearningAgent(Agent):
    def __init__(self, n_states, n_actions, lr, gamma, epsilon) -> None:
        super(QLearningAgent, self).__init__(n_states, n_actions, lr, gamma, epsilon)

        self.Q = {}  ##Q table is initialized to empty dictionary
        self.init_Q()  ##specific function to initialize the Q table

        self.steps = 0

    def init_Q(self):  ## set everything equal to zero for all state n action pairs
        for state in range(self.n_states):
            self.Q[state] = np.zeros(self.n_actions)

    def choose_action(self, state):  ## choose action using epsilon greedy algorithm
        if np.random.random() < self.epsilon:
            action = np.random.choice(self.n_actions)
        else:
            action = np.argmax(self.Q[state])
        return action

    def learn(self, state, action, reward, state_):
        self.steps += 1

        self.Q[state][action] = self.Q[state][action] + self.lr * (
            (reward + self.gamma * np.amax(self.Q[state_])) - self.Q[state][action]
        )


In [151]:
class DoubleQLearningAgent(Agent):
    def __init__(self, n_states, n_actions, lr, gamma, epsilon) -> None:
        super(DoubleQLearningAgent, self).__init__(
            n_states, n_actions, lr, gamma, epsilon
        )

        self.Q1 = {}  ##Q table is initialized to empty dictionary
        self.Q2 = {}  ##Q table is initialized to empty dictionary

        self.init_Q()  ##specific function to initialize the Q table
        self.steps = 0

    def init_Q(self):  ## set everything equal to zero for all state n action pairs
        for state in range(self.n_states):
            self.Q1[state] = np.zeros(self.n_actions)
            self.Q2[state] = np.zeros(self.n_actions)

    def choose_action(self, state):  ## choose action using epsilon greedy algorithm
        if np.random.random() < self.epsilon:
            action = np.random.choice(self.n_actions)
        else:
            action = np.argmax(self.Q1[state] + self.Q2[state])
        return action

    def learn(self, state, action, reward, state_):
        self.steps += 1

        if np.random.random() < 0.5:
            actor_action = np.argmax(self.Q1[state_])
            self.Q1[state][action] += self.lr * (
                reward
                + self.gamma * self.Q2[state_][actor_action]
                - self.Q1[state][action]
            )

        else:
            actor_action = np.argmax(self.Q2[state_])
            self.Q2[state][action] += self.lr * (
                reward
                + self.gamma * self.Q1[state_][actor_action]
                - self.Q2[state][action]
            )

        self.decrement_epsilon()


In [152]:
class FAFODoubleQLearningAgent(DoubleQLearningAgent):
    def __init__(self, n_states, n_actions, lr, gamma, epsilon) -> None:
        super(FAFODoubleQLearningAgent, self).__init__(
            n_states, n_actions, lr, gamma, epsilon
        )

    def learn(self, state, action, reward, state_):

        self.steps += 1

        if np.random.random() < 0.5:
            actor_action = np.argmax(self.Q2[state_])
            self.Q1[state][action] += self.lr * (
                reward
                + self.gamma * self.Q1[state_][actor_action]
                - self.Q1[state][action]
            )

        else:
            actor_action = np.argmax(self.Q1[state_])
            self.Q2[state][action] += self.lr * (
                reward
                + self.gamma * self.Q2[state_][actor_action]
                - self.Q2[state][action]
            )

        self.decrement_epsilon()


In [183]:
config = {
    "n_states": env.observation_space.n,
    "n_actions": env.action_space.n,
    "lr": 0.1,
    "gamma": 1,
    "epsilon": 0.1,
}



In [201]:
def eval_agent(agent, steps = 100):
    num_left = 0
    num_total = 0
    for t in range(steps):
        env.seed(t)
        state, done = env.reset(), False
        
        while not done:
            action = agent.choose_action(state)

            num_left += (action == 1)
            num_total += 1

            state_, reward, done, info = env.step(action)
            state = state_

    return num_left /  num_total


In [216]:
# agent = QLearningAgent(**config) # 7.567%
# agent = DoubleQLearningAgent(**config) # 5.638%
agent = FAFODoubleQLearningAgent(**config) # 23.53%

NUM_EPISODES = 300 # int(5e4)
writer = SummaryWriter(comment=f"{agent}-{int(time.time())}", flush_secs=5)

for episode_num in tqdm(range(NUM_EPISODES)):
    state, done = env.reset(), False
    score = 0

    num_left = 0
    num_total = 0

    while not done:
        action = agent.choose_action(state)

        num_left += (action == 1)
        num_total += 1

        state_, reward, done, info = env.step(action)
        agent.learn(state, action, reward, state_)
        score += reward
        state = state_

        writer.add_scalar("epsilon", agent.epsilon, global_step=agent.steps)

    pct_left = eval_agent(agent) * 100

    writer.add_scalar("reward", score, global_step=episode_num)
    writer.add_scalar("pct_left", pct_left, global_step=episode_num)
    


100%|██████████| 300/300 [00:00<00:00, 510.01it/s]
