In [1]:
import numpy as np
import gym

In [2]:
env = gym.make('FrozenLake-v1', desc=None, map_name="4x4", render_mode="human", is_slippery=False)
env.action_space.seed(42)

[42]

# Monte Carlo

Monte carlo is a reinforcement model-less reinforcement learning technique. Model-less meaning that it doesn't use probability estimates since it relies on __rewards__.

Let's do a quick recap of Bellman's equations:

1) Discounted rewards over an episode: $G_t = r_t + \gamma r_{t+1} + \gamma^{2} r_{t+2} + ... + \gamma^{N-1} r_{t+N-1}$
2) State value: $V_*(s) = Q_*(s,a) = E_{\pi_*}[G_t | s, a] = E_{\pi_*}[r + \gamma V_*(s')]$
3) State-action value $Q_*(s,a) = E_{\pi_*}[r + \gamma V_*(s')] = E_{\pi_*}[G_t | s, a]$ 

As one can see, state value and state-action value formulas can be described in terms of discounted rewards. Since probability is not estimated (because we are looking for optimal values), this method is considered model-less.

* $\epsilon$-greedy policy

An epsilon greedy policy follows the concepts of exploration and exploitation. The former means performing a non-optimal action and the latter means following the optimal actions.

This is useful for cases when we don't explore every single state (as it might be impossible!) but we want the agent to explore new states sometimes to see if it can find a better path than the known optimal one.

An $\epsilon$-greedy policy is defined as follows:

$$\pi(a|s) = \begin{cases}
    1 - \epsilon + \epsilon/4, \ \pi_* = a \\
    \epsilon/4, \ \pi_* \neq a
\end{cases}$$

The first part of the piece-wise function says "for optimal action $a$ under policy $pi$ the probability is $1 - \epsilon + \epsilon/4$ which means that optimal actions tend to have more probability to be taken than non-optimal ones. So, for a random value defined in $[0,1]$ and an epsilon in $(0, 1]$, the agent is going to exploit or explore depending on the aforementioned probabilities and the random value.

* Learning rate

The $\epsilon$-greedy policy raises a new problem which is: Episodes that follow some non-optimal states would also be involved in the q-value update. To prevent retrieving 100% of each experience value every episode, a learning rate is introduced whose purpose is to amplify or flatten the values learned from each episode. This means:

$$Q(s,a) = \begin{align*}
    \alpha\sum_{i=1}^{N}{Q(s,a)}
\end{align*}$$

where $\alpha$ gives a weight to each experience (episode where this action-value is performed which might happen many times over different episodes)

* Importance sampling

The learning rate approach for giving weights to new experiences is okay, but it could be a problem if the $\alpha$ chosen is too large that the algorithm starts jumping around the most optimal expected value. Another approach would be a "dynamic"-$\alpha$ method that takes variability around the true value into account.

"variability around the true value into account" means that at earlier episodes, the estimations around the true value for Q might be too large (which is ok), but at later episodes the estimations should be better so the alpha should also follow this rule to help converge to $E[Q(s,a)]$ 

This is done by Importance Sampling with the use of a value $W$ which is defined as: $\Pi_{i=0}^{\text{N}}{\frac{\pi_*(s)}{b(s)}}$ where $N$ is the total amount of times the optimal policy has been used and a normalizing factor $C(s,a) = \sum_{i=0}^{N}{W}$ where
$N$ is the total amount of time this action has been performed

## On policy methods

"On policy" means this method uses the policy $\pi$ for both making the episode (behaviour) and adjusting the policy by following the optimal values.

This method's algorithm goes as follows:

1) Initialize Q(a, s) for $\forall s \in S$ and $\forall a \in A$, $\epsilon$ > 0, $\alpha > 0$ and the rewards distribution
2) Generate episode following $\pi$
3) update Q-values from $t+N-1$ to $t$ using $Q(s,a) = Q(s,a) + \alpha(r_t + \gamma G{t+1})$ for the episode in current iteration
4) Start at 2 until max iterations reached

In [3]:
action_values = np.zeros((16,4))
action_values

array([[0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.]])

In [153]:
def policy(action_values, state, epsilon):
    if np.random.random() > epsilon:
        print(f"exploting for state {state}...")
        return np.argmax(action_values[state])
    else:
        print(f"exploring for state {state}...")
        return np.random.choice(range(4),p=[0.25]*4)
    
def on_policy_method(env, action_values, epsilon=0.99, epsilon_dis = 0.995, gamma=0.7, alpha=0.4, max_episodes=200, max_steps=50):
    i = 0
    j = 0
    action_values_acc = [[[],[],[],[]],
       [[],[],[],[]],
       [[],[],[],[]],
       [[],[],[],[]],
       [[],[],[],[]],
       [[],[],[],[]],
       [[],[],[],[]],
       [[],[],[],[]],
       [[],[],[],[]],
       [[],[],[],[]],
       [[],[],[],[]],
       [[],[],[],[]],
       [[],[],[],[]],
       [[],[],[],[]],
       [[],[],[],[]],
       [[],[],[],[]],
       ]
    for i in range(max_episodes):
        env.reset()
        episode = []
        state = 0
        next_state = 0
        j = 0
        G = 0
        print(f"episode: {i}")
        while j < max_steps:
            action = policy(action_values, state, epsilon)
            next_state, reward, terminated, truncated, info = env.step(action)
            if not terminated and next_state == state:
                # print("same state!!")
                reward = -10
            elif not terminated:
                reward = -1
            elif terminated and reward == 0:
                reward = -20
            elif terminated:
                reward = 100
            print((state, action, reward))
            episode.append((state, action, reward))
            state = next_state
            if terminated:
                break
            j += 1

        print("q_values", action_values)
        print("episode", episode[::-1])
        for (state, action, reward) in episode[::-1]:
            # print(f"updating {state,action}")
            G = reward + gamma * G
            action_values_acc[state][action].append(G)
            # print(action_values_acc[state][action])
            # print(f"avg update {np.mean(action_values_acc[state][action])}")
            action_values[state][action] = np.mean(action_values_acc[state][action])

        epsilon *= epsilon_dis
        print("new epsilon", epsilon)

on_policy_method(env, action_values)

episode: 0
exploring for state 0...
(0, 1, -1)
exploring for state 4...
(4, 1, -1)
exploring for state 8...
(8, 0, -10)
exploring for state 8...
(8, 1, -20)
q_values [[0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]]
episode [(8, 1, -20), (8, 0, -10), (4, 1, -1), (0, 1, -1)]
new epsilon 0.98505
episode: 1
exploring for state 0...
(0, 3, -10)
exploring for state 0...
(0, 1, -1)
exploring for state 4...
(4, 3, -1)
exploting for state 0...
(0, 0, -10)
exploring for state 0...
(0, 1, -1)
exploring for state 4...
(4, 3, -1)
exploring for state 0...
(0, 2, -1)
exploring for state 1...
(1, 2, -1)
exploring for state 2...
(2, 0, -1)
exploring for state 1...
(1, 1, -20)
q_values [[  0.   -13.46   0.     0.  ]
 [  0.     0.     0.     0.  ]
 [  0.     0.     0.     0.  ]
 [  0.     0.     0.     0.  ]
 [  0. 

In [162]:
def show_optimal_policy(action_values):
    if np.argmax(action_values) == 0:
        return "L"
    elif np.argmax(action_values) == 1:
        return "D"
    elif np.argmax(action_values) == 2:
        return "R"
    else:
        return "U"
    
[f"{i%4}:{show_optimal_policy(state_policy)}" for i, state_policy in enumerate(action_values)]

['0:D',
 '1:R',
 '2:D',
 '3:L',
 '0:D',
 '1:L',
 '2:D',
 '3:L',
 '0:R',
 '1:D',
 '2:D',
 '3:L',
 '0:L',
 '1:R',
 '2:R',
 '3:L']

## Off policy methods

"Off policy" means this method uses the policy $\pi$ for optimal actions and a policy $b$ for generating the episodes.

This method's algorithm goes as follows:

1) Initialize Q(a, s) for $\forall s \in S$ and $\forall a \in A$, $\epsilon$ > 0, $\alpha > 0$ and the rewards distribution
2) Generate episode following $b$
3) update Q-values from $t+N-1$ to $t$ using $Q(s,a) = Q(s,a) + \frac{W}{C(s,a)}(r_t + \gamma G{t+1})$ for the episode in current iteration
4) do not update $t-1$ q-value and $W$ if the current state action performed doesn't follow $\pi$. So skip further instructions and start at 2
5) if the current state action performed followed $\pi$, update $W$ using $W\cdot\frac{1}{b(a|s)} = W\cdot\frac{1}{((1-\epsilon)+\epsilon/4)}$ Start at 2 until max iterations reached

In [6]:
def policy(action_values, state, epsilon):
    if np.random.random() > epsilon:
        print(f"exploting for state {state}...")
        return np.argmax(action_values[state])
    else:
        print(f"exploring for state {state}...")
        return np.random.choice(range(4),p=[0.25]*4)
    
def on_policy_method(env, action_values, epsilon=0.99, epsilon_dis = 0.995, gamma=0.7, alpha=0.4, max_episodes=200, max_steps=50):
    i = 0
    j = 0
    C = np.zeros((16,4))
    for i in range(max_episodes):
        env.reset()
        episode = []
        state = 0
        next_state = 0
        j = 0
        G = 0
        W = 1
        print(f"episode: {i}")
        while j < max_steps:
            action = policy(action_values, state, epsilon) # policy b
            next_state, reward, terminated, truncated, info = env.step(action)
            if not terminated and next_state == state:
                # print("same state!!")
                reward = -10
            elif not terminated:
                reward = -1
            elif terminated and reward == 0:
                reward = -20
            elif terminated:
                reward = 100
            print((state, action, reward))
            episode.append((state, action, reward))
            state = next_state
            if terminated:
                break
            j += 1

        print("q_values", action_values)
        print("episode", episode[::-1])
        for (state, action, reward) in episode[::-1]:
            G = reward + gamma * G
            C[state][action] += W
            action_values[state][action] = action_values[state][action] + (W/C[state][action])*(G-action_values[state][action])
            if action != np.argmax(action_values[state]):
                break
            W *= 1/(1-epsilon+epsilon/4)

        epsilon *= epsilon_dis
        print("new epsilon", epsilon)
    env.close()

on_policy_method(env, action_values)

episode: 0
exploring for state 0...
(0, 3, -10)
exploring for state 0...
(0, 0, -10)
exploring for state 0...
(0, 3, -10)
exploring for state 0...
(0, 2, -1)
exploring for state 1...
(1, 3, -10)
exploring for state 1...
(1, 0, -1)
exploring for state 0...
(0, 0, -10)
exploring for state 0...
(0, 3, -10)
exploring for state 0...
(0, 2, -1)
exploring for state 1...
(1, 2, -1)
exploring for state 2...
(2, 1, -1)
exploring for state 6...
(6, 3, -1)
exploring for state 2...
(2, 2, -1)
exploring for state 3...
(3, 1, -20)
q_values [[ -0.17627   0.       14.0339   -0.17627]
 [  8.82373 -20.       21.477     5.0339 ]
 [ 14.0339   32.11     14.0339   12.477  ]
 [ 21.477   -20.        0.        0.     ]
 [  0.        0.      -20.        0.     ]
 [  0.        0.        0.        0.     ]
 [-20.       47.3     -20.       21.477  ]
 [  0.        0.        0.        0.     ]
 [ 12.477   -20.       32.11      0.     ]
 [ 21.477    47.3       0.      -20.     ]
 [ 32.11     69.      -20.       32.11 

In [7]:
def show_optimal_policy(action_values):
    if np.argmax(action_values) == 0:
        return "L"
    elif np.argmax(action_values) == 1:
        return "D"
    elif np.argmax(action_values) == 2:
        return "R"
    else:
        return "U"
    
[f"{i%4}:{show_optimal_policy(state_policy)}" for i, state_policy in enumerate(action_values)]

['0:D',
 '1:R',
 '2:D',
 '3:L',
 '0:D',
 '1:L',
 '2:D',
 '3:L',
 '0:R',
 '1:D',
 '2:D',
 '3:L',
 '0:L',
 '1:R',
 '2:R',
 '3:L']

: 