<a href="https://colab.research.google.com/github/d61h6k4/notebooks/blob/master/CS234/Assignment1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install --upgrade gym

Requirement already up-to-date: gym in /usr/local/lib/python3.6/dist-packages (0.15.3)


### MDP Value Iteration and Policy Iteration

In [0]:
import numpy as np
import gym
import time

In [0]:
np.set_printoptions(precision=3)

gym.envs.registration.register(
    id='Deterministic-4x4-FrozenLake-v0',
    entry_point='gym.envs.toy_text.frozen_lake:FrozenLakeEnv',
    kwargs={'map_name': '4x4',
            'is_slippery': False})

gym.envs.registration.register(
    id='Deterministic-8x8-FrozenLake-v0',
    entry_point='gym.envs.toy_text.frozen_lake:FrozenLakeEnv',
    kwargs={'map_name': '8x8',
            'is_slippery': False})

gym.envs.registration.register(
    id='Stochastic-4x4-FrozenLake-v0',
    entry_point='gym.envs.toy_text.frozen_lake:FrozenLakeEnv',
    kwargs={'map_name': '4x4',
            'is_slippery': True})

For policy_evaluation, policy_improvement, policy_iteration and value_iteration,
the parameters P, nS, nA, gamma are defined as follows:

	P: nested dictionary
		From gym.core.Environment
		For each pair of states in [1, nS] and actions in [1, nA], P[state][action] is a
		tuple of the form (probability, nextstate, reward, terminal) where
			- probability: float
				the probability of transitioning from "state" to "nextstate" with "action"
			- nextstate: int
				denotes the state we transition to (in range [0, nS - 1])
			- reward: int
				either 0 or 1, the reward for transitioning from "state" to
				"nextstate" with "action"
			- terminal: bool
			  True when "nextstate" is a terminal state (hole or goal), False otherwise
	nS: int
		number of states in the environment
	nA: int
		number of actions in the environment
	gamma: float
		Discount factor. Number in range [0, 1)

In [0]:
def policy_evaluation(P, nS, nA, policy, gamma=0.9, tol=1e-3):
	"""Evaluate the value function from a given policy.

	Parameters
	----------
	P, nS, nA, gamma:
		defined at beginning of file
	policy: np.array[nS]
		The policy to evaluate. Maps states to actions.
	tol: float
		Terminate policy evaluation when
			max |value_function(s) - prev_value_function(s)| < tol
	Returns
	-------
	value_function: np.ndarray[nS]
		The value function of the given policy, where value_function[s] is
		the value of state s
	"""

	value_function = np.zeros(nS)

	############################
	# YOUR IMPLEMENTATION HERE #
	prev_value_function = np.ones(nS)
	value_function = np.zeros(nS)
 
	while np.max(np.abs(value_function - prev_value_function)) > tol:
		prev_value_function = np.copy(value_function)
		for s in range(nS):
			probability, nextstate, reward, terminal = P[s][policy[s]][0]
			value_function[s] = reward + gamma * probability * prev_value_function[nextstate]

	############################
	return value_function


In [0]:
def policy_improvement(P, nS, nA, value_from_policy, policy, gamma=0.9):
	"""Given the value function from policy improve the policy.

	Parameters
	----------
	P, nS, nA, gamma:
		defined at beginning of file
	value_from_policy: np.ndarray
		The value calculated from the policy
	policy: np.array
		The previous policy.

	Returns
	-------
	new_policy: np.ndarray[nS]
		An array of integers. Each integer is the optimal action to take
		in that state according to the environment dynamics and the
		given value function.
	"""

	new_policy = np.zeros(nS, dtype='int')

	############################
	# YOUR IMPLEMENTATION HERE #
	state_action_value = np.zeros((nS, nA))
	for s in range(nS):
		for a in range(nA):
			probability, nextstate, reward, terminal = P[s][a][0]
			state_action_value[s][a] = reward + gamma * probability * value_from_policy[nextstate]
	new_policy = np.argmax(state_action_value, axis=1)
	############################
	return new_policy

In [0]:
def policy_iteration(P, nS, nA, gamma=0.9, tol=10e-3):
	"""Runs policy iteration.

	You should call the policy_evaluation() and policy_improvement() methods to
	implement this method.

	Parameters
	----------
	P, nS, nA, gamma:
		defined at beginning of file
	tol: float
		tol parameter used in policy_evaluation()
	Returns:
	----------
	value_function: np.ndarray[nS]
	policy: np.ndarray[nS]
	"""
	value_function = np.zeros(nS)
	policy = np.zeros(nS, dtype=int)
    ############################
    # YOUR IMPLEMENTATION HERE #
	policy = np.zeros(nS)
	prev_policy = np.ones(nS)
	while np.max(np.abs(policy - prev_policy)) > tol:
		prev_policy = np.copy(policy)
		value_function = policy_evaluation(P, nS, nA, policy, gamma, tol)
		policy = policy_improvement(P, nS, nA, value_function, policy, gamma)
	############################
	return value_function, policy

In [0]:
def value_iteration(P, nS, nA, gamma=0.9, tol=1e-3):
	"""
	Learn value function and policy by using value iteration method for a given
	gamma and environment.

	Parameters:
	----------
	P, nS, nA, gamma:
		defined at beginning of file
	tol: float
		Terminate value iteration when
			max |value_function(s) - prev_value_function(s)| < tol
	Returns:
	----------
	value_function: np.ndarray[nS]
	policy: np.ndarray[nS]
	"""

	value_function = np.zeros(nS)
	policy = np.zeros(nS, dtype=int)
	############################
	# YOUR IMPLEMENTATION HERE #
	prev_value_function = np.ones(nS)
	while np.max(np.abs(value_function - prev_value_function)) > tol:
		prev_value_function = np.copy(value_function)
		state_action_value = np.zeros((nS, nA))
		for s in range(nS):
			for a in range(nA):
				probability, nextstate, reward, terminal = P[s][a][0]
				state_action_value[s][a] = reward + gamma * probability * value_function[nextstate]
		value_function = np.max(state_action_value, axis=1)
	policy = np.argmax(state_action_value, axis=1)

	############################
	return value_function, policy

In [0]:
def render_single(env, policy, max_steps=100):
  """
    This function does not need to be modified
    Renders policy once on environment. Watch your agent play!

    Parameters
    ----------
    env: gym.core.Environment
      Environment to play on. Must have nS, nA, and P as
      attributes.
    Policy: np.array of shape [env.nS]
      The action to take at a given state
  """

  episode_reward = 0
  ob = env.reset()
  for t in range(max_steps):
    env.render()
    time.sleep(0.25)
    a = policy[ob]
    ob, rew, done, _ = env.step(a)
    episode_reward += rew
    if done:
      break
  env.render();
  if not done:
    print("The agent didn't reach a terminal state in {} steps.".format(max_steps))
  else:
  	print("Episode reward: %f" % episode_reward)

In [11]:

# comment/uncomment these lines to switch between deterministic/stochastic environments
# env = gym.make("Deterministic-4x4-FrozenLake-v0")
env = gym.make("Stochastic-4x4-FrozenLake-v0")

print("\n" + "-"*25 + "\nBeginning Policy Iteration\n" + "-"*25)

V_pi, p_pi = policy_iteration(env.P, env.nS, env.nA, gamma=0.9, tol=1e-3)
print(f"Policy iteration: {p_pi}")
render_single(env, p_pi, 1000)

print("\n" + "-"*25 + "\nBeginning Value Iteration\n" + "-"*25)

V_vi, p_vi = value_iteration(env.P, env.nS, env.nA, gamma=0.9, tol=1e-3)
print(f"Value iteration: {p_vi}")
render_single(env, p_vi, 1000)


-------------------------
Beginning Policy Iteration
-------------------------
Policy iteration: [2 3 2 1 2 0 2 0 3 2 2 0 0 3 3 0]

[41mS[0mFFF
FHFH
FFFH
HFFG
  (Right)
S[41mF[0mFF
FHFH
FFFH
HFFG
  (Up)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Right)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Right)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Right)
SFFF
F[41mH[0mFH
FFFH
HFFG
Episode reward: 0.000000

-------------------------
Beginning Value Iteration
-------------------------
Value iteration: [2 3 2 1 2 0 2 0 3 2 2 0 0 3 3 0]

[41mS[0mFFF
FHFH
FFFH
HFFG
  (Right)
S[41mF[0mFF
FHFH
FFFH
HFFG
  (Up)
SF[41mF[0mF
FHFH
FFFH
HFFG
  (Right)
SFFF
FH[41mF[0mH
FFFH
HFFG
  (Right)
SFFF
FHF[41mH[0m
FFFH
HFFG
Episode reward: 0.000000
