In [1]:
import numpy as np
import random
from tabulate import tabulate
from tqdm import tqdm
import ast

In [2]:
def generate_grid_world(length, width,path_lenght,holes_number,Random_State):
    
    random.seed(Random_State)
    #store all cells in a list
    Grid_Cells = []
    for row in range(length):
        for col in range(width):
            Grid_Cells.append([row,col])


    #specify the number of holes in the gridworld
    
    #specify the start point as a random cell
    start = [random.randint(0, length), random.randint(0, width)]

    #create a path from start point
    """instead of defining start and goal points,
      we define just a start point and a random path with a random lenght to
       another point and name it as goal point"""
    
    def random_path(Start, Path_Lenght,length, width):
        
        Path = []
        Path.append(Start)
        for i in range(Path_Lenght):
            
            #there are two moves that take us on a random cell named Goal [1,0], [0,1]
            
            move = random.choice([[1,0], [0,1]])
            
            #update the start cell/point by the above move
            Start = [x + y for x, y in zip(Start, move)]
            
            #if the movement take us out of our gridworld, we reverse the change in the start point
            if Start[0] < 0 or Start[1] < 0 or Start[0] > length-1 or Start[1] > width-1:

                Start = [x - y for x, y in zip(Start, move)]

            else:
                
                #create a path history
                Path.append(Start)

        Goal = Start

        return Goal,Path
    

    GoalPath = random_path(start, path_lenght,length, width)

    goal = GoalPath[0]
    path = GoalPath[1]

    #now we must eliminate the path cells from the Grid_Cells to choose hole cells from remaining cells

    FreeCells = [x for x in Grid_Cells if x not in path]

    Holes = random.sample(FreeCells, holes_number)

    #Also, we can visualize our gridworld in a simple way

    def mark_holes(holes):
        marked_data = [["Hole" if [row, col] in holes else [row, col] for col in range(width)] for row in range(length)]
        return marked_data
    
    marked_matrix = mark_holes(Holes)

    print(tabulate(marked_matrix, tablefmt="grid"))

    
    return length, width, start, goal, Holes, path,Grid_Cells

In [3]:
environment = generate_grid_world(5, 4,4,4,39)
environment

+--------+--------+--------+--------+
| Hole   | [0, 1] | [0, 2] | [0, 3] |
+--------+--------+--------+--------+
| [1, 0] | [1, 1] | [1, 2] | [1, 3] |
+--------+--------+--------+--------+
| Hole   | [2, 1] | [2, 2] | [2, 3] |
+--------+--------+--------+--------+
| Hole   | [3, 1] | Hole   | [3, 3] |
+--------+--------+--------+--------+
| [4, 0] | [4, 1] | [4, 2] | [4, 3] |
+--------+--------+--------+--------+


(5,
 4,
 [1, 2],
 [4, 3],
 [[2, 0], [3, 2], [3, 0], [0, 0]],
 [[1, 2], [1, 3], [2, 3], [3, 3], [4, 3]],
 [[0, 0],
  [0, 1],
  [0, 2],
  [0, 3],
  [1, 0],
  [1, 1],
  [1, 2],
  [1, 3],
  [2, 0],
  [2, 1],
  [2, 2],
  [2, 3],
  [3, 0],
  [3, 1],
  [3, 2],
  [3, 3],
  [4, 0],
  [4, 1],
  [4, 2],
  [4, 3]])

In [4]:
def probability_distribution(grid_size,randomness):
    #random.seed(40)
    
    #by this function we generate probabilities which their sum is equal to 1
    def generate_probabilities(n):

        numbers = [random.random() for _ in range(n)]
        total_sum = sum(numbers)
        scaled_numbers = [num / total_sum for num in numbers]
        
        return scaled_numbers
    
    cells_prob = {}
    if randomness == 'stochastic':
        for cell in range(grid_size):
            
            #we set the number of probs to 4 due to 4 possible action for each cell (go to its neighbors)
            probs = generate_probabilities(4)

            cells_prob[cell] = probs
    elif randomness == 'equal probable':

        for cell in range(grid_size):

            cells_prob[cell] = [0.25,0.25,0.25,0.25]
    
    elif randomness == 'deterministic':
        for cell in range(grid_size):

            cells_prob[cell] = [0.03,0.06,0.01,0.9] #[0,0,0,1] ##[0.15,.15,0.1,0.6]


    #Note that we consider the correspondence between probabilities and actions as below:
    #probs = [p1, p2, p3, p4] ---> [[1,0],[-1,0],[0,1],[0,-1]]

    return cells_prob

def neighbor_cells(cell):

    grid_cells = environment[6]
    Actions = [[1,0],[-1,0],[0,1],[0,-1]]

    Neighbors = []
    Actions_Neighbors = []
    for action in Actions:

        neighbor = [x + y for x, y in zip(cell, action)]
        #if neighbor not in environment[4]:
        Neighbors.append(neighbor)
        Actions_Neighbors.append(action)

    return Neighbors, Actions_Neighbors

#Note
"""As we want to use monte carlo method for estimating the state values
   it has been assumed that we have not any knowledge about the environment.
   Therefore, we should consider the transitions into the holes cells
   (against the case of policy iteration)"""

def arbitrary_policy(randomness):
    #random.seed(randomness)
    
    policy = {}
    policy_action = {}
    for state in environment[6]:

        if state not in environment[4]:

            neighbors = neighbor_cells(state)[0]
            Actions_Neighbors = neighbor_cells(state)[1]

            allowed_positions = []

            for neighbor in neighbors:
                
                if neighbor in environment[6] and neighbor not in environment[4]:
                    
                    allowed_positions.append(neighbor)
        
            next_state = random.choice(allowed_positions)

            row = next_state[0] - state[0]
            col = next_state[1] - state[1]
            PolicyAction = [row, col]

            policy['{}'.format(state)] = next_state
            policy_action['{}'.format(state)] = PolicyAction


    return policy, policy_action

def state_reward(policy,state):

    policy_state = policy[0]
    
    next_state = policy_state[str(state)]

    if next_state in environment[4]:

        r = -3
    
    elif next_state == environment[3]:

        r = 100
    
    elif next_state not in environment[6]:

        r = -2
    
    else:

        r = -1
    
    return r


state_indice_dict = {}
counter = 0
for state in environment[6]:

    state = str(state)
    state_indice_dict[state] = counter
    counter = counter + 1


def generate_trajectory(policy,randomness,environment_stochasticity):

    policy_action = policy[1]
    probs = probability_distribution(environment[0]*environment[1],environment_stochasticity)
    start = environment[2]
    terminate = start
    trajectory = [start]
    c = 0
    while terminate != environment[3]:
        random.seed(randomness+c)
        Actions = [[1,0],[-1,0],[0,1],[0,-1]]
        action = policy_action[str(terminate)]
        Actions.remove(action)
        sorted_actions = Actions + [action]
        state_indice = state_indice_dict[str(terminate)]
        actions_prob = probs[state_indice]
        actions_prob.sort()

        selected_action = random.choices(sorted_actions, actions_prob)[0]
        
        next_state = [x + y for x, y in zip(terminate, selected_action)]
        
        #if the agent goes out of the gridworld, it stays in its current state
        if next_state not in environment[6]:
            next_state = terminate
        
        #if it drops into the holes, it goes to the start points
        elif next_state in environment[4]:
            next_state = start  

        terminate = next_state
        trajectory.append(terminate)
        c = c+1

    return trajectory

## Tabular TD(0) for estimating $v_{\pi}$

In [5]:
def TD_zero(num_trials, policy, alpha, gamma,environment_stochasticity):

    policy_state = policy[0]
    policy_action = policy[1]

    grid_size = environment[0]*environment[1]
    
    V = {}
    for state in environment[6]:
    
        if state not in environment[4] and state != environment[3]:

            V[str(state)] = 0
    
    for trial in tqdm(range(num_trials)):

        trajectory = generate_trajectory(policy,trial,environment_stochasticity)
        
        #state start from start point to terminal point
        for state in trajectory[:-1]:
            
            #s_prime in the algorithm pseudocode
            next_state = policy_state[str(state)]

            if next_state == environment[3]:
                
                V[str(next_state)] = 0

            reward = state_reward(policy,state)

            V[str(state)] = V[str(state)] +\
                alpha * (reward + gamma * V[str(next_state)] - V[str(state)])
        
    
    return V

In [25]:
policy_0 = arbitrary_policy(41)

TD_zero(10000, policy_0, 0.01, 0.9, 'deterministic')

  0%|          | 0/10000 [00:00<?, ?it/s]

100%|██████████| 10000/10000 [00:13<00:00, 724.49it/s]


{'[0, 1]': -9.999999999999108,
 '[0, 2]': -9.999999999999108,
 '[0, 3]': -9.999999999999108,
 '[1, 0]': 54.95260578515885,
 '[1, 1]': 62.17099999999748,
 '[1, 2]': -9.999999999999108,
 '[1, 3]': -9.999999999999108,
 '[2, 1]': 70.18999999999758,
 '[2, 2]': -9.999999999999108,
 '[2, 3]': -9.999999999999108,
 '[3, 1]': 79.09999999999809,
 '[3, 3]': -9.999999999999108,
 '[4, 0]': 79.00171819404777,
 '[4, 1]': 88.99999999999866,
 '[4, 2]': 99.99999999999929,
 '[4, 3]': 0}

In [29]:
TD_zero(10000, policy_0, 0.01, 0.9, 'stochastic')

100%|██████████| 10000/10000 [00:38<00:00, 261.33it/s]


{'[0, 1]': -9.999999999999108,
 '[0, 2]': -9.999999999999108,
 '[0, 3]': -9.999999999999108,
 '[1, 0]': 54.953899999997375,
 '[1, 1]': 62.17099999999748,
 '[1, 2]': -9.999999999999108,
 '[1, 3]': -9.999999999999108,
 '[2, 1]': 70.18999999999758,
 '[2, 2]': -9.999999999999108,
 '[2, 3]': -9.999999999999108,
 '[3, 1]': 79.09999999999809,
 '[3, 3]': -9.999999999999108,
 '[4, 0]': 79.09999999999809,
 '[4, 1]': 88.99999999999866,
 '[4, 2]': 99.99999999999929,
 '[4, 3]': 0}

## Sarsa (On-policy TD control) for estimating $Q \approx q_{*}$

In [28]:
def state_reward_policy_free(state, Final_action):

    next_state = [x + y for x, y in zip(state, Final_action)]

    if next_state in environment[4]:
        r = -3
    elif next_state == environment[3]:
        r = 100
    elif next_state not in environment[6]:
        r = -2
    else:
        r = -1 
    return r

def argmax_policy(q_values):

    policy = {}
    for state in list(q_values.keys()):

        value_action_state = reverse_dictionary(q_values[state])
        Max_val = max(list(value_action_state.keys()))
        best_action = value_action_state[Max_val]
        policy[state] = ast.literal_eval(best_action)
    
    return policy

def reverse_dictionary(dict):
    reverse_dict = {}
    for key in list(dict.keys()):
        val = dict[key]
        reverse_dict[val] = key
    return reverse_dict


def sarsa(num_trials, alpha, gamma,environment_stochasticity, epsilon):

    grid_size = environment[0]*environment[1]

    probs = probability_distribution(grid_size,environment_stochasticity)
    
    Q = {}
    for state in environment[6]:

        if state not in environment[4]:
            
            Q[str(state)] = {}

            for action in ["[1, 0]","[-1, 0]","[0, 1]","[0, -1]"]:

                #next_state = [x + y for x, y in zip(state, ast.literal_eval(action))]

                #if (next_state in environment[6]) and next_state not in environment[4]:
                    
                Q[str(state)][action] = random.uniform(1e-9, 1e-8)

    def state_action_nextstate(current_state):

        #probs = probability_distribution(grid_size,42)

        if type(current_state) == str:

            state = ast.literal_eval(current_state)
        else:
            state = current_state
        #Choose action using policy derived from Q===================================
        value_action_state = reverse_dictionary(Q[str(state)])
        Max_val = max(list(value_action_state.keys()))
        best_action = value_action_state[Max_val]
        best_action = ast.literal_eval(best_action)

        #============================================================================
        #Epsilon Greedy
        if random.uniform(0, 1) > epsilon:

            selected_action = best_action
        
        else:
            Actions = [[1,0],[-1,0],[0,1],[0,-1]]
            Actions.remove(best_action)
            epsilon_action = random.choice(Actions)

            selected_action = epsilon_action
        #============================================================================
        
        Actions = [[1,0],[-1,0],[0,1],[0,-1]]
        Actions.remove(selected_action)
        sorted_actions = Actions + [selected_action]
        state_indice = state_indice_dict[str(state)]
        actions_prob = probs[state_indice]
        actions_prob.sort()
        #due to stochasticity of the environment
        Final_action = random.choices(sorted_actions, actions_prob)[0]
        #print(type(state), type(Final_action))
        
        next_state = [x + y for x, y in zip(state, Final_action)]

        if next_state not in environment[6] or next_state in environment[4]:

            next_state = current_state

        return Final_action, next_state
    

    
    for trial in tqdm(range(num_trials)):

        next_state = environment[2]

        while next_state != environment[3]:
            
            state = next_state
            
            action_nextstate = state_action_nextstate(state)

            action = action_nextstate[0]
            next_state = action_nextstate[1]

            next_action = state_action_nextstate(next_state)[0]


            if next_state == environment[3]:

                for action in [[1,0],[-1,0],[0,1],[0,-1]]:
                
                    Q[str(next_state)][str(action)] = 0

            reward = state_reward_policy_free(state, action)

            Q[str(state)][str(action)] = Q[str(state)][str(action)] +\
                alpha * (reward + gamma * Q[str(next_state)][str(next_action)] - Q[str(state)][str(action)])
        
    
    return Q

In [29]:
sarsa(10000, 0.1, 0.9, 'stochastic', 0.1)

  0%|          | 0/10000 [00:00<?, ?it/s]

100%|██████████| 10000/10000 [00:14<00:00, 684.19it/s]


{'[0, 1]': {'[1, 0]': -11.535359788507034,
  '[-1, 0]': -12.863424219391383,
  '[0, 1]': -11.477634282403372,
  '[0, -1]': -13.986793694189751},
 '[0, 2]': {'[1, 0]': -11.018717283503106,
  '[-1, 0]': -12.082059842743401,
  '[0, 1]': -10.609945966467095,
  '[0, -1]': -12.110869134712946},
 '[0, 3]': {'[1, 0]': -9.572918978267332,
  '[-1, 0]': -11.83531649715452,
  '[0, 1]': -11.723790980244095,
  '[0, -1]': -11.022489123404304},
 '[1, 0]': {'[1, 0]': -15.962348086147738,
  '[-1, 0]': -15.784316278787013,
  '[0, 1]': -11.82799537119439,
  '[0, -1]': -15.589713903856037},
 '[1, 1]': {'[1, 0]': -10.980892663653316,
  '[-1, 0]': -11.922267790513352,
  '[0, 1]': -10.56625470492576,
  '[0, -1]': -13.786075148993659},
 '[1, 2]': {'[1, 0]': -9.26367684058797,
  '[-1, 0]': -10.932132172150302,
  '[0, 1]': -9.99729352471521,
  '[0, -1]': -12.113140414102501},
 '[1, 3]': {'[1, 0]': -7.023973236908226,
  '[-1, 0]': -10.66604123628411,
  '[0, 1]': -10.542717736712769,
  '[0, -1]': -10.7300071892042

## Q-learning (off-policy TD control) for estimating ${\pi} \approx {\pi}_{*}$

In [25]:
def Q_learning(num_trials, alpha, gamma,environment_stochasticity, epsilon):

    grid_size = environment[0]*environment[1]

    probs = probability_distribution(grid_size,environment_stochasticity)

    Q = {}
    for state in environment[6]:

        if state not in environment[4]:
            
            Q[str(state)] = {}

            for action in ["[1, 0]","[-1, 0]","[0, 1]","[0, -1]"]:

                Q[str(state)][action] = random.uniform(1e-9, 1e-8)


    def state_action_nextstate(current_state):

        #probs = probability_distribution(grid_size,42)

        if type(current_state) == str:

            state = ast.literal_eval(current_state)
        else:
            state = current_state
        #Choose action using policy derived from Q===================================
        value_action_state = reverse_dictionary(Q[str(state)])
        Max_val = max(list(value_action_state.keys()))
        best_action = value_action_state[Max_val]
        best_action = ast.literal_eval(best_action)

        #============================================================================
        #Epsilon Greedy
        if random.uniform(0, 1) > epsilon:

            selected_action = best_action
        
        else:
            Actions = [[1,0],[-1,0],[0,1],[0,-1]]
            Actions.remove(best_action)
            epsilon_action = random.choice(Actions)

            selected_action = epsilon_action
        #============================================================================
        
        Actions = [[1,0],[-1,0],[0,1],[0,-1]]
        Actions.remove(selected_action)
        sorted_actions = Actions + [selected_action]
        state_indice = state_indice_dict[str(state)]
        actions_prob = probs[state_indice]
        actions_prob.sort()
        #due to stochasticity of the environment
        Final_action = random.choices(sorted_actions, actions_prob)[0]
        #print(type(state), type(Final_action))
        
        next_state = [x + y for x, y in zip(state, Final_action)]

        if next_state not in environment[6] or next_state in environment[4]:

            next_state = current_state
        
        value_action_state = reverse_dictionary(Q[str(next_state)])
        #max Q(s',s)
        Max_q_val = max(list(value_action_state.keys()))
        best_action = value_action_state[Max_q_val]
        best_action = ast.literal_eval(best_action)

        return Final_action, next_state, Max_q_val
    
    policy = {}
    path = [environment[2]]
    
    for trial in tqdm(range(num_trials)):

        next_state = environment[2] #sorry for bad names

        while next_state != environment[3]:
            
            state = next_state
            
            action_nextstate = state_action_nextstate(state)

            action = action_nextstate[0]
            next_state = action_nextstate[1]

            next_action = state_action_nextstate(next_state)[0]


            if next_state == environment[3]:

                for action in [[1,0],[-1,0],[0,1],[0,-1]]:
                
                    Q[str(next_state)][str(action)] = 0

            reward = state_reward_policy_free(state, action)

            Max_q_val = action_nextstate[2]

            Q[str(state)][str(action)] = Q[str(state)][str(action)] +\
                alpha * (reward + gamma * Max_q_val - Q[str(state)][str(action)])
                
            
            if trial  == num_trials - 1: #the last trial

                policy[str(state)] = [action, next_state]
                path.append(next_state)

        
    
    return Q, policy, path

In [21]:
Qlearning = Q_learning(100000, 0.1, 0.9, 'deterministic', 0.1)

  0%|          | 0/100000 [00:00<?, ?it/s]

100%|██████████| 100000/100000 [00:46<00:00, 2168.44it/s]


In [22]:
# Q-values

Qlearning[0]

{'[0, 1]': {'[1, 0]': -4.095099996888001,
  '[-1, 0]': -5.685518126911136,
  '[0, 1]': -4.095099996888001,
  '[0, -1]': -6.685589984172322},
 '[0, 2]': {'[1, 0]': -3.4389999965422264,
  '[-1, 0]': -5.095099996888001,
  '[0, 1]': -3.4389999965422264,
  '[0, -1]': -4.685589997199196},
 '[0, 3]': {'[1, 0]': -2.7099999961580314,
  '[-1, 0]': -4.438999996542224,
  '[0, 1]': -4.438999996542224,
  '[0, -1]': -4.095099996888001},
 '[1, 0]': {'[1, 0]': -6.667726150911664,
  '[-1, 0]': -6.685074344977916,
  '[0, 1]': -4.095099996888001,
  '[0, -1]': -5.68558750553322},
 '[1, 1]': {'[1, 0]': -3.4389999965422264,
  '[-1, 0]': -4.685589997199196,
  '[0, 1]': -3.4389999965422264,
  '[0, -1]': -4.685589997199196},
 '[1, 2]': {'[1, 0]': -2.7099999961580314,
  '[-1, 0]': -4.095099996888001,
  '[0, 1]': -2.7099999961580314,
  '[0, -1]': -4.095099996888001},
 '[1, 3]': {'[1, 0]': -1.899999995731148,
  '[-1, 0]': -3.4389999965422264,
  '[0, 1]': -3.7099999961580314,
  '[0, -1]': -3.4389999965422264},
 '[2

In [23]:
#optimal policy
Qlearning[1]

{'[1, 2]': [[0, 1], [1, 3]],
 '[1, 3]': [[1, 0], [2, 3]],
 '[2, 3]': [[1, 0], [3, 3]],
 '[0, 3]': [[1, 0], [1, 3]],
 '[3, 3]': [[0, -1], [4, 3]]}

In [24]:
#path from start point to the terminal point
Qlearning[2]

[[1, 3], [2, 3], [1, 3], [0, 3], [1, 3], [2, 3], [3, 3], [4, 3]]