# Reinforcement Learning

## Row Gridword Resolution

In [153]:
class GridWorldRow():
    def __init__(self, size=5):
        self.size = size
        self.nS = size # state number
        self.nA = 2 # action number (right, left)
        self.MAX_X = size-1 # max state, we start to index from 0
        P = {} # set up the environment {p:{a:[(p(s'|s,a), s', r(s,a,s'), done)]}

        for s in range(self.nS):

            dynamic_s = {}

            for a in range(self.nA):

                s_prime_list = []
                p = 1 if s != 0 and s != self.nS-1 else 0

                if a == 0:
                    s_prime = max(0, s-1)
                else:
                    s_prime = min(self.MAX_X, s+1)
                
                if s_prime == 0:
                    reward = -100
                    done = True
                elif s_prime == self.MAX_X:
                    reward = 10
                    done = True
                else:
                    reward = 0
                    done = False
                
                s_prime_list.append((p, s_prime, reward, done))
                dynamic_s.update({a:s_prime_list})

            P.update({s: dynamic_s})

        self.P = P # assign the environment

    def __str__(self):
        return f"{self.P}"

## Basic Algorithms

In [154]:
import numpy as np

In [155]:
env = GridWorldRow()
print(env)

{0: {0: [(0, 0, -100, True)], 1: [(0, 1, 0, False)]}, 1: {0: [(1, 0, -100, True)], 1: [(1, 2, 0, False)]}, 2: {0: [(1, 1, 0, False)], 1: [(1, 3, 0, False)]}, 3: {0: [(1, 2, 0, False)], 1: [(1, 4, 10, True)]}, 4: {0: [(0, 3, 0, False)], 1: [(0, 4, 10, True)]}}


### Policy Evalutation

In [156]:
pi = np.ones([env.nS, env.nA]) * 0.5
V = np.zeros([env.nS, 1])

gamma = 0.99 #replacement factor the return
theta = 1e-5 #similarity threshold to stop update

In [157]:
def compute_q_value_for_s_a(env, V, s, a, gamma):
    q = 0
    for (p_sPrime, sPrime, r_ss_a, done) in env.P[s][a]:
        q += p_sPrime * (r_ss_a + gamma * V[sPrime])
    return q

In [158]:
i = 0
while True:
    i += 1
    delta = 0

    for s in range(env.nS):
        V_new = 0

        for a in range(env.nA):
            prob_a = pi[s][a]
            q_s_a = compute_q_value_for_s_a(env, V, s, a, gamma)
            V_new += prob_a * q_s_a
        
        delta = max(delta, np.abs(V_new - V[s]))
        V[s] = V_new

    if delta < theta:
        print(f"Done after {i} iterations")
        break

Done after 22 iterations


In [159]:
V

array([[  0.        ],
       [-71.62196676],
       [-43.6807471 ],
       [-16.62196981],
       [  0.        ]])

### Policy Improvement

In [160]:
for s in range(env.nS):
    q_s = np.zeros([env.nA, 1])

    for a in range(env.nA):
        q_s[a] = compute_q_value_for_s_a(env, V, s, a, gamma)

    best_a = np.argmax(q_s)
    pi[s] = np.eye(env.nA)[best_a]

In [161]:
pi

array([[1., 0.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [1., 0.]])

### Policy Iteration

In [162]:
def evaluate_policy(env, pi, V, gamma, theta):
    V_updated = np.copy(V)
    improved = True

    while True:
        delta = 0

        for s in range(env.nS):
            V_new = 0

            for a in range(env.nA):
                prob_a = pi[s][a]
                q_s_a = compute_q_value_for_s_a(env, V, s, a, gamma)
                V_new += prob_a * q_s_a
            
            delta = max(delta, np.abs(V_new - V_updated[s]))
            V_updated[s] = V_new

        if delta < theta:
            break
    
    if (np.array_equal(V, V_updated)):
        improved = False

    return V_updated, improved

def improve_policy(env, pi, V, gamma):
    for s in range(env.nS):
        q_s = np.zeros([env.nA, 1])

        for a in range(env.nA):
             q_s[a] = compute_q_value_for_s_a(env, V, s, a, gamma)

        best_a = np.argmax(q_s)
        pi[s] = np.eye(env.nA)[best_a]

    return pi

In [163]:
def policy_iteration(env, pi, V, gamma, theta):
    i = 0
    while True:
        i += 1
        V, improved = evaluate_policy(env, pi, V, gamma, theta)
        pi = improve_policy(env, pi, V, gamma)

        if improved == False:
            print(f"Done after {i} step")
            break
    return pi, V

In [164]:
pi = np.ones([env.nS, env.nA]) * 0.5
V = np.zeros([env.nS, 1])

gamma = 0.99 #replacement factor the return
theta = 1e-5 #similarity threshold to stop update

In [165]:
pi, V = policy_iteration(env, pi, V, gamma, theta)
pi

Done after 5 step


array([[1., 0.],
       [0., 1.],
       [0., 1.],
       [0., 1.],
       [1., 0.]])