imports

In [2]:
import numpy as np
import random


In [83]:
class random_mdp():
  def __init__(self):
    """
    """
    self.floor_plan = [(i,j) for i in range(4) for j in range(3)]
    self.V = [0. for i in range(12)]
    self.possible_actions = ['L','R','U','D']
    self.gamma = 1

  def run(self):
    print('floorplan layout: ') 
    for i in range(4):
      print(self.floor_plan[i*3+0],self.floor_plan[i*3+1],self.floor_plan[i*3+2])

    counter = 0
    converge = True
    while converge:
      counter += 1
      old_V = self.V[:]
      self.perform_iteration() # perform an iteration
      if old_V == self.V: # check for convergence
        print('\nIterations for convergence: ',counter)
        converge = False

    print('\nFinal V values: ')
    for i in range(4):
      print("{:0.2f} {:0.2f} {:0.2f}".format(self.V[i*3+0],self.V[i*3+1],self.V[i*3+2]))

  def perform_iteration(self):
    for state in self.floor_plan: # iterate through all states
      possible_actions = []
      index_current_state = self.floor_plan.index(state) # get the index of the current state
      for action in self.possible_actions: # loop through each possible action
        reward, next_state = self.get_next_state(action,state) # get the reward for moving and the next state
        index_next_state = self.floor_plan.index(next_state) # get the index of the next state
        v = reward + (self.gamma * self.V[index_next_state] *0.25) # calculate value usin bellmanns
        possible_actions.append(v) # save the value
      new_V = sum(possible_actions) / len(possible_actions) # get the mean of all possible actions
      self.V[index_current_state] = new_V
      
  
  def get_next_state(self, action, state):
    old_state = list(state)
    #print(old_state)
    next_state = [3,2]
    reward = 0
    if old_state != [3,2]:
      # check what action was taken and move to the correct state
      if action == 'U': 
          next_state = [old_state[0] -1, old_state[1]]
      if action == 'D':
          next_state = [old_state[0] +1, old_state[1]]
          if (old_state == [2,2]):
            next_state = old_state
      if action == 'L':
          next_state = [old_state[0], old_state[1] -1]
          if (old_state == [1,1]) or (old_state == [1,2]) or (old_state == [3,1]): # If there is no door, dont move
            next_state = old_state
      if action == 'R':
          next_state = [old_state[0], old_state[1] +1]
          if (old_state == [1,0]) or (old_state == [1,1]) or (old_state == [3,0]):
            next_state = old_state

      # correct illegal actions
      if next_state[0] == -1 or next_state[0] == 4 or next_state[1] == -1 or next_state[1] == 3: # If going out of bounds, dont move
        next_state = old_state

      # give rewards
      reward = -1
      if (next_state == [2,0]) and (old_state != next_state): # Adding additional negative, unless already in room
        reward += -7
      if (next_state == [1,1]) and (old_state != next_state):
        reward += -3
      if (next_state == [2,2]) and (old_state != next_state):
        reward += -2

    return reward, tuple(next_state)
     

mdp = random_mdp()
mdp.run()


floorplan layout: 
(0, 0) (0, 1) (0, 2)
(1, 0) (1, 1) (1, 2)
(2, 0) (2, 1) (2, 2)
(3, 0) (3, 1) (3, 2)

Iterations for convergence:  25

Final V values: 
-1.54 -2.17 -1.44
-3.38 -1.61 -1.93
-1.82 -4.41 -1.60
-3.52 -1.46 0.00


In [80]:
class optimal_mdp():
  def __init__(self):
    """
    """
    self.floor_plan = [(i,j) for i in range(4) for j in range(3)]
    self.V = [0. for i in range(12)]
    self.possible_actions = ['L','R','U','D']
    self.gamma = 1

  def run(self):
    print('floorplan layout: ') 
    for i in range(4):
      print(self.floor_plan[i*3+0],self.floor_plan[i*3+1],self.floor_plan[i*3+2])

    counter = 0
    converge = True
    while converge:
      counter += 1
      old_V = self.V[:]
      self.get_optimal_value_function() # perform an iteration
      if old_V == self.V: # check for convergence
        print('\nIterations for convergence to find optimal value function: ',counter)
        converge = False

    print('\nOptimal V values: ')
    for i in range(4):
      print("{:0.2f} {:0.2f} {:0.2f}".format(self.V[i*3+0],self.V[i*3+1],self.V[i*3+2]))
    
    optimal_policy = self.get_optimal_policy()
    print('\nOptimal Policy: ')
    for i in range(4):
      print(optimal_policy[i*3+0],optimal_policy[i*3+1],optimal_policy[i*3+2])

  def get_optimal_policy(self):
    policies = []
    for state in self.floor_plan:
      possible_actions = []
      index_current_state = self.floor_plan.index(state) # get the index of the current state
      for action in self.possible_actions: # loop through each possible action
        reward, next_state = self.get_next_state(action,state) # get the reward for moving and the next state
        index_next_state = self.floor_plan.index(next_state) # get the index of the next state
        v = reward + (self.gamma * self.V[index_next_state]) # calculate value usin bellmanns
        possible_actions.append(v) # save the value
      new_V = max(possible_actions)
      action_index = possible_actions.index(new_V)
      policies.append(self.possible_actions[action_index])
    return policies


  def get_optimal_value_function(self):
    for state in self.floor_plan: # iterate through all states
      possible_actions = []
      index_current_state = self.floor_plan.index(state) # get the index of the current state
      for action in self.possible_actions: # loop through each possible action
        reward, next_state = self.get_next_state(action,state) # get the reward for moving and the next state
        index_next_state = self.floor_plan.index(next_state) # get the index of the next state
        v = reward + (self.gamma * self.V[index_next_state] ) # calculate value usin bellmanns
        possible_actions.append(v) # save the value
      new_V = max(possible_actions) # Bellman Optimality Equation
      self.V[index_current_state] = new_V
      
  
  def get_next_state(self, action, state):
    old_state = list(state)
    #print(old_state)
    next_state = [3,2]
    reward = 0
    if old_state != [3,2]:
      # check what action was taken and move to the correct state
      if action == 'U': 
          next_state = [old_state[0] -1, old_state[1]]
      if action == 'D':
          next_state = [old_state[0] +1, old_state[1]]
          if (old_state == [2,2]):
            next_state = old_state
      if action == 'L':
          next_state = [old_state[0], old_state[1] -1]
          if (old_state == [1,1]) or (old_state == [1,2]) or (old_state == [3,1]): # If there is no door, dont move
            next_state = old_state
      if action == 'R':
          next_state = [old_state[0], old_state[1] +1]
          if (old_state == [1,0]) or (old_state == [1,1]) or (old_state == [3,0]):
            next_state = old_state

      # correct illegal actions
      if next_state[0] == -1 or next_state[0] == 4 or next_state[1] == -1 or next_state[1] == 3: # If going out of bounds, dont move
        next_state = old_state

      # give rewards
      reward = -1
      if (next_state == [2,0]) and (old_state != next_state): # Adding additional negative, unless already in room
        reward += -7
      if (next_state == [1,1]) and (old_state != next_state):
        reward += -3
      if (next_state == [2,2]) and (old_state != next_state):
        reward += -2

    #print(action, self.current_state)
    return reward, tuple(next_state)
     

mdp = optimal_mdp()
mdp.run()


floorplan layout: 
(0, 0) (0, 1) (0, 2)
(1, 0) (1, 1) (1, 2)
(2, 0) (2, 1) (2, 2)
(3, 0) (3, 1) (3, 2)

Iterations for convergence to find optimal value function:  12

Optimal V values: 
-8.00 -7.00 -7.00
-9.00 -3.00 -6.00
-3.00 -2.00 -3.00
-11.00 -1.00 0.00

Optimal Policy: 
R D D
U D D
R D L
U R L
