In [1]:
import gym 
import numpy as np
from gym.envs.toy_text.frozen_lake import FrozenLakeEnv
from typing import List

In [2]:
print(FrozenLakeEnv.__doc__)


    Winter is here. You and your friends were tossing around a frisbee at the park
    when you made a wild throw that left the frisbee out in the middle of the lake.
    The water is mostly frozen, but there are a few holes where the ice has melted.
    If you step into one of those holes, you'll fall into the freezing water.
    At this time, there's an international frisbee shortage, so it's absolutely imperative that
    you navigate across the lake and retrieve the disc.
    However, the ice is slippery, so you won't always move in the direction you intend.
    The surface is described using a grid like the following

        SFFF
        FHFH
        FFFH
        HFFG

    S : starting point, safe
    F : frozen surface, safe
    H : hole, fall to your doom
    G : goal, where the frisbee is located

    The episode ends when you reach the goal or fall in a hole.
    You receive a reward of 1 if you reach the goal, and zero otherwise.

    


In [3]:
env = gym.make('FrozenLake-v0')

### Helper Utilities 

In [4]:
# Typing
MDP = dict
State = int
Action = int 
Value = float
Policy = np.array
Optimal_Values = np.array

def show_state_space(markov_decision_process: MDP):
    return np.array(list(markov_decision_process.keys())).reshape(4, 4)

### Markov Decision Process (MDP)

In [5]:
env.env.P

{0: {0: [(0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 4, 0.0, False)],
  1: [(0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 4, 0.0, False),
   (0.3333333333333333, 1, 0.0, False)],
  2: [(0.3333333333333333, 4, 0.0, False),
   (0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False)],
  3: [(0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 0, 0.0, False)]},
 1: {0: [(0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 5, 0.0, True)],
  1: [(0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 5, 0.0, True),
   (0.3333333333333333, 2, 0.0, False)],
  2: [(0.3333333333333333, 5, 0.0, True),
   (0.3333333333333333, 2, 0.0, False),
   (0.3333333333333333, 1, 0.0, False)],
  3: [(0.3333333333333333, 2, 0.0, False),
   (0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False)]},
 2:

In [6]:
# Our State Space 
print(show_state_space(env.env.P))

[[ 0  1  2  3]
 [ 4  5  6  7]
 [ 8  9 10 11]
 [12 13 14 15]]


In [7]:
# Our Action Space: "Left","Down","Right","Up"
env.env.P[3].keys()

dict_keys([0, 1, 2, 3])

In [8]:
state = 14
action = 3
transition_probability, next_state, reward, done = env.env.P[14][3][0]
print(f"Transition Probability to Goal State: {transition_probability} \nGoal State: {next_state}\nReward Function: {reward}\nEpisode Over: {done}")

Transition Probability to Goal State: 0.3333333333333333 
Goal State: 15
Reward Function: 1.0
Episode Over: True


### Value Iteration 

### Value Iteration 

*Policy* $\pi(a | s) $: Specifies which action to take in a given state $s$. 

*Value Function:* Input state $s$ and outputs the value of being in that state $s$ under a particular policy $\pi$: $V^{\pi}: S \rightarrow \mathbb{R}$

<p>

$$V(s) \leftarrow \mathbb{E}_{s' \sim \mathcal{P}_{s}^{\pi(s)}} \bigl[ \mathcal{R}_s^{\pi(s)} + \gamma V(s') \bigr]$$
</p>

Note the recursive update.


Hence, we can derive an iterative algorithm for computing the value function: 

**Step 1**: Initialize $t=0$, $V^*_0(s)$. 

**Step 2**: 
- Repeat Until Convergence. 
    - For $s \in S$:
        - **Update the Value Function**: $$V_{*}(s) \leftarrow \mathbb{E}_{s' \sim \mathcal{P}_{s}^{\pi(s)}} \bigl[ \mathcal{R}_s^{\pi(s)} + \gamma V_{*}(s') \bigr]$$
        - **Update the Policy**: $$\pi_{*}(s) \leftarrow \arg \max_a \mathbb{E}_{s' \sim \mathcal{P}_{s}^{a}} \bigl[ \mathcal{R}_s^{a} + \gamma V(s') \bigr]$$



In [9]:
def calculate_value_of_state_action(
    markov_decision_process: MDP,
    state: State,
    action: Action,
    state_values: np.array,
    discount: float
) -> Value:
    """
    Calculates expected value associated with being in a state and taking action
    """

    pass 

In [10]:
assert calculate_value_of_state_action(env.env.P, 14, 3, np.zeros(16), 0.99) == 1/3

In [15]:
def calculate_value_of_state(
    markov_decision_process: MDP,
    state: State,
    state_values: np.array,
    discount: float,
) -> List[Value]:
    
    """
    Calculates a list of Q[s, a] values for given state
    """
    
    pass 

In [16]:
def value_iteration(
    markov_decision_process: MDP,
    discount: float = .99,
    threshold: float = 0.0000001,
) -> (Optimal_Values, Policy):
    """
    Calculates the Value Function for the Markov Decision Process
    """
    
    pass 

In [17]:
correct_values = np.array(
    [0.542, 0.499, 0.471, 0.457, 0.558, 0., 0.358, 0., 0.592, 0.643, 0.615, 0., 0., 0.742, 0.863, 0.]
)

optimal_values, optimal_policy = value_iteration(env.env.P)
assert np.sum(np.abs(optimal_values - correct_values)) <= 0.01

### Policy Iteration 
- **Step 1**: Choose an arbitrary policy $\pi(a|s)$ 
- **Step 2**: Repeat until Convergence: 
    - **Policy Evaluation**: Compute the value function for $\pi$
        - **Approach 1**: Iterate the value function until convergence until $\pi$
        - **Approach 2**: Solve a linear program to compute:
        $$V_\pi(s) \leftarrow \mathbb{E}_{s' \sim \mathcal{P}_{s}^{\pi(s)}} \bigl[ \mathcal{R}_s^{\pi(s)} + \gamma V_\pi(s') \bigr]$$
    - **Policy Improvement**: 
        $$\pi_{new}(s) \leftarrow \arg \max_a \mathbb{E}_{s' \sim \mathcal{P}_{s}^{a}} \bigl[ \mathcal{R}_s^{a} + \gamma V(s') \bigr]$$

In [18]:
def policy_evaluation(
    markov_decision_process: MDP, 
    policy: Policy, 
    discount: float,
    threshold: float = 0.0000001
) -> List[Value]:
    
    """Calculates the Value Function for a Specific Policy 
    """
    
    pass 

In [19]:
assert np.sum(np.abs(policy_evaluation(env.env.P, optimal_policy, 0.99) - optimal_values)) <= 0.001

**Exercise: Implement Policy Evaluation by Solving a Linear Program.**

In [25]:
def policy_improvement(
        markov_decision_process: MDP,
        policy: Policy,
        values: List[Value],
        discount: float,
) -> Policy:
    
    """Calculate the Policy Improvement Step 
    """
    
    pass 

In [5]:
def policy_iteration(markov_decision_process: MDP,
                     discount: float = 0.99,
                     threshold: float = 0.0001) -> Policy:
    
    """Implements Policy Iteration 
    """
    
    pass 

In [26]:
assert all(policy_iteration(env.env.P) == optimal_policy)