In [1]:
from collections import defaultdict

import numpy as np
from tqdm import trange

import mutorere

pygame 2.1.2 (SDL 2.0.16, Python 3.10.1)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
env = mutorere.env()

In [97]:
def bin_arr_to_int(arr):
    return arr.dot(1 << np.arange(arr.size)[::-1])


def int_to_bin_arr(n, width=None):
    return np.frombuffer(np.binary_repr(n, width=width).encode(), dtype='S1').astype(np.int8)


def reverse_bit(num):
    result = 0
    while num:
        result = (result << 1) + (num & 1)
        num >>= 1
    return result


def prepare_output(rank, n_kewai, putahi=0):
    kewai = int_to_bin_arr(rank, width=n_kewai) * 2 - 1
    if putahi == 0:
        return np.append(kewai, 0)
    else:
        return np.append(np.insert(kewai, 0, 0), putahi)


def normalize_state(state):
    state = state.T[0] - state.T[1]
    putahi = state[-1]
    if putahi == 0:
        kewai = state[:-1]
        kewai = np.clip(kewai, 0, 1)
        shifts = [(bin_arr_to_int(np.roll(kewai, shift)), shift) for shift in range(kewai.size)]
        (max_rank, max_shift), (min_rank, min_shift) = max(shifts), min(shifts)
        if (reversed_min := reverse_bit(min_rank)) > max_rank:
            return prepare_output(reversed_min, 8), min_shift, True
        else:
            return prepare_output(max_rank, 8), max_shift, False
    else:
        kewai = state[:-1]
        shift = -np.where(kewai == 0)[0].item() % kewai.size
        kewai = np.roll(kewai, shift)[1:]
        kewai = np.clip(kewai, 0, 1)
        straight_rank = bin_arr_to_int(kewai)
        reversed_rank = bin_arr_to_int(kewai[::-1])
        if straight_rank >= reversed_rank:
            return prepare_output(straight_rank, 7, putahi), shift, False
        else:
            return prepare_output(reversed_rank, 7, putahi), shift, True

def apply_shift_mirror(arr, shift, mirror, putahi):
    putahi_action = arr[-1]
    arr = np.roll(arr[:-1], shift)
    if mirror:
        if putahi == 0:
            arr = arr[::-1]
        else:
            arr = np.insert(arr[1:][::-1], 0, 0)
    return np.append(arr, putahi_action)

In [115]:
def get_q_row_factory(n_kewai=8):
    def q_row_factory():
        return np.zeros(n_kewai + 1)
    return q_row_factory


class MuTorereAgent:
    def __init__(self, n_kewai=8, eps_start=1., eps_decay=0.999, eps_min=0.05, gamma=0.9):
        self.q_table = defaultdict(get_q_row_factory(n_kewai=n_kewai))
        self.n_kewai = n_kewai
        self.eps = eps_start
        self.eps_decay = eps_decay
        self.eps_min = eps_min
        self.gamma = gamma
        self.last_observation = None
        self.last_action = None

    def random_policy(self, observation, action_mask):
        self.q_table[observation] = np.ma.masked_array(self.q_table[observation], mask=1-action_mask)
        return np.random.choice(np.arange(action_mask.size), p=action_mask/action_mask.sum())

    def q_policy(self, observation, action_mask):
        self.q_table[observation] = np.ma.masked_array(self.q_table[observation], mask=1-action_mask)
        return np.argmax(self.q_table[observation])

    def epsilon_policy(self, observation, action_mask):
        if np.random.random() < self.eps:
            return self.random_policy(observation, action_mask)
        else:
            return self.q_policy(observation, action_mask)

    def decay_epsilon(self):
        self.eps = max(self.eps * self.eps_decay, self.eps_min)

    def reset(self):
        self.last_observation = None
        self.last_action = None

    def play_turn(self, env, eval=False):
        observation_dict, reward, done, info = env.last()
        observation, action_mask = observation_dict['observation'], observation_dict['action_mask']
        observation, shift, mirror = normalize_state(observation)
        action_mask = apply_shift_mirror(action_mask, shift, mirror, observation[-1])

        if not eval:
            if self.last_observation is not None:
                if done:
                    self.q_table[np.array2string(self.last_observation)][self.last_action] = reward
                    env.step(None)
                    return
                else:
                    self.q_table[np.array2string(self.last_observation)][self.last_action] = self.gamma * np.max(self.q_table[np.array2string(observation)])

        if eval:
            self.eps = self.eps_min
            denorm_action = self.epsilon_policy(np.array2string(observation), action_mask)
        else:
            self.last_action = denorm_action = self.epsilon_policy(np.array2string(observation), action_mask)
        if denorm_action != self.n_kewai:
            if mirror:
                denorm_action = self.n_kewai - denorm_action
            denorm_action = (denorm_action - shift) % self.n_kewai
        env.step(denorm_action)
        if not eval:
            self.last_observation = observation
            self.decay_epsilon()

In [116]:
N_GAMES = 100
agents = {name: MuTorereAgent() for name in env.possible_agents}

for game_idx in trange(N_GAMES):
    env.reset()
    for agent in agents.values():
        agent.reset()

    for agent_name in env.agent_iter():
        agents[agent_name].play_turn(env)

100%|██████████| 100/100 [00:11<00:00,  8.94it/s]


In [119]:
from time import sleep

env.reset()
env.render()

for agent_name in env.agent_iter():
    agents[agent_name].play_turn(env, eval=True)
    sleep(1)
    env.render()

ValueError: when an agent is done, the only valid action is None

In [120]:
env.close()