In [None]:
import numpy as np 
import matplotlib.pyplot as plt
import matplotlib as mpl
from mpl_toolkits.axes_grid1.inset_locator import inset_axes
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split
from sklearn.naive_bayes import GaussianNB
import torch
from torch.nn import functional as F
from torch import nn, Tensor

import sys

sys.path.append('../modules/')

from training import train_model

%load_ext autoreload
%autoreload 2

In [None]:
# Naive Bayes benchmark
def get_NB_accuracy(x0s,xis_int,train_frac):
    xi_train, xi_test, x0_train, x0_test = train_test_split(xis_int.T, x0s, test_size=1-train_frac, random_state=0)
    gnb = GaussianNB()
    gnb.fit(xi_train, x0_train)
    y_pred = gnb.predict(xi_test)
    return np.sum((x0_test == y_pred))/len(x0_test) 

In [None]:
# Data that will be used
q = 4
l = 4
sigma = 1.0
epsilon = 0.0
seed = 31

N_learn = 500

[q,l,sigma,epsilon,x0s,xis,M_s] = np.load('../data/labeled_data_{}_{}_{}_{:.5f}.npy'.format(q,l,sigma,epsilon),allow_pickle=True)
x0 = x0s[:,seed]
xi = xis[:,:,seed]
train_frac = N_learn/len(x0)
phi_NB = get_NB_accuracy(x0,xi,train_frac)
print('Naives Bayes accuracy: {:.2f}, trained on {} and tested on {} trees.'.format(phi_NB,int(train_frac*len(x0)),int((1-train_frac)*len(x0))))

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

device

In [None]:
# convert data to be used in pytorch
sequences = torch.from_numpy(xi[:,:N_learn].T).to(dtype=int).to(device=device)
labels = torch.from_numpy(x0[:N_learn]).to(dtype=int).to(device=device)
sequences_test = torch.from_numpy(xi[:,N_learn:].T).to(dtype=int).to(device=device)
labels_test = torch.from_numpy(x0[N_learn:]).to(dtype=int).to(device=device)

In [None]:
num_embeddings = q # number of possible values for each position
embedding_size = 128 # some number to be played with
embedding = nn.Embedding(num_embeddings, embedding_size).to(device=device)
embedded_sequences = embedding(sequences)

embedded_sequences.shape

In [None]:
class PositionalEncoding(nn.Module):

    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-np.log(10000.0) / d_model))
        pe = torch.zeros(1, max_len, d_model)
        pe[0, :, 0::2] = torch.sin(position * div_term)
        pe[0, :, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x: Tensor) -> Tensor:
        """
        Arguments:
            x: Tensor, shape ``[seq_len, batch_size, embedding_dim]``
        """
        x = x + self.pe[:, :x.size(1), :]
        return self.dropout(x)


In [None]:
num_heads = 4

encoder_layer = nn.TransformerEncoderLayer(d_model=embedding_size, nhead=num_heads, dim_feedforward=2048, batch_first=True)
transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=4).to(device=device)

positional_encoder = PositionalEncoding(d_model=embedding_size).to(device=device)
out = transformer_encoder(positional_encoder(embedded_sequences))

out

In [None]:
class TransformerClassifier(nn.Module):
    def __init__(self, transformer_encoder, positional_encoder, final_layer, seq_len):
        super().__init__()

        self.seq_len = seq_len
        
        self.transformer_encoder = transformer_encoder
        self.final_layer = final_layer
        self.pos_encoder = positional_encoder

        # self.avgpooling = nn.AvgPool1d(
        #     kernel_size=self.seq_len,
        # )
        self.flatten = nn.Flatten(start_dim=-2, end_dim=-1)

    def forward(self, x):
        x = self.pos_encoder(x)
        x = self.transformer_encoder(x)
        # x = torch.mean(x.detach(), dim=1)
        # x = self.avgpooling(torch.permute(x.detach(), dims=(0, 2, 1))).squeeze()
        x = self.flatten(x)
        x = self.final_layer(x)

        return x

# Number of epochs
num_epochs = 500
losses = np.empty(num_epochs)
acurracies = np.empty(num_epochs)

# Define the final layer
# final_layer = nn.Linear(embedding_size,q).to(device=device)  # For mean.
final_layer = nn.Linear(embedded_sequences.shape[-2] * embedded_sequences.shape[-1],q).to(device=device)

# Add positional encoding to the model

# Create the model
model = TransformerClassifier(transformer_encoder, positional_encoder, final_layer, embedded_sequences.shape[1]).to(device=device)

# Define the loss function and the optimizer
loss_fn = nn.CrossEntropyLoss()
# optimizer = torch.optim.Adam(model.parameters(),lr=1e-3)

train_model(
    model=model,
    training_data=(embedded_sequences, labels),
    test_data=(embedding(sequences_test), labels_test),
    n_epochs=500,
    loss_fn=loss_fn,
    learning_rate=1e-3,
    batch_size=32,
    early_stopper=None
)

In [None]:
plt.plot(losses)
#plt.xscale('log')
plt.ylabel('X-entropy loss')
plt.xlabel('epochs')
plt.show()

In [None]:
plt.plot(acurracies)
plt.plot(np.arange(num_epochs),phi_NB*np.ones(num_epochs),color='k',ls='--')
#plt.xlim((1,num_epochs))
#plt.xscale('log')
plt.ylabel('train accuracy')
plt.xlabel('epochs')
plt.show()

In [None]:
# Compute the accuracy on the rest of dataset
embedded_sequences_test = embedding(sequences_test)
out_test = model(embedded_sequences_test)

out_test

In [None]:
predicted_probs_test = nn.Softmax(dim=-1)(out_test)

predicted_probs_test

In [None]:
predicted_labels = torch.argmax(predicted_probs_test, dim=-1)

predicted_labels

In [None]:
accuracy_test = (predicted_labels == labels_test).sum().item()/len(labels_test)

print('Accuracy on the rest of the dataset: {:.2f}'.format(accuracy_test))

In [None]:
len(list(model.parameters()))

In [None]:
for name, param in model.named_parameters():
    if param.requires_grad:
        print(name, param.data, param.data.shape, param.requires_grad)