In [None]:
from typing import List, Dict

import numpy as np
from mdptoolbox import mdp
from itertools import product, repeat
from functools import reduce
from operator import iconcat
from scipy.sparse import csr_matrix

from copy import deepcopy

In [None]:
# data
field_content = ['empty', 'white', 'blue', 'red']
task = ['store', 'restore']
item = ['white', 'blue', 'red']
task_and_item = [['store', 'white'], ['store', 'blue'], ['store', 'red'],
                 ['restore', 'white'], ['restore', 'blue'], ['restore', 'red']]
actions = [0, 1, 2, 3]
rewards_dict = {0: 4, 1: 2, 2: 2, 3: 0}


In [None]:
def probabilities_from_data() -> Dict:
    f = open('probdata.txt', 'r')

    colors = []
    line = f.readline()
    while line:
        colors.append(line.split()[1])
        line = f.readline()

    total_length = len(colors)

    p_white = len(colors[colors == 'white']) / total_length
    p_blue = len(colors[colors == 'blue']) / total_length
    p_red = len(colors[colors == 'red']) / total_length

    return {'white': p_white, 'blue': p_blue, 'red': p_red}

probs = probabilities_from_data()
print(probs)

In [None]:
def all_repeat() -> List:
    """
    Computes all permutations of items with repetitions

    :return: permutations of items with repetitions
    """
    result = []
    for c in product(field_content, repeat=4):
        comb = [c[0], c[1], c[2], c[3]]
        result.append(comb)

    result = list(repeat(result, 6))
    result = list(map(list, zip(result, task_and_item)))

    for i in range(len(result)):
        result[i] = [elem + result[i][1] for elem in result[i][0]]

    result = reduce(iconcat, result, [])

    return result

In [None]:
states = all_repeat()
num_states = len(states)

# check if all elements are unique.
np.testing.assert_equal(len(list(set(map(tuple, states)))), num_states)

In [None]:
def field_content_equals(from_state: list, to_state: list) -> bool:
    return from_state[:4] == to_state[:4]

In [None]:
def transition_prob(action: int, from_state: list, to_state: list) -> float:
    copy_from_state = deepcopy(from_state)
    next_item = to_state[-1]

    if copy_from_state[4] == 'store':
        if copy_from_state[action] != 'empty':
            return 1 if from_state == to_state else 0

        copy_from_state[action] = copy_from_state[-1]
        return probs[next_item] / 2 if field_content_equals(copy_from_state, to_state) else 0
    elif copy_from_state[4] == 'restore':
        if copy_from_state[action] != copy_from_state[-1]:
            return 1 if from_state == to_state else 0

        copy_from_state[action] = 'empty'
        return probs[next_item] / 2 if field_content_equals(copy_from_state, to_state) else 0

    return 0

In [None]:
test_prob = transition_prob(0, ['empty', 'empty', 'empty', 'empty', 'restore', 'red'],
                            ['empty', 'empty', 'empty', 'empty', 'restore', 'red'])

np.testing.assert_equal(test_prob, 1)

In [None]:
def reward(action: int, last_prob: float) -> float:
    if last_prob == 1:
        return 0

    return rewards_dict[action]

In [None]:
def transition_and_reward_matrix():
    transitions = []
    rewards = []

    for action in actions:
        row = []
        col = []
        data = []

        reward_vector = []

        for id_from, from_state in enumerate(states):
            for id_to, to_state in enumerate(states):
                p = transition_prob(action, from_state, to_state)

                if p > 0:
                    row.append(id_from)
                    col.append(id_to)
                    data.append(p)

            reward_vector.append(reward(action, data[-1]))

        transitions.append(csr_matrix((data, (row, col)), shape=(num_states, num_states)))
        rewards.append(reward_vector)

    return transitions, np.array(rewards).T

In [109]:
P, R = transition_and_reward_matrix()

KeyboardInterrupt: 

In [None]:
P

In [None]:
ones = np.zeros((4, num_states))
for i, m in enumerate(P):
    ones[i] = np.sum(np.array(m.toarray()), axis=1)

test = ones[0]
test = np.reshape(test, (32, -1))
np.testing.assert_array_almost_equal(ones, np.ones_like(ones))

In [None]:
np.testing.assert_array_equal(R.shape, (num_states, 4))

In [None]:
pi = mdp.PolicyIteration(P, R, 0.9)
pi.run()

In [None]:
result_policy = pi.policy
print(len(result_policy))

In [None]:
def next_comb_mdp(cur_comb: List, state_index: int, policy: tuple) -> List:
    action = policy[state_index]
    result = cur_comb[:4]
    cur_task = cur_comb[4]
    cur_item = cur_comb[-1]

    if cur_task == 'store':
        result[action] = cur_item
    elif cur_task == 'restore':
        result[action] = 'empty'

    return result

In [None]:
def reward_mdp(warehouse_input: List[tuple], policy: tuple) -> float:
    cur_comb = ['empty', 'empty', 'empty', 'empty']
    total_reward = 0

    for cur_task, cur_item in warehouse_input:
        cur_comb.append(cur_task)
        cur_comb.append(cur_item)

        state_index = states.index(cur_comb)

        cur_comb = next_comb_mdp(cur_comb, state_index, policy)
        total_reward += rewards_dict[policy[state_index]]

    return total_reward

In [None]:
def next_comb_greedy(cur_comb: List) -> List:
    result = cur_comb[:4]
    cur_task = cur_comb[4]
    cur_item = cur_comb[-1]

    if cur_task == 'store':
        for action, field in enumerate(result):
            if field == 'empty':
                result[action] = cur_item
                return result, action

    elif cur_task == 'restore':
        for action, field in enumerate(result):
            if field == cur_item:
                result[action] = 'empty'
                return result, action

    return result, 0

In [None]:
def reward_greedy(warehouse_input: List[tuple]) -> float:
    cur_comb = ['empty', 'empty', 'empty', 'empty']
    total_reward = 0

    for cur_task, cur_item in warehouse_input:
        cur_comb.append(cur_task)
        cur_comb.append(cur_item)

        cur_comb, action = next_comb_greedy(cur_comb)
        total_reward += rewards_dict[action]

    return total_reward

In [None]:
def data_to_warehouse_input() -> List[tuple]:
    f = open('trainingdata.txt', 'r')

    result = []
    line = f.readline()
    while line:
        result.append(tuple(line.split()))
        line = f.readline()

    return result

w_input = data_to_warehouse_input()
print(w_input)

In [None]:
print(reward_mdp(w_input, result_policy))
print(reward_greedy(w_input))