# Modelisation joueur  


Imporations des dépendances

In [1]:
import torch
import torch.nn as nn
from torch.optim import AdamW
import math
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
from torchvision import transforms
from tqdm import tqdm, trange
import matplotlib.pyplot as plt
import numpy as np
import pickle



## Modele
  
### Bloc Transformer

In [2]:
class Attention(nn.Module):
    def __init__(self, x_to_dim, x_from_dim, hidden_dim,):
        super(Attention, self).__init__()

        self.sqrt_hidden_dim = math.sqrt(hidden_dim)

        self.wq = nn.Parameter(torch.randn(hidden_dim, x_to_dim))
        
        self.wk = nn.Parameter(torch.randn(hidden_dim, x_from_dim))
        self.wv = nn.Parameter(torch.randn(x_to_dim, x_from_dim))

    def forward(self, x_to, x_from):
        # x_to = [batch size, x_to_len, x_to_dim]
        # x_from = [batch size, x_from_len, x_from_dim]

        # les lettres dans les einsum :
        # b : le batch
        # x, y, z: la taille de l'ensemble (x_from ou x_to)
        # i,j : les éléments des vecteurs de x_from, x_to / les éléments de ces vecteurs une fois projetés (via Wq, Wk ou Wv)

        q = torch.einsum('ik,bxk->bxi',self.wq,x_to) # un tenseur de dimension (batch size, x_to_len, Dq)

        k = torch.einsum('ij,bxj->bxi',self.wk,x_from) # un tenseur de dimension (batch size, x_from_len, Dk=Dq)
        v = torch.einsum('ij,bxj->bxi',self.wv,x_from) # un tenseur de dimension (batch size, x_from_len, Dv)

        e = torch.softmax(torch.einsum('bxi,byi->bxy', q, k)/self.sqrt_hidden_dim, dim=1) # un tenseur de dimension (batch size, x_to_len, x_from_len)

        attention = torch.einsum('bxy,byi->bxi', e, v) # un tenseur de dimension (batch size, x_to_len, Dv)

        return attention

class MultiHeadAttention(nn.Module):
    def __init__(self, x_to_dim, x_from_dim, hidden_dim, n_heads):
        super(MultiHeadAttention, self).__init__()
        heads_list = []
        for _ in range(n_heads):
            att = Attention(x_to_dim, x_from_dim, hidden_dim)
            heads_list.append(att)
        
        self.heads_list = heads_list
        self.output_projection = nn.Parameter(torch.randn(n_heads))

    def forward(self, x_to, x_from):
        # x_to = [batch size, x_to_len, x_to_dim]
        # x_from = [batch size, x_from_len, x_from_dim]
        attention_list = []

        for head in self.heads_list:
            attention_list.append(head(x_to, x_from)) 

        concat = torch.stack(attention_list) # on obtient ainsi un tenseur représentant la concaténation des résultats des différentes "heads"

        result = torch.einsum('hbxi,h->bxi', concat, self.output_projection)

        return result

class MultiHeadSelfAttention(nn.Module):
    def __init__(self, x_to_dim, hidden_dim, n_heads):
        super(MultiHeadSelfAttention, self).__init__()
        self.multhead = MultiHeadAttention(x_to_dim, x_to_dim, hidden_dim, n_heads)
    
    def forward(self, x_to):
        return self.multhead(x_to, x_to)

class LayerNorm(nn.Module):
    def __init__(self):
        super(LayerNorm, self).__init__()

    def forward(self, x):
        norm_x = torch.norm(x, dim=1).unsqueeze(1).expand_as(x)

        return x/norm_x

class FFN(nn.Sequential):
    def __init__(self, input_dim, dropout_rate=0.1, expansion_factor=2):
        super(FFN, self).__init__()
        self.hidden_layer = nn.Linear(input_dim, input_dim*expansion_factor)
        self.dropout_layer = nn.Dropout(p = dropout_rate)
        self.output_layer = nn.Linear(input_dim*expansion_factor, input_dim)

    def forward(self, x):
        x = self.hidden_layer(x)
        x = nn.LeakyReLU(negative_slope=0.1)(self.dropout_layer(x))
        x = self.output_layer(x)
        return x

class TransformerEncoderBlock(nn.Module):
    def __init__(self, data_dim, hidden_dim, n_heads, dropout_rate=0.1):
        super(TransformerEncoderBlock, self).__init__()

        self.bloc_self_attention = MultiHeadSelfAttention(data_dim, hidden_dim, n_heads)

        self.bloc_normalization_1 = LayerNorm()

        self.bloc_FFN = FFN(data_dim, dropout_rate=dropout_rate)

        self.bloc_normalization_2 = LayerNorm()
        # It's useless to use 2 LayerNorm, cause they are the same. 
        # Yet, because it's my first transformer,
        # I will keep both of them for clarity in my mind


    def forward(self, x):
        # x = [batch size, x_len, hidden dim]
        identity = x
        x = self.bloc_self_attention(x)
        x = self.bloc_normalization_1(x+identity)

        identity = x
        x = self.bloc_FFN(x)
        x = self.bloc_normalization_2(x+identity)

        return x

class SinusoidalPositionalEncoding(nn.Module):
    def __init__(self, hidden_dim, max_len, const = 10000):
        super(SinusoidalPositionalEncoding, self).__init__()

        self.pe = torch.zeros(max_len, hidden_dim)

        for i in range(max_len):
            for j in range(hidden_dim):
                if j%2==0:
                    self.pe[i, j] = math.sin(i/(const**(j/hidden_dim)))
                else:
                    self.pe[i, j] = math.cos(i/(const**((j-1)/hidden_dim)))

    def forward(self, x):
        # x = [batch size, len_x, dim_vect_de_x]
        dim_batch = x.size(0) # dimension du batch
        for k in range(dim_batch):
            x[k] += self.pe[:x[k].size(0),:] # on somme le tenseur PE sur chaque élément du batch
        return x

class LearnedPositionalEncoding(nn.Module):
    def __init__(self, hidden_dim, max_len):
        super(LearnedPositionalEncoding, self).__init__()

        self.pe = nn.Parameter(torch.randn(max_len, hidden_dim))


    def forward(self, x):
        dim_batch = x.size(0) # dimension du batch
        for k in range(dim_batch):
            x[k] += self.pe[:x[k].size(0),:] # on somme le tenseur PE sur chaque élément du batch
        return x

class TransformerEncoder(nn.Module):
    def __init__(self, data_dim,  hidden_dim, n_heads, n_layers, dropout_rate=0.1, positional_encoding="sinusoidal", max_len=1000):
        super(TransformerEncoder, self).__init__()
        if positional_encoding == "sinusoidal":
            self.layer_positional_encoding = SinusoidalPositionalEncoding(data_dim, max_len)
        else:
            self.layer_positional_encoding = LearnedPositionalEncoding(data_dim, max_len)
        
        self.bloc_transformer_list = nn.ModuleList([TransformerEncoderBlock(data_dim, hidden_dim, n_heads, dropout_rate) for i in range(n_layers)])

    def forward(self, x):
        x = self.layer_positional_encoding(x)

        for transformer_block in self.bloc_transformer_list:
            x = transformer_block(x)

        return x
    


### Modele global

In [3]:
# Si ca marche un jour, on peut rajouter dans l'état mémoire (actuellement que les 9 joueurs) une mémoire du board
with open("./data_RNN+transformer", "rb") as temp:
    data = pickle.load(temp)

class MLP(nn.Module):
    def __init__(self, layers_sizes):
        super(MLP, self).__init__()
        self.layers = nn.ModuleList([nn.Linear(in_features=layers_sizes[i], out_features=layers_sizes[i+1]) for i in range(len(layers_sizes)-1)])

    def forward(self, x):
        for layer in self.layers:
            x = nn.LeakyReLU()(layer(x))
        return x


class PlayerModel(nn.Module):
    def __init__(self, dim_stat_adversaire, dim_representation_joueur, dim_representation_carte=2):
        super(PlayerModel, self).__init__()

        self.representation_initiale_joueur = nn.Parameter(torch.randn(dim_representation_joueur))
        self.representation_joueur_couche = nn.Parameter(torch.randn(dim_representation_joueur))

        ## Réseau de changement de la représentation d'un aversaire en fonction de son action des autres adversaires :
        self.repr_adversaire = MLP(layers_sizes = [2 + dim_stat_adversaire + 9*dim_representation_joueur, 32, dim_representation_joueur])

        ## Réseaux de changement de la représentation des joueurs après l'arrivée de nouvelles cartes sur le Board
        self.new_board = MLP(layers_sizes = [5*dim_representation_carte + 2*dim_representation_carte + dim_representation_joueur, 64, 16, dim_representation_joueur])
        self.remplace_carte = torch.tensor([-1]*dim_representation_carte)
        # -> 5 cartes : board, 2 cartes : cartes de 'IlxxxlI' si il s'agit d'un adversaire, rien sinon

        self.transformer_bloc = TransformerEncoder(dim_representation_joueur, 16, 6, 3, positional_encoding='learned', max_len=9)

        self.finalMLP = MLP(layers_sizes=[dim_representation_joueur+ 5*dim_representation_carte + 2*dim_representation_carte, 32, 32, 3])


    def forward(self, batch_x):
        ly = []
        for ind in batch_x:
            game_id, sequence, stats_joueurs, ind_hero, joueurs_presents, cartes_hero = data[ind][0]
            # format de la sequence ? 
            # liste d'actions : ('indice joueur', 'action', 'sizing') ou ('board')
            cartes_hero = torch.tensor(cartes_hero)
            board = torch.stack([self.remplace_carte] * 5)
            joueurs = torch.stack([self.representation_initiale_joueur.clone() if joueurs_presents[ind] else self.representation_joueur_couche for ind in range(9)])

            for token in sequence:
                if len(token)==3:
                    ind, action, sizing = token
                    if action == -1: # fold
                        joueurs[ind, :] = self.representation_joueur_couche
                        joueurs_presents[ind] = False
                    else:
                        input = torch.cat((torch.flatten(joueurs[ind:, :]), torch.flatten(joueurs[:ind, :]),torch.tensor(stats_joueurs[ind]).unsqueeze(dim=0), torch.tensor(action).unsqueeze(dim=0), torch.tensor(sizing).unsqueeze(dim=0)))
                        joueurs[ind, :] = self.repr_adversaire(input.unsqueeze(0))
                else:
                    if token[6]<-0.5:
                        board = torch.cat((torch.tensor(token[:6]), torch.tensor(self.remplace_carte), torch.tensor(self.remplace_carte)))
                    elif token[8]<-0.5:
                        board = torch.cat((torch.tensor(token[:8]), self.remplace_carte))
                    else:
                        board = torch.tensor(token)

                    for ind in range(9):
                        if ind!=ind_hero:
                            cartes = cartes_hero
                        else:
                            cartes = torch.cat((self.remplace_carte, self.remplace_carte)).flatten()
                        
                        if joueurs_presents[ind]:
                            input = torch.cat((board, cartes, joueurs[ind,:].squeeze()))
                            joueurs[ind,:] = self.new_board(input.unsqueeze(0))

            y = self.transformer_bloc(joueurs.unsqueeze(0))[0,ind_hero,:]

            ly.append(nn.Softmax()(self.finalMLP(torch.cat((y, board.flatten(), cartes_hero.flatten())).unsqueeze(0))))
        return torch.stack(ly)



## Entrainement

### Fonctions annexes

Datasets : 

In [4]:
with open("./data_RNN+transformer", "rb") as temp:
    data = pickle.load(temp)

print(data[53])
# il y a 55924 éléments
class PlayerDataset(torch.utils.data.Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        output_data = self.data[idx][1]
        return torch.tensor(idx), torch.tensor(output_data)


training_data = PlayerDataset(data[:50000])
test_data = PlayerDataset(data[50000:55924])

((808937195, [(4, 0, 0.5), (6, 0, 1.0), (7, -1, 0), (8, -1, 0), (0, 3, 2.3), (4, -1, 0), (6, 1, 1.3), [4.0, 0.0, 2.0, 0.0, 2.0, 3.0, -1.0, -1.0, -1.0, -1.0], (6, 0, 0), (0, 0, 0), [4.0, 0.0, 2.0, 0.0, 2.0, 3.0, 5.0, 2.0, -1.0, -1.0], (6, 0, 0)], [28.73, 0, 0, 0, 40.0, 0, 44.14, 94.65, 50.66], 0, [True, False, False, False, True, False, True, True, True], [14.0, 0.0, 8.0, 1.0]), [0.0, 0.0, 1.0])


Fonction d'evaluation :

In [5]:
def success_rate_2(model,test_data):
    loader = torch.utils.data.DataLoader(test_data, batch_size=len(test_data))
    with torch.no_grad():
        bonne_identification = 0

        for x_batch, y_batch in loader:
            
            y_pred = model(x_batch).squeeze()

            for ind in range(len(y_pred)):
                
                if y_batch[ind][np.argmax(y_pred[ind])] >0.5:
                    bonne_identification += 1
            return(bonne_identification/len(test_data))

Fonction d'entrainement

In [6]:

def trainer(train_data, test_data, model, loss_fn,epoch=10,batch_size=1,rate=1e-4):

    optimizer = torch.optim.Adam(model.parameters(), lr=rate)

    loader = torch.utils.data.DataLoader(train_data, batch_size=batch_size)

    for t in trange(epoch, desc='epochs'):
        for x_batch, y_batch in tqdm(loader):

            
            optimizer.zero_grad()
            y_pred = model(x_batch).squeeze(dim=1)
            if isinstance(loss_fn, torch.nn.MSELoss):
                y_batch = y_batch.float()
            loss = loss_fn(y_pred, y_batch)
        
            loss.backward()
            optimizer.step()

        print(success_rate_2(model, test_data))



### Entrainement du modele

In [9]:
# modele de base : 196K parametres
# modele suivant : 310K parametres
modelisation_joueur = PlayerModel(1, 5, 2)

In [10]:
print(success_rate_2(modelisation_joueur, test_data))

  return self._call_impl(*args, **kwargs)
  board = torch.cat((torch.tensor(token[:6]), torch.tensor(self.remplace_carte), torch.tensor(self.remplace_carte)))


0.1850101282916948


In [11]:
trainer(training_data, test_data, modelisation_joueur, loss_fn = nn.CrossEntropyLoss(), epoch=10, batch_size=64)

epochs:   0%|          | 0/10 [00:00<?, ?it/s]

  board = torch.cat((torch.tensor(token[:6]), torch.tensor(self.remplace_carte), torch.tensor(self.remplace_carte)))
100%|██████████| 782/782 [21:58<00:00,  1.69s/it]
epochs:  10%|█         | 1/10 [22:43<3:24:35, 1363.90s/it]

0.5268399729912221


100%|██████████| 782/782 [21:49<00:00,  1.67s/it]
epochs:  20%|██        | 2/10 [45:15<3:00:55, 1356.91s/it]

0.525320729237002


100%|██████████| 782/782 [21:23<00:00,  1.64s/it]
epochs:  30%|███       | 3/10 [1:07:20<2:36:36, 1342.29s/it]

0.525151924375422


100%|██████████| 782/782 [21:26<00:00,  1.65s/it]
epochs:  40%|████      | 4/10 [1:29:29<2:13:41, 1336.93s/it]

0.525151924375422


100%|██████████| 782/782 [26:03<00:00,  2.00s/it]
epochs:  50%|█████     | 5/10 [1:56:22<1:59:42, 1436.49s/it]

0.524983119513842


 11%|█         | 85/782 [12:32:55<102:54:02, 531.48s/it]
epochs:  50%|█████     | 5/10 [14:29:18<14:29:18, 10431.69s/it]


KeyboardInterrupt: 