In [1]:
import numpy as np

In [2]:
"""
Used for Q4. Represents the grid world of fig. 3.2 and 3.5.
"""
class gridWorld_1():
    def __init__(self, grid_size=5, actions=['N', 'S', 'E', 'W'], gamma=0.9):
        self.grid_size = grid_size
        self.actions = actions
        self.grid_values = np.zeros((grid_size, grid_size))
        self.prev_values = np.zeros((grid_size, grid_size))
        self.gamma = gamma
        
    def get_new_state_reward(self, state, action):
        """
        input state tuple and action char
        
        Returns state, reward
        """
        x, y = state
        if (x==0 and y==0) and (action=='N' or action=='W'):
            return (x, y), -1
        
        elif (x==0 and y==self.grid_size-1) and (action=='N' or action=='E'):
            return (x, y), -1
        
        elif (x==self.grid_size-1 and y==self.grid_size-1) and (action=='S' or action=='E'):
            return (x, y), -1
        
        elif (x==self.grid_size-1 and y==0) and (action=='S' or action=='W'):
            return (x, y), -1
        
        elif x==0 and action=='N':
            return (x, y), -1
        
        elif x==self.grid_size-1 and action=='S':
            return (x, y), -1
        
        elif y==0 and action=='W':
            return (x, y), -1
        
        elif y==self.grid_size-1 and action=='E':
            return (x, y), -1
        
        elif x==0 and y==1:
            return (4, 1), 10
        
        elif x==0 and y==3:
            return (2, 3), 5
        
        elif action=='N':
            return (x-1, y), 0
        elif action=='S':
            return (x+1, y), 0
        elif action=='E':
            return (x, y+1), 0
        elif action=='W':
            return (x, y-1), 0
        
        else:
            print("Unknown state or action:", state, action)
            
    def get_new_value_estimate(self, i, j):
#         temp = 0
        prev_max = -np.inf
        for action in self.actions:
            a, b = self.get_new_state_reward((i, j), action)
            c = self.gamma * self.grid_values[a[0], a[1]] + b
            if prev_max < c:
                prev_max = c
            
        return prev_max
            
    def value_iteration(self):
        for i in range(self.grid_size):
            for j in range(self.grid_size):
                self.grid_values[i, j] = self.get_new_value_estimate(i, j)
                
    def get_grid(self):
        for i in range(10000):
            self.value_iteration()
#             print("Iteration:", i)
        print(np.round(self.grid_values, decimals=1), "\n")
            

In [3]:
"""
Q2. 

We attempt to solve the system of 25 linear equations (1 equation per state)
using Ax = b, where A is a matrix of coefficients of the equations, x is a
a vector of all state values and b is a vector of constants obtained from the 
system of equations.

This can be done using the numpy library.
"""
import numpy as np
from numpy import linalg

A = np.zeros((25, 25), dtype=np.float32)
b = np.zeros((25,), dtype=np.float32)
with open('variable_coeffs.csv', 'r') as f:
    lines = f.readlines()
    
for i, line in enumerate(lines):
    A[i] = np.asarray(list(map(float, line.split(","))))
    
with open('constants.csv', 'r') as f:
    lines = f.readlines()
    
for i, line in enumerate(lines):
    b[i] = np.asarray(list(map(float, line.split(","))))
    
# print(A)
# print(b)

x = linalg.solve(A, b)
x = np.reshape(x, (5, 5))
print(np.round(x, decimals=1))

[[ 3.3  8.8  4.4  5.3  1.5]
 [ 1.5  3.   2.3  1.9  0.5]
 [ 0.1  0.7  0.7  0.4 -0.4]
 [-1.  -0.4 -0.4 -0.6 -1.2]
 [-1.9 -1.3 -1.2 -1.4 -2. ]]


In [4]:
"""
Q4. 

We solve for optimal state values by picking actions corresponding max return of rewards and solving 
Belman equations using Value Iteration. I iterate for 10,000 steps considering gamma^10,000 ~ 0 hence 
rewards will have converged as well. 
"""
A = gridWorld_1()
A.get_grid()

[[22.  24.4 22.  19.4 17.5]
 [19.8 22.  19.8 17.8 16. ]
 [17.8 19.8 17.8 16.  14.4]
 [16.  17.8 16.  14.4 13. ]
 [14.4 16.  14.4 13.  11.7]] 



In [71]:
 """
 Used in Q6. Represents the grid world of example 4.1
 """
import random

class gridWorld_2():
    def __init__(self, grid_size=4, actions=['N', 'S', 'E', 'W'], gamma=1):
        self.grid_size = grid_size
        self.actions = actions
        self.grid_values = np.zeros((grid_size, grid_size))
        self.prev_values = np.zeros((grid_size, grid_size))
        self.gamma = gamma
        self.init_policy = {(0, 0):'S', (0, 1):'S', (0, 2):'S', (0, 3):'S', (0, 4):'S',
                            (1, 0):'S', (1, 1):'S', (1, 2):'S', (1, 3):'S', (1, 4):'S',
                            (2, 0):'S', (2, 1):'S', (2, 2):'S', (2, 3):'S', (3, 4):'S',
                            (3, 0):'S', (3, 1):'S', (3, 2):'S', (3, 3):'S', (4, 4):'S',}
        
    def get_new_state_reward(self, state, action):
        """
        input state tuple and action char
        
        Returns state, reward
        """
        x, y = state
        if (x==0 and y==0):
            # Terminal State!
            return (x, y), 0
        
        elif (x==0 and y==self.grid_size-1) and (action=='N' or action=='E'):
            return (x, y), -1
        
        elif (x==self.grid_size-1 and y==self.grid_size-1):
            # Terminal State!
            return (x, y), 0
        
        elif (x==self.grid_size-1 and y==0) and (action=='S' or action=='W'):
            return (x, y), -1
        
        elif x==0 and action=='N':
            return (x, y), -1
        
        elif x==self.grid_size-1 and action=='S':
            return (x, y), -1
        
        elif y==0 and action=='W':
            return (x, y), -1
        
        elif y==self.grid_size-1 and action=='E':
            return (x, y), -1
        
        elif action=='N':
            return (x-1, y), -1
        elif action=='S':
            return (x+1, y), -1
        elif action=='E':
            return (x, y+1), -1
        elif action=='W':
            return (x, y-1), -1
        
        else:
            print("Unknown state or action:", state, action)    
        
    def get_optimal_value_estimate(self, i, j):
        """
        We take advantage of the fact that my MDP, P(s', r | s, a) for various s and a
        in the current question is deterministic, ie either 0 or 1. It is convinient to 
        consider only the states that are reachable by action ('up', 'down', 'left', 'right')
        in the estimation of a given state and simply ignore all other states (as they are 
        not reachable, P(s', r | s, a) for those states is actually 0).
        """
        prev_max = -np.inf
        for action in self.actions:
            a, b = self.get_new_state_reward((i, j), action)
            c = self.gamma * self.grid_values[a[0], a[1]] + b
            if prev_max < c:
                prev_max = c
            
        return prev_max
            
    def policy_improvement(self, policy, epsilon=1e-5):
        """
        @policy:
            A dict() object containing state keys and optimal action values.
        @state = (i, j)
        
        Internally updates self.grid_values to optimal and returns 
        optimal_state_values (copy of self.grid_values) and optimal_policy
        
        We take advantage of the fact that the MDP here is deterministic,
        meaning given a state and a chosen action, P(s', r | s, a) = 1 or 0. This
        is because it mentioned in the question that the agent will go in the direction
        as per the action with probability 1. Hence for the action chosen by the policy 
        we recieve a reward from the `get_new_state_reward()` method with probability 1.
        
        We fix contest between equally good policies by ignoring a policy giving equal 
        rewards as another.
        """
        stable_policy = True
        for i in range(self.grid_size):
            for j in range(self.grid_size):
                if (i, j)!=(0, 0) and (i, j)!=(self.grid_size-1, self.grid_size-1):
                    
                    old_action = policy[(i, j)]
                    new_action = None
                    prev_max = -np.inf
                    for action in self.actions:

                        a, b = self.get_new_state_reward((i, j), action)
                        c = self.gamma * self.grid_values[a[0], a[1]] + b

                        # This should fix contest between two equally good 
                        # policies for a given state...
                        if prev_max < c:
                            prev_max = c
                            new_action = action
                            
#                     print("prev_max", prev_max)
#                     print("old, new", old_action, new_action)
#                     print(self.grid_values, "\n")

                    policy[(i, j)] = new_action
                    if new_action!=old_action:
                        stable_policy = False
        
        print("Improved policy:")
        print(self.grid_values)
        print(policy)
        
        if not stable_policy:
            print("Policy not stable yet...", "\n")
            self.policy_evaluation(policy)
            return self.policy_improvement(policy)
            
        else:
            print("Policy stable!", "\n")
            return policy
                
        
    def policy_evaluation(self, policy, epsilon=1e-6, random_policy=False):
        """
        @policy: 
            A dict() object containing state keys and optimal action values.
            
        We again take advantage of the fact that the MDP here is deterministic,
        meaning given a state and a chosen action, P(s', r | s, a) = 1 or 0. This
        is because it mentioned in the question that the agent will go in the direction
        as per the action with probability 1. Hence for the action chosen by the policy 
        we recieve a reward from the `get_new_state_reward()` method with probability 1.
        
        Problem: Current evaluation is indefinite because iteratively the policy keeps decreasing 
        the value and it never converges...
        
        We solve this by keeping the initial policy as equiprobable.
        
        """
        flag = [False]*self.grid_size*self.grid_size
        flag[0] = True
        flag[15] = True
        counter = 0
        while True:
            for i in range(self.grid_size):
                for j in range(self.grid_size):
                
                    init_state_value = np.copy(self.grid_values[i, j])
                    if random_policy:
                        for action in self.actions:
                            a, b = self.get_new_state_reward((i, j), action)
                            self.grid_values[i, j] = 0.25 * (b + self.gamma * self.grid_values[a[0], a[1]])
                    else:
                        a, b = self.get_new_state_reward((i, j), policy[(i, j)])
                        self.grid_values[i, j] = b + self.gamma * self.grid_values[a[0], a[1]]

                    delta = abs(self.grid_values[i, j] - init_state_value)

                    counter +=1
#                     if counter%100==0:
#                         print(counter)
#                         print(self.grid_values)
#                         print(policy)
#                         print(flag)
#                      input()
                    
                    if delta <= epsilon:
                        if flag[i*self.grid_size + j]==False:
                            print("Converged!", i*self.grid_size + j + 1)
                        flag[i*self.grid_size + j] = True
                        
                    if all(i==True for i in flag):
                        print("Reached break point!")
                        return
    
    def value_iteration(self, epsilon=1e-5):
        flag = [False]*self.grid_size*self.grid_size
        flag[0] = True
        flag[15] = True
        counter = 0
        while not all(i==True for i in flag):
            for i in range(self.grid_size):
                for j in range(self.grid_size):
                    if flag[i*self.grid_size + j]==False:
                        
                        init_state_value = np.copy(self.grid_values[i, j])
                        self.grid_values[i, j] = self.get_optimal_value_estimate(i, j)
                        if abs(init_state_value - self.grid_values[i, j] <= epsilon):
                            
                            print("State", str(1 + i*4 + j), "has converged...")
                            flag[i*self.grid_size + j] = True
                            
            print("Iteration:", counter+1)
            print("State Values:")
            print(self.grid_values)
            print("\n")

            counter+=1
                
    def solve_by_policy_iteration(self):
        init_policy = {(0, 0):None, (0, 1):None, (0, 2):None, (0, 3):None,
                       (1, 0):None, (1, 1):None, (1, 2):None, (1, 3):None,
                       (2, 0):None, (2, 1):None, (2, 2):None, (2, 3):None,
                       (3, 0):None, (3, 1):None, (3, 2):None, (3, 3):None}
        

        self.policy_evaluation(init_policy, random_policy=True)
        policy = self.policy_improvement(init_policy)
        return policy
        
#         print(np.round(self.grid_values, decimals=1), "\n")
        
    def solve_by_value_iteration(self):
        self.value_iteration()
        print("Optimal State Values")
        print(np.round(self.grid_values, decimals=1), "\n")

In [61]:
A = gridWorld_2()
optimal_policy = A.solve_by_policy_iteration()
print("Optimal Policy:", optimal_policy)
print("Optimal State values:")
print(A.grid_values)

Converged! 2
Converged! 3
Converged! 4
Converged! 7
Converged! 8
Converged! 11
Converged! 12
Converged! 15
Converged! 5
Converged! 6
Converged! 9
Converged! 10
Converged! 13
Converged! 14
Reached break point!
Improved policy:
[[ 0.         -0.25       -0.3125     -0.328125  ]
 [-0.33333333 -0.33333333 -0.33333333 -0.33333333]
 [-0.33333333 -0.33333333 -0.33333333 -0.33333333]
 [-0.33333333 -0.33333333 -0.33333333  0.        ]]
{(0, 0): None, (0, 1): 'W', (0, 2): 'W', (0, 3): 'W', (1, 0): 'N', (1, 1): 'N', (1, 2): 'N', (1, 3): 'N', (2, 0): 'N', (2, 1): 'W', (2, 2): 'S', (2, 3): 'S', (3, 0): 'N', (3, 1): 'E', (3, 2): 'E', (3, 3): None}
Policy not stable yet... 

Converged! 2
Converged! 3
Converged! 4
Converged! 5
Converged! 6
Converged! 7
Converged! 8
Converged! 9
Converged! 10
Converged! 12
Converged! 13
Converged! 15
Converged! 11
Converged! 14
Reached break point!
Improved policy:
[[ 0. -1. -2. -3.]
 [-1. -2. -3. -4.]
 [-2. -3. -2. -1.]
 [-3. -2. -1.  0.]]
{(0, 0): None, (0, 1): 'W', 

In [72]:
B = gridWorld_2()
B.solve_by_value_iteration()
# print("Optimal Policy:", optimal_policy)
# print("Optimal State values:")
# print(B.grid_values)

Iteration: 1
State Values:
[[ 0. -1. -1. -1.]
 [-1. -1. -1. -1.]
 [-1. -1. -1. -1.]
 [-1. -1. -1.  0.]]


State 2 has converged...
State 6 has converged...
State 14 has converged...
State 18 has converged...
Iteration: 2
State Values:
[[ 0. -1. -2. -2.]
 [-1. -2. -2. -2.]
 [-2. -2. -2. -1.]
 [-2. -2. -1.  0.]]


State 3 has converged...
State 7 has converged...
State 9 has converged...
State 11 has converged...
State 13 has converged...
State 17 has converged...
Iteration: 3
State Values:
[[ 0. -1. -2. -3.]
 [-1. -2. -3. -2.]
 [-2. -3. -2. -1.]
 [-3. -2. -1.  0.]]


State 4 has converged...
State 8 has converged...
State 12 has converged...
State 16 has converged...
Iteration: 4
State Values:
[[ 0. -1. -2. -3.]
 [-1. -2. -3. -2.]
 [-2. -3. -2. -1.]
 [-3. -2. -1.  0.]]


Optimal State Values
[[ 0. -1. -2. -3.]
 [-1. -2. -3. -2.]
 [-2. -3. -2. -1.]
 [-3. -2. -1.  0.]] 



In [64]:
"""
Q7.

Referenced how to make heatmap online (https://github.com/ShangtongZhang/reinforcement-learning-an-introduction/blob/4d8928bb7538d81b818267c983f3fd004ffc9068/chapter04/car_rental.py#L124)
"""
import numpy as np
import scipy
from scipy.stats import poisson as poisson
matplotlib.use('Agg')
class JacksCarRental():
    def __init__(self, init_1=10, init_2=10, rent_lambda_1=3, rent_lambda_2=4, 
                 return_lambda_1=3, return_lambda_2=2, gamma=0.9):
        self.loc_1 = init_1
        self.loc_2 = init_2
        self.rent_lambda_1 = rent_lambda_1
        self.rent_lambda_2 = rent_lambda_2
        self.return_lambda_1 = return_lambda_1
        self.return_lambda_2 = return_lambda_2
        self.gamma = gamma
        
        self.poisson_cache = dict()
                
        # Current Policy moves from Loc 1 to Loc 2.
        # For a max/min of +/- 5
        
        self.actions = [-5, -4, -3, -2, -1, 0, 1, 2, 3, 4 ,5]
                
        # Define state as (i, j) where i = # of cars @ loc 1
        # and j = # of cars @ loc 2.
        
        self.state_values = np.zeros((21, 21))
        self.policy = np.zeros((21, 21), dtype=int)
    
    def get_poisson_value(self, n, lam):
        """
        Return sample drawn from Poisson distribution.
        """
        key = (n, lam)
        if key not in self.poisson_cache:
            self.poisson_cache[key] = poisson.pmf(n, lam)
            
        return self.poisson_cache[key]
        
    
    def get_expected_state_reward(self, state, action):
        """
        input the location (1 or 2) and action ([-5, 5])
        
        Returns state, reward
        """
        reward = 0.0
        reward = reward - abs(action)*2
        
        a = min(state[0] - action, 20)
        b = min(state[1] + action, 20)

        """
        Get expected profits for current state, supposing we have restocked from yesterday and
        rental requests can be in the range of 1 - 20, with other requests being declined.
        """
        for rent_req_1 in range(11):
            for rent_req_2 in range(11):
                
                prob_rent = self.get_poisson_value(rent_req_1, self.rent_lambda_1) * self.get_poisson_value(rent_req_2, self.rent_lambda_2)
                
                # Check valid requests
                cars_loc_1 = a
                cars_loc_2 = b
                rentable_loc_1 = min(cars_loc_1, rent_req_1)
                rentable_loc_2 = min(cars_loc_2, rent_req_2)

                # profit for rental
                profit = 10 * (rentable_loc_1 + rentable_loc_2)
                cars_loc_1 -= rentable_loc_1
                cars_loc_2 -= rentable_loc_2

                """
                Get cars returned for tomorrow:
                """
                
                returned_cars_first_loc = self.return_lambda_1
                returned_cars_second_loc = self.return_lambda_2
                num_of_cars_first_loc = min(cars_loc_1 + returned_cars_first_loc, 20)
                num_of_cars_second_loc = min(cars_loc_2 + returned_cars_second_loc, 20)
                reward += prob_rent * (profit + self.gamma * self.state_values[num_of_cars_first_loc, num_of_cars_second_loc])
                
                """
                for returned_loc_1 in range(11):
                    for returned_loc_2 in range(11):

                        prob_return = self.get_poisson_value(returned_loc_1, self.return_lambda_1) * self.get_poisson_value(returned_loc_2, self.return_lambda_2)

                        new_cars_loc_1 = min(cars_loc_1 + returned_loc_1, 20)
                        new_cars_loc_2 = min(cars_loc_2 + returned_loc_2, 20)

                        reward += prob_rent*prob_return*(profit + self.gamma*self.state_values[new_cars_loc_1, new_cars_loc_2])
                """     
        return reward
                
            
    def policy_improvement(self, epsilon=1e-5):
        """
        @policy:
            A numpy 2d array.
        @state = (i, j)
        
        Internally updates self.grid_values to optimal and returns 
        optimal_state_values (copy of self.grid_values) and optimal_policy
        
        We take advantage of the fact that the MDP here is deterministic,
        meaning given a state and a chosen action, P(s', r | s, a) = 1 or 0. This
        is because it mentioned in the question that the agent will go in the direction
        as per the action with probability 1. Hence for the action chosen by the policy 
        we recieve a reward from the `get_new_state_reward()` method with probability 1.
        """
        stable_policy = True
        for i in range(21):
            for j in range(21):
                    
                old_action = self.policy[i, j]
                new_action = None
                prev_max = -np.inf
                action_rets = list()
                for action in self.actions:
                    if (0<=action<=i) or (-j<=action<=0):

                        c = self.get_expected_state_reward((i, j), action)
                        
                        if c > prev_max:
                            prev_max = c
                            new_action = action

                self.policy[i, j] = new_action
                
                if new_action!=old_action:
                    stable_policy = False
                    
        if stable_policy:
            print("Optimal Policy:")
            print(self.policy)
            return self.policy, True
        
        else:
            print("Not stable policy...")
            return self.policy, False
                
        
    def policy_evaluation(self, epsilon=1e-4):
        """
        @policy: 
            A dict() object containing state keys and optimal action values.
            
        We again take advantage of the fact that the MDP here is deterministic,
        meaning given a state and a chosen action, P(s', r | s, a) = 1 or 0. This
        is because it mentioned in the question that the agent will go in the direction
        as per the action with probability 1. Hence for the action chosen by the policy 
        we recieve a reward from the `get_new_state_reward()` method with probability 1.
        """
        """
        
        Problem: Current evaluation is indefinite because iteratively the policy keeps decreasing 
        the value and it never converges....
        
        """
        flag = [False]*21*21
        delta = 0 
        counter = 0
        while True:
            old_values = np.copy(self.state_values)
            for i in range(21):
                for j in range(21):
                        
                    c = self.get_expected_state_reward((i, j), self.policy[i, j])
                    self.state_values[i, j] = c
#                     print(c)

            max_value = np.max(abs(old_values - self.state_values))
#             print("delta:", max_value)
            if max_value<=epsilon:
                break
            
    def policy_iteration(self):
        counter = 0
        _, axes = plt.subplots(2, 3, figsize=(40, 20))
        plt.subplots_adjust(wspace=0.1, hspace=0.2)
        axes = axes.flatten()
        
        while True:
            fig = sns.heatmap(np.flipud(self.policy), cmap="YlGnBu", ax=axes[counter])
            fig.set_ylabel('# cars at first location', fontsize=30)
            fig.set_yticks(list(reversed(range(20 + 1))))
            fig.set_xlabel('# cars at second location', fontsize=30)
            fig.set_title('policy {}'.format(counter), fontsize=30)
            
            print("Here we go...", "\n")
            self.policy_evaluation()
            a, b = self.policy_improvement()
            
            if b:
                fig = sns.heatmap(np.flipud(self.state_values), cmap="YlGnBu", ax=axes[-1])
                fig.set_ylabel('# cars at first location', fontsize=30)
                fig.set_yticks(list(reversed(range(20 + 1))))
                fig.set_xlabel('# cars at second location', fontsize=30)
                fig.set_title('optimal value', fontsize=30)
                break
                
            counter += 1
            
        plt.show()
        plt.savefig('./plots/fig.png')
        plt.close()

In [None]:
A = JacksCarRental()
A.policy_iteration()

Here we go... 

