In [1]:
from gym import Env
from gym.spaces import Discrete, Tuple

import numpy as np
import random

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten
from tensorflow.keras.optimizers import Adam

from rl.agents import DQNAgent
from rl.policy import BoltzmannQPolicy
from rl.memory import SequentialMemory

from py4j.java_gateway import JavaGateway


In [2]:
# Conecta no servidor de treinamento

gateway = JavaGateway()
minitruco_java = gateway.entry_point


In [3]:
###### Env customizado do gym que encapsupla o servidor

class MinitrucoEnv(Env):
    def __init__(self):
        # Ações que podemos tomar: jogar a carta 0, a carta 1 ou a carta 2
        self.action_space = Discrete(3)

        # Estados possíveis
        # - posições: 1=inferior, 2=direita, 3=superior, 4=esquerda
        # - equipes: 1=posições 1 e 3; 2=posições 2 e 4
        # - cartas: podem valer -1 (null), 0 (fechada) ou um valor de 1
        #           a 14, conforme o valor relativo delas (cartas normais de 1
        #           a 10, manilhas de 11 a 14)
        # - rodadas: 1 a 3
        # - resultado da rodada: a equipe que venceu (1 ou 2), 3 para empate ou -1 para rodada não conlcuída
        # - booleanos (ex.: podeFechada) são 0 ou 1
        # TBD posJogadorPedindoAumento (acho que não zera depois do aumento)
        # TBD tento mineiro (talvez só varie as recompensas, mas é preciso especificar)
        # TBD baralho limpo (provavelmente só vamos excluir o range 1-4)
        self.observation_space = Tuple((
            self._Discrete(4, start=1),    # posJogador
            self._Discrete(2),             # baralhoSujo
            self._Discrete(2),             # podeFechada
            self._Discrete(3, start=1),    # numRodadaAtual
            self._Discrete(5, start=-1),   # resultadoRodada1
            self._Discrete(5, start=-1),   # resultadoRodada2
            self._Discrete(12, start=1),   # valorMao
            self._Discrete(13, start=0),   # valorProximaAposta
            self._Discrete(5, start=0),    # posJogadorPedindoAumento
            self._Discrete(4, start=1),    # posJogadorQueAbriuRodada
            self._Discrete(24, start=0),   # pontosEquipe1
            self._Discrete(24, start=0),   # pontosEquipe2
            self._Discrete(16, start=-1),  # cartaJogadaRodada1Pos1
            self._Discrete(16, start=-1),  # cartaJogadaRodada1Pos2
            self._Discrete(16, start=-1),  # cartaJogadaRodada1Pos3
            self._Discrete(16, start=-1),  # cartaJogadaRodada1Pos4
            self._Discrete(16, start=-1),  # cartaJogadaRodada2Pos1
            self._Discrete(16, start=-1),  # cartaJogadaRodada2Pos2
            self._Discrete(16, start=-1),  # cartaJogadaRodada2Pos3
            self._Discrete(16, start=-1),  # cartaJogadaRodada2Pos4
            self._Discrete(16, start=-1),  # cartaJogadaRodada3Pos1
            self._Discrete(16, start=-1),  # cartaJogadaRodada3Pos2
            self._Discrete(16, start=-1),  # cartaJogadaRodada3Pos3
            self._Discrete(16, start=-1),  # cartaJogadaRodada3Pos4
            self._Discrete(16, start=-1),  # carta1Jogador
            self._Discrete(16, start=-1),  # carta2Jogador
            self._Discrete(16, start=-1)   # carta3Jogador
        ))

        self.episodio = None
        self.state = None

    # A versão do gym que eu tou usando não suporta intervalos
    # discretos de início arbitrário, então vamos deslocar o
    # intervalo [a, b] para [0, b - a], guardando o "a"...
    
    def _Discrete(self, size, start=0):
        if hasattr(self, '_starts'):
            self._starts.append(start)
        else:
            self._starts = [start]
        return Discrete(size - start)
    
    # ...e ao ler o estado do jogo, aplicamos o mesmo deslocamento
    
    def _get_obs(self):
        estado_str = self.episodio.estado()
        estado_tupla = tuple(map(int, self.episodio.estado().split()))
        return tuple([
            x - self._starts[i] for i, x in enumerate(estado_tupla)
        ])
        
    def step(self, action):
        last_observation = self.state
        cartaJogada = last_observation[24 + action]
        if cartaJogada == 0: # Seria -1, mas estamos deslocados
            # Jogada inválida
            return last_observation, 0, False, {}

        self.episodio.executa(action)
        estado_str = self.episodio.estado()
        if estado_str == "EQUIPE 1 VENCEU":
            return None, 1.0, True, {}
        if estado_str == "EQUIPE 2 VENCEU":
            return None, -1.0, True, {}

        self.state = self._get_obs()

        pontosEquipe1 = self.state[10]
        pontosEquipe2 = self.state[11]

        reward = 0.0
        terminated = False

        return self.state, reward, terminated, {}

    def render(self):
        pass

    def reset(self, seed=None, options=None):
        if self.episodio is not None:
            self.episodio.finaliza()

#         super().reset(seed=seed) # required by check_env

        self.episodio = minitruco_java.novoEpisodio()
        self.state = self._get_obs()

        return self.state


In [4]:
from gym import Wrapper, spaces

class FlattenObservationEnv(Wrapper):
    def __init__(self, env):
        super(FlattenObservationEnv, self).__init__(env)
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(np.prod(env.observation_space.shape),), dtype=np.float32)

    def step(self, action):
        obs, reward, done, info = self.env.step(action)
        return obs.flatten(), reward, done, info

    def reset(self, **kwargs):
        obs = self.env.reset(**kwargs)
        return obs.flatten()

env = FlattenObservationEnv(MinitrucoEnv())

TypeError: 'NoneType' object cannot be interpreted as an integer

In [None]:
env.observation_space

In [None]:
env.observation_space.sample()

In [None]:
# Roda alguns episódios com política "jogue uma carta aleatoriamente"

import time

episodes = 10
for episode in range(1, episodes+1):
    state = env.reset()
    terminated = False
    score = 0
    start = time.time()

    while not terminated:
        #env.render()
        action = env.action_space.sample()
        n_state, reward, terminated, info = env.step(action)
        score+=reward
    print('Episode:{} Score:{} Time:{}s'.format(episode, score, time.time() - start))


In [None]:
states = env.observation_space.shape
actions = env.action_space.n

def build_model(states, actions):
    model = Sequential()    
    model.add(Dense(24, activation='relu', input_shape=states))
    model.add(Dense(24, activation='relu'))
    model.add(Dense(actions, activation='linear'))
    return model

model = build_model(states, actions)

model.summary()

In [None]:
env.observation_spac