In [1]:
import itertools
import numpy as np


def generate_states():
    storage_state = ["x","r","b","w"] #0 = empty, r = red, b = blue, w = white
    actions = ["sb", "sr", "sw", "rb", "rr", "rw"] #sb/sr/sw = store_blue/red/white || rb/rr/rw = restore blue/red/white
    #storage_states = [p for p in itertools.product(storage_state, repeat=4)]
    storage_states = []
    for action in actions:
        for x1 in storage_state:
            for x2 in storage_state:
                for x3 in storage_state:
                    for x4 in storage_state:
                        st = (x1, x2, x3, x4, action)
                        storage_states.append(st)

    print(len(storage_states))
    print(storage_states)
    return storage_states

In [2]:
#Matrix = [[0 for x in range(len(storage_states))] for y in range(len(storage_states))]
#print(len(Matrix[0]))
#print(len(Matrix[1535]))

In [3]:
import csv
import pandas as pd
def load_training_df():
    data = pd.read_csv("SAKI_Exercise_3_warehousetraining2x2.txt", delimiter='\t', names=["action", "color"])
    data.head()
    df = data.copy()
    return df

In [4]:
def calc_container_distribution(dataframe):
    count_of_trainingset = len(dataframe)
    distribution_df = dataframe.copy()
    print(count_of_trainingset)
    distribution_df = distribution_df.groupby(['action', 'color']).size().reset_index(name='count')
    distribution_df['count'] = distribution_df['count'].div(count_of_trainingset)
    return distribution_df

In [6]:
def get_color(state):
    return state[4][1].lower()


def has_warehouse_free_slots(state):
    for s in state:
        if s == "x":
            return True
    return False

def does_warehouse_store_color(state, color_to_restore):
    for s in state:
        if s == color_to_restore:
            return True
    return False


def is_warehouse_empty(state):
    for s in state:
        if s != 'x':
            return False
    return True

def distribute_transition_probability_matrix(tpm):
    for index, row_vector in enumerate(tpm):
        sum = np.sum(row_vector)
        if sum == 0:
            # no transition was possible for this state (e. g. restore from empty warehouse)
            # it should stay in the state then since the sum of each row has to equal one
            tpm[index, index] = 1
            continue
    # give every possible transition the same possibility by dividing every element by the sum
    # of the row. E. g. [0, 0, 1, 1, 0, 0] will be converted to [0, 0, 0.5, 0.5, 0, 0].
    # This will also result in the sum of the row equal to one.
    tpm = tpm / tpm.sum(axis=1)[:, None]
    return tpm

def is_store_action_possible(state_1, state_2):
    if not has_warehouse_free_slots(state_1):
        return False

    color_to_store = get_color(state_1)

    changed_slots = 0

    for i in range(len(state_1)):
        if i == 4:
            break

        if state_1[i] != "x" and state_1[i] != state_2[i]:
            # a slot was overwritten, that is not possible
            return False

        if state_1[i] == "x" and state_2[i] != color_to_store and state_2[i] != "x":
            # an empty slot was overwritten, but with something else we expected, not possible
            return False

        if state_1[i] == "x" and state_2[i] == color_to_store:
            changed_slots += 1

    if changed_slots != 1:
        # more or less than one empty slot were stored, not possible
        return False

    return True


def is_restore_action_possible(state_1, state_2):

    color_to_restore = get_color(state_1)
    if does_warehouse_store_color(state_1, color_to_restore) == False or is_warehouse_empty(state_1):
        # Warehouse does not store color, so we obviously can't restore it
        return False

    changed_slots = 0
    for i in range(len(state_1)):
        if i == 4:
            break

        if state_1[i] != color_to_restore and state_1[i] != state_2[i]:
            # something changed which had nothing to do with the restore color command, not possible
            return False

        if state_1[i] == color_to_restore and state_2[i] != "x" and state_2[i] != color_to_restore:
            # a color was replaced instead of just getting it, not possible
            return False

        if state_1[i] == color_to_restore and state_2[i] == 'x':
            changed_slots += 1

    if changed_slots != 1:
        # more or less than one empty slot were restored, not possible
        return False
    
    return True

def is_transition_possible(curr_state, next_state):
    curr_action = curr_state[4]
    if curr_action == "sb" or curr_action == "sr" or curr_action == "sw":
        return is_store_action_possible(curr_state, next_state)
    else:
        return is_restore_action_possible(curr_state, next_state)
    
def get_transition_probability_matrix(states):
    tp_matrix = np.zeros((len(states), len(states)), dtype=float, order='C')
    for x, curr_state in enumerate(states, start=0):
        for y, next_state in enumerate(states, start=0):
            if is_transition_possible(curr_state, next_state):
                tp_matrix[x, y] = 1
    return distribute_transition_probability_matrix(tp_matrix)

In [7]:
def get_costs(index):
    costs_map = {
        0: 1,
        1: 2,
        2: 2,
        3: 3
    }
    return costs_map.get(index, 0)


def get_reward(state_1, state_2):
    # The reward here is defined by the distance that the robot has to do
    # We define a standard reward of 10, the distances are the cost.
    # Since we have 4 states, the first index has the cost of 1, the second
    # and third index a cost of 2 and the forth index a cost of 3, since the
    # robot has to travel the longest there. That means, we just have to track
    # which field changes in the states.
    for i in range(len(state_1)):
        if state_1[i] != state_2[i]:
            return 10 - get_costs(i)
    return 0


def get_reward_matrix(states, tpm):
    rm = np.zeros((len(states), len(states)))
    for i, state_1 in enumerate(states, start=0):
        for j, state_2 in enumerate(states, start=0):
            if tpm[i, j] == 0:
                # there is no probability, so there is no reward
                rm[i, j] = 0
            else:
                rm[i, j] = get_reward(state_1, state_2)
    return rm

In [8]:
states = generate_states()
trainingsfile_df = load_training_df()
distribution_df = calc_container_distribution(trainingsfile_df)
tpm = get_transition_probability_matrix(states)
reward_matrix = get_reward_matrix(states, tpm)

1536
[('x', 'x', 'x', 'x', 'sb'), ('x', 'x', 'x', 'r', 'sb'), ('x', 'x', 'x', 'b', 'sb'), ('x', 'x', 'x', 'w', 'sb'), ('x', 'x', 'r', 'x', 'sb'), ('x', 'x', 'r', 'r', 'sb'), ('x', 'x', 'r', 'b', 'sb'), ('x', 'x', 'r', 'w', 'sb'), ('x', 'x', 'b', 'x', 'sb'), ('x', 'x', 'b', 'r', 'sb'), ('x', 'x', 'b', 'b', 'sb'), ('x', 'x', 'b', 'w', 'sb'), ('x', 'x', 'w', 'x', 'sb'), ('x', 'x', 'w', 'r', 'sb'), ('x', 'x', 'w', 'b', 'sb'), ('x', 'x', 'w', 'w', 'sb'), ('x', 'r', 'x', 'x', 'sb'), ('x', 'r', 'x', 'r', 'sb'), ('x', 'r', 'x', 'b', 'sb'), ('x', 'r', 'x', 'w', 'sb'), ('x', 'r', 'r', 'x', 'sb'), ('x', 'r', 'r', 'r', 'sb'), ('x', 'r', 'r', 'b', 'sb'), ('x', 'r', 'r', 'w', 'sb'), ('x', 'r', 'b', 'x', 'sb'), ('x', 'r', 'b', 'r', 'sb'), ('x', 'r', 'b', 'b', 'sb'), ('x', 'r', 'b', 'w', 'sb'), ('x', 'r', 'w', 'x', 'sb'), ('x', 'r', 'w', 'r', 'sb'), ('x', 'r', 'w', 'b', 'sb'), ('x', 'r', 'w', 'w', 'sb'), ('x', 'b', 'x', 'x', 'sb'), ('x', 'b', 'x', 'r', 'sb'), ('x', 'b', 'x', 'b', 'sb'), ('x', 'b', 'x'

In [12]:
print(reward_matrix)

[[0. 0. 7. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]]


In [18]:
import mdptoolbox

print(tpm.shape)

mdpResultPolicy = mdptoolbox.mdp.PolicyIteration(tpm, reward_matrix, 0.3, max_iter=len(states) * 5)
mdpResultValue = mdptoolbox.mdp.ValueIteration(tpm, reward_matrix, 0.3, max_iter=len(states) * 5)

# Run the MDP
mdpResultPolicy.run()
mdpResultValue.run()

print('PolicyIteration:')
print(mdpResultPolicy.policy)
print(mdpResultPolicy.V)
print(mdpResultPolicy.iter)

print('ValueIteration:')
print(mdpResultValue.policy)
print(mdpResultValue.V)
print(mdpResultValue.iter)

(1536, 1536)


InvalidError: 'PyMDPToolbox - The transition probability array must have the shape (A, S, S)  with S : number of states greater than 0 and A : number of actions greater than 0. i.e. R.shape = (A, S, S)'