In [None]:
import torch, copy, os, datetime
import pandas as pd
import torch.nn as nn
import numpy as np
from tqdm import tqdm
from torch.utils.data import DataLoader

VOCAB_SIZE = 100

PYTORCH_DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

if not os.path.exists("./dataset"): os.mkdir("./dataset")

In [None]:
# Crea un tokenizador desde cero
from tokenizers import Tokenizer, models, trainers, pre_tokenizers

tokenizer = Tokenizer(models.WordPiece(unk_token="[UNK]"))
tokenizer.pre_tokenizer = pre_tokenizers.Whitespace()

trainer = trainers.WordPieceTrainer(vocab_size=VOCAB_SIZE, special_tokens=["[UNK]", "[CLS]", "[SEP]", "[PAD]", "[MASK]"])
tokenizer.train(["./dataset/emotions.csv"], trainer)

tokenizer.save("custom_tokenizer.json")

In [None]:
# Carga tokenizador
from transformers import PreTrainedTokenizerFast
tokenizer = PreTrainedTokenizerFast(tokenizer_file="custom_tokenizer.json")
tokenizer.pad_token = "[PAD]"

In [None]:
class DataGen:
    def __init__(self):
        self.t, self.l = [], []
    
    def append_items(self,t,l):
        self.t.append(t)
        self.l.append(l)

    def to_numpy(self):
        self.t = np.array(self.t)
        self.l = np.array(self.l)

    def __len__(self):
        return len(self.l)

    def __getitem__(self, idx):
        return self.t[idx],self.l[idx]

In [None]:
# Carga dataset
def parse_data():
    file = pd.read_csv("./dataset/emotions.csv")
    data = DataGen()
    for _, row in tqdm(file.iterrows(),total=1,desc="Cargando dataset"):
        text_tokenized = tokenizer(row["text"],padding="max_length",truncation=True,max_length=VOCAB_SIZE,)
        data.append_items(text_tokenized["input_ids"],row["label"])
    data.to_numpy()
    return data

train_gen = parse_data()

In [None]:
print(train_gen.__getitem__(0))
print(train_gen.__len__())

In [None]:
# Modelo
class Emotions(torch.nn.Module):
    def __init__(self) -> None:
        super().__init__()

    def save(self, project_name: str, path: str = './models', weights=None):
        model_id = str(project_name)
        weights_to_save = weights if weights is not None else self.state_dict()
        save_dir = os.path.join(path, model_id)
        if not os.path.exists(save_dir): os.mkdir(save_dir)
        weights_dir = os.path.join(save_dir, 'weights.pt')
        model_dir = os.path.join(save_dir, 'model.pt')
        torch.save(weights_to_save, weights_dir)
        model_scripted = torch.jit.script(self)
        model_scripted.save(model_dir)

    @staticmethod
    def load(model_id: str):
        weights_path = os.path.join('./models', model_id, 'weights.pt')
        weights = torch.load(weights_path)
        model = Emotions()
        model.load_state_dict(weights)
        return model
    
class EmotionsNet(Emotions):
    def __init__(self,num_classes=6,vocab_size=100):
        super().__init__()

        self.embedding = nn.Embedding(vocab_size,512,padding_idx=0)

        self.lstm = nn.Sequential(
            nn.LSTM(512,512,1, batch_first=True),
        )

        self.b = nn.Sequential(
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(0.4),
            nn.Linear(256, num_classes),
        )

    def forward(self, text):
        out_embed = self.embedding(text)
        out_lstm, _ = self.lstm(out_embed)
        out_b = self.b(out_lstm[:,-1,:])
        return out_b
    
    @staticmethod
    def load(model_id: str):
        return Emotions.load(model_id)

In [None]:
# Cantidad de parametros
model = EmotionsNet().to(PYTORCH_DEVICE)
sum(p.numel() for p in model.parameters())

In [None]:
def train_emotions_net(checkpoint_model_id=None, batch_size=64, vocab_size=100,
                    epochs=100, learning_rate=0.0001, project_name=str(datetime.time()), cant_emotions=6):
    
    data_loader = DataLoader(train_gen,batch_size=batch_size,shuffle=True)

    model = None
    if checkpoint_model_id:
        model = EmotionsNet(num_classes=cant_emotions,vocab_size=vocab_size).load(model_id=checkpoint_model_id).type(torch.FloatTensor).to(PYTORCH_DEVICE)
    else:
        model = EmotionsNet(num_classes=cant_emotions,vocab_size=vocab_size).to(PYTORCH_DEVICE)

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
    best_state = copy.deepcopy(model.state_dict())
    best_loss = float("inf")
    patience = 30
    patience_count = 0

    try:
        for epoch in range(1,epochs):
            loading_bar = tqdm(total=len(data_loader))
            total_accu, total_count = 0, 0
            running_loss = 0

            for _, data in enumerate(data_loader):
                text, label = data
                text: torch.Tensor = text.type(torch.LongTensor).to(PYTORCH_DEVICE)
                expected_label: torch.Tensor = label.type(torch.LongTensor).to(PYTORCH_DEVICE)

                optimizer.zero_grad()
                op = model(text)
                output_label:torch.Tensor = op

                loss = criterion(output_label, expected_label)
                loss.backward()
                optimizer.step()

                total_accu += (output_label.argmax(1) == expected_label).sum().item()
                total_count += expected_label.size(0)
                running_loss += loss.item() * text.size(0)
                loading_bar.set_description_str(f'Training {project_name} | lr: {learning_rate} | Epoch {epoch}/{epochs-1} | Loss {(running_loss / len(data_loader.dataset)):.8f} |  Acc {((total_accu / total_count) * 100):.4f}')
                loading_bar.update()
            loading_bar.set_description_str(f'Training {project_name} | lr: {learning_rate} | Epoch {epoch}/{epochs-1} | Loss {(running_loss / len(data_loader.dataset)):.8f} |  Acc {((total_accu / total_count) * 100):.4f}')
            loading_bar.close()
            epoch_loss = running_loss / len(data_loader.dataset)
            
            if epoch_loss < best_loss:
                best_loss = epoch_loss
                best_state = copy.deepcopy(model.state_dict())
                patience_count = 0
            else:
                patience_count += 1

            if patience_count == patience:
                break
            
            if epoch%10 == 0: model.save(f"{project_name}{epoch}")

    except Exception as e:
        print("ERROR: ",e)
        model.save(project_name+"Crashed", weights=best_state)
        return

    model.save(project_name, weights=best_state)

In [None]:
#test_data_loader = DataLoader(test_gen_CNN,batch_size=batch_size,shuffle=True)

"""model.eval()  # Poner el modelo en modo de evaluación
            test_correct = 0
            test_total = 0
            test_running_loss = 0.0
            with torch.no_grad():
                for idx, data in enumerate(osu_test_loader):
                    images, expected = data
                    images: torch.Tensor = images.type(torch.FloatTensor).to(PYTORCH_DEVICE)
                    expectedBTN: torch.Tensor = expected[1].type(torch.LongTensor).to(PYTORCH_DEVICE)

                    outputs = model(images)
                    output_btn = outputs[1]

                    test_total += expectedBTN.size(0)
                    test_correct += (output_btn.argmax(1) == expectedBTN).sum().item()

                    test_loss = criterion(output_btn, expectedBTN)
                    test_running_loss += test_loss.item() * images.size(0)
            test_epoch_loss = test_running_loss / len(osu_test_loader.dataset)
            model.train()
            print(f"Test Loss: {test_epoch_loss} || Test Acc: {(test_correct / test_total) * 100}")"""


In [None]:
# Entrenar
NOMBRE = "emotions"
LR = 0.0001
BS = 512
EPOCAS = 56
NUM_EMOTIONS = 6

if os.path.exists(f"./models/{NOMBRE}"):train_emotions_net(project_name=NOMBRE+"NEW",checkpoint_model_id=NOMBRE,
                                                        epochs=EPOCAS,learning_rate=LR, batch_size=BS, vocab_size=VOCAB_SIZE, cant_emotions=NUM_EMOTIONS)
else:train_emotions_net(project_name=NOMBRE,checkpoint_model_id=None,epochs=EPOCAS,learning_rate=LR, batch_size=BS, vocab_size=VOCAB_SIZE, cant_emotions=NUM_EMOTIONS)

In [None]:
def get_model(model_id):
    model = torch.jit.load(os.path.join("./models", model_id, 'model.pt'))
    model.load_state_dict(torch.load(os.path.join("./models", model_id, 'weights.pt')))
    model.to(PYTORCH_DEVICE)
    model.eval()
    return model

modelo = get_model("emotions_test")

In [None]:
test_texto = "I feel really good"
text_tokenized = tokenizer(test_texto,padding="max_length",truncation=True,max_length=VOCAB_SIZE,)

text_tokenized = torch.tensor(text_tokenized["input_ids"]).type(torch.LongTensor).to(PYTORCH_DEVICE).unsqueeze(0)
answ = modelo(text_tokenized)
# 0-sadness  1-joy  2-love  3-anger  4-fear  5-surprise
answ = list(answ[0])
print("Emocion: ", answ.index(max(answ)))