In [4]:
# The grid is defined with lengh and width. Total number of states and actions are calculated. 

import numpy as np
import matplotlib.pyplot as plt

L = 8 # grid length constant
W = 8 # grid width constant

S = [] # initialize the state space

# iterate all possible states
for x in range(W):
    for y in range(L):
        for dirc in range(12):
            S.append([x,y,dirc])
            
A = [] # initialize the action space

# define possible individual actions
SIT = 0
FWD = 1
BWD = -1
NO_TURN = 0
L_TURN = -1
R_TURN = 1

# list all possible combined actions
for step in [SIT, FWD, BWD]:
    if step != SIT:
        for turn in [NO_TURN,L_TURN, R_TURN]:
            A.append([step, turn])
    else:
        A.append([SIT, NO_TURN])

In [2]:
# This function returns possible next states and their corresponding probabilities based on the current state,  
# next action and the (constant) pre-rotate error probability. 

def next_state_distr(s, a, p_e):
    x = s[0]
    y = s[1]
    dirc = s[2]
    step = a[0]
    turn = a[1]
    S_next_distr = {}
    s_index = 0
    
    if step != 0: 
        for turn_error in [NO_TURN,L_TURN, R_TURN]: # handle pre-rotate error
            x_new = x
            y_new = y
            s_p_new = []
            p_sa = 0
            if turn_error == 0:
                dirc_new = dirc
                p_sa = 1-2*p_e
            else:
                dirc_new = dirc + turn_error
                p_sa = p_e

            dirc_new = dirc_new % 12 # translational movement according to the direction
            if dirc_new in [11, 0, 1]:
                y_new = y + step
            elif dirc_new in [2, 3, 4]:
                x_new = x + step
            elif dirc_new in [5, 6, 7]:
                y_new = y - step
            else: 
                x_new = x - step
            
            dirc_new = (dirc_new + turn) % 12
            if x_new < 0 or x_new >= W or y_new < 0 or y_new >= L: # handle attempts to move off grid
                s_p_new = [x, y, dirc_new, p_sa]
            else:
                s_p_new = [x_new, y_new, dirc_new, p_sa] 
            
            S_next_distr[s_index] = s_p_new
            s_index += 1

    else:
        S_next_distr[s_index] = [x, y, dirc, 1]
    return S_next_distr

In [3]:
# This function returns the probability of the next state based on the current state, 
# next action and the (constant) pre-rotate error probability. 

def next_state_p(s, a, s_prime, p_e):
    p_sa = 0
    S_next_distr = next_state_distr(s, a, p_e)
    
    # iterate through all possible states in case there are repeated cases
    for s_index in S_next_distr:
        s_new = S_next_distr[s_index][0:3]
        p_new = S_next_distr[s_index][3]
        if s_new == s_prime:
            p_sa += p_new 
    return p_sa

In [4]:
# This function returns a state that is generated based on the current state, the action 
# and the (constant) pre-rotate probability.

import random 

def next_state(s, a, p_e):
    S_next_distr = next_state_distr(s,a,p_e)
    seed = random.random()
    prob = 0
    
    # campare random number with the aggregated probablity of potential next states
    for s_index in S_next_distr:
        prob += S_next_distr[s_index][3]
        if seed < prob:
            state = S_next_distr[s_index][0:3]
            return state
    return None

In [5]:
# This function returns the reward given the current state.

import numpy

def reward(s):
    x = s[0]
    y = s[1]
    
    # define rewards all over the map 
    R = numpy.zeros((W, L))
    # define lane markers
    for i in [4, 5, 6]:
        R[3][i] = -10
    # define walls
    for i in range(W):
        for j in range(L):
            if i == 0 or i == 7 or j == 0 or j == 7:
                R[i][j] = -100
    # define goal sqaure
    R[5][6] = 1
    
    return R[x][y]

In [6]:
print('1.1 For a grid of %d x %d, there are %d states.' % (L, W, len(S)))

1.1 For a grid of 8 x 8, there are 768 states.


In [14]:
print('1.2 The robot can choose from %d actions.' % len(A))
for n in A:
    print(action_to_words(n))

1.2 The robot can choose from 7 actions.
['SIT', 'NO_TURN']
['FWD', 'NO_TURN']
['FWD', 'L_TURN']
['FWD', 'R_TURN']
['BWD', 'NO_TURN']
['BWD', 'L_TURN']
['BWD', 'R_TURN']


In [8]:
# This function returns the word for an action, based on the input of an action array.
# It will be used in testings later. 
def action_to_words(a):
    a_word = []
    if a[0] == SIT:
        a_word.append('SIT')
    elif a[0] == FWD:
        a_word.append('FWD')
    else:
        a_word.append('BWD')
    
    if a[1] == NO_TURN:
        a_word.append('NO_TURN')
    elif a[1] == L_TURN:
        a_word.append('L_TURN')
    else:
        a_word.append('R_TURN')
    
    return a_word

In [9]:
# Four cases are tested for 1(c). 

print('1.3 Test cases: ')
states = [[1,1,0], [2,3,5], [7,7,6], [5,6,2]]
next_states = [[1,1,0], [2,2,4], [7,7,8], [4,6,3]]
actions = [[SIT, NO_TURN], [FWD, L_TURN], [BWD, R_TURN], [BWD, NO_TURN]]
p_errors = [0, 0.1, 0.2, 0.25]

for i in range(len(p_errors)):
    s = states[i]
    a = actions[i]
    s_prime = next_states[i]
    p_e = p_errors[i]
    p_sa = next_state_p(s, a, s_prime, p_e)
    
    print('Case %s: current state:%s, action:%s, next state:%s, p_e:%s' \
          % (i+1, s, action_to_words(a), s_prime, p_e))
    print('p_sa = %s' % p_sa)

1.3 Test cases: 
Case 1: current state:[1, 1, 0], action:['SIT', 'NO_TURN'], next state:[1, 1, 0], p_e:0
p_sa = 1
Case 2: current state:[2, 3, 5], action:['FWD', 'L_TURN'], next state:[2, 2, 4], p_e:0.1
p_sa = 0.8
Case 3: current state:[7, 7, 6], action:['BWD', 'R_TURN'], next state:[7, 7, 8], p_e:0.2
p_sa = 0.2
Case 4: current state:[5, 6, 2], action:['BWD', 'NO_TURN'], next state:[4, 6, 3], p_e:0.25
p_sa = 0.25


In [10]:
# Four cases are tested for 1(d). Distribution of next states generated are displayed. 
# Test cases are defined in 1(c).

S_SIZE = 100
RUN = 5
print('1.4 Counts of next states generated with %d run and the sample size of %d:' % (RUN, S_SIZE))


for i in range(len(p_errors)):
    s = states[i]
    a = actions[i]
    p_e = p_errors[i]
    print('Case %s: current state:%s, action:%s, p_e:%s' % (i+1, s, action_to_words(a), p_e))
    for r in range(RUN):
        next_state_counter = {}
        for n in range(S_SIZE):
            s_prime = str(next_state(s, a, p_e))

            if s_prime in next_state_counter:
                next_state_counter[s_prime] += 1
            else:
                next_state_counter[s_prime] = 1
        print('Run %s: p_prime and counts: %s' % (r+1, next_state_counter))

1.4 Counts of next states generated with 5 run and the sample size of 100:
Case 1: current state:[1, 1, 0], action:['SIT', 'NO_TURN'], p_e:0
Run 1: p_prime and counts: {'[1, 1, 0]': 100}
Run 2: p_prime and counts: {'[1, 1, 0]': 100}
Run 3: p_prime and counts: {'[1, 1, 0]': 100}
Run 4: p_prime and counts: {'[1, 1, 0]': 100}
Run 5: p_prime and counts: {'[1, 1, 0]': 100}
Case 2: current state:[2, 3, 5], action:['FWD', 'L_TURN'], p_e:0.1
Run 1: p_prime and counts: {'[2, 2, 4]': 74, '[2, 2, 5]': 14, '[3, 3, 3]': 12}
Run 2: p_prime and counts: {'[2, 2, 4]': 88, '[3, 3, 3]': 5, '[2, 2, 5]': 7}
Run 3: p_prime and counts: {'[2, 2, 4]': 80, '[2, 2, 5]': 10, '[3, 3, 3]': 10}
Run 4: p_prime and counts: {'[2, 2, 5]': 13, '[2, 2, 4]': 75, '[3, 3, 3]': 12}
Run 5: p_prime and counts: {'[2, 2, 4]': 83, '[2, 2, 5]': 7, '[3, 3, 3]': 10}
Case 3: current state:[7, 7, 6], action:['BWD', 'R_TURN'], p_e:0.2
Run 1: p_prime and counts: {'[7, 7, 8]': 16, '[7, 7, 7]': 70, '[7, 7, 6]': 14}
Run 2: p_prime and count

In [11]:
# Map for rewards is displayed, assuming the origin is at the bottom left 
# and x axis to the right. 

print('2. Reward Map')
for j in range(L):    
    for i in range(W):
        r = int(reward([i,7-j,0]))
        if i == 7 :
            print(r)
        else:
            print(r, end = ' ' * (6 - len(str(r))))

2. Reward Map
-100  -100  -100  -100  -100  -100  -100  -100
-100  0     0     -10   0     1     0     -100
-100  0     0     -10   0     0     0     -100
-100  0     0     -10   0     0     0     -100
-100  0     0     0     0     0     0     -100
-100  0     0     0     0     0     0     -100
-100  0     0     0     0     0     0     -100
-100  -100  -100  -100  -100  -100  -100  -100


In [1]:
# Problem 4(a)
def value_iteration(state, pe, discount, theta):
    #initialization of Policy and Value.
    V = {}
    policy = {}
    for s in state:
        V[s] = 0.0
        policy[s] = (0, 0)

    while True:
        delta = 0       
        for s in state:
            action_value = 0.0
            action = [0, 0]
            for a in A:
                value = 0.0
                state_list = next_state_dist(s, a, pe)
                for st in range(len(state_list)):
                    test_state = state_list[st][0:3]
                    value += next_state_prob(s, a, test_state, pe)*(reward(test_state) + discount * V[test_state])
                if value == max(action_value, value):
                    action_value = value
                    action = a
            policy[s] = action
            delta = max(delta, abs(action_value - V[s]))
            V[s] = action_value         
        
        if delta < theta:
            break

    return policy, V

In [2]:
# Function used to generate the visual grid world
import matplotlib.pyplot as plt
def generate_trajectory(policy, s, pe=0.0, show=True):
    """
    Generate a trajectory using policy from initial state s to the goal and
    visualize the trajectory on the grid world.
    
    Args:
        Policy: a directionary including all states and their related action.
        Initial state s = (x, y, h).
        p_e: the error probability Pe to pre-rotate when chosing to move. 0 < Pe <= 0.5
    
    Returns:
        Trajectory including passing states and actions on each state
    """
    
    # Confirm the feasibility of p_e
    if not (pe >= 0.0 and pe <= 0.5):
        raise ValueError('The error probability must be between 0 and 0.5')
        
    # Generate the trajectory

    traj = []
    s_now = s
    while True:
        traj.append([s_now, policy[s_now]])
        if (policy[s_now] == [0, 0]):
            break
        if (s_now[0] == 3) and (s_now[1] == 4):
            break
        P_state = next_state_dist(s_now, policy[s_now], pe)
        statetest = 0
        stateindex = 0
        for i in range(len(P_state)):
            if P_state[i][3] > statetest:
                statetest = P_state[i][3]
                stateindex = i
        s_next = P_state[stateindex]
        s_now = s_next[0:3]

    
    # Grid world initilization
    fig = plt.figure(figsize = (L,W))
    ax = fig.add_subplot(1,1,1)
    plt.xlim((0, L))
    plt.ylim((0, W))
    plt.grid(True, color = 'k')

    # Plot red markers at edges
    edge1 = plt.Rectangle((0,0), 1, 8, color = 'r')
    edge2 = plt.Rectangle((1,0), 7, 1, color = 'r')
    edge3 = plt.Rectangle((7,1), 1, 7, color = 'r')
    edge4 = plt.Rectangle((1,7), 6, 1, color = 'r')
    ax.add_patch(edge1)
    ax.add_patch(edge2)
    ax.add_patch(edge3)
    ax.add_patch(edge4)

    # Plot yellow markers
    yellow1 = plt.Rectangle((3,4), 1, 3, color = 'gold', alpha = 0.8)
    ax.add_patch(yellow1)

    # Plot green goal
    goal = plt.Rectangle((5,6), 1, 1, color = 'limegreen', alpha = 0.8)
    ax.add_patch(goal)

    # Plot the start state
    plt.plot(s[0]+0.5, s[1]+0.5, 'c*', markersize = '40')
    ax.arrow(s[0]+0.5, s[1]+0.5, 0.4*np.sin(30*s[2]*np.pi/180),0.4*np.cos(30*s[2]*np.pi/180), \
             head_width = 0.1, head_length = 0.2, fc = 'k', ec = 'k')
    
    # Plot all passing states
    for i in range(1, len(traj)):
        x1 = traj[i-1][0][0]
        y1 = traj[i-1][0][1]
        x2 = traj[i][0][0]
        y2 = traj[i][0][1]
        h = traj[i][0][2]
        plt.plot([x1+0.5, x2+0.5], [y1+0.5, y2+0.5], 'k--')
        plt.plot(x2+0.5, y2+0.5, 'bo', markersize = '10')
        ax.arrow(x2+0.5, y2+0.5, 0.4*np.sin(30*h*np.pi/180),0.4*np.cos(30*h*np.pi/180), \
                 head_width = 0.1, head_length = 0.2, fc = 'k', ec = 'k')
    
    if show:
        plt.show()
        
    return traj

In [3]:
# Problem 4(b)
import time
start_time = time.time()
policy_optimal, V_optimal = value_iteration(S, 0.0, 0.9, 0.01)
end_time = time.time()
s0 = (1, 6, 6)
traj_sample = generate_trajectory(policy_optimal, s0, pe=0.0, show=True)

V_value_ite = 0
for i in range(0, len(traj_sample)):
    V_value_ite += V_optimal[traj_sample[i][0]]
    print(V_optimal[traj_sample[i][0]])

print("4(b). Trajectory ([state, action]) from %s to the goal is: " % str(s0), traj_sample, ", with optimal value:" ,V_value_ite)

NameError: name 'S' is not defined