# Gridworld

[Exercises](https://github.com/SUTD-MARL/reinforcement-learning-exercises/tree/master/DP)

In [5]:
import numpy as np
import os, sys
import pprint

sys.path.insert(
    1,
    os.path.realpath(
        os.path.join(os.path.pardir, "reinforcement-learning-exercises")
    )
)

from lib.envs.gridworld import GridworldEnv
pp = pprint.PrettyPrinter(indent=2)

In [6]:
env = GridworldEnv()

In [8]:
pp.pprint(env.P)

{ 0: { 0: [(1.0, 0, 0.0, True)],
       1: [(1.0, 0, 0.0, True)],
       2: [(1.0, 0, 0.0, True)],
       3: [(1.0, 0, 0.0, True)]},
  1: { 0: [(1.0, 1, -1.0, False)],
       1: [(1.0, 2, -1.0, False)],
       2: [(1.0, 5, -1.0, False)],
       3: [(1.0, 0, -1.0, True)]},
  2: { 0: [(1.0, 2, -1.0, False)],
       1: [(1.0, 3, -1.0, False)],
       2: [(1.0, 6, -1.0, False)],
       3: [(1.0, 1, -1.0, False)]},
  3: { 0: [(1.0, 3, -1.0, False)],
       1: [(1.0, 3, -1.0, False)],
       2: [(1.0, 7, -1.0, False)],
       3: [(1.0, 2, -1.0, False)]},
  4: { 0: [(1.0, 0, -1.0, True)],
       1: [(1.0, 5, -1.0, False)],
       2: [(1.0, 8, -1.0, False)],
       3: [(1.0, 4, -1.0, False)]},
  5: { 0: [(1.0, 1, -1.0, False)],
       1: [(1.0, 6, -1.0, False)],
       2: [(1.0, 9, -1.0, False)],
       3: [(1.0, 4, -1.0, False)]},
  6: { 0: [(1.0, 2, -1.0, False)],
       1: [(1.0, 7, -1.0, False)],
       2: [(1.0, 10, -1.0, False)],
       3: [(1.0, 5, -1.0, False)]},
  7: { 0: [(1.0, 3, -1

## [Policy evaluation](https://github.com/SUTD-MARL/reinforcement-learning-exercises/blob/master/DP/Policy%20Evaluation.ipynb)

In [13]:
def q_fn(env, value, state, discount_factor):
    actions = env.P[state]
    return [
        transitions[0][2] + discount_factor * sum([
            transition[0] * value[transition[1]] for transition in transitions
        ]) for a, transitions in actions.items()
    ]

In [16]:
def policy_eval(policy, env, discount_factor=1.0,
                theta=0.00001):
    V = np.zeros(env.nS)
    new_V = np.zeros(env.nS)
    distance = 1
    
    while distance > theta:
        for s in range(env.nS):
            actions = env.P[s]
            q = q_fn(env, V, s, discount_factor)
            new_V[s] = sum([
                policy[s, a] * q[a] for a in actions.keys()
            ])
        
        distance = np.linalg.norm(V - new_V)
        V = new_V.copy()        
        
    return np.array(V)

In [17]:
random_policy = np.ones([env.nS, env.nA]) / env.nA
v = policy_eval(random_policy, env)

In [18]:
np.reshape(v, (4,4))

array([[  0.        , -13.9999661 , -19.99994976, -21.99994378],
       [-13.9999661 , -17.99995574, -19.9999501 , -19.99994976],
       [-19.99994976, -19.9999501 , -17.99995574, -13.9999661 ],
       [-21.99994378, -19.99994976, -13.9999661 ,   0.        ]])

## [Policy iteration](https://github.com/SUTD-MARL/reinforcement-learning-exercises/blob/master/DP/Policy%20Iteration.ipynb)

In [19]:
def policy_improvement(env, policy_eval_fn=policy_eval, discount_factor=1.0, theta=0.00001):
    """
    Policy Improvement Algorithm. Iteratively evaluates and improves a policy
    until an optimal policy is found.
    
    Args:
        env: The OpenAI envrionment.
        policy_eval_fn: Policy Evaluation function that takes 3 arguments:
            policy, env, discount_factor.
        discount_factor: gamma discount factor.
        
    Returns:
        A tuple (policy, V). 
        policy is the optimal policy, a matrix of shape [S, A] where each state s
        contains a valid probability distribution over actions.
        V is the value function for the optimal policy.
        
    """
    # Start with a random policy
    policy = np.ones([env.nS, env.nA]) / env.nA
    new_policy = np.zeros([env.nS, env.nA])
    distance = 1
    
    while distance > theta:
        V = policy_eval_fn(policy, env, discount_factor=discount_factor, theta=theta)
        for s in range(env.nS):
            actions = env.P[s]
            q = q_fn(env, V, s, discount_factor)
            best_action = max(enumerate(q),key=lambda x: x[1])[0]
            for a in actions.keys():
                if a == best_action:
                    new_policy[s, a] = 1
                else:
                    new_policy[s, a] = 0
        
        distance = np.linalg.norm(policy - new_policy)
        policy = new_policy.copy()
    
    return policy, policy_eval_fn(policy, env, discount_factor=discount_factor, theta=theta)

In [20]:
policy, value = policy_improvement(env)

In [21]:
pp.pprint(policy)

array([[1., 0., 0., 0.],
       [0., 0., 0., 1.],
       [0., 0., 0., 1.],
       [0., 0., 1., 0.],
       [1., 0., 0., 0.],
       [1., 0., 0., 0.],
       [1., 0., 0., 0.],
       [0., 0., 1., 0.],
       [1., 0., 0., 0.],
       [1., 0., 0., 0.],
       [0., 1., 0., 0.],
       [0., 0., 1., 0.],
       [1., 0., 0., 0.],
       [0., 1., 0., 0.],
       [0., 1., 0., 0.],
       [1., 0., 0., 0.]])


Reshaped Grid Policy (0=up, 1=right, 2=down, 3=left):

In [25]:
np.reshape(policy.argmax(axis=1), (4,4))

array([[0, 3, 3, 2],
       [0, 0, 0, 2],
       [0, 0, 1, 2],
       [0, 1, 1, 0]])

In [23]:
np.reshape(value, (4,4))

array([[ 0., -1., -2., -3.],
       [-1., -2., -3., -2.],
       [-2., -3., -2., -1.],
       [-3., -2., -1.,  0.]])

## [Value iteration](https://github.com/SUTD-MARL/reinforcement-learning-exercises/blob/master/DP/Value%20Iteration.ipynb)

In [26]:
def value_iteration(env, theta=0.0001, discount_factor=1.0):
    """
    Value Iteration Algorithm.
    
    Args:
        env: OpenAI env. env.P represents the transition probabilities of the environment.
            env.P[s][a] is a list of transition tuples (prob, next_state, reward, done).
            env.nS is a number of states in the environment. 
            env.nA is a number of actions in the environment.
        theta: We stop evaluation once our value function change is less than theta for all states.
        discount_factor: Gamma discount factor.
        
    Returns:
        A tuple (policy, V) of the optimal policy and the optimal value function.        
    """
    

    V = np.zeros(env.nS)
    policy = np.zeros([env.nS, env.nA])
    new_V = np.zeros(env.nS)
    distance = 1
    while distance > theta:
        for s in range(env.nS):
            actions = env.P[s]
            q = q_fn(env, V, s, discount_factor)
            best_action = max(enumerate(q),key=lambda x: x[1])[0]
            new_V[s] = max(q)
            for a in actions.keys():
                if a == best_action:
                    policy[s, a] = 1
                else:
                    policy[s, a] = 0
    
        distance = np.linalg.norm(V - new_V)
        V = new_V.copy()
        
    return policy, V

In [27]:
policy, value = value_iteration(env)

In [28]:
np.reshape(policy.argmax(axis=1), (4,4))

array([[0, 3, 3, 2],
       [0, 0, 0, 2],
       [0, 0, 1, 2],
       [0, 1, 1, 0]])

In [29]:
np.reshape(value, (4,4))

array([[ 0., -1., -2., -3.],
       [-1., -2., -3., -2.],
       [-2., -3., -2., -1.],
       [-3., -2., -1.,  0.]])