### Sketch of hierarchical VI

In [11]:
from typing import Any, Tuple, Union, TypeVar
from tasks import get, put, root
from rllib.taxicab import TaxiCab, TaxiCabRenderer, Location, Passenger, Taxi  
import numpy as np
layout_str = """
A--B
----
----
C--D 
"""
taxicab = TaxiCab(layout_str)

east,west, north, south, pickup, putdown = taxicab.actions()


Action = TypeVar("Action")
State = TypeVar("State")
StateDist = list[Tuple[State, float]]
StateRewardDist = list[Tuple[Tuple[State, float], float]]

class MDP:
    state_list : list[State]
    def actions(self, state: State) -> list[Action]: pass
    def next_state_reward_dist(self, state: State, action: Action) -> StateRewardDist: pass #1
    def is_terminal(self, state: State) -> bool: pass

class SubTask(MDP):
    mdp: MDP
    #def child_subtasks(self, state: State) -> list[Union["SubTask", Action]]: pass
    def continuation_prob(self, state: State) -> float: pass #2, just for whole mdp?
    def exit_reward(self, state: State) -> float: pass #3 eq 7
    def exit_distribution(self, state: State) -> StateDist: pass #eq 8 #4

#import taxicab state
from rllib.taxicab import TaxiCabState
def taxi_state(taxi_x,taxi_y,waiting_passenger_x,waiting_passenger_y,passenger_x = None, passenger_y = None):
    taxi = Taxi(Location(taxi_x,taxi_y),None)
    waiting_passenger = Passenger(Location(waiting_passenger_x,waiting_passenger_y),None)
    return TaxiCabState(taxi, (waiting_passenger,))
def taxi_put_state(taxi_x,taxi_y,dest_x,dest_y):
    passenger = Passenger(None,Location(dest_x,dest_y))
    taxi = Taxi(Location(taxi_x,taxi_y),passenger)
    return TaxiCabState(taxi, ())


#all the code below will be polished up later

##these are needed bc next_state_sample can only work as p(s') if p(s') = 1
def pickup_transition(state,next_state):
    if state.taxi.passenger is None and state.waiting_passengers[0].location == state.taxi.location:
        if next_state.taxi.location == state.taxi.location and next_state.taxi.passenger is not None:
            return True
    return False
def putdown_transition(state,next_state):
    if state.taxi.passenger is not None and state.taxi.passenger.destination == state.taxi.location:
        if next_state.taxi.location == state.taxi.passenger.destination and next_state.taxi.passenger is None and len(next_state.waiting_passengers) == 0:
            return True
    return False

class TaxiMDP(MDP):
    def __init__(self,taxicab):
        state_list = taxicab.list_all_possible_states()
        self.tcab = taxicab
    def actions(self, state):
        return self.tcab.actions
    def next_state_reward_dist(self,state,action): #state,action,probability
    #only for prim actions
        state_reward_dist = []
        possibilities = 0
        if action == pickup:
            #this method rewards failed pickups if there's already a passenger
            for next_state in taxicab.list_all_possible_states():
                reward = 15 if taxicab.next_state_sample(state,action).taxi.passenger is not None else -10
                #probability starts at 1 and then is normalized at the end                
                probability = 1 if pickup_transition(state,next_state) else 0
                if probability > 0:
                    possibilities += 1
                    state_reward_dist.append(((next_state,reward),probability))
                #no need to include and loop through if prob is 0
        elif action == putdown:
            for next_state in taxicab.list_all_possible_states():
                reward = 15 if taxicab.next_state_sample(state,action).taxi.passenger is None else -10
                #p is 1 since if successful since dropoff results in no new waiting passengers
                probability = 1 if putdown_transition(state,next_state) else 0
                if probability > 0:
                    possibilities += 1
                    state_reward_dist.append(((next_state,reward),probability))
        else:
            for next_state in taxicab.list_all_possible_states():
                reward = -1 #always -1 for nav
                #this is for primitive actions, not nav itself
                probability = 1 if next_state == taxicab.next_state_sample(state,action) else 0
                if probability > 0:
                    possibilities += 1
                    #simpler computaion
                    state_reward_dist.append(((next_state,reward),probability))
        #normalize probabilities
        if possibilities > 0:
            for i in range(len(state_reward_dist)):
                state_reward_dist[i] = (state_reward_dist[i][0],state_reward_dist[i][1]/possibilities)
        return state_reward_dist
    def is_terminal(self, state):
        if state.taxi.passenger is None and len(state.waiting_passengers) == 0:
            return True
        return False
    

def get_terminal(s):
    if s.taxi.passenger is not None:
        return True
    return False
def put_terminal(s):
    if s.taxi.passenger is None:
        return True
    return False
root_terminal = put_terminal
#only loop thorugh exits w non zero probability
class Root(SubTask,MDP):
    def __init__(self,MDP):
        self.mdp = MDP
        self.child_subtasks = [Get(self.mdp),Put(self.mdp)] #lowercase is the original
        self.continuation_prob = None
        self.exit_reward = None 
    def is_terminal(self,s):
        if s.taxi.passenger is None and len(s.waiting_passengers) == 0:
            return True
        return False 
    def exit_distribution(self, s: State):
        distribution = []
        possibilities = 0
        for state in taxicab.list_all_possible_states():
            if self.is_terminal(state) and len(s.waiting_passengers) == 0 and s.passenger is None and s.taxi.location == state.taxi.location: #
                distribution.append((state,1))
                possibilities += 1
            else:
                pass
        if possibilities > 0:
            for i in range(len(distribution)):
                distribution[i] = (distribution[i][0],distribution[i][1]/possibilities)
        return distribution
class Get(SubTask,MDP):
    def __init__(self,MDP):
        self.mdp = MDP
        self.child_subtasks = [Nav(self.mdp),pickup]
        self.continuation_prob = None
        self.exit_reward = None 
    def is_terminal(self,s):
        if s.taxi.passenger is not None:
            return True
        return False
    def exit_distribution(self, s: State):
        distribution = []
        possibilities = 0
        for state in taxicab.list_all_possible_states():
            if self.is_terminal(state)  and len(s.waiting_passengers) == 1:
                if s.waiting_passengers[0].location == state.taxi.location:
                    if len(state.waiting_passengers) == 0:
                        if state.taxi.passenger is not None: #
                            distribution.append((state,1))
                            possibilities += 1
            else:
                pass
        if possibilities > 0:
            for i in range(len(distribution)):
                distribution[i] = (distribution[i][0],distribution[i][1]/possibilities)
        return distribution
        
class Put(SubTask,MDP):
    def __init__(self,MDP):
        self.mdp = MDP
        self.child_subtasks = [Nav(self.mdp),putdown]
        self.continuation_prob = None
        self.exit_reward = None
    def is_terminal(self,s):
        if s.taxi.passenger is None and len(s.waiting_passengers) == 0:
            return True
        return False 
    def exit_distribution(self, s):
        distribution = []
        possibilities = 0
        for state in taxicab.list_all_possible_states():
            if self.is_terminal(state) and len(state.waiting_passengers) == 0 and state.taxi.passenger is None and s.taxi.location == state.taxi.location: #
                distribution.append((state,1))
                possibilities += 1
            else:
                pass
        if possibilities > 0:
            for i in range(len(distribution)):
                distribution[i] = (distribution[i][0],distribution[i][1]/possibilities)
        return distribution
class Nav(SubTask,MDP):
    def __init__(self,MDP):
        self.mdp = MDP
        self.child_subtasks = [east,west,north,south]

        self.continuation_prob = None
        self.exit_reward = None

    def is_terminal(self,s): #return true if at pass dest or waiting pass location 
        if s.taxi.passenger is not None:
            if s.taxi.location == s.taxi.passenger.destination:
                return True
            else:
                return False
        elif len(s.waiting_passengers) == 0:
            return False #hack for now
        elif s.taxi.location == s.waiting_passengers[0].location:
            return True
        return False 
    def exit_distribution(self, s: State) -> list[Tuple[Any, float]]:
        distribution = []
        possibilities = 0
        for state in taxicab.list_all_possible_states():
            if s.taxi.passenger is not None and state.taxi.passenger is not None:
                if s.taxi.passenger.destination != state.taxi.passenger.destination:
                    continue
            if self.is_terminal(state) and s.waiting_passengers == state.waiting_passengers and (s.taxi.passenger is None) == (state.taxi.passenger is None):
                #need to also not include if starting state dest != end state dest
                distribution.append((state,1))
                possibilities += 1
            else:
                pass
                #distribution.append((state,0))
        if possibilities > 0:
            for i in range(len(distribution)):
                distribution[i] = (distribution[i][0],distribution[i][1]/possibilities)
        return distribution
    def name(subtask: SubTask) -> str:
        if isinstance(subtask, Nav):
            return "Nav"
        elif isinstance(subtask, Get):
            return "Get"
        elif isinstance(subtask, Put):
            return "Put"
        elif isinstance(subtask, Root):
            return "Root"
        elif 'dropoff=True' in str(subtask):
            return "Dropoff"
        elif 'pickup=True' in str(subtask):
            return "Pickup"
        else:
            return "nav_action"

In [12]:
#upload this
#annotate/double check results
#
#IRL 
#Start with inferring s/a very simple -- imitation learning / behavioral cloning
#IRL w/o hierarchy
#Want dataset MDP(grid config, taxicab params(noisy))
#task decomposition
#combinations of ^
#samples trajectories -> D= {<s,a,stack,....st,stack_t,a_t>, ...}
#pi_theta predict a given s = argmax_pi_theta SUM_i_j log pi_tehta(a^i_t } s^i_t)

In [13]:
len(taxicab.list_all_possible_states())
#check that get and put initizalize same nav
#hash value of subtasks
#make object hashable

132

In [17]:
horizon_cache = {}
def horizon_value(subtask: SubTask, s: State,horizon):


    if horizon == 0:
        return 0
    if subtask.is_terminal(s) == True:
        return 0
    
    try:
        return horizon_cache[(subtask,s,horizon)]
    except KeyError:
        pass

    continue_prob = 1  # subtask.continuation_prob(s) #for now, same as just if terminal. bc first if will exit out this can equal 1 for now
    max_qval = float("-inf")
    max_action = None #just for debugging it
    for a in subtask.child_subtasks:
        # Get next-state/reward distribution for semi-MDP
        if a in taxicab.actions():  # if isinstance(a, Action):
            ns_r_prob = subtask.mdp.next_state_reward_dist(s, a)
        else: #will polish later but this if/else is same functionality as designating each a as action class # elif isinstance(a, SubTask):
            ns_r_prob = []
            for ns, prob in a.exit_distribution(s):
                if prob == 0:
                    continue
                #if (a,s,horizon) in horizon_cache:
                #    v = horizon_cache[(a,s,horizon)]   
                #else:
                v = horizon_value(a, s, horizon)
                ns_r_prob.append(((ns, v),(prob)))
        # Calculate expected value of action Q(subtask, s, a)

        qval = 0
        for ns_r, prob in ns_r_prob:
            if prob == 0:
                continue
            ns, r = ns_r
            
            if (subtask,ns,horizon-1) in horizon_cache:
                v = horizon_cache[(subtask,ns,horizon-1)]
            else:
                v = horizon_value(subtask, ns,horizon-1)
            qval += prob * (r + continue_prob * v)
            if qval > max_qval:
                max_action = a
            max_qval = max(max_qval, qval)
    horizon_cache[(subtask,s,horizon)] = max_qval
    return max_qval

In [19]:
def val_true(subtask,s):
    for i in range(1,10):
        print('iteration' ,i)
        print('v1')
        v1 = horizon_value(subtask,s,i-1)
        print('v2')
        v2 = horizon_value(subtask,s,i)
        print('v1:',v1)
        print('v2:',v2)
        if np.abs(v1-v2) < 0.01:
            print("Ended at iteration",i)
            return v2
#init state has taxi in bottom left corner with waiting passenger in top right (max distance pickup)
val_true(Root(TaxiMDP(taxicab)), taxi_state(0,0,taxicab.width-1,taxicab.height-1))

iteration 1
v1
v2
v1: 0
v2: -0.25
iteration 2
v1
v2
v1: -0.25
v2: 16.0
iteration 3
v1
v2
v1: 16.0
v2: 25.5
iteration 4
v1
v2
v1: 25.5
v2: 23.75
iteration 5
v1
v2
v1: 23.75
v2: 22.5
iteration 6
v1
v2
v1: 22.5
v2: 21.25
iteration 7
v1
v2
v1: 21.25
v2: 21.0
iteration 8
v1
v2
v1: 21.0
v2: 21.0
Ended at iteration 8


21.0

In [20]:
#if put and get share nav this should be shorter
len(horizon_cache)

451