In [1]:
import numpy as np
import random

In [2]:
class GridWorld:
    def __init__(self):
        self.grid_width = 4
        self.grid_height = 3
        self.start_state = (2, 0)
        self.goal_states = [(0, 3), (1, 3)]
        self.obstacles = [(1, 1)]
        self.action_space = ('UP', 'DOWN', 'RIGHT', "LEFT")
        self.policy = {(0,0):"DOWN", (0,1):"Right", (0,2):"RIGHT",(1,0):"DOWN", (1,2):"UP", (2,0):"Right",(2,1):"RIGHT",(2,2):"RIGHT", (2,3): "UP"}
        self.current_state = self.start_state


    def reset(self):
        self.current_state = self.start_state
        return self.current_state  # Return the state tuple

    def is_terminal(self):
        return self.current_state in self.goal_states

    def stochastic_action(self, action):
        action = action.upper()
        x , y = self.current_state
        if(y == 2):
            pass
            random_number = random.random()
            if( random_number <= 0.7):
                return "UP"
            else:
                return action      
        return action
    
    def next_state(self, action):
        action = self.stochastic_action(action)
        x , y = self.current_state
        # Update state based on action
        if action == "UP":
            new_state = (x - 1, y)
        elif action == "DOWN":
            new_state = (x + 1, y)
        elif action == "RIGHT":
            new_state = (x, y + 1)
        elif action == "LEFT":
            new_state = (x, y - 1)
        
        if self.is_valid_state(new_state):
            return new_state
        return self.current_state
    
    def is_valid_state(self, state):
        x, y = state
        if(state in self.obstacles):
            return False
        if (0 <= x < self.grid_height and 0 <= y < self.grid_width):
            return True
        return False

    def step(self, action):
        action = action.upper()
        # Update state based on action
        if action in self.action_space:
            new_state = self.next_state(action)
        else:
            raise ValueError("Invalid action")

        #Info
        info = {"Previous state":self.current_state, "Action":action, "Next state": new_state, "Reward": 0, "Done": False}
        
        self.current_state = new_state # Update State

        # Check for valid state and update
        reward = 0
        if self.is_terminal():
            reward = 1 if self.current_state == (0, 3) else -1  # Goal 1 reward or failure penalty

        done = self.is_terminal()
        #Update info
        info["Reward"] = reward
        info["Done"] = done

        # Return observation, reward, done, and optionally info dictionary
        return self.current_state, reward, done, info  # Empty info for now
    
    # Quiz 1:

    def reward(self, next_state):
        if(next_state in self.goal_states):
            if(next_state == (0,3)):
                return 1
            else:
                return -1
        return 0

    def next(self, state, action):
        action = action.upper()
        x , y = state
        if action == "UP":
            new_state = (x - 1, y)
        elif action == "DOWN":
            new_state = (x + 1, y)
        elif action == "RIGHT":
            new_state = (x, y + 1)
        elif action == "LEFT":
            new_state = (x, y - 1)

        if self.is_valid_state(new_state):
            return new_state
        return state

        
    def possible_actions(self, state, policy):
        x , y = state
        next_states = {}

        next_state = self.next(state, self.policy[state]) # Find the "next_state" for the "action" required by the policy at "state" 
        next_states[next_state] = 1

        if(y == 2):
            next_states[next_state] *= 0.3
            next_state = self.next(state,"UP")
            next_states[next_state] = next_states.get(next_state, 0) + 0.7

        return next_states
    

    def possible_actions_random(self, state):
        x , y = state
        next_states = {}

        for action in self.action_space:
            next_state = self.next(state, action)
            next_states[next_state] = next_states.get(next_state, 0) + 0.25

        return next_states

    def policy_evaluation(self, policy ,tol = 0.05, gamma = 0.9 ):
        value_function = {(0,0):0, (0,1):0, (0,2):0, (0,3):0, (1,0):0, (1,2):0, (1,3):0, (2,0):0, (2,1):0, (2,2):0, (2,3):0}
        value_function_updated = dict(value_function)
        
        while(1):
            delta = 0
            value_function = dict(value_function_updated) # Update Value Functions
            for state in value_function:
                if(state in self.goal_states):
                    value_function_updated[state] = 0
                    continue
                temp = 0
                next_states = self.possible_actions(state, policy) # Uncomment for policy given by self.policy
                # next_states = self.possible_actions_random(state) # Uncomment for random policy -> ("UP": 0.25, "DOWN": 0.25, "RIGHT": 0.25, "LEFT":0.25) # Note: Value functions for this policy matches Assignment 1
                for next_state in next_states:
                    temp += next_states[next_state]*(self.reward(next_state) + gamma*value_function[next_state])
                value_function_updated[state] = temp
            
            # Calculate delta 
            difference = {key: abs(value_function[key] - value_function_updated.get(key, 0)) for key in value_function} # Difference between prev and current V(s)
            delta = max(max(difference.values()), 0) # Find max value in the difference and return the max(max,0)
            if(delta < tol): # if change < tolerance -> END
                break
                
        return value_function_updated
    
    def possible_actions_action(self, state, action):
        x , y = state
        next_states = {}

        next_state = self.next(state, action) # Find the "next_state" for the "action" required by the policy at "state" 
        next_states[next_state] = 1

        if(y == 2):
            next_states[next_state] *= 0.3
            next_state = self.next(state,"UP")
            next_states[next_state] = next_states.get(next_state, 0) + 0.7

        return next_states
    
    def actionValue(self, value_function, state, action, gamma = 0.9):
        next_states = self.possible_actions_action(state,action) # Returns {(next_state1): its probability, (next_state2): its probability}
        # Action value = probability_of_next_state1*(reward + gamma*(V(next_state1))) + ... 
        action_value = 0
        for next_state, probability in next_states.items():
            action_value += probability*(self.reward(next_state) + gamma*value_function[next_state])

        return action_value
    
    def actionValue_functions(self, policy, value_func, gamma = 0.9):
        action_value = {"UP":0 , "DOWN":0 , "RIGHT":0 , "LEFT":0}
        action_value_functions = {} 
        for state in policy:
            for action in action_value:
                action_value[action] = self.actionValue(value_func, state, action, gamma)
                
            action_value_functions[state] = dict(action_value)

        return action_value_functions
    
    def policy_improvement(self, policy, tol = 0.05, gamma = 0.9):

        policy_stable = False
        while(not policy_stable):
            # step 1: Policy Evaluation
            value_func = self.policy_evaluation(policy, tol = tol, gamma = gamma)

            # Step 2: Policy Improvement
            policy_stable = True
            action_value = {"UP":0 , "DOWN":0 , "RIGHT":0 , "LEFT":0}
            action_value_functions = {} 
            for state in policy:
                for action in action_value:
                    action_value[action] = self.actionValue(value_func, state, action, gamma = gamma)
                    
                best_action = max(action_value, key=action_value.get)
                if(best_action != policy[state]):
                    policy_stable = False
                    policy[state] = best_action
        return policy
        
def Display(action_value_function):
    print("State  : [ UP   | DOWN  | RIGHT  | LEFT ]")
    for state,dic in action_value_function.items():
        up = dic["UP"]
        down = dic["DOWN"]
        right = dic["RIGHT"]
        left = dic["LEFT"]
        print(f"{state} : [{up} , {down}, {right}, {left}] ")
    

# Best Policy

In [6]:
# Test Best Policy:
gw = GridWorld()
best_policy = gw.policy_improvement(gw.policy, tol = 0.000001, gamma = 0.9)
display(best_policy)

{(0, 0): 'RIGHT',
 (0, 1): 'RIGHT',
 (0, 2): 'RIGHT',
 (1, 0): 'UP',
 (1, 2): 'UP',
 (2, 0): 'UP',
 (2, 1): 'RIGHT',
 (2, 2): 'UP',
 (2, 3): 'LEFT'}

In [7]:
# State Value Function of Best Policy:
state_value_function = gw.policy_evaluation(best_policy, tol=0.000001, gamma=0.9) # (self, policy ,tol = 0.05, gamma = 0.9 ):
state_value_function = {key: round(value, 4) for key, value in state_value_function.items()} # Round
state_value_function = [ [state_value_function.get((x,y), 0) for y in range(gw.grid_width )] for x in range(gw.grid_height)] # Place in 2D array
state_value_function = np.array(state_value_function)
print(state_value_function)

[[0.6568 0.7297 0.8108 0.    ]
 [0.5911 0.     0.7297 0.    ]
 [0.532  0.5911 0.6568 0.5911]]


In [8]:
# Action Value Function of Best Policy:
action_value_function =  gw.actionValue_functions(best_policy, state_value_function, gamma=0.9) #def actionValue_functions(self, policy, value_func, gamma = 0.9):
action_value_function[(0,3)] = {"UP": 0 , "DOWN": 0 , "RIGHT": 0, "LEFT": 0}
action_value_function[(1,3)] = {"UP": 0 , "DOWN": 0 , "RIGHT": 0, "LEFT": 0}
for key in action_value_function: # key = (0,0)
    for action in action_value_function[key]: # action = "UP"
        dic = action_value_function[key] 
        dic[action]= round(dic[action], 4)

Display(action_value_function)

State  : [ UP   | DOWN  | RIGHT  | LEFT ]
(0, 0) : [0.5911 , 0.532, 0.6567, 0.5911] 
(0, 1) : [0.6567 , 0.6567, 0.7297, 0.5911] 
(0, 2) : [0.7297 , 0.7078, 0.8108, 0.7078] 
(1, 0) : [0.5911 , 0.4788, 0.532, 0.532] 
(1, 2) : [0.7297 , 0.6881, 0.2108, 0.7078] 
(2, 0) : [0.532 , 0.4788, 0.532, 0.4788] 
(2, 1) : [0.532 , 0.532, 0.5911, 0.4788] 
(2, 2) : [0.6567 , 0.637, 0.6193, 0.6193] 
(2, 3) : [-1.0 , 0.532, 0.532, 0.5911] 
(0, 3) : [0 , 0, 0, 0] 
(1, 3) : [0 , 0, 0, 0] 


In [15]:
# Start to Finish using Best policy
gw.reset()
gw.current_state = (2,0) # Change Start state
done = gw.is_terminal()
while not done:
    action = best_policy[gw.current_state]
    new_state, reward, done, info = gw.step(action)
    print(info.values())

dict_values([(2, 0), 'UP', (1, 0), 0, False])
dict_values([(1, 0), 'UP', (0, 0), 0, False])
dict_values([(0, 0), 'RIGHT', (0, 1), 0, False])
dict_values([(0, 1), 'RIGHT', (0, 2), 0, False])
dict_values([(0, 2), 'RIGHT', (0, 2), 0, False])
dict_values([(0, 2), 'RIGHT', (0, 2), 0, False])
dict_values([(0, 2), 'RIGHT', (0, 3), 1, True])


# Function Tests

In [20]:
gw = GridWorld() 
# policy = {(0,0):"DOWN", (0,1):"Right", (0,2):"RIGHT",(1,0):"DOWN", (1,2):"UP", (2,0):"Right",(2,1):"RIGHT",(2,2):"RIGHT", (2,3): "UP"}
gw.possible_actions((0,2), gw.policy) # Returns possible next states and their probabilities

{(0, 3): 0.3, (0, 2): 0.7}

In [19]:
# Test Policy Evaluation: Uncomment one of the two options for policy
gw = GridWorld()
dic = gw.policy_evaluation(gw.policy, tol = 0.000001, gamma = 0.9)

rounded_dict = {key: round(value, 4) for key, value in dic.items()} # Round

array2d = [ [rounded_dict.get((x,y), 0) for y in range(gw.grid_width )] for x in range(gw.grid_height)] # Place in 2D array
array2d = np.array(array2d)

print(array2d)
print(rounded_dict)

[[ 0.1245  0.7297  0.8108  0.    ]
 [ 0.1383  0.      0.7297  0.    ]
 [ 0.1537  0.1708  0.1897 -1.    ]]
{(0, 0): 0.1245, (0, 1): 0.7297, (0, 2): 0.8108, (0, 3): 0, (1, 0): 0.1383, (1, 2): 0.7297, (1, 3): 0, (2, 0): 0.1537, (2, 1): 0.1708, (2, 2): 0.1897, (2, 3): -1.0}


In [18]:
# Test Action Value: actionValue(self, value_function, state, action, gamma = 0.9):
print(array2d)
action_value = gw.actionValue(rounded_dict, (2,3), "LEFT", gamma=0.9)
print(action_value)

# Test actionValue_functions: actionValue_functions(self, policy, value_func, gamma = 0.9):
gw = GridWorld()
dic = gw.policy_evaluation(gw.policy, tol = 0.000001, gamma = 0.9)
value_function = {key: round(value, 4) for key, value in dic.items()} # Round

dic = gw.actionValue_functions(gw.policy, value_function, gamma = 0.9 )
print(dic)

# Test Policy Improvement
best_policy = gw.policy_improvement(gw.policy, tol = 0.000001, gamma = 0.9)
print(best_policy)



[[ 0.1245  0.7297  0.8108  0.    ]
 [ 0.1383  0.      0.7297  0.    ]
 [ 0.1537  0.1708  0.1897 -1.    ]]
0.17073000000000002
{(0, 0): {'UP': 0.11205, 'DOWN': 0.12447000000000001, 'RIGHT': 0.65673, 'LEFT': 0.11205}, (0, 1): {'UP': 0.65673, 'DOWN': 0.65673, 'RIGHT': 0.72972, 'LEFT': 0.11205}, (0, 2): {'UP': 0.72972, 'DOWN': 0.7078230000000001, 'RIGHT': 0.8108040000000001, 'LEFT': 0.7078230000000001}, (1, 0): {'UP': 0.11205, 'DOWN': 0.13833, 'RIGHT': 0.12447000000000001, 'LEFT': 0.12447000000000001}, (1, 2): {'UP': 0.72972, 'DOWN': 0.562023, 'RIGHT': 0.21080400000000005, 'LEFT': 0.7078230000000001}, (2, 0): {'UP': 0.12447000000000001, 'DOWN': 0.13833, 'RIGHT': 0.15372000000000002, 'LEFT': 0.13833}, (2, 1): {'UP': 0.15372000000000002, 'DOWN': 0.15372000000000002, 'RIGHT': 0.17073000000000002, 'LEFT': 0.13833}, (2, 2): {'UP': 0.65673, 'DOWN': 0.51093, 'RIGHT': 0.18971099999999996, 'LEFT': 0.505827}, (2, 3): {'UP': -1.0, 'DOWN': -0.9, 'RIGHT': -0.9, 'LEFT': 0.17073000000000002}}
{(0, 0): 'R