In [25]:
# Imports
import numpy as np
import pandas as pd
import os
from enum import Enum, auto
from collections import namedtuple
from itertools import product
import mdptoolbox

In [26]:
# Paths
TRAINING_DATASET_PATH = 'data/warehousetraining.txt'
TEST_DATASET_PATH = 'data/warehouseorder.txt'

In [27]:
# Constants
WAREHOUSE_SIZE = 4
WAREHOUSE_LAYOUT = (2,2)

In [4]:
Action = namedtuple('Action', ['operation_type','item_type'])

In [5]:
class OperationType(Enum):
    STORE = auto()
    RESTORE = auto()
    
    @staticmethod
    def from_str(value):
        if value.upper() == 'STORE':
            return OperationType.STORE
        elif value.upper() == 'RESTORE':
            return OperationType.RESTORE
        else:
            raise NotImplementedError

In [6]:
class ItemType(Enum):
    WHITE = auto()
    BLUE = auto()
    RED = auto()
    
    @staticmethod
    def from_str(value):
        if value.upper() == 'WHITE':
            return ItemType.WHITE
        elif value.upper() == 'BLUE':
            return ItemType.BLUE
        elif value.upper() == 'RED':
            return ItemType.RED
        else:
            raise NotImplementedError
    
    @staticmethod
    def list():
        return list(ItemType)

In [7]:
class WarehouseDataSet:
    
    def __init__(self, path=TRAINING_DATASET_PATH):
        self.path = path
        self.dataset = pd.read_csv(path, sep='\t', names=['OperationType', 'ItemType'])
        self.rel_freq = self._create_rel_freq_dict(self.dataset)
        self.size = self.dataset.shape[0]
        
    def _create_rel_freq_dict(self, dataset: pd.DataFrame):
        rel_freq_series = (dataset.value_counts() / dataset.shape[0])
        tmp_dict = rel_freq_series.to_dict()
        rel_freq_dict = {}
        for (operation, item), rel_freq in tmp_dict.items():
            operation_type = OperationType.from_str(operation)
            item_type = ItemType.from_str(item) 
            a = Action(operation_type, item_type)
            rel_freq_dict[a] = rel_freq
            #rel_freq_dict[(operation_type, item_type)] = rel_freq
        return rel_freq_dict

    def get_relative_frequency_for(self, action: (OperationType, ItemType)):
        return self.rel_freq.get(action, 0)

    def get_relative_frequencies(self):
        return self.rel_freq.copy()

In [8]:
train_dataset = WarehouseDataSet(TRAINING_DATASET_PATH)

In [9]:
class Warehouse:
     
    @staticmethod
    def is_applicable_action(position, s, s_prime):
        action = s[1]
        cell_content_s = s[0][position]
        cell_content_s_prime = s_prime[0][position]
        is_applicable = False
        if action.operation_type == OperationType.STORE:
            is_applicable = cell_content_s is None and cell_content_s_prime == action.item_type
        elif action.operation_type == OperationType.RESTORE:
            is_applicable = cell_content_s == action.item_type and cell_content_s_prime is None
        else:
            raise ValueError()
        return is_applicable
    
    @staticmethod
    def calculate_distance(position, layout):
        # Since the robot can not go diagonally
        # it makes sense to use the manhattan distance + 1 for the outside position
        manhattan_distance = sum(np.unravel_index(position, layout))
        return 1 + manhattan_distance


In [10]:
def create_transition_probability_matrix(dataset, state_space):
    number_of_states = len(state_space)
    tpm = np.zeros((WAREHOUSE_SIZE, number_of_states, number_of_states), dtype=np.float16)
    for position in range(WAREHOUSE_SIZE): 
        for x, s in enumerate(state_space):
            for y, s_prime in enumerate(state_space):
                if Warehouse.is_applicable_action(position, s, s_prime):
                    tpm[position, x, y] = dataset.get_relative_frequency_for(s[1])
            # handle dead states
            if tpm[position, x].sum() == 0:
                tpm[position, x, x] = 1.0
        tpm[position] = tpm[position] / tpm[position].sum(axis=1)[:, None]    
    return tpm

In [11]:
class ActionSpace:

    def __init__(self, operation_enum, item_type_enum):
        self.operation_enum = operation_enum
        self.item_type_enum = item_type_enum
    
    def get(self):
        action_space = []
        for o_type, i_type in product(list(self.operation_enum), list(self.item_type_enum)):
            action_space.append(Action(o_type, i_type))
        return action_space

In [12]:
class StateSpace:
    
    def __init__(self, operation_enum, item_type_enum, number_of_cells, action_space):
        self.number_of_cells = number_of_cells
        self.operation_enum = operation_enum
        self.item_type_enum = item_type_enum
        self.possible_cell_states = [None] + list(item_type_enum)
        self.warehouse_states = product(self.possible_cell_states, repeat=self.number_of_cells)
        self.action_space = action_space
        
    def get(self):
        return list(product(self.warehouse_states, self.action_space))
        

In [13]:
def create_state_space():
    from itertools import product
    states_space = []
    possible_cell_states = [None, ItemType.RED, ItemType.WHITE, ItemType.BLUE]
    warehouse_state_space = product(possible_cell_states, repeat=4)
    possible_actions = []
    t = product(list(OperationType), list(ItemType))
    for operation_type, item_type in t:
        possible_actions.append(Action(operation_type, item_type))
    return list(product(warehouse_state_space, possible_actions))

In [18]:
class RewardStructure:
    
    def __init__(self, reward_for_dead_ends, position_rewards, weights):
        self.reward_for_dead_ends = reward_for_dead_ends
        self.position_reward_mapping = {}
        for pos, reward in position_rewards:
            self.position_reward_mapping[pos] = reward
        self.weights = weights

    def calculate_reward(self, position, s):
        cell_content = s[0][position]
        action = s[1]
        reward = self.reward_for_dead_ends
        weight = self.get_weight_factor_for_action(action)
        if action.operation_type == OperationType.STORE:
            if cell_content is None:
                reward = self.position_reward_mapping.get(position) * weight
        elif action.operation_type == OperationType.RESTORE:
            if cell_content == action.item_type:
                reward = self.position_reward_mapping.get(position) * weight
        else:
            raise ValueError
        return reward
        
    def create_reward_matrix(self, dataset, state_space):
        number_of_states = len(state_space)
        rm = np.zeros((WAREHOUSE_SIZE, number_of_states), dtype=np.float16)
        for position in range(WAREHOUSE_SIZE):
            for x, s in enumerate(state_space):
                rm[position, x] = self.calculate_reward(position, s) 
        return rm.T
            
    def get_weight_factor_for_action(self, action):
        return self.weights.get(action, 1)


### Creating the transition probabilty matrix

In [None]:
action_space = ActionSpace(OperationType, ItemType).get()

In [None]:
state_space = StateSpace(OperationType, ItemType, WAREHOUSE_SIZE, action_space).get()

In [None]:
transition_probability_matrix = create_transition_probability_matrix(train_dataset, state_space)

### Creating the reward matrix

In [20]:
position_rewards = []
for pos in range(WAREHOUSE_SIZE):
    reward = - Warehouse.calculate_distance(pos, WAREHOUSE_LAYOUT)
    position_rewards.append((pos, reward))

In [22]:
weights = {}
for action, rel_freq in train_dataset.get_relative_frequencies().items(): 
    weights[action] = 1 + rel_freq

In [23]:
rw = RewardStructure(-10, position_rewards, weights)

In [24]:
reward_matrix = rw.create_reward_matrix(train_dataset, state_space)

### MDP

In [29]:
DISCOUNT_FACTOR = 0.99
ITERATIONS = 100
policy_iteration_result = mdptoolbox.mdp.PolicyIteration(
    transition_probability_matrix,
    reward_matrix,
    DISCOUNT_FACTOR, 
    max_iter=ITERATIONS
)
value_iteration_result = mdptoolbox.mdp.ValueIteration(
    transition_probability_matrix,
    reward_matrix, 
    DISCOUNT_FACTOR, 
    max_iter=ITERATIONS
)

#policy_iteration_result.run()
#value_iteration_result.run()

print('Policy-Iteration Algorithm:')
print(policy_iteration_result.policy)
print(policy_iteration_result.V)
print(policy_iteration_result.iter)

print('Value-Iteration Algorithm:')
print(value_iteration_result.policy)
print(value_iteration_result.V)
print(value_iteration_result.iter)

Policy-Iteration Algorithm:
[0 0 0 ... 0 0 0]
[0. 0. 0. ... 0. 0. 0.]
0
Value-Iteration Algorithm:
None
[0. 0. 0. ... 0. 0. 0.]
0
