In [1]:
import gym
import numpy as np
import pandas as pd
from gym import spaces
import tensorflow as tf
from tensorflow import keras
from collections import deque
import time
import random

In [187]:
# observations are of the form

# (agents hand, played cards, number of cards of enemies, stack quantity, stack nu, active player, last player)

# agent hand and played cards are encoded as (n° of 0's, n° of 1's, ..., n° of 9's)

def observation_space(n_of_players):
    
    size_lst = []
    
    for _ in range(20):
        size_lst.append(5)
    for _ in range(n_of_players-1):
        size_lst.append((40/n_of_players)+1)
        
    size_lst.append(5)
    size_lst.append(10)
    
    for _ in range(2):
        size_lst.append(n_of_players)
    
    size_vec = np.array(size_lst)
    
    return spaces.MultiDiscrete(size_vec,dtype='uint8')

def action_space():
    
    return spaces.MultiDiscrete((11,5),dtype='uint8')

def deal(n_of_players):
    
    players_data = []
    deck = []
    available_indices =list(range(40))
    active_player = None
        
    for pinta in ['O','B','C','E']:
        for n in range(10):
            deck.append(str(n)+pinta)
        
    for player in range(n_of_players):

        sample = random.sample(available_indices,int(40/n_of_players))
        players_data.append([deck[ind] for ind in sample])
        available_indices = list(set(available_indices) - set(sample))
    
    if active_player == None:
        for player in range(n_of_players):
            if '0O' in players_data[player]:
                active_player = player

    return players_data, active_player


# Inputs to encode_players are:
# players_data : info on players hands



def encode_players_hand(game_data,player):
    
    players_hand = [0]*10
    
    for card in game_data[player]:
        
        players_hand[int(card[0])] +=1
        
    return players_hand
    


def encode_players_data(players_data,history,stack,agent,active_player=None,last_player=None):
    
    agent_hand = players_data[0]
    
    agent_cards = [0]*10
    
    enemy_handsize = []
    
    encoded_history = [0]*10
    
    if stack != []:
        
        encoded_stack = [len(stack),int(stack[0][0])]
    
    else:
        
        encoded_stack = [0, None]
        
    if active_player == None:
    
        play_status = [players_data[1],last_player]
    
    for card in agent_hand:
        
        agent_cards[int(card[0])] += 1
        
    n_p = len(players_data[0])
    
    for enemy in range(n_p):
        
        if enemy != agent:
            
            enemy_handsize += [len(players_data[enemy])]
            
    for card in history:
        
        encoded_history[int(card[0])]+=1
    
    encoded_data = agent_cards + encoded_history + enemy_handsize + encoded_stack + play_status
    
    return encoded_data


In [147]:
# We'll assume (and construct) the class such that the player
# supposed to play is the active player.

def possible_plays(state):

        curr_quantity = state[23] # Number of card(s) already in play
        curr_number = state[24] # Value of the cards in play
        
        active_player = state[25] # Who has the option to play
        last_player = state[26] # Who was the last to play
        
        pos_plays = ['']
        
        if last_player == None:
            
            pos_plays = []
            
            for number in range(1):
                
                for quantity in range(1,state[number]+1):
                    
                    pos_plays.append([quantity,number])
                    
        elif (active_player == last_player):
                
            for number in range(10):
                
                if state[number] >= 1:

                    for quantity in range(1,state[number]+1):

                        pos_plays.append([quantity,number])

        
        elif (active_player != last_player) and curr_number != 9:
            
            for number in range(curr_number,10):
                
                if state[number] >= 1:
                
                    for quantity in range(1,state[number]+1):
                        
                        pos_plays.append([quantity,number])
        
        return pos_plays

    
def game_start(n_players,seed=42):
    
    random.seed(seed)
    
    dealt_hands = deal(n_players)
    
    print(dealt_hands)
    
    overall_data = []
    
    print(dealt_hands[0][0])
    
    agent_state = encode_players_data(dealt_hands,[],[],0)
    
    return agent_state

In [181]:
def deal(n_of_players):
    
    players_data = []
    deck = []
    available_indices =list(range(40))
    active_player = None
        
    for pinta in ['O','B','C','E']:
        for n in range(10):
            deck.append(str(n)+pinta)
        
    for player in range(n_of_players):

        sample = random.sample(available_indices,int(40/n_of_players))
        players_data.append([deck[ind] for ind in sample])
        available_indices = list(set(available_indices) - set(sample))
    
    if active_player == None:
        for player in range(n_of_players):
            if '0O' in players_data[player]:
                active_player = player

    return players_data, active_player



def encode_players_hand(game_data,player):
    
    players_hand = [0]*10
    
    for card in game_data[player]:
        
        players_hand[int(card[0])] +=1
        
    return players_hand



def start_game(n_players,seed = 1234):
    
    random.seed(seed)
    
    dealt_hands, starting_player = deal(n_players)
    hands_data = [[]]*n_players
    
    for player in range(n_players):
        hands_data[player] = encode_players_hand(dealt_hands,player)
    
    history = [0]*10
    last_player = None
    stack_quantity = None
    stack_value = None
    
    return hands_data, history ,starting_player, last_player, stack_quantity, stack_value


def pos_plays(hands_data,stack_quantity,stack_value, last_player,player):
    
    pass_turn = ['']
    av_plays = []
    
    if (stack_quantity == None) and (last_player == None):
        
        h = hands_data[player]
        
        for amount in range(1,h[0]+1):
        
            av_plays.append([amount,0])
            
        return av_plays
    
    elif stack_quantity == None:
        
        h = hands_data[player]
        
        for card in h:
            
            for amount in range(1,h[card]+1):
                
                av_plays.append([amount,card])
                
        return av_plays
        
    elif stack_quantity != None:
        
        if stack_value == 9:
            
            return pass_turn
        
        else:
            
            for card in range(stack_value,10):
                
                if h[card]>=stack_quantity:
                    
                    for amount in range(1,h[card]+1):
                        
                        av_plays.append([amount,card])
            
            return pass_turn + av_plays

        
def play_selection(hands_data, history, active_player, last_player, stack_quantity, stack_value, play):
    
    possible_plays = pos_plays(hands_data,stack_quantity,stack_value,last_player,active_player)
     
    assert play in range(len(possible_plays))
    
    stack_quantity = possible_plays[play][0]
    stack_value = possible_plays[play][1]
    
    hands_data[active_player][stack_value] = hands_data[active_player][stack_value] - stack_quantity
    
    last_player = active_player
    
    active_player = (active_player + 1)%len(hands_data)
    
    history[stack_value] = history[stack_value] + stack_quantity
    
    return hands_data, history, active_player, last_player, stack_quantity, stack_value




# Ejemplo
hands_data, history ,starting_player, last_player, stack_quantity, stack_value = start_game(4)
print(hands_data, stack_quantity, stack_value, last_player, starting_player, history)
pos_plays(hands_data,stack_quantity,stack_value,last_player,starting_player)
hands_data, history, active_player,last_player, stack_quantity, stack_value = play_selection(hands_data, history, starting_player, last_player, stack_quantity, stack_value, 0)
print(hands_data, stack_quantity, stack_value, last_player, active_player, history)


[[1, 0, 2, 0, 0, 2, 2, 2, 1, 0], [1, 2, 1, 1, 0, 0, 1, 2, 0, 2], [2, 0, 1, 1, 2, 0, 1, 0, 1, 2], [0, 2, 0, 2, 2, 2, 0, 0, 2, 0]] None None None 0 [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
[[0, 0, 2, 0, 0, 2, 2, 2, 1, 0], [1, 2, 1, 1, 0, 0, 1, 2, 0, 2], [2, 0, 1, 1, 2, 0, 1, 0, 1, 2], [0, 2, 0, 2, 2, 2, 0, 0, 2, 0]] 1 0 0 1 [1, 0, 0, 0, 0, 0, 0, 0, 0, 0]


In [188]:
class culo(gym.Env):
    
    def __init__(self,n_p):
        
        super().__init__()
        
        self.n_p = n_p
        self.action_space = action_space(self.n_p)
        self.observation_space = observation_space(self.n_p)
        
    def reset(self):
        
        self.placements = []
        players_data = deal(self.n_p)
        self.hands, self.active_player = encode_players_hands()
        
        
    def step(self,action):
        
        new_state, reward, done = play_selection(hands_data, history, active_player, last_player, stack_quantity, stack_value, action)
        
        return new_state, reward, done
                
    def render(self):
        
        print()
            

In [182]:
from keras.models import Sequential
from keras.layers import Dense, Dropout, Conv2D, MaxPooling2D, Activation, Flatten
from keras.callbacks import TensorBoard
from keras.optimizers import Adam
from collections import deque
import time


from keras.callbacks import TensorBoard

#...
# Own Tensorboard class
class ModifiedTensorBoard(TensorBoard):

    # Overriding init to set initial step and writer (we want one log file for all .fit() calls)
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.step = 1
        self.writer = tf.summary.FileWriter(self.log_dir)

    # Overriding this method to stop creating default log writer
    def set_model(self, model):
        pass

    # Overrided, saves logs with our step number
    # (otherwise every .fit() will start writing from 0th step)
    def on_epoch_end(self, epoch, logs=None):
        self.update_stats(**logs)

    # Overrided
    # We train for one batch only, no need to save anything at epoch end
    def on_batch_end(self, batch, logs=None):
        pass

    # Overrided, so won't close writer
    def on_train_end(self, _):
        pass

    # Custom method for saving own metrics
    # Creates writer, writes custom metrics and closes writer
    def update_stats(self, **stats):
        self._write_logs(stats, self.step)


REPLAY_MEMORY_SIZE = 50000
MODEL_NAME = "256x2"

class DQNAgent:
    def __init__(self):
        
        # main model (this gets trained every step)
        self.model = self.create_model()
        
        # target model (this is what we .predict against every step)
        self.target_model = self.create_model()
        self.target_model.set_weights(self.model.get_weights())
        
        self.replay_memory = self.deque(maxlen = REPLAY_MEMORY_SIZE)
        
        self.tensorboard = ModifiedTensorBoard(logs_dir = f"logs/{MODEL_NAME}-{int(time.time())}")
        
        self.target_update_counter = 0
        
    def create_model(self):
        model = Sequential()
        model.add(Conv2D(256,(3,3),input_shape = env.OBSERVATION_SPACE_VALUES))
        model.add(Activation("relu"))
        model.add(MaxPooling2D(2,2))
        model.add(Dropout(0.2))
            
        model.add(Conv2D(256,(3,3)))
        model.add(Activation("relu"))
        model.add(MaxPooling2D(2,2))
        model.add(Dropout(0.2))
        
        model.add(Flatten())
        model.add(Dense(64))
        
        model.add(Dense(env.ACTION_SPACE_SIZE,activation = "linear"))
        model.compile(loss="mse",optimizer = Adam(lr=0.001),metrics =['accuracy'])
        
        return model
    
    def update_replay_memory(self,transition):
        self.replay_memory.append(transition)
    
    def get_qs(self, state, step):
        return self.model_predict(np.array(state).reshape(-1,*state.shape)/255)[0]