## TLDR:
- A Markov decision process is defined by:
  - The states (and whether or not it's an end state)
  - The actions transitioning between those states
  - The probability of the transitions given an action and the states
  - The reward of the transitions given the action and the states
- A policy navigates a Markov decision process
- A policy can be trained using value iteration:
  - Monte Carlo method:
    - Model-based
    - Model-free
  - Temporal Difference learning:
    - SARSA
    - Q-Learning
    - TD(0)

## Markov Decision Process

In [30]:
import random
import numpy as np
from tqdm import tqdm

class FrozenLakeEnvironment:
  dirs = [(-1, 0), (0, -1), (1, 0), (0, 1)]

  def __init__(self, n, is_slippery=False):
    self.n = n
    self.is_slippery = is_slippery
    self.s = 0

    self.__init_map()

  def __str__(self):
    rv = ""
    for i in range(self.n):
      for j in range(self.n):
        s = self.__encode(i, j)
        rv += ('S' if s == self.s else self.map[s]) + ' '
      rv = rv[:-1] + '\n'
    return rv[:-1]

  def __init_map(self):
    self.map = ['F'] * (self.n * self.n)
    self.map[0] = 'S'
    self.map[-1] = 'G'

    holes = self.n - 1
    for _ in range(holes):
      i = 0
      while self.map[i] != 'F':
        i = random.randrange(0, self.n * self.n)
      self.map[i] = 'H'

    self.map[0] = 'F'

  def __encode(self, i, j):
    return i * self.n + j
  
  def __decode(self, s):
    return s // self.n, s % self.n

  def can_move(self, s, a):
    i, j = self.__decode(s)
    di, dj = FrozenLakeEnvironment.dirs[a]

    return i + di >= 0 and i + di < self.n and j + dj >= 0 and j + dj < self.n

  def get_successor(self, s, a):
    if not self.can_move(s, a):
      return s
    
    i, j = self.__decode(s)
    di, dj = FrozenLakeEnvironment.dirs[a]

    return self.__encode(i + di, j + dj)

  def get_observation_space(self):
    return list(range(self.n * self.n))

  def get_action_space(self, s=None):
    if s == None:
      return list(range(len(FrozenLakeEnvironment.dirs)))
    
    rv = []
    for i in range(len(FrozenLakeEnvironment.dirs)):
      if self.can_move(s, i):
        rv.append(i)
    return rv

  def get_transition_probability(self, s_0, a, s_f):
    space = self.get_action_space(s_0)
    probs = [0] * len(space)
    if self.is_slippery:
      for i in range(len(space)):
        probs[i] = (1 / 5) * (1 / len(space))
      probs[space.index(a)] += 4 / 5
    else:
      probs[space.index(a)] = 1

    rv = 0
    for a_t, prob in zip(space, probs):
      s_t = self.__get_successor(s_0, a_t)
      if s_t == s_f:
        rv += prob

    return rv

  def get_reward(self, s_0, a, s_f):
    if self.map[s_f] == 'H':
      return -1
    if self.map[s_f] == 'F':
      return 0
    if self.map[s_f] == 'G':
      return 1
    
  def is_end(self, s):
    return self.map[s] == 'H' or self.map[s] == 'G'
  
  def observe(self):
    return self.s
  
  def act(self, a):
    if self.is_slippery:
      if random.randint(1, 5) == 1:
        a = random.choice(self.get_action_space())
    
    self.s = self.get_successor(self.s, a)
  
  def reset(self):
    self.s = 0

env = FrozenLakeEnvironment(5, True)
print("*** Environment ***")
print(env)

*** Environment ***
S F F F F
F F F F F
F H F F F
F F H H F
H F F F G


The definition of a Markov decision process:
1. A starting state `s_start`
2. A space of all possible actions `actions(s)` from state `s`
3. Transition probabilities `T(s, a, s')` representing the probability of reaching `s'` from `s` after just one action `a`
4. Rewards `Reward(s, a, s')` representing the reward of transitioning between `s` to `s'` via action `a`
5. `IsEnd(s)` to determine if the state `s` is an end state
6. A discount factor `gamma` to discount rewards from states further away

Markov decision process vocabulary:
- Policy: a function that chooses an action for any given state
  - `pi(s) = a`
- Utility of a path: the discounted sum of rewards on a path
  - `u(s0, s1, ..., sk) = r1 * gamma ** 0 + r2 * gamma ** 1 + ... + rk * gamma ** (k - 1)`
- Q-value: the expected utility of all paths starting from `s` after taking action `a` following policy `pi`
  - `Qpi(s, a) = sum(T(s, a, s') * (Reward(s, a, s') + gamma * Qpi(s, pi(s)))) for all s'`
- Value: the expected utility of all paths starting from `s` following policy `pi`
  - `V(s) = sum(T(s, a, s') * (Reward(s, a, s') + gamma * V(s))) for all s'`

Markov decision process training vocabulary:
- Optimal Q-value: the maximum Q-value attained by the policy
- Optimal value: the maximum value attained by the policy
- Optimal policy: the policy that achieves the optimal values
- Initialization: all values are initialized to a null value
- Iteration: values are updated per iteration:
  - Updates are defined by the training method
  - The goal is to reach the optimal Q-value/value

## Model-free Monte Carlo

In [31]:
class ModelFreeMonteCarlo:
  def __init__(self, env, gamma=0.95):
    self.env = env
    self.gamma = gamma

    action_len = len(env.get_action_space())
    observation_len = len(env.get_observation_space())

    self.q_table = [[-1 for i in range(action_len)] for j in range(observation_len)]
    self.updates = [[0 for i in range(action_len)] for j in range(observation_len)]

  def __str__(self):
    rv = ""
    for i in range(self.env.n):
      for j in range(self.env.n):
        rv += ['U', 'L', 'D', 'R'][self.greedy_policy(i * self.env.n + j)] + ' '
      rv = rv[:-1] + '\n'
    return rv[:-1]

  def random_policy(self, s):
    return random.choice(self.env.get_action_space(s))

  def greedy_policy(self, s):
    return np.argmax(self.q_table[s])

  def epsilon_greedy_policy(self, s, epsilon):
    if random.random() > epsilon:
      return self.greedy_policy(s)
    else:
      return self.random_policy(s)

  def training_loop(self, training_episodes=10000, epsilon_low=0.05, epsilon_high=1.0, epsilon_rate=0.0005):
    def epsilon(episode):
      return epsilon_low + (epsilon_high - epsilon_low) * np.exp(-episode * epsilon_rate)
    
    for episode in tqdm(range(training_episodes)):
      sar_arr = []
      self.env.reset()
      while not self.env.is_end(self.env.observe()):
        s_0 = self.env.observe()
        a = self.epsilon_greedy_policy(s_0, epsilon(episode))
        if not self.env.can_move(s_0, a):
          a = self.random_policy(s_0)

        self.env.act(a)
        s_f = self.env.observe()
        sar_arr.append((s_0, a, self.env.get_reward(s_0, a, s_f)))
      
      sau_arr = []
      u = 0
      for s, a, r in sar_arr[::-1]:
        u = u * self.gamma + r
        sau_arr.append((s, a, u))

      visited = set()
      for s, a, u in sau_arr[::-1]:
        if not (s, a) in visited:
          visited.add((s, a))

          eta = 1 / (1 + self.updates[s][a])
          self.q_table[s][a] = (1 - eta) * self.q_table[s][a] + eta * u
          self.updates[s][a] += 1
  
  def evaluation_loop(self, evaluation_episodes=100, gamma=0.95):
    u_sum = 0
    for episode in tqdm(range(evaluation_episodes)):
      r_arr = []
      self.env.reset()
      while not self.env.is_end(self.env.observe()):
        s_0 = self.env.observe()
        a = self.greedy_policy(s_0)
        if not self.env.can_move(s_0, a):
          a = self.random_policy(s_0)

        self.env.act(a)
        s_f = self.env.observe()
        r_arr.append(self.env.get_reward(s_0, a, s_f))
      
      u = 0
      for r in r_arr[::-1]:
        u = u * gamma + r
      u_sum += u
    
    return u_sum / evaluation_episodes

learner = ModelFreeMonteCarlo(env)
print("*** Model-free Monte Carlo ***")
print()

learner.training_loop()
print("*** Policy ***")
print(learner)
print()

evaluation = learner.evaluation_loop()
print("*** Evaluation ***")
print(evaluation)

*** Model-free Monte Carlo ***



100%|██████████| 10000/10000 [00:01<00:00, 7614.30it/s]


*** Policy ***
R R R R D
U U R R D
U U U R D
U D U U D
U R R R U



100%|██████████| 100/100 [00:00<00:00, 11109.27it/s]

*** Evaluation ***
0.5387692817714264





Model-free Monte Carlo characteristics:
- Estimator of Q-values
  - Compared to *model-based* Monte Carlo, which estimates transition probability and reward, which are then used to estimate the Q-values
- Finds the "average" Q-value (utility `u`) for any given `s` and `a`
  - `Q(s, a) = (1 - eta) * Q(s, a) + eta * u`
  - where `eta = 1 / (1 + # of samples for (s, a))`

About the epsilon-greedy policy:
- The epsilon-greedy policy balances exploration and exploitation
  - Exploration searches for new behavior
  - Exploitation follows the optimal behavior
- Can be used in most, if not all, training methods (used in all training methods covered in this notebook)

## SARSA

In [32]:
class SARSA:
  def __init__(self, env, gamma=0.95):
    self.env = env
    self.gamma = gamma

    action_len = len(env.get_action_space())
    observation_len = len(env.get_observation_space())

    self.q_table = [[-1 for i in range(action_len)] for j in range(observation_len)]

  def __str__(self):
    rv = ""
    for i in range(self.env.n):
      for j in range(self.env.n):
        rv += ['U', 'L', 'D', 'R'][self.greedy_policy(i * self.env.n + j)] + ' '
      rv = rv[:-1] + '\n'
    return rv[:-1]

  def random_policy(self, s):
    return random.choice(self.env.get_action_space(s))

  def greedy_policy(self, s):
    return np.argmax(self.q_table[s])

  def epsilon_greedy_policy(self, s, epsilon):
    if random.random() > epsilon:
      return self.greedy_policy(s)
    else:
      return self.random_policy(s)

  def training_loop(self, training_episodes=10000, eta=0.7, epsilon_low=0.05, epsilon_high=1.0, epsilon_rate=0.0005):
    def epsilon(episode):
      return epsilon_low + (epsilon_high - epsilon_low) * np.exp(-episode * epsilon_rate)

    for episode in tqdm(range(training_episodes)):
      self.env.reset()
      while not self.env.is_end(self.env.observe()):
        s_0 = self.env.observe()
        a_0 = self.epsilon_greedy_policy(s_0, epsilon(episode))
        if not self.env.can_move(s_0, a_0):
          a_0 = self.random_policy(s_0)

        self.env.act(a_0)
        s_f = self.env.observe()
        a_f = self.greedy_policy(s_f)
        r = self.env.get_reward(s_0, a_0, s_f)

        self.q_table[s_0][a_0] = (1 - eta) * self.q_table[s_0][a_0] + eta * (r + self.gamma * self.q_table[s_f][a_f])
  
  def evaluation_loop(self, evaluation_episodes=100, gamma=0.95):
    u_sum = 0
    for episode in tqdm(range(evaluation_episodes)):
      r_arr = []
      self.env.reset()
      while not self.env.is_end(self.env.observe()):
        s_0 = self.env.observe()
        a = self.greedy_policy(s_0)
        if not self.env.can_move(s_0, a):
          a = self.random_policy(s_0)

        self.env.act(a)
        s_f = self.env.observe()
        r_arr.append(self.env.get_reward(s_0, a, s_f))
      
      u = 0
      for r in r_arr[::-1]:
        u = u * gamma + r
      u_sum += u
    
    return u_sum / evaluation_episodes

learner = SARSA(env)
print("*** SARSA ***")
print()

learner.training_loop()
print("*** Policy ***")
print(learner)
print()

evaluation = learner.evaluation_loop()
print("*** Evaluation ***")
print(evaluation)

*** SARSA ***



100%|██████████| 10000/10000 [00:10<00:00, 924.96it/s]


*** Policy ***
R R D L L
R R L U U
U U U R U
U L U U U
U D L L U



100%|██████████| 100/100 [00:00<00:00, 2380.54it/s]

*** Evaluation ***
-0.2577885476058239





SARSA characteristics:
- Estimator of Q-values
- Explicitly chooses an `a'` using a policy (hence, "on-policy")
  - `Q(s, a) = (1 - eta) * Q(s, a) + eta * (r + gamma * Q(s', a'))`
  - Similar to branching only to `a'`, as chosen by the policy: `s`, `a` -> `s'` -> `a'`
- With respect to the "cliff-walking" metaphor, SARSA takes the longer, safer path avoiding the cliff, as its Q-values are dependent on the actual actions chosen by the policy

## Q-Learning

In [33]:
class QLearning:
  def __init__(self, env, gamma=0.95):
    self.env = env
    self.gamma = gamma

    action_len = len(env.get_action_space())
    observation_len = len(env.get_observation_space())

    self.q_table = [[-1 for i in range(action_len)] for j in range(observation_len)]
  
  def __str__(self):
    rv = ""
    for i in range(self.env.n):
      for j in range(self.env.n):
        rv += ['U', 'L', 'D', 'R'][self.greedy_policy(i * self.env.n + j)] + ' '
      rv = rv[:-1] + '\n'
    return rv[:-1]

  def random_policy(self, s):
    return random.choice(self.env.get_action_space(s))

  def greedy_policy(self, s):
    return np.argmax(self.q_table[s])

  def epsilon_greedy_policy(self, s, epsilon):
    if random.random() > epsilon:
      return self.greedy_policy(s)
    else:
      return self.random_policy(s)

  def training_loop(self, training_episodes=10000, eta=0.7, epsilon_low=0.05, epsilon_high=1.0, epsilon_rate=0.0005):
    def epsilon(episode):
      return epsilon_low + (epsilon_high - epsilon_low) * np.exp(-episode * epsilon_rate)
    
    for episode in tqdm(range(training_episodes)):
      self.env.reset()
      while not self.env.is_end(self.env.observe()):
        s_0 = self.env.observe()
        a_0 = self.epsilon_greedy_policy(s_0, epsilon(episode))
        if not self.env.can_move(s_0, a_0):
          a_0 = self.random_policy(s_0)

        self.env.act(a_0)
        s_f = self.env.observe()
        r = self.env.get_reward(s_0, a_0, s_f)

        self.q_table[s_0][a_0] = (1 - eta) * self.q_table[s_0][a_0] + eta * (r + self.gamma * np.argmax(self.q_table[s_f]))
  
  def evaluation_loop(self, evaluation_episodes=100, gamma=0.95):
    u_sum = 0
    for episode in tqdm(range(evaluation_episodes)):
      r_arr = []
      self.env.reset()
      while not self.env.is_end(self.env.observe()):
        s_0 = self.env.observe()
        a = self.greedy_policy(s_0)
        if not self.env.can_move(s_0, a):
          a = self.random_policy(s_0)

        self.env.act(a)
        s_f = self.env.observe()
        r_arr.append(self.env.get_reward(s_0, a, s_f))
      
      u = 0
      for r in r_arr[::-1]:
        u = u * gamma + r
      u_sum += u
    
    return u_sum / evaluation_episodes

learner = QLearning(env)
print("*** Q-learning ***")
print()

learner.training_loop()
print("*** Policy ***")
print(learner)
print()

evaluation = learner.evaluation_loop()
print("*** Evaluation ***")
print(evaluation)

*** Q-learning ***



100%|██████████| 10000/10000 [00:21<00:00, 458.50it/s]


*** Policy ***
D L L L L
D L U U U
D U D U L
R L U U U
U R R R U



100%|██████████| 100/100 [00:00<00:00, 8326.00it/s]

*** Evaluation ***
-0.3837843224088213





Q-learning characteristics:
- Estimator of Q-values
- Does not choose an `a'` using any policy (hence, "off-policy")
  - Simply takes the maximum of all successors' Q-values
  - `Q(s, a) = (1 - eta) * Q(s, a) + eta * (r + gamma * max(Q(s', {a'1, a'2, ... a'n})))`
  - Similar to branching to all `a'` and choosing the best one: `s`, `a` -> `s'` -> `a'1`, `a'2`, `...`, `a'n`
- With respect to the "cliff-walking" metaphor, Q-learning takes the most opportunistic path, since its Q-values are based on the best case scenario from each state and thus, has a higher risk of walking off the cliff