In [1]:
import copy
import gzip
import numpy as np
import datetime
import itertools
import mdptoolbox
import gc
import pandas as pd
from enum import IntEnum, Enum

In [2]:
class ItemColorType(IntEnum): # # enumerate the type of a Item's
    WHITE = 0
    BLUE = 1
    RED = 2

class SlotOccupation(IntEnum):# enumerate the possible occupation of a Warehouse's slot
    WHITE = 0
    BLUE = 1
    RED = 2
    EMPTY = 3
    
class CommandType(Enum): # enumerate the type of commade
    STORE = 0
    RESTORE = 1

In [3]:
class Command: # represente a single command (e.g store red) to be executed
    def __init__(self, command_type:CommandType, color:ItemColorType):
        self.command_type = command_type
        self.color = color
    def get_id(self):
        return len(ItemColorType) * self.command_type.value + self.color.value

class SlotCoordinate: # represent the coordinate of a slot in the warehouse
    def __init__(self, x:int, y:int):
        self.x = x
        self.y = y 
        

In [4]:
class StorageSpace:
    def __init__(self, height:int, width: int):
        self.state = [SlotOccupation.EMPTY] * (height * width) # all slot are first empty
        self.width = width

    def perform_command(self, pos:SlotCoordinate, command:Command): 
        # perform Command command.CommandType on position/slot pos (x,y)
        next_state = copy.deepcopy(self)
        idx = (pos.x-1) * next_state.width + (pos.y - 1)
        error = False

        if command.command_type == CommandType.STORE:
            if (next_state.state[idx] != SlotOccupation.EMPTY):
                error = True
            next_state.state[idx] = SlotOccupation(command.color)
        else:
            if next_state.state[idx] != command.color:
                error = True
            next_state.state[idx] = SlotOccupation.EMPTY
        return next_state, error
    
    def get_id(self): # an unique representation of the current state
        id = 0
        for idx in range(0, len(self.state)):
            id += np.power(len(SlotOccupation), idx) * self.state[idx]
        return id;

    def combinations(self):
        return list(itertools.product(SlotOccupation, repeat=len(self.state)))

       

In [5]:
class State:    
    def __init__(self, warehouse, storage_space:StorageSpace, command:Command):
        self.warehouse = warehouse   
        self.storage_space = storage_space
        self.command = command
    
    def get_next_state(self, pos:SlotCoordinate, command:Command): 
        #get next state by performing command command on slot at coordinate pos
        state_copy = State(self.warehouse, self.storage_space, command)
        new_storage_space, error = state_copy.storage_space.perform_command(pos, self.command)
        state_copy.storage_space = new_storage_space
        if not error:
            if not state_copy.is_state_valide():
                error = True
        return state_copy, error

    def get_next_possible_states(self, pos:SlotCoordinate):
        #get all possible direct nexts states we can have by performing commands on slot at coordinate pos
        next_states = []
        for command in self.warehouse.commands:
            next_state, error = self.get_next_state(pos, command)
            if not error:
                next_states.append(next_state)
        return next_states

    def get_id(self):
        ## an unique representation of a warehouse state (combination of starage space id and command id)
        id_storage_space = self.storage_space.get_id()
        id_command = self.command.get_id()
        return id_storage_space + self.warehouse.count_storage_space_states() * id_command

    def is_state_valide(self):
        hasEmpty = False
        completeEmpty = True
        for state in self.storage_space.state:
            if state == SlotOccupation.EMPTY:
                hasEmpty = True
            else:
                completeEmpty = False
        if (self.command.command_type == CommandType.STORE) and (hasEmpty == False):
            return False
        if self.command.command_type == CommandType.RESTORE:
            if (completeEmpty == True):
                return False
            if not (self.command.color in self.storage_space.state):
                return False
        return True

In [6]:
class WareHouse:
    def __init__(self, shape:tuple):
        self.rows = shape[0]
        self.columns = shape[1]
        self.commands = self.get_all_possible_commands()
        self.storage_slot_coordinates = self.get_all_storage_slot_coordinate()
        self.states = self.create_warehouse_states()

    def get_number_slot_in_storage_space(self):
        return self.rows * self.columns
    
    def get_all_possible_commands(self): 
        results = []
        for at in CommandType:
            for color in ItemColorType:
                results.append(Command(at, color))
        return results
    
    def get_all_storage_slot_coordinate(self):
        results = []
        for y in range(1, self.rows + 1):
            for x in range(1, self.columns + 1):
                results.append(SlotCoordinate(y,x))
        return results
    
    def create_warehouse_states(self): 
        # create all possible state in warehouse (a combination with store space state)
        self.state_row_dictionary = {}
        results = []
        commands_list = list(itertools.product(CommandType, ItemColorType))# list like [(Store Red), (Store Blue), (Restore Blue)....]
        storage_space_states  = self.create_empty_storage_space().combinations() 
        # list like this: [[x, x, x, x, x, x], [x, x, x, x, x, x], ...]
        rowIndex = 0
        for storage_space_state in storage_space_states:
            for command_i in commands_list:
                storage_space = self.create_storage_space(np.array(storage_space_state))
                command = Command(command_i[0], command_i[1])
                state = State(self, storage_space, command)
                if state.is_state_valide():
                    results.append(state)
                    self.state_row_dictionary[state.get_id()] = rowIndex
                    rowIndex += 1
        return results
    
    def create_empty_storage_space(self):
        storage_space = StorageSpace(self.rows, self.columns)
        return storage_space
    
    def create_storage_space(self, state:[SlotOccupation]):
        storage_space = self.create_empty_storage_space()
        storage_space.state = state
        return storage_space
    
    def count_storage_space_states(self):
        return np.power(len(SlotOccupation), self.get_number_slot_in_storage_space())
    
    def count_warehouse_states(self):
        return len(self.states)


In [7]:
class Rewards:
    def __init__(self, number_of_state:int, pos:[SlotCoordinate]):
        self.matrix = np.zeros((number_of_state, len(pos)))
        self.pos = pos

    def get_reward(self, current_state:State, pos:SlotCoordinate):
        nexts_possible_states = current_state.get_next_possible_states(pos)
        if len(nexts_possible_states) == 0:
            return -1
        #the reward is simply the maximum possible distance minus the the distance the current command 
        #will bring
        distance_value = self.get_distance(current_state, pos)
        return round(distance_value)

    #returns between 0 and height*length, the more the better
    def get_distance(self, current_state:State, pos:SlotCoordinate):
        max_value = current_state.warehouse.rows * current_state.warehouse.columns
        return (max_value - pos.x*pos.y)

    def compute_reward_matrix(self, list_states:[State], state_row_dict):
        for state in list_states:
            i = 0
            for pos_ in self.pos:
                reward = self.get_reward(state, pos_)
                row_index = state_row_dict[state.get_id()]
                self.matrix[row_index, i] = reward
                i += 1
        return self.matrix

In [8]:
class TransitionMatrix:
    def __init__(self, number_of_state:int, pos:SlotCoordinate):
        self.matrix = np.zeros((number_of_state, number_of_state))
        self.pos = pos
        
    def compute_transition_matrix(self, list_states:[State], state_row_dict):
        for state in list_states:
            row_index = state_row_dict[state.get_id()]
            next_states = state.get_next_possible_states(self.pos)
            if not next_states:
                self.matrix[row_index, row_index] = 1
            elif len(next_states) == 0:
                self.matrix[row_index, row_index] = 1
            else:
                probabilitíes = self.get_probability_equal(state, next_states)
                for (state2, prob) in probabilitíes:
                    id2 = state2.get_id()
                    if id2 in state_row_dict.keys():
                        row_index2 = state_row_dict[id2]
                        self.matrix[row_index, row_index2] = prob

    def get_probability_equal(self, current_state:State, next_possible_states:[State]):
        results = []
        prob = round(1.0/len(next_possible_states), 4)
        for next_state in next_possible_states:
            transition = (next_state, prob)
            results.append(transition)
        return results

    def check_fullfillment(self):
        x = 0
        y = 0
        for row in self.matrix:
            sum = np.sum(row)
            if(sum != 1):
                #fix for floating issues
                delta = 1 - sum
                if(delta > 0.1):
                    return False
                else:
                    y = 0
                    for column in row:
                        if column > 0:
                            self.matrix[x, y] += delta
                            break
                        y += 1
            x += 1
        return True

In [9]:
class Evaluation:
    def __init__(self, warehouse: WareHouse, inputs, policy: tuple):
        self.inputs = inputs
        self.warehouse = warehouse
        self.policy = policy

    def calculate_costs(self):
        empty_storage_space = self.warehouse.create_empty_storage_space()
        state = State(self.warehouse, empty_storage_space, self.inputs[0])
        costs = 0

        for command in self.inputs[1:]:
            # state.print()
            state_row_index = self.warehouse.state_row_dictionary[state.get_id()]
            slot_id = self.policy[state_row_index]
            slot_pos = self.warehouse.storage_slot_coordinates[slot_id]
            state, _ = state.get_next_state(slot_pos, command)
            # calc costs - way to the spot and back again
            costs += self.cost(slot_pos)

        state_row_index = self.warehouse.state_row_dictionary[state.get_id()]
        slot_id = self.policy[state_row_index]
        slot_pos = self.warehouse.storage_slot_coordinates[slot_id]
        costs += self.cost(slot_pos)
        return costs

    def calculate_greedy(self):
        empty_storage_space = self.warehouse.create_empty_storage_space()
        state = State(self.warehouse, empty_storage_space, self.inputs[0])
        costs = 0
        for command in self.inputs[1:]:
            for slot_pos in self.warehouse.storage_slot_coordinates:
                new_state, error = state.get_next_state(slot_pos, command)
                if not error:
                    state = new_state
                    costs += self.cost(slot_pos)
                    break
        for slot_pos in self.warehouse.storage_slot_coordinates:
            _, error = state.get_next_state(slot_pos, self.inputs[0])
            if not error:
                costs += self.cost(slot_pos)
                break
        return costs

    def cost(self, pos: SlotCoordinate):
        return (pos.y - 1 + pos.x) * 2

In [10]:
### construct and save reward-and Transition matrix
def reward_trans(warehouse):
    reward = Rewards(warehouse.count_warehouse_states(), warehouse.storage_slot_coordinates)
    reward_matrix = reward.compute_reward_matrix(warehouse.states, warehouse.state_row_dictionary)
    #reward_matrix = reward_matrix.astype(np.int8)

    transition_matrix = np.zeros((len(warehouse.storage_slot_coordinates), len(warehouse.states), len(warehouse.states)), dtype=np.float16)
    i = 0
    for slot_pos in warehouse.storage_slot_coordinates:
        trans = TransitionMatrix(warehouse.count_warehouse_states(), slot_pos)
        trans.compute_transition_matrix(warehouse.states, warehouse.state_row_dictionary)

        trans.check_fullfillment()
        transition_matrix[i] = trans.matrix.astype(np.float)
        del(trans)
        i += 1
        
    f = gzip.GzipFile( "reward_matrix_"+str(warehouse.rows)+"x"+str(warehouse.columns)+".npy" + ".gz", "w")
    np.save(file=f, arr=reward_matrix)
    f.close()
    
    f = gzip.GzipFile( "transition_matrix_"+str(warehouse.rows)+"x"+str(warehouse.columns)+".npy" + ".gz", "w")
    np.save(file=f, arr=transition_matrix)
    f.close()
    print("terminated")

In [16]:
### apply RL Algorithms and store policies
def apply_algo(warehouse):
    f1 = gzip.GzipFile("reward_matrix_"+str(warehouse.rows)+"x"+str(warehouse.columns)+".npy" + ".gz", "r")
    rmatrix = np.load(f1)
    
    f2 = gzip.GzipFile("transition_matrix_"+str(warehouse.rows)+"x"+str(warehouse.columns)+".npy" + ".gz", "r")
    trans = np.load(f2)

    policies = []
    for x in range(2):
        if x == 0:
            alg = mdptoolbox.mdp.ValueIteration(trans, rmatrix, 0.3, max_iter=100)
        if x == 1:
            alg = mdptoolbox.mdp.PolicyIteration(trans, rmatrix, 0.3, max_iter=100)
        alg.run()
        policies.append(alg.policy)
        del(alg)
        
    f = gzip.GzipFile( "policies_"+str(warehouse.rows)+"x"+str(warehouse.columns)+".npy" + ".gz", "w")
    np.save(file=f, arr=np.array(policies))
    f.close()
    f1.close()
    f2.close()
    print("terminated")

In [17]:
def read_file(filename:str):
    results = []
    with open(filename, 'r') as file:
        for line in file:
            split = line.rstrip().split("\t")
            command_type = CommandType[split[0].upper()]
            color = ItemColorType[split[1].upper()]
            results.append(Command(command_type, color))
    return results

# Evaluation on 2x3 warehouse

In [13]:
warehouse = WareHouse((2, 3))

In [14]:
reward_trans(warehouse)

terminated


In [18]:
apply_algo(warehouse)

terminated


In [19]:
import gzip
import numpy as np

testing_data = read_file("warehouseorder.txt")
costs_greedy = 0
mean_greedy = 0
names = ["ValueIteration", "PolicyIteration"]

f = gzip.GzipFile("policies_"+str(warehouse.rows)+"x"+str(warehouse.columns)+".npy" + ".gz", "r")
policies = np.load(f)
for x in range(2):
    print("Algorithm: ", names[x])
    eval = Evaluation(warehouse, testing_data, policies[x])
    distance_parcoured = eval.calculate_costs()
    greedy_distance_parcoured = eval.calculate_greedy()
    del(eval)
    print('total_traveled_distance = ', distance_parcoured)
print("Algorithm: Greedy")
print('greedy_traveled_distance = ', greedy_distance_parcoured)
print("===========================")
print("  ")
f.close()

Algorithm:  ValueIteration
total_traveled_distance =  248
Algorithm:  PolicyIteration
total_traveled_distance =  248
Algorithm: Greedy
greedy_traveled_distance =  260
  
