# Implementing Value Iteration to Find Optimal $\pi'$ for a Karel Task $T_2$
# Documentation
There are multiple classes used to implement value iteration to find the optimal policy $\pi$. 
## bcoolors
This class is only useful for the output map coloring
## Direction
An enum structer specifying directions
## Action
An enum structer specifiyng possible actions
## ENV 
The class where all environment-related details are stored.
## MDP
This class contains all MDP-related values, implements value iteration and executes the reached policy.

The last two will be expanded on before their respective cells.

Importing Necessary libraries

In [40]:
import numpy as np
import time
from enum import Enum
from time import sleep
from IPython.display import clear_output
from ast import literal_eval as make_tuple
import random
import time
import itertools
# seed random number generator
random.seed(73)

In [41]:
class bcolors:
    HEADER = '\033[95m'
    OKBLUE = '\033[94m'
    OKCYAN = '\033[96m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'

class Direction(Enum):
    UP = 0
    RIGHT = 1
    DOWN = 2
    LEFT = 3
    
class Action(Enum):
    move = 0 
    turnLeft = 1 
    turnRight = 2 
    pickMarker = 3
    finish = 4

# Environemnt Class (ENV)
This class contains all environment-relataed details in a paramettrized manner. 
## ENV Attribues:
- `W` (int): width of the map
- `H`(int): height of the map
- `walls` list(): a list containing all tuple positions of the walls
- `avatar_looks` list(): contains different orientations looks indexed in-order with the `Direction` enum class.
- `map` list(list()): contains the current Karel configuragion based on the AVATAR's state.
## ENV Methods
- `make_env(self, state)`: takes the state of the avatar and generates the environemnt accordingly, then stores it in `self.map`.
- `show(self, state)`: prints the current map given the AVATAR's state.

In [42]:
class ENV():
    def __init__(self, W=6, H=6, markers=[(2, 1), (5, 0)], walls=[(0, 5), (1, 3), (3, 2), (4, 3)]):
        self.W = W
        self.H = H
        self.walls = walls
        self.markers = markers
        self.avatar_looks = ['^', '>', 'v', '<']
    def make_env(self, State):
        self.map = [['.' for i in range(self.W)] for j in range(self.H)]
        if self.walls is not None:
            for wall in self.walls:
                self.map[wall[0]][wall[1]] = '#'
        for i in range(len(self.markers)):
            if State[3+i] == 1:
                self.map[self.markers[i][0]][self.markers[i][1]] = 'm'
        self.map[State[0]][State[1]] = self.avatar_looks[State[2]]
    def show(self, State):
        self.make_env(State)
        for r in range(self.W):
            for c in range(self.H):
                if self.map[r][c] in self.avatar_looks:
                    print(bcolors.OKBLUE + self.map[r][c] + bcolors.ENDC," ", end='')
                elif self.map[r][c] == 'm':
                    print(bcolors.WARNING + 'm' + bcolors.ENDC," ", end='')
                elif self.map[r][c] == '#':
                    print(bcolors.FAIL + '#' + bcolors.ENDC," ", end='') 
                else:
                    print(self.map[r][c], " ", end='')
            print('\n')

# Makrov-Decision Process Class (MDP)
This class contain all MDP-related dynamics.
## MDP Attributes
- `initial_state`: a 3-tuple containg the intial state of task $T_1$, which is `(1, 2, Direction.LEFT.value)`.
- `final_state`: a 3-tuple represeing the post-grid configuration, which is `(3, 2, Direction.RIGHT.value)`
- `env`: an `ENV()` class instance
- `S`: a list of 3-tuples containing all possible states, $(i, j, dir)$ such that:  

>>> $0 \leq i \leq HEIGHT -1$  

>>> $0 \leq j \leq WIDTH - 1 $  

>>> $dir \in \{UP, RIGHT, DOWN, LEFT\}$

>>> it also include the terminal state, namely, `terminal`

- `A`: a dictionary of $S$, which contains all possible actions given $S$. This is $A(S)$
- `V`: a dictionary of $S$, which represent the value function of $S$. This is $V(S)$
## MDP Methods
- `generateState(self):` generates all possible states and assigns them to the attribute `S`.
- `generateActions(self):` generates $A(S)$ and assigns them to $A$.
- ` generateV(self)`: $\forall_{s\in S} V(S) \gets 0$
- `reward(s, a)`: takes $s$ and $a$, returns $100$ if the state is the desired post-grid and $a=finish$. Returns -1, otherwise.
- `p(s, a)`: represents the environment dynamics which takes $s$ and $a$ and returns the next state $s'$ and immediate reward $r$.
- `ValueIteration(theta, V, S, A, gamma=0.97)`: implements the value iteration algorithm in the textbook (pg: 105), returns the reached optimal policy $\pi'$.
- `executePolicy(self, pi)`: given $\pi$, and class attributes such as $S$, $A(S)$ and $V(S)$, executes the policy $\pi$ while using `self.env` for visualization of the execution.

In [43]:
class MDP():
    def __init__(self, inital_state = (3, 1, Direction.LEFT.value, 1, 1), final_state=(5, 2, Direction.RIGHT.value, 1, 0)):
        self.env = ENV()
        self.S = self.generateState()
        self.A = self.generateActions()
        self.V = self.generateV()
        self.initial_state = inital_state
        self.final_state = final_state
        self.p_dict = self.generate_p()

    
    def generateState(self):
        DIRECTIONS = [Direction.UP.value, Direction.RIGHT.value, Direction.DOWN.value, Direction.LEFT.value] 
        # Generate all possible states
        S = []
        perms = list(itertools.product([0, 1], repeat=2))
        for i in range(self.env.W):
            for j in range(self.env.H):
                for dir in DIRECTIONS:
                    for perm in perms:
                        S.append((i, j, dir, perm[0], perm[1]))
        print("State space size=", len(S))
        S.append('terminal')
        return S

    def generateActions(self):
        # generate all possible actions given every state
        A = {}
        for s in self.S:
            A[s] = []
            if s == 'terminal':
                continue
            # these actions are possible from all states
            A[s].append(Action.turnLeft.value)
            A[s].append(Action.turnRight.value)
            A[s].append(Action.finish.value)
            A[s].append(Action.pickMarker.value)
            A[s].append(Action.move.value)
        return A
        
    def generateV(self):
        # generate V(s)
        V = {}
        for s in self.S:
            V[s] = 0
        # set terminal state value to 0
        V['terminal'] = 0
        return V

    def generate_p(self):
        p_dict = {}
        for s in self.S:
            for a in self.A[s]: 
                r, s_ = self.p(s, a)
                p_dict[(s, a)] = (r, s_)
                
        return p_dict
        
    def reward(self, s, a):
        if s == self.final_state and a == Action.finish.value:
            return 100
        return -1
    
    def p(self, s, a):
        r = self.reward(s, a)
        # get s'
        i, j, dir, m1_status, m2_status = s
        new_i = i
        new_j = j
        new_dir = dir
        new_ms_status = [m1_status, m2_status]
        terminal = False
        marker_picked = False
        if a == Action.move.value:
            if dir == Direction.UP.value:
                new_i -= 1
            elif dir == Direction.RIGHT.value:
                new_j += 1
            elif dir == Direction.DOWN.value:
                new_i +=1
            elif dir == Direction.LEFT.value:
                new_j -=1
        elif a == Action.turnRight.value:
            if dir == Direction.UP.value:
                new_dir = Direction.RIGHT.value
            elif dir == Direction.RIGHT.value:
                new_dir = Direction.DOWN.value
            elif dir == Direction.DOWN.value:
                new_dir = Direction.LEFT.value
            elif dir == Direction.LEFT.value:
                new_dir = Direction.UP.value
        elif a == Action.turnLeft.value:
            if dir == Direction.UP.value:
                new_dir = Direction.LEFT.value
            elif dir == Direction.RIGHT.value:
                new_dir = Direction.UP.value
            elif dir == Direction.DOWN.value:
                new_dir = Direction.RIGHT.value
            elif dir == Direction.LEFT.value:
                new_dir = Direction.DOWN.value
        elif a == Action.pickMarker.value:
            status = [m1_status, m2_status]
            # check if we are at a makrer position
            for m_i, m_pos in enumerate(self.env.markers):
                if (i, j) == m_pos and status[m_i] == 1:
                    new_ms_status[m_i] = 0
                    marker_picked = True
            if not marker_picked:
                terminal = True
        elif a == Action.finish.value:
            terminal = True
        
        out_of_bounds = new_i < 0 or new_i > self.env.H-1 or new_j <0 or new_j > self.env.W - 1
        on_a_wall = False
        for w in self.env.walls:
            if (new_i, new_j) == w:
                on_a_wall = True
        if terminal or out_of_bounds or on_a_wall:
            s_ = 'terminal'
        else:
            s_ = (new_i, new_j, new_dir, new_ms_status[0], new_ms_status[1])


        return r, s_

    def ValueIteration(self, theta, V, S, A, gamma=0.97):
        # theta = 1e-3
        Delta = 1e9
        num_iterations = 0
        while(Delta >= theta):
            Delta = 0
            for s in S:
                v = V[s]
                max_a = -1e9
                for a in A[s]:
                    r, s_ = self.p_dict[(s, a)]
                    curr_v = r + gamma*V[s_]
                    if curr_v > max_a:
                        V[s] = curr_v
                        max_a = curr_v
                Delta = max(Delta, np.abs(v-V[s]))
            num_iterations += 1
        # deterministic pollicy pi
        pi = {}
        for s in S:
            max_a = -1e9
            a_max_i = 0
            # get values all possible actions from s
            for i, a in enumerate(A[s]):
                r, s_ = self.p_dict[(s, a)]
                curr_v = r + gamma*V[s_]
                if curr_v > max_a:
                    a_max_i = i
                    max_a = curr_v
            
            pi[s] = a_max_i
        print("Number of iterations=", num_iterations)
        return pi    
    
    def executePolicy(self, pi):
        s = self.initial_state
        action_sequence = []
        while(s != 'terminal'):
            time.sleep(1)
            clear_output(wait=True)
            self.env.show(s)
            # get action
            a = self.A[s][pi[s]]
            r, s_ = self.p_dict[(s, a)]
            s = s_
            action_sequence.append(a)
        return action_sequence
        

In [44]:
# generate MDP() instance, run value iteration
mdp = MDP()
start = time.time()
pi = mdp.ValueIteration(1e-3, mdp.V, mdp.S, mdp.A, gamma=0.9)
end = time.time()
print("Total runtime to learn the optimal policy=", end-start, "(s)")


State space size= 576
Number of iterations= 11
Total runtime to learn the optimal policy= 0.1520981788635254 (s)


In [45]:
# policy execution
action_sequence = mdp.executePolicy(pi)

.  .  .  .  .  [91m#[0m  

.  .  .  [91m#[0m  .  .  

.  [93mm[0m  .  .  .  .  

.  .  [91m#[0m  .  .  .  

.  .  .  [91m#[0m  .  .  

.  .  [94m>[0m  .  .  .  



In [46]:
# printing action commands
actions = ['move', 'turnLeft', 'turnRight', 'pickMarker', 'finish']
print("Actions sequence from optimal policy:")
for a in action_sequence:
    print(actions[a])

Actions sequence from optimal policy:
move
turnLeft
move
move
turnLeft
pickMarker
move
move
finish
