In [1]:
import numpy as np

class WarehouseWorld:

    def __init__(self, size_0=2, size_1=2, prob_1=1, prob_2=1, prob_3=1):
        """
            size_0: first dimension of the warehouse
            size_1: second dimension of the warehouse
            prob_1: frequency of item 1
            prob_2: frequency of item 2
            prob_3; frequency of item 3
        """
        self.size_0 = size_1
        self.size_1 = size_0
        self.prob_1 = prob_1
        self.prob_2 = prob_2
        self.prob_3 = prob_3

        """
        Warehouse State
            0 = no item
            1 = white item
            2 = red item
            3 = blue item
            example: warehouse is full with with white items
             [1. 1.]
             [1. 1.]   
        """
        self.map = np.zeros((size_0, size_1)).astype(int)

        """
        Goal State:
            int goal_move: 0 = store, 1 = restore
            int goal_item: 1..3 (white, red, blue)
            list goal_distribution: probabilities of the items [white, red, blue]
            restore_item: The restore item is calculated depending the warehouse state and the goal distribution
        """
        self.goal_move = None
        self.goal_item = None
        self.goal_distribution = np.array([self.prob_1, self.prob_2, self.prob_3]) / (
                self.prob_1 + self.prob_2 + self.prob_3)
        self.create_new_task()

        """
            Punishment of -1 for each field away from the entry at the top left (perfect spot)
            Entry at top left has no punishment
            No diagonal moves are possible
            Punishment for a task that doesnt solve the goal: - size_0 + size_1
            => to optimize the reward the robot has to find a solution with a short distance but also fulfill the task
        """
        self.reward = 0

        # helpers to decode/encode the state to a int number
        self.warehouse_decode = None
        self.goal_item_decode = None
        self.goal_move_decode = None
        self.generate_helpers()

    def create_new_task(self):
        # warehouse is full
        if np.all(self.map):
            self.goal_move = 1

        # warehouse is empty
        elif not np.any(self.map):
            self.goal_move = 0

        # warehouse is not empty nor full
        else:
            self.goal_move = np.random.binomial(1, 0.5)

        # store
        if self.goal_move == 0:
            self.goal_item = np.random.random()
            if self.goal_item < self.goal_distribution[0]:
                self.goal_item = 1
            elif self.goal_item < (self.goal_distribution[0] + self.goal_distribution[1]):
                self.goal_item = 2
            else:
                self.goal_item = 3

        # restore
        else:

            count = np.bincount(self.map.flatten(), minlength=4)
            distribution = count[1:] / (np.sum(count) - count[0])
            distribution = np.multiply(distribution > 0, self.goal_distribution)
            distribution = distribution / np.sum(distribution)
            self.goal_item = np.random.random()
            if self.goal_item < distribution[0]:
                self.goal_item = 1
            elif self.goal_item < (distribution[0] + distribution[1]):
                self.goal_item = 2
            else:
                self.goal_item = 3

    def move(self, action):
        """
            action = place to store/restore the item
        """
        index = np.unravel_index(action, self.map.shape)
        completed = False

        # store
        if self.goal_move == 0 and self.map[index] == 0:
            self.map[index] = self.goal_item
            completed = True

        # restore
        if self.goal_move == 1 and self.map[index] == self.goal_item:
            self.map[index] = 0
            completed = True

        if completed:
            self.create_new_task()
            reward = - index[0] - index[1]
        else:
            reward = - index[0] - index[1] - self.size_0 - self.size_1

        return reward, completed

    def __str__(self):
        if self.goal_move == 1:
            goal = 'Restore'
        else:
            goal = 'Store'

        if self.goal_item == 1:
            goal_item = 'White(1)'
        elif self.goal_item == 2:
            goal_item = 'Red(2)'
        else:
            goal_item = 'Blue(3)'

        print("Reward:", self.reward)
        print("Goal:", goal, goal_item)
        print(self.map)

    def generate_helpers(self):
        # different warehouse states
        items = []
        for i in range(self.size_0 * self.size_1):
            items.append([0, 1, 2, 3])
        self.warehouse_decode = np.array(np.meshgrid(*items)).T.reshape(-1, self.size_0 * self.size_1)

        # different goal_states
        self.goal_item_decode = np.array([1, 2, 3])
        self.goal_move_decode = np.array([0, 1])

    def state_to_int(self):
        warehouse = self.map.astype(int).flatten()
        warehouse = np.argwhere((self.warehouse_decode == warehouse).all(axis=1))

        goal_item = self.goal_item
        goal_item = np.argwhere(self.goal_item_decode == goal_item)

        goal_move = self.goal_move
        goal_move = np.argwhere(self.goal_move_decode == goal_move)

        result = warehouse[0, 0]
        factor = self.warehouse_decode.shape[0]

        result += goal_item[0, 0] * factor
        factor *= self.goal_item_decode.shape[0]

        result += goal_move[0, 0] * factor
        return result

    def int_to_state(self, state_number):
        factor = self.warehouse_decode.shape[0] * self.goal_item_decode.shape[0]
        goal_move = int(state_number // factor)
        state_number = state_number % factor

        factor /= self.goal_item_decode.shape[0]
        goal_item = int(state_number // factor)
        state_number = state_number % factor

        warehouse = int(state_number)

        self.map = np.copy(self.warehouse_decode[warehouse].reshape((self.size_0, self.size_1)))
        self.goal_item = np.copy(self.goal_item_decode[goal_item])
        self.goal_move = np.copy(self.goal_move_decode[goal_move])

    def give_next_state_probabilities(self, action):
        """
            action = place to store/restore the item
        """
        reward, completed = self.move(action)

        if not completed:
            return [(1, self.state_to_int())], reward
        else:
            next_states = []

            # warehouse is empty
            if not np.any(self.map):
                self.goal_move = 0
                self.goal_item = 1
                next_states.append((self.goal_distribution[0], self.state_to_int()))
                self.goal_item = 2
                next_states.append((self.goal_distribution[1], self.state_to_int()))
                self.goal_item = 3
                next_states.append((self.goal_distribution[2], self.state_to_int()))

            # warehouse is full
            elif np.all(self.map):
                self.goal_move = 1
                count = np.bincount(self.map.flatten(), minlength=4)
                distribution = count[1:] / (np.sum(count) - count[0])
                distribution = np.multiply(distribution > 0, self.goal_distribution)
                distribution = distribution / np.sum(distribution)
                self.goal_item = 1
                next_states.append((distribution[0], self.state_to_int()))
                self.goal_item = 2
                next_states.append((distribution[1], self.state_to_int()))
                self.goal_item = 3
                next_states.append((distribution[2], self.state_to_int()))

            # both possible
            else:
                self.goal_move = 0
                self.goal_item = 1
                next_states.append((self.goal_distribution[0] / 2, self.state_to_int()))
                self.goal_item = 2
                next_states.append((self.goal_distribution[1] / 2, self.state_to_int()))
                self.goal_item = 3
                next_states.append((self.goal_distribution[2] / 2, self.state_to_int()))

                self.goal_move = 1
                count = np.bincount(self.map.flatten(), minlength=4)
                distribution = count[1:] / (np.sum(count) - count[0])
                distribution = np.multiply(distribution > 0, self.goal_distribution)
                distribution = distribution / np.sum(distribution)
                self.goal_item = 1
                next_states.append((distribution[0] / 2, self.state_to_int()))
                self.goal_item = 2
                next_states.append((distribution[1] / 2, self.state_to_int()))
                self.goal_item = 3
                next_states.append((distribution[2] / 2, self.state_to_int()))

            return next_states, reward

    def build_transtion_and_reward_matrix(self):
        state_space = self.warehouse_decode.shape[0] * self.goal_item_decode.shape[0] * self.goal_move_decode.shape[0]
        actionspace = self.size_0 * self.size_1

        transitions = np.zeros((actionspace, state_space, state_space)).astype(np.float16)
        rewards = np.zeros((state_space, actionspace)).astype(np.float16)
        for a in range(actionspace):
            for s in range(state_space):
                self.int_to_state(s)
                states_reward = self.give_next_state_probabilities(a)
                rewards[s,a] = states_reward[1]
                for next_s in states_reward[0]:
                    transitions[a,s,next_s[1]] = next_s[0]

        return transitions, rewards


    def test_policies(self, pol_1, pol_2, numb_moves):
        self.int_to_state(0)
        self.reward = 0
        goal_items = []
        goal_moves = []

        #pol_1
        for i in range(numb_moves):
            action = pol_1[self.state_to_int()]
            reward, completed = self.move(action)

            if completed:
                goal_items.append(self.goal_item)
                goal_moves.append(self.goal_move)
            self.reward += reward
        reward_1 = self.reward

        self.int_to_state(0)
        self.reward = 0
        
        # pol_2
        for i in range(numb_moves):
            action = pol_2[self.state_to_int()]
            reward, completed = self.move(action)

            if completed:
                self.goal_item = goal_items.pop(0)
                self.goal_move = goal_moves.pop(0)
            self.reward += reward
        reward_2 = self.reward

        return reward_1, reward_2

In [None]:
# Build environment and transition/reward matrix
env= WarehouseWorld(size_0=1, size_1=5, prob_1=5, prob_2=90, prob_3=5)
transitions, rewards = env.build_transtion_and_reward_matrix()

print(transitions.shape)
print(rewards.shape)

In [13]:
import mdptoolbox as mdp

# Calucalte better policy
model = mdp.mdp.PolicyIterationModified(transitions, rewards, 0.98)
model.setVerbose()
model.run()

  	Iteration		V-variation
    1		  6.059814453125
    2		  9.435956682858148
    3		  3.630806276081387
    4		  2.6502466817330514
    5		  2.1160581534854828
    6		  1.6908835960944941
    7		  1.3517934904775188
    8		  1.0808908765747276
    9		  0.86402970626051
    10		  0.690864656256494
    11		  0.5522449926616417
    12		  0.44156449498619565
    13		  0.35296449336534863
    14		  0.2822229766049702
    15		  0.22559428273805082
    16		  0.18038006873977253
    17		  0.1441860370455359
    18		  0.11528764933604663
    19		  0.0921544651615065
    20		  0.07368431973119982
    21		  0.05889895839266046
    22		  0.047093982974693915
    23		  0.03764409763189036
    24		  0.030099120624896614
    25		  0.0240593673748819
    26		  0.019237127536811727
    27		  0.01537692666273216
    28		  0.012294890459209284
    29		  0.009827722843326114
    30		  0.007857911202805212
    31		  0.006281079146170043
    32		  0.0050221256493045985
    33		  0.004014335173998518
    34	

In [14]:
# Check result being different
policy = np.array(model.policy)
greedy_policy = np.argmax(rewards, axis=1)

print("Policy", policy.shape)
print("Greedy_Policy",greedy_policy.shape)
print("Same Policy:",np.array_equal(policy, greedy_policy))

Policy (6144,)
Greedy_Policy (6144,)
Same Policy: False


In [15]:
#Evaluate new policy
numb_tasks = 10000
reward_greedy, reward_policy = env.test_policies(greedy_policy, policy, numb_tasks)
print("Average additional distance per task")
print("policy:", -reward_policy/numb_tasks)
print("greedy:", -reward_greedy/numb_tasks)

Average additional distance per task
policy: 0.8945
greedy: 0.9575
