# 1. Download dipendenze

In [None]:
!pip install torchtext==0.17.0
!pip install spacy

# installing spacy italian language
!python -m spacy download it_core_news_sm

# 2. Import

In [None]:
import torch
import torch.nn as nn
import pandas as pd
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
import torchtext.transforms as T
from torch.utils.data import Dataset
from torch import Tensor
import matplotlib.pyplot as plt
from pathlib import Path
import numpy as np



# 3. Custom Dataset

In [None]:
class CustomDataset(Dataset):

    def __init__(self, file_path: str):
        self.file_path = file_path
        self.data = pd.read_csv(file_path)
        self.inputs = self.data["Input"].to_list()
        self.outputs = self.data.apply(lambda row: f'asset: {row["Asset"]}, start: {row["Start"]}, end: {row["End"]}', axis=1).tolist()

        self.tokenizer = get_tokenizer(tokenizer="spacy", language="it_core_news_sm")
        self.vocab_specials = ["<unk>", "<pad>", "<sos>", "<eos>"]
        self.source_vocab = self.__build_vocab__(self.inputs)
        self.target_vocab = self.__build_vocab__(self.outputs)

        self.source_vocab.set_default_index(self.source_vocab["<unk>"])
        self.target_vocab.set_default_index(self.target_vocab["<unk>"])


    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        # 1. Recupero input e relativo output atteso
        source = self.inputs[index]
        target = self.outputs[index]

        # 2. Trasformazione del testo di input
        transformed_input = self.__transform__(source, self.source_vocab)

        # 3. Trasformazione dell'output
        transformed_output = self.__transform__(target, self.target_vocab)

        return {
            "input_ids": transformed_input,
            "labels": transformed_output
        }

    def __yield_tokens__(self, sentences):
        for text in sentences:
            yield self.tokenizer(text)

    def __build_vocab__(self, sentences):
        return build_vocab_from_iterator(
            iterator=self.__yield_tokens__(sentences),
            specials=self.vocab_specials,
            special_first=True
        )

    def __transform__(self, sentence, vocab) -> Tensor:
        tokens = self.tokenizer(sentence)
        transform_pipeline = T.Sequential(
            # Converte le parole nei rispettivi indici del vocabolario
            T.VocabTransform(vocab),

            # Aggiunge <sos> all'inizio della frase.
            # 2 è l'indice che ha il token nel vocabolario (vedi vocab_specials)
            T.AddToken(token=2, begin=True),

            T.Truncate(max_seq_len=256),

            # Aggiunge <eos> alla fine della frase.
            # 3 è l'indice che ha il token nel vocabolario (vedi vocab_specials)
            T.AddToken(3, begin=False),

            # Trasforma in un tensore
            T.ToTensor(padding_value=1),

            T.PadTransform(max_length=256, pad_value=1))
        return transform_pipeline(tokens)

    def info(self):
        print("---------------------------------")
        print("Dataset Info")
        print("---------------------------------")
        print(f"Data location:\t\"{self.file_path}\"")
        print(f"Data length:\t{len(self.data)} records")
        print(f"Source vocab length:\t{len(self.source_vocab)} words")
        print(f"Target vocab length:\t{len(self.target_vocab)} words")
        print("---------------------------------")
        print("END Info")
        print("---------------------------------")

if __name__ == "__main__":
    cd = CustomDataset(file_path="datasets/train/train.csv")
    cd.info()

    for i, cd in enumerate(cd):
        if i > 5: break
        print(f"Campione {i}: {cd}")


# 4. Model

In [None]:
class Encoder(nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size, num_layers, p):
        super(Encoder, self).__init__()

        self.dropout = nn.Dropout(p)
        self.embedding = nn.Embedding(input_size, embedding_size)
        self.rnn = nn.LSTM(
            embedding_size, hidden_size, num_layers, dropout=p, batch_first=True
        )

    def forward(self, x):
        embedded = self.dropout(self.embedding(x))
        outputs, (hidden, cell) = self.rnn(embedded)
        return hidden, cell


In [None]:
class Decoder(nn.Module):
    def __init__(self, output_size, embedding_size, hidden_size, num_layers, p):
        super(Decoder, self).__init__()

        self.dropout = nn.Dropout(p)
        self.embedding = nn.Embedding(output_size, embedding_size)
        self.rnn = nn.LSTM(embedding_size, hidden_size, num_layers, dropout=p)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x, hidden, cell):
        x = x.unsqueeze(0)
        embedded = self.dropout(self.embedding(x))
        outputs, (hidden, cell) = self.rnn(embedded, (hidden, cell))
        predictions = self.fc(outputs)

        predictions = predictions.squeeze(0)
        return predictions, hidden, cell


In [None]:
class SpaceAIModel(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(SpaceAIModel, self).__init__()

        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, source, target, teacher_forcing_ratio=0.5):
        target_len = target.shape[1]
        batch_size = target.shape[0]
        target_vocab_size = self.decoder.fc.out_features

        outputs = torch.zeros(target_len, batch_size, target_vocab_size).to(self.device)
        hidden, cell = self.encoder(source)
        input = target[:, 0]

        for t in range(1, target_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[t] = output
            best_guess = output.argmax(1)
            input = (
                target[:, t]
                if torch.rand(1).item() < teacher_forcing_ratio
                else best_guess
            )

        return outputs


# 5. Metrics

In [None]:
class Metrics:
    def __init__(self, real: np.array, prediction: np.array):
        self.real = real
        self.prediction = prediction

    def token_level_accuracy(self):
        correct = 0
        total = 0
        for pred, real in zip(self.prediction, self.real):
            correct += sum(
                pred_token == real_token for pred_token, real_token in zip(pred, real)
            )
            total += len(real)

        return correct / total


# 6. Early Stopping

In [None]:
class EarlyStopping:
  def __init__(self, patience=2, delta=0, out_path="out"):
    self.patience = patience
    self.delta = delta
    self.best_model_out_path = out_path
    self.counter = 0
    self.best_score = None
    self.early_stop = False

  def __call__(self, val_loss, model):
    score = val_loss

    if self.best_score is None:
      self.best_score = val_loss
      self.save_checkpoint(model)
    elif score >= self.best_score + self.delta:
      # Nessun miglioramento rilevato
      print("EarlyStopping: validation loss NOT improved...")
      self.counter += 1
      if self.counter >= self.patience:
        self.early_stop = True
    else:
      # Miglioramento rilevato, salvo il modello e azzero il contatore
      print("EarlyStopping: validation loss improved...")
      self.best_score = score
      self.save_checkpoint(model)
      self.counter = 0

  def save_checkpoint(self, model):
    torch.save(model.state_dict(), Path(self.best_model_out_path)/"best_model_sd.pth")
    torch.save(model, Path(self.best_model_out_path)/"best_model.pth")


# 7. Runner

In [None]:
class SpaceAIRunner:
    def __init__(
        self,
        batch_size,
        epochs,
        source_vocab,
        target_vocab,
        embedding_size,
        hidden_size,
        num_layers,
        dropout,
        lr,
    ):
        self.batch_size = batch_size
        self.epochs = epochs

        self.source_vocab = source_vocab
        self.target_vocab = target_vocab

        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        encoder = Encoder(
            len(source_vocab), embedding_size, hidden_size, num_layers, dropout
        ).to(self.device)
        decoder = Decoder(
            len(target_vocab), embedding_size, hidden_size, num_layers, dropout
        ).to(self.device)

        self.net = SpaceAIModel(encoder, decoder, self.device).to(self.device)

        self.loss = torch.nn.CrossEntropyLoss()
        self.optimizer = torch.optim.Adam(self.net.parameters(), lr=lr)

    def train(
        self,
        train_loader: torch.utils.data.DataLoader,
        validation_loader: torch.utils.data.DataLoader,
        out_root: str,
    ):
        out_root = Path(out_root)
        if not out_root.exists():
            out_root.mkdir()

        early_stopping = EarlyStopping(patience=2, delta=0, out_path=out_root)

        step_counter = 0
        step_monitor = 5
        ep_monitor = 2

        tr_losses_x, tr_losses_y = [], []
        tr_run_losses_x, tr_run_losses_y = [], []
        va_losses_x, va_losses_y = [], []

        for epoch in range(self.epochs):

            if early_stopping.early_stop:
                print("Early stopping triggered. Stopping training.")
                break

            running_loss = 0.0

            print(f"Epoch {epoch + 1} / {self.epochs}")
            for i, data in enumerate(train_loader):
                self.net.train()

                source, target = data["input_ids"].to(self.device), data["labels"].to(
                    self.device
                )
                outputs = self.net(source, target)

                outputs = outputs[1:].reshape(-1, outputs.shape[2])
                target = target.T[1:].reshape(-1)

                loss = self.loss(outputs, target)

                self.optimizer.zero_grad()

                loss.backward()

                self.optimizer.step()

                running_loss += loss.item()

                if (i + 1) % step_monitor == 0:
                    tr_run_losses_y.append(running_loss / step_monitor)
                    tr_run_losses_x.append(step_counter + 1)
                    print(
                        f"GlobalStep: {(step_counter + 1):5d} - [Epoca: {epoch + 1:3d}, Step: {i + 1:5d}] loss: {loss.item():.6f} - running_loss: {(running_loss / step_monitor):.6f}"
                    )
                    running_loss = 0.0

                tr_losses_x.append(step_counter + 1)
                tr_losses_y.append(loss.item())
                step_counter += 1

            if (epoch + 1) % ep_monitor == 0:
                print(f"\n *** *** VALIDATION *** ***")

                current_va_loss = 0

                for i, data in enumerate(validation_loader):
                  self.net.eval()
                  source, target = data["input_ids"].to(self.device), data["labels"].to(self.device)

                  with torch.no_grad():
                    outputs = self.net(source, target)

                    outputs = outputs[1:].reshape(-1, outputs.shape[2])
                    target = target.T[1:].reshape(-1)

                    loss += self.loss(outputs, target)
                    current_va_loss += loss.item()

                current_va_loss /= len(validation_loader)
                va_losses_x.append(step_counter)
                va_losses_y.append(current_va_loss)

                early_stopping(current_va_loss, self.net)

                t_accuracy = self.test(validation_loader, use_current_net=True)
                print(
                    f"GlobalStep: {step_counter:5d} - [Epoca: {epoch + 1:3d}], token_level_accuracy: {t_accuracy:.2f}"
                )
                print(f"*** *** FINISHED VALIDATION *** ***\n")

        print("Finished training!")

        torch.save(self.net.state_dict(), out_root / "last_model_sd.pth")
        torch.save(self.net, out_root / "last_model.pth")
        print("Model saved!")

        _, (ax1, ax2) = plt.subplots(1, 2)

        ax1.plot(tr_losses_x, tr_losses_y, label="Training Loss")
        ax1.plot(tr_run_losses_x, tr_run_losses_y, label="Training Running Loss")
        ax1.set_title("Training Loss")

        ax2.plot(va_losses_x, va_losses_y, label="Validation Loss")
        ax2.set_title("Validation Loss")

        ax1.legend()
        ax2.legend()
        plt.tight_layout()
        plt.show()

    def test(self, test_loader: torch.utils.data.DataLoader, use_current_net=False, preview=False):
        net = self.net
        if use_current_net == False:
            try:
                net.load_state_dict(torch.load("out/best_model_sd.pth"))
            except:
                print("Model not found.")
                return

        net.eval()

        total_target = []
        total_prediction = []

        with torch.no_grad():
            for i, data in enumerate(test_loader):
                source, target = data["input_ids"].to(self.device), data["labels"].to(
                    self.device
                )
                outputs = net(source, target, teacher_forcing_ratio=0)
                output_tokens = outputs.argmax(2).T

                for i in range(len(source)):
                    input_sentence = " ".join(
                        self.source_vocab.lookup_tokens(source[i].tolist())
                    )
                    target_json = self.target_vocab.lookup_tokens(target[i].tolist())
                    predicted_json = self.target_vocab.lookup_tokens(
                        output_tokens[i].tolist()
                    )

                    total_target.append(target_json)
                    total_prediction.append(predicted_json)

                    if preview:
                        print(f"Input: {input_sentence}")
                        print(f"Target: {target_json}")
                        print(f"Prediction: {predicted_json}")

        metrics = Metrics(total_target, total_prediction)
        token_level_accuracy = metrics.token_level_accuracy()
        return token_level_accuracy


# 8. Main

In [None]:
BATCH_SIZE = 20
EPOCHS = 15
SHUFFLE = False
EMB_DIM = 150
HID_DIM = 300
N_LAYERS = 2
DROPOUT = 0.5
TRAIN = True
LR = 0.001


tr_dataset = CustomDataset(file_path="datasets/train/train.csv")
te_dataset = CustomDataset(file_path="datasets/test/test.csv")
va_dataset = CustomDataset(file_path="datasets/validation/validation.csv")

tr_loader = torch.utils.data.DataLoader(
    tr_dataset, batch_size=BATCH_SIZE, shuffle=SHUFFLE
)
te_loader = torch.utils.data.DataLoader(
    te_dataset, batch_size=BATCH_SIZE, shuffle=SHUFFLE
)
va_loader = torch.utils.data.DataLoader(
    va_dataset, batch_size=BATCH_SIZE, shuffle=SHUFFLE
)

runner = SpaceAIRunner(
    batch_size=BATCH_SIZE,
    epochs=EPOCHS,
    source_vocab=tr_dataset.source_vocab,
    target_vocab=tr_dataset.target_vocab,
    embedding_size=EMB_DIM,
    hidden_size=HID_DIM,
    num_layers=N_LAYERS,
    dropout=DROPOUT,
    lr=LR,
)

if TRAIN:
    runner.train(train_loader=tr_loader, validation_loader=va_loader, out_root="out")
else:
    print(f"\n *** *** TESTING *** ***")
    t_accuracy = runner.test(test_loader=te_loader, preview=True)
    print("Finished testing!")
    print(f"token_level_accuracy: {t_accuracy:.2f}")
