In [1]:
from build.Board import Board, Result
from build.player.Player import Player
from build.player.qlearner.QLearner import QLearner

import os
import dill
import numpy as np

In [35]:
class QPlayer(Player):
    """
    A Player specification used to handle various learning algorithms.

    To specify a learning algorithm a QLearner is defined.
    Different QLearner will be used to handle the learning process and to choose the best next move.
    QPlayer and QLearner interact according to the player-role pattern.
    """

    def __init__(self, representation_char, q_learner: QLearner, state_reduction: bool = False):
        """
        The constructor for the Q-Player
        :param representation_char: As from the Player class inherited the QPlayer is given a single Char representing the user on the bord.
        Longer strings would also get accepted but would yield a worse representation.
        :param q_learner: The QLearner used to handle the learning process as well as the choosing of the best option
        :param state_reduction: indicates whether or not a reduction of possible states should get applied

        Terms:
        state => array representing the Board.field
        action => array representing the chosen field on the board
        result => entity of the Result class used to assess an previously chosen move

        For more specific examples see the description of the given Q-Learner
        """

        Player.__init__(self, representation_char)
        self.q_learner = q_learner
        self.state_reduction = state_reduction
        if state_reduction:
            self.state_cache = dict()

        self.prev_state = None
        self.state = None
        self.prev_action = None
        self.action = None
        self.prev_reward = 0

    #@debug
    #@log(aiReadable=True)
    def make_move(self, board: Board) -> int:
        """
        Makes a move on the board
        :param board: current state of the board
        :return: list of a random column (first) and row number (second)
        """

        self.state = self.find_state_to_use(board)
        self.action = self.q_learner.select_move(self.state)

        return self.action

    def find_state_to_use(self, board: Board):

        state = board.get_field_copy()
        if self.state_reduction:

            cached_equivalent = self.state_cache.get(state.tobytes())
            if cached_equivalent is not None:
                return cached_equivalent

            else:
                equivalent_fields = board.get_equivalent_fields()
                for field in equivalent_fields:
                    if self.q_learner.is_known_state(field):

                        self.save_fields_to_cache(field, equivalent_fields)
                        return field

        return state

    def save_fields_to_cache(self, found_field, equivalent_fields):

        for field in equivalent_fields:
            self.state_cache[field.tobytes()] = found_field


    def give_result(self, result: Result) -> None:
        """
        Set own reward for given result and updates the q table
        """
        reward = -0.1
        if Result.INVALID_MOVE == result:
            reward = -1000

        elif Result.GAME_LOST == result:
            reward = -100
            self.stats.incr_lost()

        elif Result.GAME_WON == result:
            reward = 200
            self.stats.incr_won()

        elif Result.GAME_DRAW == result:
            reward = -1
            self.stats.incr_draw()

        self.update_q_lerner()

        self.prev_action = self.action
        self.prev_state = np.array(self.state)
        self.prev_reward = reward

        if result in [Result.GAME_WON, Result.GAME_LOST, Result.GAME_DRAW]:
            self.update_q_lerner()
            self.prev_action = None

    def update_q_lerner(self) -> None:
        """
        Triggers the learning process of the Q-Player
        """
        if self.prev_action is not None:
            self.q_learner.update(self.prev_state, self.state, self.prev_action, self.prev_reward)

In [38]:
class QUtils:

    @staticmethod
    def pretty_print_q_table(dict_q_table) -> None:
        """
        prints a human readable representation of the given q_table
        :param dict_q_table:
        """

        for key, values in dict_q_table.items():
            values = np.swapaxes(np.round_(values, 2),0,1)

            pretty_key = str(key).replace("None", "' '").replace('[[', ' [').replace(']]', ']')
            pretty_value = str(values).replace('[[', ' [').replace(']]', ']')
            print(f"{pretty_key}\n\n{pretty_value}\n\n-------------------------\n")

    @staticmethod
    def get_dict_from_file(filepath) -> dict:
        """
        uses the given filepath to read a dict from the specified file
        :param filepath: the file to read in
        :return: the dict read from the file
        """

        if os.path.isfile(filepath) and os.path.getsize(filepath) > 0:
            with open(filepath, "rb") as file:
                return dill.load(file)

    @staticmethod
    def save_dict_to_file(filepath, dict) -> None:
        """
        save a dict to a specified file
        :param filepath: the file to write to
        :param dict: the savable dict
        """

        with open(filepath, 'wb') as file:
            dill.dump(dict, file)

    @staticmethod
    def merge_dicts(d0: dict, d1: dict) -> dict:
        """
        merges 2 given dicts
        :param d0: first dict
        :param d1: second dict
        :return: a new dict containing the content of the given dicts
        """

        d = d0.copy()
        for k, v in d1.items():
            if k not in d:
                d[k] = d1[k]

        return d

    # @staticmethod
    # def get_rotated_states(states):
    #     """
    #     Takes in an array of states and returns the rotated states including the original ones
    #     :param states: an array of states used in QPlayer
    #     :return: an array of states
    #     """
    #
    #     rotated_states = []
    #     for state in states:
    #
    #         states.append(state)
    #         state = np.rot90(state)
    #         states.append(state)
    #         state = np.rot90(state)
    #         states.append(state)
    #         state = np.rot90(state)
    #         states.append(state)
    #
    #     return rotated_states
    #
    # @staticmethod
    # def get_parallelized_states(states, parallelization: StateReduction):
    #     """
    #     Takes in an array of states and returns the parallelized states including the original ones
    #     :param states: an array of states that should get parallelized
    #     :param parallelization: the kind of parallelization that should get applied
    #     :return: an array of states
    #     """
    #
    #     if parallelization == StateReduction.NONE:
    #         return states
    #
    #     if parallelization == StateReduction.VERTICAL:
    #
    #         parallelized_states = []
    #         for state in states:
    #             parallelized_states.append(state)
    #             parallelized_states.append(np.fliplr)
    #             return parallelized_states
    #
    #     if parallelization == StateReduction.HORIZONTAL:
    #
    #         parallelized_states = []
    #         for state in states:
    #             parallelized_states.append(state)
    #             parallelized_states.append(np.flipud)
    #             return parallelized_states
    #
    #     if parallelization == StateReduction.BOTH:
    #
    #         parallelized_states = []
    #         for state in states:
    #             parallelized_states.append(state)
    #             parallelized_states.append(np.fliplr)
    #             parallelized_states.append(np.flipud)
    #             return parallelized_states