<a href="https://colab.research.google.com/github/StefanFischer/SAKI-Project4/blob/main/smart_warehouse2x2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Smart Warehouse Implementation

The purpose of this exercise is to develop an algorithm that optimizes the route that a robot takes for pick-up and storage of items in a warehouse.

There are the following constraints :
*   Size of warehouse is 2x2 or 2x3
*   There is separate start/stop position outside the storage space
*   The first position the robot can move is (0,0)
*   Robots can move to adjacent fields (but not diagonally)
*   There are three types of items, identified by color (white, blue, red)

The problem can be formulated as a Markov Decision Process consisting of:
*   Transition Probability Matrix (TPM)
*   Reward Matrix
*   States
*   Actions

The goal is to create a policy, which returns for each state the action which is optimal to take.

In [43]:
pip install pymdptoolbox



In [44]:
import numpy as np
import mdptoolbox
import matplotlib.pyplot as plt
import csv
from scipy.sparse import csr_matrix

**Configuration of the MDP problem**

The states consist of the storage state of each shelf, which can be filled with one of the three items or be empty.
Furthermore, there is the input given what item should be stored or restored.

Besides this also the rewards/penalties (negative rewards) are defined here:
*   distance penalties: the distance the robot has to cover to store or restore an item
*   unable to store/restore item: penelize robot if he can not store/restore a item
*   refusing item penalty: if there is a place left in storage but robot rejects item, penalize it

Here, the discount is also configured, which defines if the agent should take future return into account(value: ]0,1[ )


In [45]:

warehouse_size = 4
storage_state = ['0','r','b','w'] #0 = empty, r = red, b = blue, w = white
requirements = ['sr', 'sb', 'sw', 'rr', 'rb', 'rw'] #sb/sr/sw = store_blue/red/white, rb/rr/rw = restore blue/red/white

reward_con_in = -50
reward_con_out = -50
discount = 0.999
penalty_refusing = -10000
prints = False


reward_pro = list()
if warehouse_size == 4:
    reward_pro = [-1, -2, -2, -3]
elif warehouse_size == 6:
    reward_pro = [-1, -2, -2, -3, -3, -4]
elif warehouse_size == 9:
    reward_pro = [-1, -2, -2, -2, -3, -3, -3, -4, -4, -5]


path_to_trainings_file = "./Exercise 4 - Reinforcement Learning Data - warehousetraining.txt"


**Create all possible states**

In [46]:

def get_states(warehouse_size, storage_state, requirements):
    states = []

    if warehouse_size == 4:
        for x1 in storage_state:
            for x2 in storage_state:
                for x3 in storage_state:
                    for x4 in storage_state:
                        for r in requirements:
                            st = (x1, x2, x3, x4, r)
                            states.append(st)

    if warehouse_size == 6:
        for x1 in storage_state:
            for x2 in storage_state:
                for x3 in storage_state:
                    for x4 in storage_state:
                        for x5 in storage_state:
                            for x6 in storage_state:
                                for r in requirements:
                                    st = (x1, x2, x3, x4, x5, x6, r)
                                    states.append(st)

    if warehouse_size == 9:
        for x1 in storage_state:
            for x2 in storage_state:
                for x3 in storage_state:
                    for x4 in storage_state:
                        for x5 in storage_state:
                            for x6 in storage_state:
                                for x7 in storage_state:
                                    for x8 in storage_state:
                                        for x9 in storage_state:
                                            for r in requirements:
                                                st = (x1, x2, x3, x4, x5, x6, x7, x8, x9, r)
                                                states.append(st)
    return states

states = get_states(warehouse_size, storage_state, requirements)



**Compute the probability of each item getting stored/restored**

In [47]:

def compute_item_probability(path_to_file):
    file = open(path_to_file, 'r')
    count = 0
    red_count = 0
    blue_count = 0
    white_count = 0

    # Assumption: all stored items are also restored later in time
    for line in file:
        count += 1
        if "restore" in line:
            if "red" in line:
                red_count += 1
            if "white" in line:
                white_count += 1
            if "blue" in line:
                blue_count += 1

    # Closing files
    file.close()

    return red_count / count, blue_count / count, white_count / count

probRed, probBlue, probWhite = compute_item_probability(path_to_trainings_file)
print(probRed, probBlue, probWhite)


0.24686157912124215 0.12528906508093823 0.1278493557978196


**Init the penalty matrix for rejecting items while storage is free**

In [48]:
# penalties for the agent if he refuses to store a item
penalties_mat = np.zeros((len(states), warehouse_size))

**Computation of the TPM and filling the penalty matrix**

In [49]:

def get_tpm(warehouse_size, storage_state_size, requirements_size, probRed, probBlue, probWhite, penalties_mat):
    trans_prob = []

    probRed = round(probRed, 4)
    probWhite = round(probWhite, 4)
    probBlue = round(probBlue, 4)

    probs = {'sr': probRed, 'sw': probWhite, 'sb': probBlue,
             'rr': probRed, 'rw': probWhite, 'rb': probBlue}

    # actions to take: as action the specific shelf is used for storing or restoring
    # iterate over all different shelfes in warehouse
    for idx_tr in range(warehouse_size):
        trans = np.zeros((storage_state_size ** warehouse_size * requirements_size, storage_state_size ** warehouse_size * requirements_size), dtype=np.float16)
        for idx_st, state in enumerate(states):
            for idx_ne, next_state in enumerate(states):
                # store
                if state[-1][0] == 's':
                    # store possible
                    if state[idx_tr] == '0':
                        trans_state = list(state)
                        # apply action
                        trans_state[idx_tr] = state[-1][1]
                        if tuple(trans_state[:-1]) == next_state[:-1]:
                            trans[idx_st][idx_ne] = probs[next_state[-1]]
                        continue
                    # store impossible
                    if state[:-1] == next_state[:-1]:
                        trans[idx_st][idx_ne] = probs[next_state[-1]]

                        if '0' in state[:-1]:
                            penalties_mat[idx_st, idx_tr] = penalty_refusing
                        continue
                # restore
                if state[-1][0] == 'r':
                    if state[-1][1] == 'r':
                        # restore possible
                        if state[idx_tr] == 'r':
                            trans_state = list(state)
                            # apply action
                            trans_state[idx_tr] = '0'
                            if tuple(trans_state[:-1]) == next_state[:-1]:
                                trans[idx_st][idx_ne] = probs[next_state[-1]]
                            continue
                        # restore impossible: stay in state
                        if state[:-1] == next_state[:-1]:
                            trans[idx_st][idx_ne] = probs[next_state[-1]]
                            continue
                    if state[-1][1] == 'w':
                        # restore possible
                        if state[idx_tr] == 'w':
                            trans_state = list(state)
                            # apply action
                            trans_state[idx_tr] = '0'
                            if tuple(trans_state[:-1]) == next_state[:-1]:
                                trans[idx_st][idx_ne] = probs[next_state[-1]]
                            continue
                        # restore impossible: stay in state
                        if state[:-1] == next_state[:-1]:
                            trans[idx_st][idx_ne] = probs[next_state[-1]]
                            continue
                    if state[-1][1] == 'b':
                        # restore possible
                        if state[idx_tr] == 'b':
                            trans_state = list(state)
                            # apply action
                            trans_state[idx_tr] = '0'
                            if tuple(trans_state[:-1]) == next_state[:-1]:
                                trans[idx_st][idx_ne] = probs[next_state[-1]]
                            continue
                        # restore impossible: stay in state
                        if state[:-1] == next_state[:-1]:
                            trans[idx_st][idx_ne] = probs[next_state[-1]]
                            continue
        dense_matrix = csr_matrix(trans)
        trans_prob.append(dense_matrix)

    return trans_prob, penalties_mat

trans_prob, penalties_mat = get_tpm(warehouse_size, len(storage_state), len(requirements), probRed, probBlue, probWhite, penalties_mat)#

**Filling the reward matrix and add penalty matrix on top of it**

In [50]:

def store_possible(state, action):
    return True if state[action] == '0' else False


def restore_possible(state, action):
    return True if state[action] == state[-1][1] else False


def get_rewards(warehouse_size, requirements_size, storage_state_size, reward_pro, reward_con_in, reward_con_out):
    reward_mat = np.zeros((storage_state_size ** warehouse_size * requirements_size, warehouse_size))

    for idx_st, state in enumerate(states):
        # store
        if state[-1][0] == 's':
            for action in range(warehouse_size):
                reward_mat[idx_st][action] = reward_pro[action] if store_possible(state, action) else reward_con_in
        # restore
        else:
            for action in range(warehouse_size):
                reward_mat[idx_st][action] = reward_pro[action] if restore_possible(state, action) else reward_con_out
    return reward_mat


reward_mat = get_rewards(warehouse_size, len(requirements), len(storage_state), reward_pro, reward_con_in, reward_con_out)
print(reward_mat)

# add penalty for refusing to store
reward_mat = reward_mat + penalties_mat

[[ -1.  -2.  -2.  -3.]
 [ -1.  -2.  -2.  -3.]
 [ -1.  -2.  -2.  -3.]
 ...
 [-50. -50. -50. -50.]
 [-50. -50. -50. -50.]
 [ -1.  -2.  -2.  -3.]]


**Solving the MDP**

using the py-mdptoolbox solve the MDP with value-iteration

In [51]:
# add penalty for refusing to store
reward_mat = reward_mat + penalties_mat

# Define the MDP with toolbox
mdpresultValue = mdptoolbox.mdp.ValueIteration(trans_prob,reward_mat,discount=discount)

# Run the MDP
mdpresultValue.run()


**Define Environment**

In [52]:
class Environment:
    def __init__(self, warehouse_size):
        self.warehouse_size = warehouse_size
        self.fields = []
        for i in range(warehouse_size):
            self.fields.append(0)

**Define greedy robot**

In [53]:
class Greedy:
    def __init__(self, warehouse_size, reward_mat):
        self.env = Environment(warehouse_size)
        self.reward = 0
        self.refused = 0
        self.colours = {'blue': 1, 'white': 2, 'red': 3}
        self.rewards = reward_pro[:warehouse_size]

    def reformat_env_printing(self, fields):
        printout = []
        for f in fields:
          if f == 0:
            printout.append(0)
          elif f == 1:
            printout.append('b')
          elif f == 2:
            printout.append('w')
          elif f == 3:
            printout.append('r')
        return printout

    def next_action(self, action, colour):
        if action is 'store':
            position = np.argmin(self.env.fields)

            # is action valid?
            if self.env.fields[position] != 0:
                self.reward += reward_con_in
                self.refused += 1
                if prints: print('Greedy: Could not {} in {}'.format(action + ' ' + colour, self.reformat_env_printing(self.env.fields)))
                # keep state as no action was done
            else:
                self.reward += self.rewards[position]
                self.env.fields[position] = self.colours[colour]
                if prints: print('Greedy: {} in {}'.format(action + ' ' + colour, self.reformat_env_printing(self.env.fields)))

        if action is 'restore':
            if self.colours[colour] not in self.env.fields:
                self.reward += reward_con_out
                self.refused += 1
                # keep state as no action was done
                if prints: print('Greedy: Could not {} in {}'.format(action + ' ' + colour, self.reformat_env_printing(self.env.fields)))
                return

            position = self.env.fields.index(self.colours[colour])
            # is action valid?
            if self.env.fields[position] != self.colours[colour]:
                self.reward += reward_con_out
                self.refused += 1
                if prints: print('Greedy: Could not {} in {}'.format(action + ' ' + colour, self.reformat_env_printing(self.env.fields)))
                # keep state as no action was done
            else:
                self.reward += self.rewards[position]
                self.env.fields[position] = 0
                if prints: print('Greedy: {} in {}'.format(action + ' ' + colour, self.reformat_env_printing(self.env.fields)))

**Define smart robot**

In [54]:
class Smart:
    def __init__(self, warehouse_size, policy, states, reward_mat):
        self.policy = policy
        self.states = states
        self.env = Environment(warehouse_size)
        self.reward = 0
        self.refused = 0
        self.reward_mat = reward_mat

    def next_action(self, action, colour):
        act = action[0] + colour[0]
        current_state = tuple([str(field) for field in self.env.fields]) + (act,)
        idx_st = self.states.index(current_state)
        pol = self.policy[idx_st]
        # store
        if act[0] == 's':
            if self.env.fields[pol] != 0:
                if prints: print('Smart: Could not {} in {}'.format(action + ' ' + colour, self.env.fields))
                self.refused += 1
                self.reward += self.reward_mat[idx_st, pol]
                return

            self.env.fields[pol] = act[1]
            self.reward += self.reward_mat[idx_st, pol]
            if prints: print('Smart: {} in {}'.format(action + ' ' + colour, self.env.fields))
        # restore
        if act[0] == 'r':
            if self.env.fields[pol] != act[1]:
                if prints: print('Smart: Could not {} in {}'.format(action + ' ' + colour, self.env.fields))
                self.refused += 1
                self.reward += self.reward_mat[idx_st, pol]
                return

            self.env.fields[pol] = 0
            self.reward += self.reward_mat[idx_st, pol]
            if prints: print('Smart: {} in {}'.format(action + ' ' + colour, self.env.fields))



**Define Simulation**

In [55]:
class Simulation:
    def __init__(self, warehouse_size, policy, states, reward_mat, testfile):
        self.greedy = Greedy(warehouse_size, reward_mat)
        self.smart = Smart(warehouse_size, policy, states, reward_mat)
        a = {'store\tred\n': ('store', 'red'),
             'store\twhite\n': ('store', 'white'),
             'store\tblue\n': ('store', 'blue'),
             'restore\tred\n': ('restore', 'red'),
             'restore\twhite\n': ('restore', 'white'),
             'restore\tblue\n': ('restore', 'blue')}
        self.order = []
        with open(testfile, 'r') as f:
            for line in f:
                if line in a:
                    self.order.append(a[line])
        self.index = 0

    def run(self):
        moves = len(self.order)

        for i in range(0, moves, 1):
            self.update()
            if prints: print("Update")

    def update(self):
        action, colour = self.order[self.index]
        self.index += 1
        self.greedy.next_action(action, colour)
        self.smart.next_action(action, colour)

    def print_reward(self):
        print('Greedy Robot: ' + str(self.greedy.reward))
        print('Greedy Refused: ' + str(self.greedy.refused))
        print('Smart Robot: ' + str(self.smart.reward))
        print('Smart Refused: ' + str(self.smart.refused))

        performance_diff = self.greedy.reward - self.smart.reward
        performance_diff_per_step = performance_diff / self.index
        print('Full Performance difference: ' + str(performance_diff))
        print('Performance difference per step: ' + str(round(performance_diff_per_step, 5)))


**Run simulation and evaluate Result**

In [56]:
sim = Simulation(warehouse_size, mdpresultValue.policy, states, reward_mat, 'Exercise 4 - Reinforcement Learning Data - warehousetraining.txt')
sim.run()

print("Evaluation - WarehouseTraining")
sim.print_reward()

Evaluation - WarehouseTraining
Greedy Robot: -128186
Greedy Refused: 2200
Smart Robot: -128228.0
Smart Refused: 2200
Full Performance difference: 42.0
Performance difference per step: 0.00347


In [57]:
sim = Simulation(warehouse_size, mdpresultValue.policy, states, reward_mat, 'Exercise 4 - Reinforcement Learning Data - warehouseorder.txt')
sim.run()


print("Evaluation - WarehouseOrder")
sim.print_reward()

Evaluation - WarehouseOrder
Greedy Robot: -684
Greedy Refused: 12
Smart Robot: -686.0
Smart Refused: 12
Full Performance difference: 2.0
Performance difference per step: 0.03333


In [58]:
with open('Value-Iteration-Policy', 'w') as f:
    write = csv.writer(f)
    write.writerow(mdpresultValue.policy)