In [None]:
from google.colab import drive

drive.mount('/content/drive')

!pip install --upgrade huggingface_hub transformers


In [None]:
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm
import numpy as np
import nltk
from nltk.corpus import stopwords, wordnet
from nltk.stem import WordNetLemmatizer
from nltk import pos_tag
from torch import nn
import torch
from transformers import AutoConfig, AutoModel, AutoTokenizer, AdamW
from sklearn.metrics import f1_score
from torch.optim.lr_scheduler import LinearLR
import re

nltk.download('punkt')
nltk.download('punkt_tab')
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('averaged_perceptron_tagger_eng')

In [None]:
!pip install emoji
import emoji

Creiamo un dataframe pandas contenente i testi dei tweet e le relative etichette di classe(0 = tweet non ironico -- 1 = tweet ironico)

In [None]:
root = "/content/drive/MyDrive/NLP/Challenge_2024/irony/"
with open(root+"train_text.txt") as f_input:
    data = [line for line in f_input]

texts = pd.DataFrame(data, columns=['text'])

In [None]:
texts

In [None]:
labels = pd.read_csv(root+"train_labels.txt", header=None)
labels

In [None]:
data = pd.concat([texts, labels], axis=1)
data.columns = ['text', 'label']

In [None]:
data

Verifichiamo che la distribuzione delle classi sia equilibrata per evitare bias durante l'addestramento del modello

In [None]:
data['label'].value_counts().plot(kind='bar')
plt.title('Distribuzione delle classi')
plt.show()

Analizziamo i tweet alla ricerca di emoji presenti nei testi e la loro influenza sull'ironia del testo.
In particolare contiamo quali e quante emoji sono presenti in tweet ironici, creandoci così un pool di emoji ironiche.

In [None]:
data['emoji_count'] = data['text'].apply(lambda text: sum(char in emoji.EMOJI_DATA.keys() for char in text))

In [None]:
emojidf = (texts['text'].apply(lambda text: [token.chars for token in emoji.analyze(text, join_emoji=True)
                       if isinstance(token.value, emoji.EmojiMatch)]).explode().value_counts()
                      .rename_axis('Smiley').rename('Count').reset_index())
emoji_ironic = (data[data['label'] == 1]['text'].apply(lambda text: [token.chars for token in emoji.analyze(text, join_emoji=True)
                       if isinstance(token.value, emoji.EmojiMatch)]).explode().value_counts()
                      .rename_axis('Smiley').rename('Count').reset_index())

In [None]:
emojidf_comparison = emojidf.merge(emoji_ironic, on='Smiley', how='outer', suffixes=('total', 'ironic')).fillna(0)
emojidf_comparison['ironic_ratio'] = emojidf_comparison['Countironic'] / emojidf_comparison['Counttotal']
emojidf_comparison = emojidf_comparison.sort_values(by='ironic_ratio', ascending=False)

Infine creiamo un dizionario dando ad ogni emoji incontrata un peso tra -1 e 1, dove -1 è fortemente non ironico e 1 è fortemente ironico, prendendo ispirazione dal sentiment analyzer VADER.
Questo punteggio viene poi log-scalato per dare peso maggiore alle emoji più presenti.

In [None]:
emoji_weights = {}

for emoji_char in emojidf_comparison["Smiley"]:
  ironic_count = emojidf_comparison[emojidf_comparison['Smiley']==emoji_char]['Countironic'].values[0]
  not_ironic_count = emojidf_comparison[emojidf_comparison['Smiley']==emoji_char]['Counttotal'].values[0] - ironic_count
  total_count = emojidf_comparison[emojidf_comparison['Smiley']==emoji_char]['Counttotal'].values[0]
  emoji_weights[emoji_char] = ((ironic_count - not_ironic_count)/total_count) * np.log1p(total_count)

emoji_weights

La funzione sottostante calcola lo score di ogni tweet in base alle emoji presenti, questo produrrà un vettore sparso in quanto nell'intero dataset pochi tweet contengono emoji.

In [None]:
def calculate_emoji_score(tweet, emoji_weights):
    score = 0

    tokens = emoji.emoji_list(tweet)

    for token in tokens:
        emoji_char = token['emoji']
        if emoji_char in emoji_weights:
            score += emoji_weights[emoji_char]

    return score

data['emoji_score'] = data['text'].apply(lambda tweet: calculate_emoji_score(tweet, emoji_weights))

data

In [None]:
relevant_emojis = emojidf_comparison[emojidf_comparison['Counttotal']>3]

emoji_ironic_pool = relevant_emojis[relevant_emojis['ironic_ratio']>0.6]
print(emoji_ironic_pool)
emoji_ironic_pool = emoji_ironic_pool['Smiley'].tolist()
print(emoji_ironic_pool)
print("-----------")
emoji_non_ironic_pool = relevant_emojis[relevant_emojis['ironic_ratio']<0.4]
print(emoji_non_ironic_pool)
emoji_non_ironic_pool = emoji_non_ironic_pool['Smiley'].tolist()
print(emoji_non_ironic_pool)

Creiamo una funzione che normalizza i tweet, abbiamo in parte preso ispirazione dalla funzione normalizeTweet del tokenizzatore di Bertweet.
Questa normalizzazione trasforma i tag, gli URL,i numeri e le emoji in token.
I primi due sono già presenti come token, NUMBER e i token delle emoji verranno aggiunti come nuovi tokens.

In [None]:
def normalizeToken(token):
    lowercased_token = token.lower()
    if token.startswith("@user"):
      return "@USER"
    elif "http" in lowercased_token or "www" in lowercased_token:
      return "HTTPURL"
    else:
      return token

def normalizeTweet(tweet):
    tokens= " ".join([token.replace("’", "'").replace("|", " ").replace("#","# ") for token in tweet.split()])
    tokens = re.sub(r'\b\d{1,3}(?:[\.,:]\d{3})*(?:[\.,]\d+)?\b', 'NUMBER', tokens)
    tokens = " ,".join(tokens.split(","))
    tokens = " !".join(tokens.split("!"))
    tokens = re.sub(r'^(http|www)\S*', 'HTTPURL', tokens)
    tokens = " ".join([normalizeToken(token) for token in tokens.split()])
    tokens = re.sub(r'\.{2,}', ' …', tokens)
    tokens = " . ".join(tokens.split("."))
    tokens = tokens.replace("…", "...")
    tokens = " ?".join(tokens.split("?"))
    tokens = emoji.demojize(tokens)
    normTweet = ": :".join(tokens.split("::"))

    return " ".join(normTweet.split())

Analogamente a quanto fatto per le emoji, calcoliamo il peso per ogni parola presente nel dataset tranne quelle poco regolari e i verbi che possono essere utilizzate in qualsiasi tipo di contesto.

Anche qui abbiamo preso ispirazione da VADER.

In [None]:
from collections import defaultdict

stop_words = set(stopwords.words('english'))


def clean_text(text):
    words = normalizeTweet(text.lower()).split()
    cleaned_words = [word for word in words if word.isalpha() and word not in stop_words and not pos_tag([word])[0][1].startswith("V")]
    return cleaned_words

ironic_texts = data[data['label'] == 1]['text']
non_ironic_texts = data[data['label'] == 0]['text']


ironic_words = ironic_texts.apply(clean_text).explode()
non_ironic_words = non_ironic_texts.apply(clean_text).explode()


ironic_word_counts = defaultdict(int)
non_ironic_word_counts = defaultdict(int)

for word in ironic_words:
    ironic_word_counts[word] += 1

for word in non_ironic_words:
    non_ironic_word_counts[word] += 1

ironic_ratio_dict = {}

for word in set(ironic_word_counts) | set(non_ironic_word_counts):
    ironic_count = ironic_word_counts.get(word, 0)
    non_ironic_count = non_ironic_word_counts.get(word, 0)
    total_count = ironic_count + non_ironic_count

    if total_count == 0:
        ironic_ratio = 0
    else:
        ironic_ratio = ((ironic_count - non_ironic_count) / total_count)*np.log(total_count)

    ironic_ratio_dict[word] = ironic_ratio

ironic_ratio_dict = {k: v for k, v in sorted(ironic_ratio_dict.items(), key=lambda item: item[1], reverse=True)}

print("Le parole con l'ironic ratio più alto:")
for word, ratio in list(ironic_ratio_dict.items())[:10]:
    print(f'{word}: {ratio}')


Aggiungiamo manualmente delle parole che secondo il nostro giudizio, se presenti, influiscono maggiormente.

In [None]:
ironic_words={
    "irony": 5,
    "ironic": 5,
    "sarcasm": 5,
    "sarcastic": 5,
}

for k, v in ironic_words.items():
  ironic_ratio_dict[k] = v

In [None]:
def calculate_irony_polarity(tweet, irony_dict=ironic_ratio_dict):
    tokens = normalizeTweet(tweet).split()
    print(tokens)
    irony_score = 0

    for word in tokens:
        word = word.lower()
        if word in irony_dict:
            irony_score += irony_dict[word]

    return irony_score

In [None]:
data['irony_polarity'] = data["text"].apply(lambda x: calculate_irony_polarity(x))

In [None]:
data

In [None]:
x_train = data['text']
y_train = data['label']
emoji_score_train = data['emoji_score']
irony_polarity_train = data['irony_polarity']

In [None]:
x_train

Carichiamo il validation set e applichiamo le feature trovate precedentemente dal train set.

In [None]:
with open(root+"val_text.txt") as f_input:
    val_text = [line for line in f_input]

val_text_df = pd.DataFrame(val_text, columns=['text'])
val_labels_df = pd.read_csv(root+"val_labels.txt", header=None)
val_text_df['irony_polarity'] = val_text_df["text"].apply(lambda x: calculate_irony_polarity(x))
val_text_df['emoji_score'] = val_text_df['text'].apply(lambda tweet: calculate_emoji_score(tweet, emoji_weights))
val_text_df['label'] = val_labels_df[0]


In [None]:
x_val = val_text_df['text']
emoji_score_val = val_text_df['emoji_score']
irony_polarity_val = val_text_df['irony_polarity']
y_val = val_labels_df[0]

In [None]:
val_text_df

In [None]:
x_train = x_train.apply(lambda x: x.replace('\r', ' ').replace('\n', ' '))
x_val = x_val.apply(lambda x: x.replace('\r', ' ').replace('\n', ' '))

In [None]:
hyperparameters = {
    "epochs": 5,
    "learning_rate": 5e-5,
    "batch_size": 32,
    "dropout": 0.1,
    "weight_decay": 1e-4,
    "stopwords": False,
    "lemmatize": False,
    "language_model": "vinai/bertweet-base",
    "h_dim": 768,
    "patience": 5,
    "min_delta": 0.01,
    "extra_features": 2
}

Questa funzione ritorna il POS tag, questo sarà necessario per la lemmatizzazione.
Senza questa funzione, abbiamo notato che il lemmatizer non riusciva a riconoscere i verbi, categorizzandoli spesso come sostantivi.

In [None]:
def get_wordnet_pos(tag):
    if tag.startswith('J'):
        return wordnet.ADJ
    elif tag.startswith('V'):
        return wordnet.VERB
    elif tag.startswith('N'):
        return wordnet.NOUN
    elif tag.startswith('R'):
        return wordnet.ADV
    else:
        return wordnet.NOUN


Creiamo il tokenizzatore e come detto prima, aggiungiamo i tag delle emoji, NUMBER e RT (Vuol dire ReTweet)

In [None]:
tokenizer = AutoTokenizer.from_pretrained(hyperparameters["language_model"], use_auth_token=False)
all_emojis = list(emoji.EMOJI_DATA.keys())
emoji_tokens = [emoji.demojize(e) for e in all_emojis]
tokenizer.add_tokens(emoji_tokens)
tokenizer.add_tokens(["NUMBER", "RT"])

Creiamo i dataset, questo ci sarà utile per la fase di training e predizione.

In [None]:
class Dataset(torch.utils.data.Dataset):

    def __init__(self, x, emoji_score, irony_polarity, y, tokenizer, stopwords, lemmatize):
        lemmatizer = WordNetLemmatizer()

        normalized_tweets = [normalizeTweet(tweet) for tweet in list(x)]

        tokens_litt = [tokenizer.tokenize(text) for text in normalized_tweets]
        print(tokens_litt)
        text_clean = []

        if stopwords:
            for sentence in tqdm(tokens_litt, desc='Tokenizing, POS Tagging, Lemmatizing...'):
                if lemmatize:
                  pos_tags = pos_tag(sentence)
                  sentence = [
                    lemmatizer.lemmatize(w.lower(), get_wordnet_pos(tag))
                    for w, tag in pos_tags
                    if not w.lower() in nltk.corpus.stopwords.words("english")
                  ]
                text_clean.append(' '.join(sentence))
        else:
            for sentence in tqdm(tokens_litt, desc='Tokenizing, POS Tagging, Lemmatizing...'):
              if lemmatize:
                pos_tags = pos_tag(sentence)
                sentence = [
                    lemmatizer.lemmatize(w.lower(), get_wordnet_pos(tag))
                    for w, tag in pos_tags
                ]
              text_clean.append(' '.join(sentence))
        for text in text_clean:
          print(text)
        self.texts = text_clean
        self.labels = [torch.tensor(label) for label in y]
        self.emoji_score = [torch.tensor(score) for score in emoji_score]
        self.irony_polarity = [torch.tensor(score) for score in irony_polarity]

    def classes(self):
        return self.labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):

        batch_texts = self.texts[idx]
        batch_labels = np.array(self.labels[idx])
        batch_emoji_score = np.array(self.emoji_score[idx])
        batch_irony_polarity = np.array(self.irony_polarity[idx])

        return batch_texts, batch_emoji_score, batch_irony_polarity, batch_labels

In [None]:
train_dataset = Dataset(x_train, emoji_score_train, irony_polarity_train ,y_train, tokenizer,hyperparameters["stopwords"], hyperparameters["lemmatize"])
val_dataset = Dataset(x_val, emoji_score_val, irony_polarity_train, y_val, tokenizer,hyperparameters["stopwords"], hyperparameters["lemmatize"])

In [None]:
class EarlyStopping:
    def __init__(self, patience=5, min_delta=0.0):

        self.patience = patience
        self.min_delta = min_delta              # valore minimo di decrescita della loss di validazione all'epoca corrente
                                                # per asserire che c'è un miglioramenti della loss
        self.counter = 0                        # contatore delle epoche di pazienza
        self.early_stop = False                 # flag di early stop
        self.min_validation_loss = torch.inf    # valore corrente ottimo della loss di validazione

    def __call__(self, validation_loss):

        if (validation_loss + self.min_delta) >= self.min_validation_loss:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
                print("Early stop!")
        else:
            self.min_validation_loss = validation_loss
            self.counter = 0


Questo classificatore prende in input gli embeddings del testo del tweet che passano da un layer lineare, parallelamente lo stesso succede con le feature delle emoji e delle parole.
Gli output dei due layer si concatenano per poi andare in input in un altro strato lineare.
In questo modo il modello elabora separatamente le feature, dando diversi pesi alle due categorie di feature, in cui l'output del branch degli embeddings ha più peso rispetto agli score.
Sono stati aggiunti dei dropout per diminuire l'overfitting.

In [None]:
class ClassifierDeep(nn.Module):

    def __init__(self, embeddings_len, scores_len,dropout, model_name):
        super(ClassifierDeep, self).__init__()
        config = AutoConfig.from_pretrained(model_name, use_auth_token=False)
        self.lm_model = AutoModel.from_pretrained(model_name, config=config, use_auth_token=False)
        self.text_branch = nn.Sequential(
          nn.Linear(embeddings_len, 256),
          nn.ReLU(),
          nn.Dropout(dropout)
        )

        self.emoji_branch = nn.Sequential(
          nn.Linear(scores_len,32),
          nn.ReLU(),
          nn.Dropout(dropout)
        )

        self.final_branch = nn.Sequential(
          nn.Linear(256+32, 64),
          nn.ReLU(),
          nn.Dropout(dropout),
          nn.Linear(64, 1),
          nn.Sigmoid()
        )

    def forward(self, input_id_text, attention_mask, emoji_score, irony_polarity):
        text_features = self.lm_model(input_id_text, attention_mask).last_hidden_state
        text_features = text_features[:,0,:]
        text_features = self.text_branch(text_features)

        emoji_features = self.emoji_branch(torch.cat((emoji_score.unsqueeze(1),irony_polarity.unsqueeze(1)), dim=1))

        combined_features = torch.cat((text_features, emoji_features), dim=1)
        output = self.final_branch(combined_features)
        return output

Le successive 3 funzioni sono utilizzate per addestrare il modello e valutarlo sul validation set.

In [None]:
def train_loop(model, dataloader, tokenizer, loss, optimizer, device, scheduler):
    model.train()

    epoch_acc = 0
    epoch_loss = 0

    for batch_texts, batch_emoji_score, batch_irony_polarity, batch_labels in tqdm(dataloader, desc='training set'):
        optimizer.zero_grad()

        tokens = tokenizer(list(batch_texts),
                           add_special_tokens=True,
                           return_tensors='pt',
                           padding='max_length',
                           max_length = 128,
                           truncation=True)

        decoded_text = tokenizer.decode(tokens['input_ids'][0], clean_up_tokenization_spaces=True)

        print("Testo Decodificato:", decoded_text)

        input_id_texts = tokens['input_ids'].squeeze(1).to(device)
        mask_texts = tokens['attention_mask'].squeeze(1).to(device)
        batch_emoji_score = batch_emoji_score.float().to(device)
        batch_irony_polarity = batch_irony_polarity.float().to(device)
        batch_labels = batch_labels.float().to(device)

        output = model(input_id_texts, mask_texts, batch_emoji_score, batch_irony_polarity).squeeze(1)

        batch_loss = loss(output, batch_labels)
        batch_loss.backward()
        optimizer.step()
        scheduler.step()

        epoch_loss += batch_loss.item()

        preds = (output > 0.5).float()
        epoch_acc += (preds == batch_labels).sum().item()

        batch_labels = batch_labels.detach().cpu()
        input_id_texts = input_id_texts.detach().cpu()
        mask_texts = mask_texts.detach().cpu()
        batch_emoji_score = batch_emoji_score.detach().cpu()
        batch_irony_polarity = batch_irony_polarity.detach().cpu()
        output = output.detach().cpu()

    return epoch_loss/len(dataloader), epoch_acc

In [None]:
def val_loop(model, dataloader, tokenizer, loss, device):
    model.eval()

    epoch_acc = 0
    epoch_loss = 0

    all_preds = []
    all_labels = []

    with torch.no_grad():

        for batch_texts, batch_emoji_score, batch_irony_polarity, batch_labels in tqdm(dataloader, desc='Val set'):

            tokens = tokenizer(list(batch_texts),
                               add_special_tokens=True,
                               return_tensors='pt',
                               padding='max_length',
                               max_length = 128,
                               truncation=True)
            input_id_texts = tokens['input_ids'].squeeze(1).to(device)
            mask_texts = tokens['attention_mask'].squeeze(1).to(device)
            batch_emoji_score = batch_emoji_score.float().to(device)
            batch_irony_polarity = batch_irony_polarity.float().to(device)
            batch_labels = batch_labels.float().to(device)

            output = model(input_id_texts, mask_texts, batch_emoji_score, batch_irony_polarity).squeeze(1)

            batch_loss = loss(output, batch_labels)
            epoch_loss += batch_loss.item()

            preds = (output > 0.5).float()
            epoch_acc += (preds == batch_labels).sum().item()

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(batch_labels.cpu().numpy())

            batch_labels = batch_labels.detach().cpu()
            input_id_texts = input_id_texts.detach().cpu()
            mask_texts = mask_texts.detach().cpu()
            batch_emoji_score = batch_emoji_score.detach().cpu()
            batch_irony_polarity = batch_irony_polarity.detach().cpu()
            output = output.detach().cpu()

    f1 = f1_score(all_labels, all_preds)

    return epoch_loss/len(dataloader), epoch_acc, f1

In [None]:
def train_test(model, tokenizer,epochs, optimizer, device, train_data, val_data,
               batch_size, model_name, loss_fn, best_f1,
               early_stopping=None,
               scheduler=None):


    train_dataloader = torch.utils.data.DataLoader(train_data, batch_size=batch_size)
    val_dataloader = torch.utils.data.DataLoader(val_data, batch_size=batch_size)

    save_path = "best_model.pth"

    train_loss = []
    validation_loss = []

    train_acc = []
    validation_acc = []

    preds = []

    for epoch in tqdm(range(1,epochs+1)):

        epoch_train_loss, epoch_train_acc = train_loop(model, train_dataloader, tokenizer, loss_fn, optimizer, device, scheduler)
        train_loss.append(epoch_train_loss)
        train_acc.append(epoch_train_acc/len(train_data))

        epoch_val_loss, epoch_val_acc, f1 = val_loop(model, val_dataloader, tokenizer, loss_fn, device)
        validation_loss.append(epoch_val_loss)
        validation_acc.append(epoch_val_acc/len(val_data))

        if f1 > best_f1:
          best_f1 = f1
          print(f"New best F1 score: {f1:.4f}. Saving model...")
          torch.save(model.state_dict(), root+save_path)

        print(f"\nTrain loss: \t\t{epoch_train_loss:6.4f} ----- Val loss: \t\t{epoch_val_loss:6.4f}")
        print(f"Train accuracy: \t{(epoch_train_acc/len(train_data)):6.4f} ----- Val accuracy: \t{(epoch_val_acc/len(val_data)):6.4f}")

        if early_stopping != None:
                early_stopping(epoch_val_loss)
                if early_stopping.early_stop:
                    break

    return train_loss, validation_loss, train_acc, validation_acc, f1,best_f1

Il seguente blocco viene utilizzato per fare hypertuning, a causa delle limitazioni di Colab non ci è possibile riportare tutti i tentativi. I risultati vengono però salvati in un file csv

In [None]:
from sklearn.model_selection import ParameterGrid
import matplotlib.pyplot as plt



results = []
best_f1=0
param_grid = {
    "learning_rate": [1e-4],
    "batch_size": [32],
    "dropout": [0.1],
    "weight_decay": [1e-3],
    "epochs": [10]
}

best_val_acc_acc=0
best_val_loss_acc=1
best_val_f1_acc=0
best_params_acc = {}
best_val_acc_loss=0
best_val_loss_loss=1
best_val_f1_loss=0
best_params_loss = {}
best_val_acc_f1=0
best_val_loss_f1=1
best_val_f1_f1=0
best_params_f1 = {}

grid = ParameterGrid(param_grid)
for params in grid:
  print("----------------------------------------------------------------")
  print(params)

  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  print(f"Using {device} device")

  model = ClassifierDeep(
                      hyperparameters["h_dim"],
                      hyperparameters["extra_features"],
                      hyperparameters["dropout"],
                      hyperparameters["language_model"]).to(device)
  print(model)

  model.lm_model.resize_token_embeddings(len(tokenizer))

  total_params = sum(p.numel() for p in model.parameters())
  print(f"Numbero totale dei parametri: {total_params}")

  criterion = nn.BCELoss()
  optimizer = AdamW(model.parameters(), lr=params["learning_rate"], weight_decay=params["weight_decay"])
  scheduler = LinearLR(optimizer, start_factor=1.0, end_factor=0.1, total_iters=params['epochs'])

  early_stopping = EarlyStopping(patience=hyperparameters['patience'], min_delta=hyperparameters['min_delta'])




  train_loss, val_loss, train_acc, val_acc, f1, best_f1 = train_test(model, tokenizer,
                                                                                params['epochs'],
                                                                                optimizer, device, train_dataset,
                                                                                val_dataset, params['batch_size'], hyperparameters['language_model'],
                                                                                criterion, best_f1, early_stopping, scheduler)
  print(f1)

  if val_acc[-1] > best_val_acc_acc:
    best_val_acc_acc = val_acc[-1]
    best_val_loss_acc = val_loss[-1]
    best_val_f1_acc = f1
    best_params_acc = params

  if val_loss[-1] < best_val_loss_loss:
    best_val_acc_loss = val_acc[-1]
    best_val_loss_loss = val_loss[-1]
    best_val_f1_loss = f1
    best_params_loss = params

  if f1 > best_val_f1_f1:
    best_val_acc_f1 = val_acc[-1]
    best_val_loss_f1 = val_loss[-1]
    best_val_f1_f1 = f1
    best_params_f1 = params

  fig, axs = plt.subplots(1, 2, figsize=(20, 10))

  axs[0].plot(train_loss, label='training loss')
  axs[0].plot(val_loss, label='validation loss')
  axs[0].legend(loc='upper right')
  axs[0].set_ylim(0,1)

  axs[1].plot(train_acc, label='training accuracy')
  axs[1].plot(val_acc, label='validation accuracy')
  axs[1].legend(loc='lower right')
  axs[1].set_ylim(0,1)

  plt.show()

  result = {
        "learning_rate": params["learning_rate"],
        "batch_size": params["batch_size"],
        "dropout": params["dropout"],
        "weight_decay": params["weight_decay"],
        "epochs": params["epochs"],
        "lemmatize": hyperparameters["lemmatize"],
        "train_loss": train_loss[-1],
        "val_loss": val_loss[-1],
        "train_acc": train_acc[-1],
        "val_acc": val_acc[-1],
        "f1_score": f1
    }

  df = pd.DataFrame([result])

  df.to_csv(root+"hyperparameter_results.csv", mode='a', index=False, header=not pd.io.common.file_exists(root+"hyperparameter_results.csv"))

Una volta trovato il miglior modello, iniziamo la fase di test, ripetendo quanto fatto per il validation, eliminando chiaramente qualunque riferimento alle labels.

In [None]:
with open(root+"test_text.txt") as f_input:
    test_text = [line for line in f_input]

test_text_df = pd.DataFrame(test_text, columns=['text'])
test_text_df['irony_polarity'] = test_text_df["text"].apply(lambda x: calculate_irony_polarity(x))
test_text_df['emoji_score'] = test_text_df['text'].apply(lambda tweet: calculate_emoji_score(tweet, emoji_weights))

In [None]:
x_test = test_text_df['text']
emoji_score = test_text_df['emoji_score']
irony_polarity = test_text_df['irony_polarity']

In [None]:
x_test = x_test.apply(lambda x: x.replace('\r', ' ').replace('\n', ' '))

In [None]:
class TestDataset(torch.utils.data.Dataset):

    def __init__(self, x, emoji_score, irony_polarity, tokenizer, stopwords, lemmatize):
        lemmatizer = WordNetLemmatizer()

        normalized_tweets = [normalizeTweet(tweet) for tweet in list(x)]

        tokens_litt = [tokenizer.tokenize(text) for text in normalized_tweets]
        print(tokens_litt)
        text_clean = []

        if stopwords:
            for sentence in tqdm(tokens_litt, desc='Tokenizing, POS Tagging, Lemmatizing...'):
                if lemmatize:
                  pos_tags = pos_tag(sentence)
                  sentence = [
                    lemmatizer.lemmatize(w.lower(), get_wordnet_pos(tag))
                    for w, tag in pos_tags
                    if not w.lower() in nltk.corpus.stopwords.words("english")
                  ]
                text_clean.append(' '.join(sentence))
        else:
            for sentence in tqdm(tokens_litt, desc='Tokenizing, POS Tagging, Lemmatizing...'):
              if lemmatize:
                pos_tags = pos_tag(sentence)
                sentence = [
                    lemmatizer.lemmatize(w.lower(), get_wordnet_pos(tag))
                    for w, tag in pos_tags
                ]
              text_clean.append(' '.join(sentence))

        self.texts = text_clean
        self.emoji_score = [torch.tensor(score) for score in emoji_score]
        self.irony_polarity = [torch.tensor(score) for score in irony_polarity]

    def __len__(self):
        return len(self.emoji_score)

    def __getitem__(self, idx):

        batch_texts = self.texts[idx]
        batch_emoji_score = np.array(self.emoji_score[idx])
        batch_irony_polarity = np.array(self.irony_polarity[idx])

        return batch_texts, batch_emoji_score, batch_irony_polarity

In [None]:
test_dataset = TestDataset(x_test, emoji_score, irony_polarity, tokenizer,hyperparameters["stopwords"], hyperparameters["lemmatize"])
test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=32)

In [None]:
def test_loop(model, dataloader, tokenizer, device):
    model.eval()

    all_preds = []

    with torch.no_grad():

        for batch_texts, batch_emoji_score, batch_irony_polarity in tqdm(dataloader, desc='Test set'):
            tokens = tokenizer(list(batch_texts),
                               add_special_tokens=True,
                               return_tensors='pt',
                               padding='max_length',
                               max_length = 128,
                               truncation=True)
            input_id_texts = tokens['input_ids'].squeeze(1).to(device)
            mask_texts = tokens['attention_mask'].squeeze(1).to(device)
            batch_emoji_score = batch_emoji_score.float().to(device)
            batch_irony_polarity = batch_irony_polarity.float().to(device)

            print(batch_emoji_score.shape)
            print(batch_irony_polarity.shape)

            output = model(input_id_texts, mask_texts, batch_emoji_score, batch_irony_polarity).squeeze(1)

            preds = (output > 0.5).int()
            print(preds)
            all_preds.extend(preds.cpu().numpy())

            input_id_texts = input_id_texts.detach().cpu()
            mask_texts = mask_texts.detach().cpu()
            batch_emoji_score = batch_emoji_score.detach().cpu()
            batch_irony_polarity = batch_irony_polarity.detach().cpu()
            output = output.detach().cpu()

    return all_preds

In [None]:
y_preds = test_loop(model, test_dataloader, tokenizer, device)

In [None]:
y_preds

In [None]:
preds_df = pd.DataFrame(y_preds, columns=["y"])
preds_df.to_csv(root+"irony_preds.csv", index=False, header=False)

Nella leaderboard abbiamo ottenuto un punteggio finale di 0.8132.
Siamo consapevoli che i risultati hanno un margine di miglioramento, raggiungibile tramite ulteriori ricerche su feature da estrarre e hyperparameter tuning. Inoltre con ulteriore potenza di calcolo sarebbe stato possibile usare modelli più performanti per questi task.