In [147]:
import numpy as np
import random
import math

class State:
    def  __init__(self, x, y, h):
        self.x = x
        self.y = y
        self.h = h
    
    def set_state(self, state):
        self.x = state.x
        self.y = state.y
        self.h = state.h
        
    def equals(self, new_state):
        if self.x == new_state.x and self.y == new_state.y and self.h == new_state.h:
            return True
        else:
            return False
        
    def print_state(self):
        print("State x,y,h is: " + str(self.x) + "," + str(self.y)+  "," + str(self.h))
    
class Matrix:
    def __init__(self, length, width, p_error, x ,y, h):
        self.length = length
        self.width = width
        self.p_error = p_error
        self.grid = []
        self.state = State(x,y,h)
        self.vertical_positive = [11,0,1]
        self.vertical_negative = [5,6,7]
        self.horizontal_positive = [2,3,4]
        self.horizontal_negative = [10,9,8]
        for i in range(self.width):
            temp = [0] * length
            self.grid.append(temp)
        self.reward_dict = {}
    
    def get_reward_map(self):
        return self.reward_dict
            
    def set_state(self, new_state):
        self.state = new_state
    
    def get_grid(self):
        self.grid[self.state.y][self.state.x] = 1
        return np.array(self.grid)

    def pre_rotation(self):
        v = random.random()
        #print("The pre rotation probability: " + str(v))
        if v < self.p_error:
            return -1
        elif v >= self.p_error and v < 2*self.p_error:
            return 1
        else:
            return 0
    
    def test_move(self, action, error):
        h = (self.state.h + error) % 12
        multiplier = 1
        if h in self.horizontal_negative or h in self.vertical_negative:
            multiplier = -1
        new_x = self.state.x
        new_y = self.state.y
        #print(new_x)
        #print(new_y)
        #print(h)
        #print(error)
        if h in self.horizontal_negative or h in self.horizontal_positive:
            temp_x = self.state.x + (multiplier*action[0])
            if 0 <= temp_x <= self.length - 1:
                new_x += (multiplier*action[0])
        else:
            temp_y = self.state.y + (multiplier*action[0])
            if 0 <= temp_y <= self.width - 1:
                new_y += (multiplier*action[0])
        new_h = (h + action[1]) % 12
        #print(new_x)
        #print(new_y)
        #print(new_h)
        #print("------")
        return State(new_x, new_y, new_h)

    def probability_new_state(self, input_action, new_state):
        if input_action[0] == 0:
            if self.state.equals(new_state):
                return 1
            else:
                return 0
        else:
            probability = 0
            errors = [-1, 0, 1]
            for e in errors:
                result_state = self.test_move(input_action, e)
                if new_state.equals(result_state):
                    #if new_x == result_state[0] and new_y == result_state[1] and new_h == result_state[2]:
                    if e == 1 or e == -1:
                        probability += self.p_error
                    else:
                        probability += (1 - 2*self.p_error)
            return probability
        
    def move(self, action):
        h = (self.state.h + self.pre_rotation()) % 12
        multiplier = 1
        if h in self.horizontal_negative or h in self.vertical_negative:
            multiplier = -1
        new_x = self.state.x
        new_y = self.state.y
        if h in self.horizontal_negative or h in self.horizontal_positive:
            temp_x = self.state.x + (multiplier*action[0])
            if 0 <= temp_x <= self.length - 1:
                new_x += (multiplier*action[0])
        else:
            temp_y = self.state.y + (multiplier*action[0])
            if 0 <= temp_y <= self.width - 1:
                new_y += (multiplier*action[0])
        new_h = (h + action[1]) % 12
        return State(new_x, new_y, new_h)
        
    def get_next_state(self, p_error, initial_state, action):
        self.set_state(initial_state)
        new_states = []
        x,y,h = initial_state.x, initial_state.y, initial_state.h
        new_positions = [(x,y), (x-1,y), (x+1,y), (x,y-1), (x,y+1)]
        new_headings = [(h-2)%12, (h-1)%12, h, (h+1)%12, (h+2)%12]
        for p in new_positions:
            for o in new_headings:
                new_states.append(State(p[0], p[1], o))
        probabilities = []
        for s in new_states:
            prob = self.probability_new_state(action, s)
            probabilities.append(prob)
        #print("Sum of probabilities is: " + str(sum(probabilities)))
        #print(probabilities)
        v = random.random()
        #print(v)
        cumulative = 0
        for i in range(len(probabilities)):
            cumulative += probabilities[i]
            if v <= cumulative and probabilities[i] != 0:
                return new_states[i]
    
    def get_next_state_probabilities(self, p_error, initial_state, action):
        self.p_error = p_error
        return_value = []
        self.set_state(initial_state)
        new_states = []
        x,y,h = initial_state.x, initial_state.y, initial_state.h
        #pre-rotate left
        #ns=State(x,y,np.mod(h-1,12))
        
        positions = [(x,y), (x-1,y), (x+1,y), (x,y-1), (x,y+1)]
        new_positions = []
        for p in positions:
            if 0 <= p[0] <= self.length - 1 and 0 <= p[1] <= self.width - 1:
                new_positions.append((p[0],p[1]))
        new_headings = [(h-2)%12, (h-1)%12, h, (h+1)%12, (h+2)%12]
        for p in new_positions:
            for o in new_headings:
                new_states.append(State(p[0], p[1], o))
        probabilities = []
        for s in new_states:
            prob = self.probability_new_state(action, s)
            return_value.append((s, prob, self.get_reward(s)))
        return return_value
        
            
    def get_reward(self, input_state):
        x,y = input_state.x, input_state.y
        return self.reward_dict[(x,y)]

m = Matrix(8,8,0.0,0,0,0)

In [148]:
def do():
    m = Matrix(3, 4, 0.25, 1, 2, 0)
    #print(m.get_grid())
    '''
    action = [1,1]
    new_state = State(0,0,9)
    p = m.probability_new_state(action, new_state)
    print(p)
    '''
    new_state = m.get_next_state(0.25, State(1,2,0), [1,1])
    x,y,h = new_state.x, new_state.y, new_state.h
    return x,y,h

def main():
    m = Matrix(3,4,0.25,1,2,0)
    '''
    d = {}
    for i in range(100000):
        h = do()
        if h in d:
            d[h] += 1
        else:
            d[h] = 1
        if i % 500 == 0:
            print(i)
    print(d)
    '''
    print(m.get_reward_map(x))


In [149]:
#This part deals with Question 2 and setting the reward map.
def make_reward_map():
    d = {}
    for i in range(8):
        for j in range(8):
            if i == 7 or j == 7:
                d[(i,j)] = -100
            elif i == 0 or j == 0:
                d[(i,j)] = -100
            elif i == 5 and j == 6:
                d[(i,j)] = 1
            else:
                d[(i,j)] = 0
    d[(3,6)] = -10
    d[(3,5)] = -10
    d[(3,4)] = -10
    return d
m.reward_dict = make_reward_map()
print(m.get_reward(State(0,0,3)))
print(m.get_reward(State(6,5,3)))
print(m.get_reward(State(4,6,3)))
print(m.get_reward(State(5,3,11)))



-100
0
0
0


In [150]:
#Question 3
#Algorithm for Part 3 initial question. We are using Manhattan distance as the metric. The algorithm just 
#returns the action which corresponds to the lowest Manhattan distance out of 6 possible actions. Unless the agent
#is in the goal state, it must move.

goal_state = State(5,6,None)

def manhattan_distance(current_state):
    return abs(current_state.x - goal_state.x) + abs(current_state.y - goal_state.y)

def initial_policy(current_state):
    if current_state.x == goal_state.x and current_state.y == goal_state.y:
        return [0,0]
    actions = ([1,1],[1,0],[1,-1],[-1,1],[-1,0],[-1,-1])
    distances = []
    for a in actions:
        m.set_state(current_state)
        new_state = m.test_move(a, 0)
        distances.append(manhattan_distance(new_state))
    index = distances.index(min(distances))
    return actions[index]
    '''
    t = []
    for i in range(len(distances)):
        if distances[i] == min_d:
            t.append(actions[i])
    t = sorted(t, key=lambda x: t[1])
    return t[-1]
    '''
    

states = []
p0_action_state_dict = {}
for i in range(m.length):
    for j in range(m.width):
        for h in range(12):
            s = State(i,j,h)
            states.append(s)
            
print(len(states))

for s in states:
    a = initial_policy(s)
    p0_action_state_dict[(s.x,s.y,s.h)] = a

def plot_trajectory(initial_state, p_error, policy_dict):
    s = initial_state
    m.set_state(initial_state)
    states = []
    steps = 100
    while steps > 0:
        if s.x == goal_state.x and s.y == goal_state.y:
            print("Foudn goal!")
            break
        else:
            action = policy_dict[(s.x,s.y,s.h)]
            s = m.move(action)
            m.set_state(s)
            states.append(s)
            steps -= 1
    return states

states = plot_trajectory(State(3,3,0), 0, p0_action_state_dict)
for s in states:
    s.print_state()
    
#def policy_evaluation(input_state, discount_factor):
    
    
    

768
Foudn goal!
State x,y,h is: 3,4,1
State x,y,h is: 3,5,2
State x,y,h is: 4,5,3
State x,y,h is: 5,5,4
State x,y,h is: 6,5,5
State x,y,h is: 6,6,6
State x,y,h is: 6,5,7
State x,y,h is: 6,6,8
State x,y,h is: 5,6,9


In [151]:
#Value iteration Question 4
import copy

def value_distance(V1, V2):
    diff = 0
    for k in V1:
        diff += abs(V1[k] - V2[k])
    return diff
    

V = {}
states = []
for i in range(m.length):
    for j in range(m.width):
        for h in range(12):
            V[(i,j,h)] = 0
            states.append(State(i,j,h))

def value_iteration(gamma, p_error):
    m.p_error = p_error
    step = 0
    while step < 1000:
        V_prev = copy.deepcopy(V)
        for s in states:
            actions = ([1,1],[1,0],[1,-1],[-1,1],[-1,0],[-1,-1],[0,0])
            action_score = []
            for a in actions:
                score = np.sum([p*(r + gamma*V_prev[(n_s.x, n_s.y, n_s.h)]) for n_s, p, r in m.get_next_state_probabilities(p_error, s, a)])
                action_score.append(score)
            max_score=max(action_score)
            V[(s.x, s.y, s.h)] = max_score
        print(value_distance(V_prev, V))
        if abs(value_distance(V_prev, V)) < 0.001:
            return V, V_prev
        step += 1
        print(step)

#V, V_prev = value_iteration(0.9, 0.25)

                               
            

In [133]:
print(V[0,1,0])
print(V_prev[0,1,0])

-203.84303927793366
-203.84304048929553


In [152]:
def policy_from_value(V, gamma, p_error):
    #empty policy
    #for every state
        #for every action
            #score from optimal value func
    policy={}
    states = []
    for i in range(m.length):
        for j in range(m.width):
            for h in range(12):
                states.append(State(i,j,h))
    
    for s in states:
        actions = ([1,1],[1,0],[1,-1],[-1,1],[-1,0],[-1,-1],[0,0])
        action_score = []
        for a in actions:
            score = np.sum([p*(r + gamma*V[(n_s.x, n_s.y, n_s.h)]) for n_s, p, r in m.get_next_state_probabilities(p_error, s, a)])
            action_score.append(score)
        max_index = action_score.index(max(action_score))
        best_action = actions[max_index]
        policy[(s.x,s.y,s.h)] = best_action
    return policy

#trajectory = plot_trajectory(State(1,6,6), 0, policy_from_value(V, 0.9, 0))       
#for t in trajectory:
#    t.print_state()

In [153]:
def policy_evaluation(policy, gamma, p_error):
    
    V = {}
    states = []
    step = 0
    for i in range(m.length):
        for j in range(m.width):
            for h in range(12):
                V[(i,j,h)] = 0
                states.append(State(i,j,h))
    while step < 1000:
        V_prev = copy.deepcopy(V)
        for s in states:
            action = policy[(s.x,s.y,s.h)]
            V[(s.x,s.y,s.h)] = np.sum([p*(r + gamma*V_prev[(n_s.x, n_s.y, n_s.h)]) for n_s, p, r in m.get_next_state_probabilities(p_error, s, action)])
        #print(value_distance(V_prev, V))
        if abs(value_distance(V_prev, V)) < 0.001:
            return V, V_prev
        step += 1
        #print(step)
            
       

In [154]:
#V, V_prev = policy_evaluation(p0_action_state_dict, 0.9, 0.25)
#print(V[0,1,0])
#print(V_prev[0,1,0])

In [165]:
def policy_difference(P1, P2):
    for k in P1:
        a1 = P1[k]
        a2 = P2[k]
        if a1[0] == a2[0] and a1[1] == a2[1]:
            continue
        else:
            return False
    return True

def policy_iteration(gamma, p_error):
    policy = copy.deepcopy(p0_action_state_dict)
    step = 0
    while True:
        V,_ = policy_evaluation(policy, gamma, p_error)
        #print(len(list(V.keys())))
        #print("Done with V")
        policy_new = policy_from_value(V, gamma, p_error)
        #print("Finsihed getting new policy")
        if policy_difference(policy, policy_new):
            return policy
        else:
            policy = copy.deepcopy(policy_new)
            step += 1
            print(step)

            
        

In [167]:
policy = policy_iteration(0.9, 0.25)
m.p_error = 0.25
trajectory = plot_trajectory(State(1,6,6), 0.25, policy)       
for t in trajectory:
    t.print_state()

1
2
3
4
5
6
7
Foudn goal!
State x,y,h is: 1,5,7
State x,y,h is: 1,6,6
State x,y,h is: 1,5,7
State x,y,h is: 2,5,7
State x,y,h is: 1,5,8
State x,y,h is: 2,5,8
State x,y,h is: 1,5,8
State x,y,h is: 2,5,7
State x,y,h is: 2,4,7
State x,y,h is: 2,3,8
State x,y,h is: 3,3,9
State x,y,h is: 4,3,8
State x,y,h is: 5,3,7
State x,y,h is: 5,4,6
State x,y,h is: 5,5,6
State x,y,h is: 5,6,8
