We have implemented treasure hunt with reinforcement dynamic programming approach in this notebook.

https://www.analyticsvidhya.com/blog/2018/09/reinforcement-learning-model-based-planning-dynamic-programming/

https://towardsdatascience.com/planning-by-dynamic-programming-reinforcement-learning-ed4924bbaa4c

In [1]:
#import required libraries
import numpy as np
import matplotlib.pyplot as plt
from time import sleep
from IPython.display import clear_output
from random import randint, random

In [2]:
# Setting up an enviornment where A is position of agent and position 0 and 15 is a treasure.
# Reward value is -1 for each move except for move that reach treasure which has 100 reward value.
def show_enviornment(A):
  cells = list(range(16))
  env = ".........................\n"
  for index in range(0,16,4):
     line = cells[index:index+4]
     for cell in line:
        if cell == A:
            env += "| o-< "
        elif cell == cells[0] or cell == cells[15]:
            env += "| $   "
        else:
            env += "|     "
     env += "|\n"
     env += ".........................\n"
  print(env)          

In [3]:
# Setting the starting position of Agent to be 8

show_enviornment(8)

.........................
| $   |     |     |     |
.........................
|     |     |     |     |
.........................
| o-< |     |     |     |
.........................
|     |     |     | $   |
.........................



In [4]:
# Setting probabilities for each of the actions left, right, top and down
# Probability of each action is set to 0.25

def set_prob():
    
    return {index: {'U': 0.0, 'D': 0.0, 'L': 0.0,'R': 0.0} if index == 0 or index == 15 #0 for goal
        else {'U': 0.25, 'D': 0.25, 'L': 0.25,'R': 0.25} for index in range(16)} 

In [5]:
set_prob()

{0: {'U': 0.0, 'D': 0.0, 'L': 0.0, 'R': 0.0},
 1: {'U': 0.25, 'D': 0.25, 'L': 0.25, 'R': 0.25},
 2: {'U': 0.25, 'D': 0.25, 'L': 0.25, 'R': 0.25},
 3: {'U': 0.25, 'D': 0.25, 'L': 0.25, 'R': 0.25},
 4: {'U': 0.25, 'D': 0.25, 'L': 0.25, 'R': 0.25},
 5: {'U': 0.25, 'D': 0.25, 'L': 0.25, 'R': 0.25},
 6: {'U': 0.25, 'D': 0.25, 'L': 0.25, 'R': 0.25},
 7: {'U': 0.25, 'D': 0.25, 'L': 0.25, 'R': 0.25},
 8: {'U': 0.25, 'D': 0.25, 'L': 0.25, 'R': 0.25},
 9: {'U': 0.25, 'D': 0.25, 'L': 0.25, 'R': 0.25},
 10: {'U': 0.25, 'D': 0.25, 'L': 0.25, 'R': 0.25},
 11: {'U': 0.25, 'D': 0.25, 'L': 0.25, 'R': 0.25},
 12: {'U': 0.25, 'D': 0.25, 'L': 0.25, 'R': 0.25},
 13: {'U': 0.25, 'D': 0.25, 'L': 0.25, 'R': 0.25},
 14: {'U': 0.25, 'D': 0.25, 'L': 0.25, 'R': 0.25},
 15: {'U': 0.0, 'D': 0.0, 'L': 0.0, 'R': 0.0}}

In [6]:
#Setting positions in enviornment
# For example for position 5, L -> 4 R-> 6 U -> 1 D - > 9
def setup_position():
    positions = {}
    cell_range = list(range(16))
    for index in cell_range :
        if index == 0 or index == 15 :
            positions[index] = {'U': 0, 'D': 0, 'L': 0,'R': 0}
        elif index % 4 == 0:
            positions[index] = {'U' : index-4 if index-4 in cell_range else index,
                                'D' : index+4 if index+4 in cell_range else index,
                                'L' : index,
                                'R' : index+1 if index+1 in cell_range else index}
        elif index % 4 == 3:
            positions[index] = {'U' : index-4 if index-4 in cell_range else index,
                                'D' : index+4 if index+4 in cell_range else index,
                                'L' : index-1 if index-1 in cell_range else index,
                                'R' : index}
        else :
            positions[index] = {'U' : index-4 if index-4 in cell_range else index,
                                'D' : index+4 if index+4 in cell_range else index,
                                'L' : index-1 if index-1 in cell_range else index,
                                'R' : index+1 if index+1 in cell_range else index} 
    return positions
 

In [7]:
setup_position()

{0: {'U': 0, 'D': 0, 'L': 0, 'R': 0},
 1: {'U': 1, 'D': 5, 'L': 0, 'R': 2},
 2: {'U': 2, 'D': 6, 'L': 1, 'R': 3},
 3: {'U': 3, 'D': 7, 'L': 2, 'R': 3},
 4: {'U': 0, 'D': 8, 'L': 4, 'R': 5},
 5: {'U': 1, 'D': 9, 'L': 4, 'R': 6},
 6: {'U': 2, 'D': 10, 'L': 5, 'R': 7},
 7: {'U': 3, 'D': 11, 'L': 6, 'R': 7},
 8: {'U': 4, 'D': 12, 'L': 8, 'R': 9},
 9: {'U': 5, 'D': 13, 'L': 8, 'R': 10},
 10: {'U': 6, 'D': 14, 'L': 9, 'R': 11},
 11: {'U': 7, 'D': 15, 'L': 10, 'R': 11},
 12: {'U': 8, 'D': 12, 'L': 12, 'R': 13},
 13: {'U': 9, 'D': 13, 'L': 12, 'R': 14},
 14: {'U': 10, 'D': 14, 'L': 13, 'R': 15},
 15: {'U': 0, 'D': 0, 'L': 0, 'R': 0}}

In [8]:
#Setting probability of each position

def set_position_prob():
    states = list(range(16))
    positions = setup_position()
    
    probability_position = {}
    
    for state in states:
        for action in ["U", "R", "D", "L"]:
          for prime in states:
            probability_position[(prime, -1, state, action)] = 0 if prime != positions[state][action] else 1
            
    return probability_position

In [9]:
set_position_prob()

{(0, -1, 0, 'U'): 1,
 (1, -1, 0, 'U'): 0,
 (2, -1, 0, 'U'): 0,
 (3, -1, 0, 'U'): 0,
 (4, -1, 0, 'U'): 0,
 (5, -1, 0, 'U'): 0,
 (6, -1, 0, 'U'): 0,
 (7, -1, 0, 'U'): 0,
 (8, -1, 0, 'U'): 0,
 (9, -1, 0, 'U'): 0,
 (10, -1, 0, 'U'): 0,
 (11, -1, 0, 'U'): 0,
 (12, -1, 0, 'U'): 0,
 (13, -1, 0, 'U'): 0,
 (14, -1, 0, 'U'): 0,
 (15, -1, 0, 'U'): 0,
 (0, -1, 0, 'R'): 1,
 (1, -1, 0, 'R'): 0,
 (2, -1, 0, 'R'): 0,
 (3, -1, 0, 'R'): 0,
 (4, -1, 0, 'R'): 0,
 (5, -1, 0, 'R'): 0,
 (6, -1, 0, 'R'): 0,
 (7, -1, 0, 'R'): 0,
 (8, -1, 0, 'R'): 0,
 (9, -1, 0, 'R'): 0,
 (10, -1, 0, 'R'): 0,
 (11, -1, 0, 'R'): 0,
 (12, -1, 0, 'R'): 0,
 (13, -1, 0, 'R'): 0,
 (14, -1, 0, 'R'): 0,
 (15, -1, 0, 'R'): 0,
 (0, -1, 0, 'D'): 1,
 (1, -1, 0, 'D'): 0,
 (2, -1, 0, 'D'): 0,
 (3, -1, 0, 'D'): 0,
 (4, -1, 0, 'D'): 0,
 (5, -1, 0, 'D'): 0,
 (6, -1, 0, 'D'): 0,
 (7, -1, 0, 'D'): 0,
 (8, -1, 0, 'D'): 0,
 (9, -1, 0, 'D'): 0,
 (10, -1, 0, 'D'): 0,
 (11, -1, 0, 'D'): 0,
 (12, -1, 0, 'D'): 0,
 (13, -1, 0, 'D'): 0,
 (14, -1, 0, 'D'):

In [10]:
# This function sets all the steps to reach tresure

def policy_eval(policy, start_position=None, verbose=False):
    l = list(range(16))
    positions = setup_position()
    position_of_agent = randint(1, 14) if start_position is None else start_position
        
    step_number = 1
    action_taken = None
    
    if verbose:
        print("Move: {} Position: {} Action: {}".format(step_number, position_of_agent, action_taken))
        show_enviornment(position_of_agent)
        print("\n")
        sleep(2)
    
    while not (position_of_agent == 0 or position_of_agent == 15):
        if verbose:
            clear_output(wait=True)
            print("Move: {} Position: {} Action: {}".format(step_number, position_of_agent, action_taken))
            show_enviornment(position_of_agent)
            print("\n")
            sleep(1)
        
        current_policy = policy[position_of_agent]
        next_move = random()
        lower_bound = 0
        for action, chance in current_policy.items():
            if chance == 0:
                continue
            if lower_bound <= next_move < lower_bound + chance:
                position_of_agent = positions[position_of_agent][action]
                action_taken = action
                break 
            lower_bound = lower_bound + chance
                
        step_number += 1   
    
    if verbose:
        clear_output(wait=True)
        print("Move: {} Position: {} Action: {}".format(step_number, position_of_agent, action_taken))
        show_enviornment(position_of_agent)
        print("Treasure found")
    
    return step_number

In [11]:
data = []

for i in range(1000):
    clear_output(wait=True)
    print("{}%\n".format((i + 1) / 10))
    data.append(policy_eval(set_prob()))
    
print("Average number of steps taken by agent : {}".format(sum(data)/len(data)))

100.0%

Average number of steps taken by agent : 20.087


In [12]:
# If we run this it give all the steps taken by agent in order to reach goal
policy_eval(set_prob(), verbose=True)

Move: 5 Position: 15 Action: R
.........................
| $   |     |     |     |
.........................
|     |     |     |     |
.........................
|     |     |     |     |
.........................
|     |     |     | o-< |
.........................

Treasure found


5

We found the path agent can opt for to reach the treasure. However, this is not an optimal solution.
So let's improve our algorithm to improve performance.

In [13]:
def policy_improvement(V_s):
    positions = setup_position()
    policy = {}
        
    for state in range(16):
        
        state_values = {a: V_s[positions[state][a]] for a in ['U', 'D', 'R', 'L']}
        
        if state == 0 or state == 15:
            policy[state] = {'U': 0.0, 'R': 0.0, 'D': 0.0, 'L': 0.0}
        else:
            max_actions = [k for k, v in state_values.items() if v == max(state_values.values())]
            policy[state] = {a: 1 / len(max_actions) if a in max_actions else 0.0 
                             for a in ['U', 'D', 'R', 'L']}
    return policy

In [14]:
def policy_iteration(policy, max_iterations=0.01, discount_rate=0.5):
    V_s = {i: 0 for i in range(16)} 
    position_prob = set_position_prob() 

    delta = 100 
    while not delta < max_iterations: 
        delta = 0 
        for state in range(16):
            v = V_s[state] 
            
            total = 0 
            for action in ["U", "R", "D", "L"]:
                action_total = 0
                for state_prime in range(16):
                  action_total += position_prob[(state_prime, -1, state, action)] * (-1 + discount_rate * V_s[state_prime])
                #print(state,action)
                total += policy[state][action] * action_total   
                
            V_s[state] = round(total, 1) 
            delta = max(delta, abs(v - V_s[state])) 
    return V_s

In [15]:
# Checking the probability of each step in action and calling the function twice to see exactly how the computation of probabilities changes
policy = set_prob()
V_s = policy_iteration(policy)
policy = policy_improvement(V_s)
print(V_s)

V_s = policy_iteration(policy)
policy = policy_improvement(V_s)
print(V_s)

{0: 0.0, 1: -1.7, 2: -1.9, 3: -1.9, 4: -1.7, 5: -1.9, 6: -1.9, 7: -1.9, 8: -1.9, 9: -1.9, 10: -1.9, 11: -1.7, 12: -1.9, 13: -1.9, 14: -1.7, 15: 0.0}
{0: 0.0, 1: -1.0, 2: -1.5, 3: -1.8, 4: -1.0, 5: -1.5, 6: -1.8, 7: -1.5, 8: -1.5, 9: -1.8, 10: -1.5, 11: -1.0, 12: -1.8, 13: -1.5, 14: -1.0, 15: 0.0}


In [16]:
policy = set_prob()
#policy = create_random_policy()
policy_iteration(policy)

{0: 0.0,
 1: -1.7,
 2: -1.9,
 3: -1.9,
 4: -1.7,
 5: -1.9,
 6: -1.9,
 7: -1.9,
 8: -1.9,
 9: -1.9,
 10: -1.9,
 11: -1.7,
 12: -1.9,
 13: -1.9,
 14: -1.7,
 15: 0.0}

In [17]:
# Now we will see how many steps it would take

data = []

for i in range(1000):
    clear_output(wait=True)
    print("{}%\n".format((i + 1) / 10))
    data.append(policy_eval(policy))
    
print("Average number of steps taken by agent : {}".format(sum(data)/len(data)))

100.0%

Average number of steps taken by agent : 18.7


In [18]:
policy_eval(policy, verbose=True)

Move: 9 Position: 0 Action: L
.........................
| o-< |     |     |     |
.........................
|     |     |     |     |
.........................
|     |     |     |     |
.........................
|     |     |     | $   |
.........................

Treasure found


9

In [19]:
#Improves coding efficiency
def value_iteration(V_s, theta=0.01, discount_rate=0.5):
    position_prob = set_position_prob()

    delta = 100
    while not delta < theta:
        delta = 0
        for state in range(1, 15):
            v = V_s[state]
            
            totals = {}
            for action in ["U", "D", "R", "L"]:
                total = 0
                for state_prime in range(16):
                    total += position_prob[(state_prime, -1, state, action)] * (-1 + discount_rate * V_s[state_prime])
                totals[action] = total
            
            V_s[state] = round(max(totals.values()), 4)
            delta = max(delta, abs(v - V_s[state]))
    return V_s

In [20]:
V_s = {i: 0 for i in range(16)}
V_s = value_iteration(V_s)
policy = policy_improvement(V_s)

print(V_s)

{0: 0, 1: -1.0, 2: -1.5, 3: -1.75, 4: -1.0, 5: -1.5, 6: -1.75, 7: -1.5, 8: -1.5, 9: -1.75, 10: -1.5, 11: -1.0, 12: -1.75, 13: -1.5, 14: -1.0, 15: 0}


In [21]:
data = []

for i in range(1000):
    clear_output(wait=True)
    print("{}%\n".format((i + 1) / 10))
    data.append(policy_eval(policy))
    
print("Average number of steps taken by agent : {}".format(sum(data)/len(data)))

100.0%

Average number of steps taken by agent : 3.017


In [22]:
policy_eval(policy, verbose=True)

Move: 4 Position: 0 Action: U
.........................
| o-< |     |     |     |
.........................
|     |     |     |     |
.........................
|     |     |     |     |
.........................
|     |     |     | $   |
.........................

Treasure found


4