In [1]:
%load_ext lab_black

In [2]:
from typing import List, Dict, Tuple
from abc import ABC, abstractmethod

In [3]:
import matplotlib

matplotlib.use("Agg")
import matplotlib.pyplot as plt

In [4]:
from collections import defaultdict
import numpy as np
import pandas as pd
import gym

In [5]:
class CartPoleQAgent(ABC):
    @abstractmethod
    def predict_q(self, state: Tuple[float, float, float, float]) -> np.ndarray:
        pass

    @abstractmethod
    def update_q(
        self,
        reward: float,
        action: int,
        from_state: Tuple[float, float, float, float],
        to_state: Tuple[float, float, float, float],
    ) -> None:
        pass

    @abstractmethod
    def get_action(self, state: np.ndarray) -> int:
        pass

In [6]:
def empty_array():
    return np.zeros(2)

In [7]:
class DiscreteDictAgent(CartPoleQAgent):
    def __init__(self, decimals: int, learning_rate: float, exploration_rate: float):
        self._decimals = decimals
        self._action_space = [0, 1]

        # maps from state to q_value per action
        self._q_dict: Dict[Tuple[float], np.ndarray] = defaultdict(empty_array)
        self._learning_rate = learning_rate
        self._exploration_rate = exploration_rate

    def predict_q(self, state: Tuple[float, float, float, float]):
        state = self._round_value(state)
        return self._q_dict[state]

    def update_q(
        self,
        reward: float,
        action: int,
        from_state: Tuple[float, float, float, float],
        to_state: Tuple[float, float, float, float],
    ):
        from_state = self._round_value(from_state)
        to_state = self._round_value(to_state)

        best_predicted_outcome = np.max(self.predict_q(to_state))
        cur_predicted_outcome = self.predict_q(from_state)[action]
        self._q_dict[from_state][action] += self._learning_rate * (
            reward + best_predicted_outcome - cur_predicted_outcome
        )

    def get_action(self, state: Tuple[float, float, float, float]):
        if np.random.random() <= self._exploration_rate:
            return np.random.choice(self._action_space)
        else:
            return self._best_action(state)

    def _best_action(self, state: Tuple[float, float, float, float]):
        if len(self._q_dict):
            state = self._round_value(state)
            return np.argmax(self.predict_q(state))
        else:
            return np.random.choice(self._action_space)

    def _round_value(self, value):
        return tuple(np.round(value, self._decimals))

In [16]:
agent = DiscreteDictAgent(1, learning_rate=0.55, exploration_rate=0.05)
env = gym.make("CartPole-v0")

In [17]:
episode_score_ls = []
for i_episode in range(20000):
    old_state = env.reset()
    for t in range(1000):
        env.render()
        action = agent.get_action(old_state)

        new_state, reward, done, info = env.step(action)
        if done and t < 200:
            reward = -20.0

        agent.update_q(reward, action, old_state, new_state)
        old_state = new_state
        if done:
            episode_score_ls.append(t + 1)
            print("Episode finished after {} timesteps".format(t + 1))
            break

Episode finished after 13 timesteps
Episode finished after 28 timesteps
Episode finished after 16 timesteps
Episode finished after 15 timesteps
Episode finished after 15 timesteps
Episode finished after 31 timesteps
Episode finished after 24 timesteps
Episode finished after 13 timesteps
Episode finished after 21 timesteps
Episode finished after 25 timesteps
Episode finished after 18 timesteps
Episode finished after 18 timesteps
Episode finished after 10 timesteps
Episode finished after 22 timesteps
Episode finished after 13 timesteps
Episode finished after 19 timesteps
Episode finished after 12 timesteps
Episode finished after 13 timesteps
Episode finished after 20 timesteps
Episode finished after 12 timesteps
Episode finished after 37 timesteps
Episode finished after 20 timesteps
Episode finished after 19 timesteps
Episode finished after 27 timesteps
Episode finished after 24 timesteps
Episode finished after 12 timesteps
Episode finished after 23 timesteps
Episode finished after 19 ti

In [18]:
env.close()

In [19]:
len(agent._q_dict)

16315

In [20]:
episode_df = pd.DataFrame({"raw": episode_score_ls})

In [21]:
episode_df["smooth"] = episode_df["raw"].rolling(window=10, center=True).mean()

In [22]:
episode_df.head()

Unnamed: 0,raw,smooth
0,13,
1,28,
2,16,
3,15,
4,15,


In [23]:
afig = plt.figure(figsize=(12, 6))
plt.plot(episode_df["raw"])
plt.plot(episode_df["smooth"])
plt.xlabel("Episode")
plt.ylabel("Survived iteration")
plt.legend(["raw", "smooth"])
plt.grid(linestyle=":")
plt.savefig("episode.png")