In [4]:
import numpy as np
import gym
import time
from lake_envs import *
import random 

np.set_printoptions(precision=3)

In [5]:
"""
For policy_evaluation, policy_improvement, policy_iteration and value_iteration,
the parameters P, nS, nA, gamma are defined as follows:

	P: nested dictionary
		From gym.core.Environment
		For each pair of states in [1, nS] and actions in [1, nA], P[state][action] is a
		tuple of the form (probability, nextstate, reward, terminal) where
			- probability: float
				the probability of transitioning from "state" to "nextstate" with "action"
			- nextstate: int
				denotes the state we transition to (in range [0, nS - 1])
			- reward: int
				either 0 or 1, the reward for transitioning from "state" to
				"nextstate" with "action"
			- terminal: bool
			  True when "nextstate" is a terminal state (hole or goal), False otherwise
	nS: int
		number of states in the environment
	nA: int
		number of actions in the environment
	gamma: float
		Discount factor. Number in range [0, 1)
"""

'\nFor policy_evaluation, policy_improvement, policy_iteration and value_iteration,\nthe parameters P, nS, nA, gamma are defined as follows:\n\n\tP: nested dictionary\n\t\tFrom gym.core.Environment\n\t\tFor each pair of states in [1, nS] and actions in [1, nA], P[state][action] is a\n\t\ttuple of the form (probability, nextstate, reward, terminal) where\n\t\t\t- probability: float\n\t\t\t\tthe probability of transitioning from "state" to "nextstate" with "action"\n\t\t\t- nextstate: int\n\t\t\t\tdenotes the state we transition to (in range [0, nS - 1])\n\t\t\t- reward: int\n\t\t\t\teither 0 or 1, the reward for transitioning from "state" to\n\t\t\t\t"nextstate" with "action"\n\t\t\t- terminal: bool\n\t\t\t  True when "nextstate" is a terminal state (hole or goal), False otherwise\n\tnS: int\n\t\tnumber of states in the environment\n\tnA: int\n\t\tnumber of actions in the environment\n\tgamma: float\n\t\tDiscount factor. Number in range [0, 1)\n'

In [6]:
def render_single(env, policy, max_steps=100):
  """
    This function does not need to be modified
    Renders policy once on environment. Watch your agent play!

    Parameters
    ----------
    env: gym.core.Environment
      Environment to play on. Must have nS, nA, and P as
      attributes.
    Policy: np.array of shape [env.nS]
      The action to take at a given state
  """

  episode_reward = 0
  ob = env.reset()
  for t in range(max_steps):
    env.render()
    time.sleep(0.25)
    a = policy[ob]
    ob, rew, done, _ = env.step(a)
    episode_reward += rew
    if done:
      break
  env.render();
  if not done:
    print("The agent didn't reach a terminal state in {} steps.".format(max_steps))
  else:
  	print("Episode reward: %f" % episode_reward)


## Policy Evaluation

$V^{\pi}_{k}(s) \leftarrow \underset{a}{\sum}\pi(a|s) \underset{s'}{\sum} P(s'|s,\pi (s))(R(s,\pi(s),s')+ \gamma V^{\pi}_{k-1}(s'))$

## Policy Improvement

$\pi_{k+1}(s) \leftarrow \underset{a}{argmax}\ q^{\pi}(s,a)$

Where $ q^{\pi}(s,a) = \underset{s'}{\sum} P(s'|s,a)(R(s,\pi(s),s')+ \gamma V^{\pi}(s'))$

## Policy Iteration

1. Initialize Policy $\pi$

2. Evaluate: Until $V^{\pi}_{k}(s) - V^{\pi}_{k+1}(s) < $ tolerance: For all  $s$ in $\mathbb S$ , evalute $\pi$ as $V^{\pi}_{k}(s)$
3. Improve: If $\pi_{k}(s) \ \ != \pi_{k+1}(s) <$ perform policy improvement using $V^{\pi}_{k}(s)$

## Value Iteration

1. Initialize $V(s)$ for all s in $\mathbb S $
2. Until $V_{k}(s) - V_{k+1}(s) < $ tolerance: For all  $s$ in $\mathbb S$ , $V_{k}(s)$ as

$V_{k}(s) = \underset{a}{max} \underset{s}{\sum} P(s'|s,a)(R(s,a,s')+ \gamma V(s'))$
3. Perform policy improvement using $V_{k}(s)$ such that $ \pi \approx \pi *$

In [7]:
def isover(V,V_new,tol):
	if np.all(np.abs(V - V_new) < tol) :    #np.sum(np.sqrt(np.square(V_new-V))) < tol
		return 1
	return 0

def policy_evaluation(P, nS, nA, policy, gamma=0.9,  tol=1e-3):
	"""Evaluate the value function from a given policy.
	Parameters
	----------
	P: dictionary
		It is from gym.core.Environment
		P[state][action] is tuples with (probability, nextstate, reward, terminal)
	nS: int
		number of states
	nA: int
		number of actions
	gamma: float
		Discount factor. Number in range [0, 1)
	policy: np.array
		The policy to evaluate. Maps states to actions.
	max_iteration: int
		The maximum number of iterations to run before stopping. Feel free to change it.
	tol: float
		Determines when value function has converged.
	Returns
	-------
	value function: np.ndarray
		The value function from the given policy.
	"""
	############################
	# YOUR IMPLEMENTATION HERE #
	############################
	V=np.zeros(nS)
	V_new=V.copy()
	i=0
	for i in range(max_iteration):
		V=V_new.copy()
		V_new = np.zeros(nS, dtype=float)
		for state in range(nS):
			for probability, nextstate, reward, terminal in P[state][policy[state]]:
				V_new[state] += probability * (reward + gamma * V[nextstate])
		if isover(V,V_new,tol) :
			break
	return V_new

In [8]:
def policy_improvement(P, nS, nA, value_from_policy, policy, gamma=0.9):
	"""Given the value function from policy improve the policy.
	Parameters
	----------
	P: dictionary
		It is from gym.core.Environment
		P[state][action] is tuples with (probability, nextstate, reward, terminal)
	nS: int
		number of states
	nA: int
		number of actions
	gamma: float
		Discount factor. Number in range [0, 1)
	value_from_policy: np.ndarray
		The value calculated from the policy
	policy: np.array
		The previous policy.
	Returns
	-------
	new policy: np.ndarray
		An array of integers. Each integer is the optimal action to take
		in that state according to the environment dynamics and the
		given value function.
	"""
	############################
	# YOUR IMPLEMENTATION HERE #
	############################
	P_new = np.zeros(nS, dtype=int)
	for state in range(nS):
		B=np.zeros(nA,dtype=float)
		q=-99
		for action in range(nA):
			for probability, nextstate, reward, terminal in P[state][action]:
				B[action] += probability * (reward + gamma * value_from_policy[nextstate])
			if(B[action]>q):
				q=B[action]
				P_new[state]=action
			elif(q == B[action]):
				if random.random() < 0.5:
					P_new[state]=action
	return P_new

In [9]:
def policy_iteration(P, nS, nA, gamma=0.9, max_iteration=20, tol=1e-3):
	"""Runs policy iteration.
	You should use the policy_evaluation and policy_improvement methods to
	implement this method.
	Parameters
	----------
	P: dictionary
		It is from gym.core.Environment
		P[state][action] is tuples with (probability, nextstate, reward, terminal)
	nS: int
		number of states
	nA: int
		number of actions
	gamma: float
		Discount factor. Number in range [0, 1)
	max_iteration: int
		The maximum number of iterations to run before stopping. Feel free to change it.
	tol: float
		Determines when value function has converged.
	Returns:
	----------
	value function: np.ndarray
	policy: np.ndarray
	"""
	V = np.zeros(nS,dtype=float)
	policy = np.zeros(nS, dtype=int)
	for s in range(nS):
		policy[s]=s%nA
	for i in range(max_iteration):
		V_new=policy_evaluation(P, nS, nA, policy, gamma)
		policy_new=policy_improvement(P, nS, nA, V_new, policy, gamma)
		if isover(V,V_new,tol) :
			break
		V=V_new.copy()
		policy=policy_new.copy()
	############################
	# YOUR IMPLEMENTATION HERE #
	############################
	return V, policy

In [10]:
def value_iteration(P, nS, nA, gamma=0.9, max_iteration=20, tol=1e-3):
	"""
	Learn value function and policy by using value iteration method for a given
	gamma and environment.
	Parameters:
	----------
	P: dictionary
		It is from gym.core.Environment
		P[state][action] is tuples with (probability, nextstate, reward, terminal)
	nS: int
		number of states
	nA: int
		number of actions
	gamma: float
		Discount factor. Number in range [0, 1)
	max_iteration: int
		The maximum number of iterations to run before stopping. Feel free to change it.
	tol: float
		Determines when value function has converged.
	Returns:
	----------
	value function: np.ndarray
	policy: np.ndarray
	"""
	V = np.zeros(nS,dtype=float)
	policy = np.zeros(nS, dtype=int)
	for i in range(max_iteration):
		V_next=np.zeros(nS,dtype=float)
		for s in range(nS):
			for a in range(nA):
				q=0
				for probability, nextstate, reward, terminal in P[s][a]:
					q += probability * (reward + gamma * V[nextstate])
				if V_next[s] < q:
					V_next[s] = q
		if isover(V,V_next,tol):
			break
		V = V_next.copy()
	policy=policy_improvement(P, nS, nA, V_next, policy, gamma)
	############################
	# YOUR IMPLEMENTATION HERE #
	############################
	return V_next, policy

In [11]:
# comment/uncomment these lines to switch between deterministic/stochastic environments
env = gym.make("Deterministic-4x4-FrozenLake-v0")
#env = gym.make("Stochastic-4x4-FrozenLake-v0")

print("\n" + "-"*25 + "\nBeginning Policy Iteration\n" + "-"*25)
V_pi, p_pi = policy_iteration(env.P, env.nS, env.nA, gamma=0.9, tol=1e-3)
render_single(env, p_pi, 100)

print("\n" + "-"*25 + "\nBeginning Value Iteration\n" + "-"*25)

V_vi, p_vi = value_iteration(env.P, env.nS, env.nA, gamma=0.9, tol=1e-3)
render_single(env, p_vi, 100)


-------------------------
Beginning Policy Iteration
-------------------------

[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS