In [None]:
#@title Dependencies
!pip install numpy
!pip install gym
!pip install matplotlib

In [None]:
#@title Setup Environments

import numpy as np
import gym
import sys
from six import StringIO, b
from gym.envs.toy_text import frozen_lake, discrete
from gym.envs.registration import register
import time

np.set_printoptions(precision=3, linewidth=np.inf)


def categorical_sample(prob_n, np_random):
    """
    Sample from categorical distribution
    Each row specifies class probabilities
    """
    prob_n = np.asarray(prob_n)
    csprob_n = np.cumsum(prob_n)
    return (csprob_n > np_random.rand()).argmax()


class DiscreteEnv(gym.Env):

    """
    Has the following members
    - nS: number of states
    - nA: number of actions
    - P: transitions (*)
    - isd: initial state distribution (**)

    (*) dictionary dict of dicts of lists, where
      P[s][a] == [(probability, nextstate, reward, done), ...]
    (**) list or array of length nS


    """
    def __init__(self, nS, nA, P, isd):
        self.P = P
        self.isd = isd
        self.lastaction=None # for rendering
        self.nS = nS
        self.nA = nA

        self.action_space = spaces.Discrete(self.nA)
        self.observation_space = spaces.Discrete(self.nS)

        self._seed()
        self._reset()

    def _seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]

    def _reset(self):
        self.s = categorical_sample(self.isd, self.np_random)
        self.lastaction=None
        return self.s

    def _step(self, a):
        transitions = self.P[self.s][a]
        i = categorical_sample([t[0] for t in transitions], self.np_random)
        p, s, r, d= transitions[i]
        self.s = s
        self.lastaction=a
        return (s, r, d, {"prob" : p})

# Mapping between directions and index number
LEFT = 0
DOWN = 1
RIGHT = 2
UP = 3

# Maps for the two different environments
MAPS = {
    "4x4": [
        "SFFF",
        "FHFH",
        "FFFH",
        "HFFG"
    ],
    "8x8": [
        "SFFFFFFF",
        "FFFFFFFF",
        "FFFHFFFF",
        "FHFFFHFF",
        "FFFHFFFF",
        "FFHFFFHF",
        "FHFFHFHF",
        "FFFHFFFG"
    ],
}

class FrozenLakeEnv(DiscreteEnv):
    """
    Winter is here. You and your friends were tossing around a frisbee at the park
    when you made a wild throw that left the frisbee out in the middle of the lake.
    The water is mostly frozen, but there are a few holes where the ice has melted.
    If you step into one of those holes, you'll fall into the freezing water.
    At this time, there's an international frisbee shortage, so it's absolutely imperative that
    you navigate across the lake and retrieve the disc.
    However, the ice is slippery, so you won't always move in the direction you intend.
    The surface is described using a grid like the following

        SFFF
        FHFH
        FFFH
        HFFG

    S : starting point, safe
    F : frozen surface, safe
    H : hole, fall to your doom
    G : goal, where the frisbee is located

    The episode ends when you reach the goal or fall in a hole.
    You receive a reward of 1 if you reach the goal, and zero otherwise.

    """

    metadata = {'render.modes': ['human', 'ansi']}

    def __init__(self, desc=None, map_name="4x4",is_slippery=True):
        if desc is None and map_name is None:
            raise ValueError('Must provide either desc or map_name')
        elif desc is None:
            desc = MAPS[map_name]
        self.desc = desc = np.asarray(desc,dtype='c')
        self.nrow, self.ncol = nrow, ncol = desc.shape

        nA = 4 # number of actions
        nS = nrow * ncol # number of states

        isd = np.array(desc == b'S').astype('float64').ravel()
        isd /= isd.sum()

        P = {s : {a : [] for a in range(nA)} for s in range(nS)}

        def to_s(row, col):
            return row*ncol + col
        def inc(row, col, a):
            if a==0: # left
                col = max(col-1,0)
            elif a==1: # down
                row = min(row+1,nrow-1)
            elif a==2: # right
                col = min(col+1,ncol-1)
            elif a==3: # up
                row = max(row-1,0)
            return (row, col)

        for row in range(nrow):
            for col in range(ncol):
                s = to_s(row, col)
                for a in range(4):
                    li = P[s][a]
                    letter = desc[row, col]
                    if letter in b'GH':
                        li.append((1.0, s, 0, True))
                    else:
                        if is_slippery:
                            for b in [(a-1)%4, a, (a+1)%4]:
                                newrow, newcol = inc(row, col, b)
                                newstate = to_s(newrow, newcol)
                                newletter = desc[newrow, newcol]
                                done = bytes(newletter) in b'GH'
                                rew = float(newletter == b'G')
                                li.append((0.8 if b==a else 0.1, newstate, rew, done))
                        else:
                            newrow, newcol = inc(row, col, a)
                            newstate = to_s(newrow, newcol)
                            newletter = desc[newrow, newcol]
                            done = bytes(newletter) in b'GH'
                            rew = float(newletter == b'G')
                            li.append((1.0, newstate, rew, done))

        super(FrozenLakeEnv, self).__init__(nS, nA, P, isd)

    def _render(self, mode='human', close=False):
        if close:
            return
        outfile = StringIO() if mode == 'ansi' else sys.stdout

        row, col = self.s // self.ncol, self.s % self.ncol
        desc = self.desc.tolist()
        desc = [[c.decode('utf-8') for c in line] for line in desc]
        desc[row][col] = utils.colorize(desc[row][col], "red", highlight=True)
        if self.lastaction is not None:
            outfile.write("  ({})\n".format(["Left","Down","Right","Up"][self.lastaction]))
        else:
            outfile.write("\n")
        outfile.write("\n".join(''.join(line) for line in desc)+"\n")

        return outfile


# coding: utf-8
"""Defines some frozen lake maps."""

register(
    id='Deterministic-4x4-FrozenLake-v0',
    entry_point='gym.envs.toy_text.frozen_lake:FrozenLakeEnv',
    kwargs={'map_name': '4x4',
            'is_slippery': False})

register(
    id='Deterministic-8x8-FrozenLake-v0',
    entry_point='gym.envs.toy_text.frozen_lake:FrozenLakeEnv',
    kwargs={'map_name': '8x8',
            'is_slippery': False})

register(
    id='Stochastic-4x4-FrozenLake-v0',
    entry_point='gym.envs.toy_text.frozen_lake:FrozenLakeEnv',
    kwargs={'map_name': '4x4',
            'is_slippery': True})

def render_single(env, policy, max_steps=100):
    """
        Allows you to watch your agent navigate the lake
        Parameters
        ----------
        env: gym.core.Environment
        Environment to play on. Must have nS, nA, and P as
        attributes.
        Policy: np.array of shape [env.nS]
        The action to take at a given state
    """

    episode_reward = 0
    ob = env.reset()
    for t in range(max_steps):
        if print_episode_run:
            env.render()
            time.sleep(0.25)
        a = policy[ob]
        ob, rew, done, _ = env.step(a)
        episode_reward += rew
        if done:
            if print_episode_run:
                env.render()
            break
    if not done:
        print("The agent didn't reach a terminal state in {} steps.".format(max_steps))
    else:
        print("Agent reached terminal state in {steps_taken} steps gathering an episodic reward of: {over_reward}".format(over_reward = episode_reward, steps_taken = t + 1))

##<font size = 5pt>General Parameters</font>

* env.P - `nested dictionary` :
Use like this:
P[state][action] = (probability, nextstate, reward, terminal) \
<font size = 2pt> If environment is stochastic each P[state][action] could have multiple 4-tuples whose probability all add up to 1</font>

  * probability `float` : the probability of transitioning from "state" to "nextstate" with "action" : $P(s^{\prime} | s, a)$

  * nextstate `int` : the state we transition to : $s^{\prime}$ in range $[0,$ env.nS $ - 1]$

  * reward `int` : the reward for transitioning from "state" to "nextstate" with "action" :
$ R(s, a, s^{\prime})=   \left\{
\begin{array}{ll}
     1 & \text{if transitioned to goal $(s^{\prime} = $ 'G')} \\
     0 & \text{otherwise} \\
\end{array}
\right.  $

  * terminal `boolean` : marks the terminal state : terminal =  $   \left\{
\begin{array}{ll}
     True & \text{if on goal or hole $(s = $ 'G' $ \lor $ 'H')}\\
     False & \text{otherwise}\\
\end{array}
\right.  $

* env.nS `int` : the number of states in the environment : $|\mathcal{S}|$

* env.nA `int` : the number of actions in the environment : $|\mathcal{A}|$


##<font size = 5pt>Policy Evaluation</font>

<font size = 4pt>Parameters</font>

* env `gym.env` : gym environment <font size = 2pt>(must have P, nS, nA)</font>

* policy `np.array[nS]` : the policy to evaluate : $\pi$

* gamma `float` : Discount factor : $\gamma$ in range $[0,1)$

* tol `float` : policy convergence tolerance : $max_{s \in \mathcal{S}} |V^{\pi}_{k}(s) - V^{\pi}_{k - 1}(s)| < tol$

* max_iterations `int` : Max number of iterations allowed before stopping (incase convergence takes a while)

<font size = 4pt>Returns</font>
* value_function `np.ndarray[nS]` : The value function of the given policy : $V^{\pi}(s)$

In [None]:
# Evaluate the given policy by converging the state-value function. Returns array containing the value of every state
def policy_evaluation(env, policy, gamma=0.9, tol=1e-3, max_iterations=int(1e3)):

    # Initialize V(s) = 0 for all s
    value_function = np.zeros(env.nS)

    # Did a max-iteration just incase
    for k in range(max_iterations):
        delta = 0
        # Loop through all states
        for state in range(env.nS):
            new_value = 0
            # Calculate V(s)
            for probability, nextstate, reward, terminal in env.P[state][policy[state]]:
                new_value += probability * (reward + gamma * value_function[nextstate])
            # Check difference with tolerance to see if converged
            delta = max(delta, abs(value_function[state] - new_value))
            value_function[state] = new_value
        if delta < tol:
            # Value function converged
            break

    if debug:
        print("Policy Evaluation converged in {} iterations".format(k))
    return value_function

##<font size = 5pt>Policy Improvement</font>

<font size = 4pt>Parameters</font>

* env `gym.env` : gym environment <font size = 2pt>(must have P, nS, nA)</font>

* value_from_policy `np.ndarray[nS]` : value function corresponding to the given policy : $V^{\pi}(s)$

* policy `np.array[nS]` : the policy to evaluate : $\pi$

* gamma `float` : Discount factor : $\gamma$ in range $[0,1)$

<font size = 4pt>Returns</font>
* stable `boolean` : flags a change in the policy : stable $   \left\{
\begin{array}{ll}
     True & \text{if policy never changed}\\
     False & \text{otherwise}\\
\end{array}
\right.  $

* new_policy: `np.ndarray[nS]` : An improved policy with the optimal actions mapped, $\pi^{\prime}$

In [None]:
# Improve a given policy by using the state-action value function. Return boolean that flags if the policy changed and the policy
def policy_improvement(env, value_from_policy, policy, gamma=0.9):

	# Track policy stability
	stable = True

	# Loop through all states
	for state in range(env.nS):
		# Track previous action this policy used in this state
		prev_action = policy[state]
		# Calculate Q(s,a) for all actions and track max-Q(s,a) (max/'best' action)
		max_state_action_value = -1
		for action in range(env.nA):
			state_action_value = 0
			for probability, nextstate, reward, terminal in env.P[state][action]:
				state_action_value += probability * (reward + gamma * value_from_policy[nextstate])
			if (state_action_value > max_state_action_value):
				max_state_action_value = state_action_value
				policy[state] = action
		stable = False if prev_action != policy[state] else stable

	return stable, policy

##<font size = 5pt>Policy Iteration</font>

<font size = 4pt>Parameters</font>

* env `gym.env` : gym environment <font size = 2pt>(must have P, nS, nA)</font>

* gamma `float` : Discount factor : $\gamma$ in range $[0,1)$

* tol `float` : policy convergence tolerance for policy_evaluation()

* max_iterations `int` : Max number of iterations allowed in policy_evaluation() before stopping

<font size = 4pt>Returns</font>
* value_function `np.ndarray[nS]` : The value function of the optimal policy :  $V^*(s)$

* policy: `np.ndarray[nS]` : The optimal policy : $\pi^*(s)$

In [None]:
# Iteratively find the optimal policy and state-value function function by policy evaluation + policy improvement loops
def policy_iteration(env, gamma=0.9, tol=10e-3, max_iterations=int(1e3)):
	iter_count = 0

	# Initialize V(s) for all s
	value_function = np.zeros(env.nS)
    # Initialize pi(s) for all s
	policy = np.zeros(env.nS, dtype=int)

    # Track policy stability
	stable = False
	while(not stable):
		# Evaluate the policy
		value_function = policy_evaluation(env, policy, gamma, tol, max_iterations)
        # Improve the policy
		stable, policy = policy_improvement(env, value_function, policy, gamma)
		iter_count += 1

	if debug:
		print('\nPolicy Iteration converged in {} iterations'.format(iter_count))
	return value_function, policy

##<font size = 5pt>Value Iteration</font>

<font size = 4pt>Parameters</font>

* env `gym.env` : gym environment <font size = 2pt>(must have P, nS, nA)</font>

* gamma `float` : Discount factor : $\gamma$ in range $[0,1)$

* tol `float` : policy convergence tolerance for policy_evaluation()

* max_iterations `int` : Max number of iterations allowed in policy_evaluation() before stopping

<font size = 4pt>Returns</font>
* value_function `np.ndarray[nS]` : The value function of the optimal policy :  $V^*(s)$

* policy: `np.ndarray[nS]` : The optimal policy : $\pi^*(s)$

In [None]:
def value_iteration(env, gamma=0.9, tol=1e-3, max_iterations=int(1e3)):

	# Initialize V(s) = 0 for all s
	value_function = np.zeros(env.nS)
    # Initialize pi(s) = 0 for all s
	policy = np.zeros(env.nS, dtype=int)

	# Did a max-iteration just incase
	for k in range(max_iterations):
		delta = 0
        # Loop through all states
		for state in range(env.nS):
			prev_value = value_function[state]
			# loop through all actions
			for action in range(env.nA):
				# calculate Q(s,a)
				state_action_value = 0
				for probability, nextstate, reward, terminal in env.P[state][action]:
					state_action_value += probability * (reward + gamma * value_function[nextstate])
	            # Update V(s) with max-Q(s,a) and pi(s) with a from max-Q(s,a)
				if (state_action_value > value_function[state]):
					value_function[state] = state_action_value
					policy[state] = action
			delta = max(delta, abs(prev_value - value_function[state]))
		if delta < tol:
			# value function converged
			break

	if debug:
		print('Value Iteration converged in {} iterations.'.format(k))
	return value_function, policy

# Settings

In [None]:
#@title Settings <font size = 2pt>(run cell after change)</font>

debug = True #@param [True, False] {type:'boolean'}
print_episode_run = True #@param [True, False] {type:'boolean'}
frozen_lake_id =  'Deterministic-4x4-FrozenLake-v0' #@param ['Deterministic-4x4-FrozenLake-v0', 'Deterministic-8x8-FrozenLake-v0', 'Stochastic-4x4-FrozenLake-v0']
dim = 4
if (frozen_lake_id == 'Deterministic-8x8-FrozenLake-v0'):
    dim = 8

# Test Algorithms

In [None]:
env = gym.make(frozen_lake_id)

print("\n" + "-"*25 + "\nBeginning Policy Iteration\n" + "-"*25)
V_pi, p_pi = policy_iteration(env, gamma=0.9, tol=1e-3)
if debug:
    print ('\nOptimal Value Function:\n', V_pi.reshape(dim,dim), '\n\nOptimal Policy:\n', p_pi.reshape(dim,dim))
render_single(env, p_pi, 100)

print("\n" + "-"*25 + "\nBeginning Value Iteration\n" + "-"*25)
V_vi, p_vi = value_iteration(env, gamma=0.9, tol=1e-3)
if debug:
    print ('\nOptimal Value Function:\n', V_vi.reshape(dim,dim), '\n\nOptimal Policy:\n', p_vi.reshape(dim,dim))
render_single(env, p_vi, 100)
env.close()


-------------------------
Beginning Policy Iteration
-------------------------
Policy Evaluation converged in 0 iterations
Policy Evaluation converged in 1 iterations
Policy Evaluation converged in 2 iterations
Policy Evaluation converged in 3 iterations
Policy Evaluation converged in 4 iterations
Policy Evaluation converged in 5 iterations
Policy Evaluation converged in 6 iterations

Policy Iteration converged in 7 iterations

Optimal Value Function:
 [[0.59  0.656 0.729 0.656]
 [0.656 0.    0.81  0.   ]
 [0.729 0.81  0.9   0.   ]
 [0.    0.9   1.    0.   ]] 

Optimal Policy:
 [[1 2 1 0]
 [1 0 1 0]
 [2 1 1 0]
 [0 2 2 0]]

[41mS[0mFFF
FHFH
FFFH
HFFG
  (Down)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Down)
SFFF
FHFH
[41mF[0mFFH
HFFG
  (Right)
SFFF
FHFH
F[41mF[0mFH
HFFG
  (Down)
SFFF
FHFH
FFFH
H[41mF[0mFG
  (Right)
SFFF
FHFH
FFFH
HF[41mF[0mG
  (Right)
SFFF
FHFH
FFFH
HFF[41mG[0m
Agent reached terminal state in 6 steps gathering an episodic reward of: 1.0

-------------------------
Begi