<a href="https://colab.research.google.com/github/Mariihmp/RL_Notebooks/blob/main/gridWorld_R1L.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [16]:
import numpy as np
import time

up = 0
down = 1
left = 2
right = 3

# no. of states and no. of variables
grid_size = 5
noS = grid_size * grid_size
noA = 4

S = range(noS)

reward = -1  # for every step

terminal_state = lambda s: s == 0 or s == noS - 1  # state terminal
wall = []

P = dict()

for s in S:
    P[s] = dict()

    if (terminal_state(s)):
        P[s][up] = (s, 1.0, 0.0)
        P[s][down] = (s, 1.0, 0.0)
        P[s][right] = (s, 1.0, 0.0)
        P[s][left] = (s, 1.0, 0.0)
    else:
        # Up
        next_s_up = s - grid_size
        if next_s_up < 0:  # top boundary
            P[s][up] = (s, 1.0, -1000.0)
        elif next_s_up in wall:
            P[s][up] = (s, 1.0, -1000.0)
        else:
            P[s][up] = (next_s_up, 1.0, reward)

        # Down
        next_s_down = s + grid_size
        if next_s_down >= noS:  # bottom boundary
            P[s][down] = (s, 1.0, -1000.0)
        elif next_s_down in wall:
            P[s][down] = (s, 1.0, -1000.0)
        else:
            P[s][down] = (next_s_down, 1.0, reward)

        # Right
        next_s_right = s + 1
        if (s + 1) % grid_size == 0:  #  right boundary
            P[s][right] = (s, 1.0, -1000.0)
        elif next_s_right in wall:
            P[s][right] = (s, 1.0, -1000.0)
        else:
            P[s][right] = (next_s_right, 1.0, reward)

        # Left
        next_s_left = s - 1
        if s % grid_size == 0:  # left boundary
            P[s][left] = (s, 1.0, -1000.0)
        elif next_s_left in wall:
            P[s][left] = (s, 1.0, -1000.0)
        else:
            P[s][left] = (next_s_left, 1.0, reward)


print('No. of states in grid: ', noS)
print('No. of action options in each state:', noA)
Action_Index = dict()
Action_Index[0] = 'up'
Action_Index[1] = 'down'
Action_Index[2] = 'left'
Action_Index[3] = 'right'
Action_Index[5] = 'terminal states (stay)'
Action_Index[7] = 'wall'

print('Index for actions:')
for k, v in Action_Index.items():
    print(k, ":", v)


No. of states in grid:  25
No. of action options in each state: 4
Index for actions:
0 : up
1 : down
2 : left
3 : right
5 : terminal states (stay)
7 : wall


###The value fuction Vpi for a given policy pi (policy evaluation)







In [17]:
def policy_evaluation(P, policy, threshold, discount):
    value = np.zeros((noS,))
    while True:
        new_value = np.zeros((noS,))

        change = 0
        for s in S:
            v = 0

            for a, action_prob in enumerate(policy[s]):
                next_state, probability, reward = P[s][a]
                temp = probability * action_prob * (reward + discount * value[next_state])
                v += temp

            change = max(change, np.abs(v - value[s]))
            new_value[s] = v

        if change < threshold:
            break

        value = new_value

    return value


In [18]:
#Testing policy evaluation

random_policy = np.ones([noS, 4]) / 4

threshold = 0.0001
discount = 1.0
value = np.zeros((noS,))

random_policy_value = policy_evaluation(P, random_policy, threshold, discount)
for w_state in wall:
    random_policy_value[w_state] = 13
print('Value Function for policy:')
print(random_policy_value.reshape(grid_size, grid_size))

Value Function for policy:
[[    0.         -4018.9981729  -6094.93052749 -7365.66336612
  -8366.66316978]
 [-4018.9981729  -4959.06419637 -5897.13035853 -6632.3967717
  -7365.66336612]
 [-6094.93052749 -5897.13035853 -5898.13027261 -5897.13035853
  -6094.93052749]
 [-7365.66336612 -6632.3967717  -5897.13035853 -4959.06419637
  -4018.9981729 ]
 [-8366.66316978 -7365.66336612 -6094.93052749 -4018.9981729
      0.        ]]


In [19]:
#Policy Iteration

def policy_iteration(P, discount, threshold):
    value = np.zeros((noS,))
    policy = np.ones([noS, 4]) / 4

    while True:
        # Policy evaluation
        value = policy_evaluation(P, policy, threshold, discount)

        new_policy = np.zeros([noS, 4])

        # Policy Improvement
        policy_stable = True
        for s in S:
            if not terminal_state(s) and s not in wall:
                old_action_probs = policy[s]
                action_values = np.zeros(noA)
                for a in range(noA):  # Iterating over all the actions
                    next_state, probability, reward = P[s][a]
                    action_values[a] += probability * (reward + discount * value[next_state])
                max_total = np.amax(action_values)  # taking the max reward value
                best_a = np.argmax(action_values)

                new_policy[s][best_a] = 1

                if (np.array_equal(old_action_probs, new_policy[s]) != True):
                    policy_stable = False
            elif terminal_state(s): # terminal states
                new_policy[s][up] = 0.25
                new_policy[s][down] = 0.25
                new_policy[s][left] = 0.25
                new_policy[s][right] = 0.25
            elif s in wall: #wall states
                new_policy[s][up] = 0.25
                new_policy[s][down] = 0.25
                new_policy[s][left] = 0.25
                new_policy[s][right] = 0.25


        if policy_stable:
            for w_state in wall:
                value[w_state] = 13
            return new_policy, value
        else:
            policy = new_policy

# start = time.process_time()
best_policy, corr_value = policy_iteration(P, discount, threshold)
# end = time.process_time()

show_best_policy = np.zeros(noS,)
for s, p_s in enumerate(best_policy):
    if terminal_state(s):
        show_best_policy[s] = 5
    elif s in wall:
        show_best_policy[s] = 7
    else:
        show_best_policy[s] = np.argmax(p_s)


print('Best policy with Policy Iteration ')
print(show_best_policy.reshape(grid_size, grid_size))
print('Corresponding Value Function is')
print(corr_value.reshape(grid_size, grid_size))
print('Time taken')
# print(end - start)


Best policy with Policy Iteration 
[[5. 2. 2. 2. 1.]
 [0. 0. 0. 0. 1.]
 [0. 0. 0. 1. 1.]
 [0. 0. 1. 1. 1.]
 [0. 3. 3. 3. 5.]]
Corresponding Value Function is
[[ 0. -1. -2. -3. -4.]
 [-1. -2. -3. -4. -3.]
 [-2. -3. -4. -3. -2.]
 [-3. -4. -3. -2. -1.]
 [-4. -3. -2. -1.  0.]]
Time taken


###value iteration---> optimal value fuction

In [20]:
def value_iteration(P, discount, threshold):
    value = np.zeros((noS,))

    while True:
        new_policy = np.zeros([noS, 4])
        change = 0
        for s in S:
            if not terminal_state(s) and s not in wall:
                v = value[s]
                action_values = np.zeros(noA)
                for a in range(noA):  # Iterating over all the actions
                    next_state, probability, reward = P[s][a]
                    action_values[a] += probability * (reward + discount * value[next_state])
                max_total = np.amax(action_values)  # max reward value
                best_a = np.argmax(action_values)

                value[s] = max_total
                new_policy[s][best_a] = 1

                change = max(change, np.abs(v - value[s]))
            elif terminal_state(s):
                new_policy[s][up] = 0.25
                new_policy[s][down] = 0.25
                new_policy[s][left] = 0.25
                new_policy[s][right] = 0.25
            elif s in wall:
                new_policy[s][up] = 0.25
                new_policy[s][down] = 0.25
                new_policy[s][left] = 0.25
                new_policy[s][right] = 0.25


        if change < threshold:
            break

    for w_state in wall:
        value[w_state] = 13
    return new_policy, value

start = time.process_time()
best_policy_vi, corr_value_vi = value_iteration(P, discount, threshold)
end = time.process_time()

show_best_policy_vi = np.zeros(noS,)
for s, p_s in enumerate(best_policy_vi):
    if terminal_state(s):
        show_best_policy_vi[s] = 5
    elif s in wall:
        show_best_policy_vi[s] = 7
    else:
        show_best_policy_vi[s] = np.argmax(p_s)



In [21]:
print('Best policy with Value Iteration is')
print(show_best_policy_vi.reshape(grid_size, grid_size))
print('Corresponding Value Function is')
print(corr_value_vi.reshape(grid_size, grid_size))
print('Time taken')
print(end - start)

Best policy with Value Iteration is
[[5. 2. 2. 2. 1.]
 [0. 0. 0. 0. 1.]
 [0. 0. 0. 1. 1.]
 [0. 0. 1. 1. 1.]
 [0. 3. 3. 3. 5.]]
Corresponding Value Function is
[[ 0. -1. -2. -3. -4.]
 [-1. -2. -3. -4. -3.]
 [-2. -3. -4. -3. -2.]
 [-3. -4. -3. -2. -1.]
 [-4. -3. -2. -1.  0.]]
Time taken
0.00267201400000161
