# Import

In [None]:
import transformers
from keras.preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split
from transformers import AutoTokenizer, AutoModel,BertTokenizer, BertModel,GPT2Tokenizer, GPT2Model

In [None]:
from torch import optim
from torch import nn
import torch.nn.functional as F
import torch
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler

In [None]:
import pandas as pd
import re
import matplotlib.pyplot as plt
from tqdm import tqdm, trange
import numpy as np
import pickle

In [None]:
import seaborn as sn
from sklearn.metrics import accuracy_score, f1_score
import math
from scipy.stats import wilcoxon

In [None]:
torch.cuda.device(0)

# Import data

In [None]:
df = pd.read_csv("data.csv")
df.head()



In [None]:
df.Label.value_counts()

In [None]:
sentences = df.Domain.values
target_clean_train = []
for x in df.Label:
    if x == 0:
        target_clean_train.append(0)
    if x == 1:
        target_clean_train.append(1)
        
labels = np.array(target_clean_train)


In [None]:
unique, counts = np.unique(labels, return_counts = True)

print(unique, counts)

In [None]:
def calculate_mean(number_list):
    sum = 0
    for number in number_list:
        sum += number

    return sum/len(number_list)

def calculate_standard_deviation(number_list):
    mean = calculate_mean(number_list)
    summatory = 0
    for number in number_list:
        summatory += pow((number - mean),2)

    summatory = summatory/len(number_list)

    return math.sqrt(summatory)

In [None]:
def flat_accuracy(preds, labels):
    pred_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = labels.flatten()
    acc = np.sum(pred_flat == labels_flat) / len(labels_flat)
    return acc

## ** Transformers**
* **Bert**
    * Tokenizzatore -->  BertTokenizer.from_pretrained("bert-base-uncased", do_lower_case=True)
    * Modello --> BertModel.from_pretrained('bert-base-uncased')


* **Multilingua-Bert** 
    * Tokenizzatore --> BertTokenizer.from_pretrained('bert-base-multilingual-uncased')
    * Modello --> BertModel.from_pretrained("bert-base-multilingual-uncased")
    
* **Electra** 
    * Tokenizzatore --> ElectraTokenizer.from_pretrained('google/electra-base-discriminator')
    * Modello --> ElectraModel.from_pretrained('google/electra-base-discriminator')
    
* **XLNet** 
    * Tokenizzatore --> XLNetTokenizer.from_pretrained('xlnet-base-cased')
    * Modello --> XLNetModel.from_pretrained('xlnet-base-cased')
    
* **Ernie** 
    * AutoTokenizer.from_pretrained("nghuyong/ernie-2.0-en")
    * AutoModel.from_pretrained("nghuyong/ernie-2.0-en")

In [None]:
def define_input(seed, random_state, sentences, model_type, epochs):
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    if torch.cuda.is_available(): torch.cuda.manual_seed_all(seed)
        
    global tokenizer
    global model_architecture
        
    sentences = ["[CLS] " + str(sentence) + " [SEP]" for sentence in sentences]
    
    if model_type == 'Bert':
        tokenizer = BertTokenizer.from_pretrained("bert-base-uncased", do_lower_case=True)
        model_architecture = BertModel.from_pretrained('bert-base-uncased').to("cuda" if torch.cuda.is_available() else "cpu")

    if model_type == 'Electra':
        tokenizer = AutoTokenizer.from_pretrained("google/electra-base-discriminator")
        model_architecture = AutoModel.from_pretrained("google/electra-base-discriminator").to("cuda" if torch.cuda.is_available() else "cpu")
                
    if model_type == 'XLNet':
        tokenizer = AutoTokenizer.from_pretrained("xlnet-base-cased")
        model_architecture = AutoModel.from_pretrained("xlnet-base-cased").to("cuda" if torch.cuda.is_available() else "cpu")

    if model_type == 'Multilingua-Bert':
        tokenizer = BertTokenizer.from_pretrained('bert-base-multilingual-uncased')
        model_architecture = BertModel.from_pretrained("bert-base-multilingual-uncased").to("cuda" if torch.cuda.is_available() else "cpu")

    if model_type == 'Ernie':
        tokenizer = AutoTokenizer.from_pretrained("nghuyong/ernie-2.0-en")
        model_architecture = AutoModel.from_pretrained("nghuyong/ernie-2.0-en").to("cuda" if torch.cuda.is_available() else "cpu")

    if model_type == 'Roberta':
        tokenizer = AutoTokenizer.from_pretrained("roberta-base")
        model_architecture = AutoModel.from_pretrained("roberta-base").to("cuda" if torch.cuda.is_available() else "cpu")
    
    if model_type == 'distilbert':
        tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
        model_architecture = AutoModel.from_pretrained("distilbert-base-uncased").to("cuda" if torch.cuda.is_available() else "cpu")
        
    if model_type == 'ita_bert':
        tokenizer = AutoTokenizer.from_pretrained("m-polignano-uniba/bert_uncased_L-12_H-768_A-12_italian_alb3rt0")
        model_architecture = AutoModel.from_pretrained("m-polignano-uniba/bert_uncased_L-12_H-768_A-12_italian_alb3rt0").to("cuda" if torch.cuda.is_available() else "cpu")
        
    
    tokenized_texts = [tokenizer.tokenize(sent) for sent in sentences]
    
    MAX_LEN = 128
    input_ids = pad_sequences([tokenizer.convert_tokens_to_ids(txt) for txt in tokenized_texts],
                          maxlen=MAX_LEN, dtype="long", truncating="post", padding="post")
    
    attention_masks = []

    for seq in input_ids:
        seq_mask = [float(i>0) for i in seq]
        attention_masks.append(seq_mask)
        
    X_inputs, test_inputs, X_labels, test_labels = train_test_split(input_ids, labels, random_state=random_state, test_size=0.3)
    X_masks, test_masks, _, _ = train_test_split(attention_masks, input_ids, random_state=random_state, test_size=0.3)

    train_inputs, validation_inputs, train_labels, validation_labels = train_test_split(X_inputs, X_labels, random_state=random_state, test_size=0.1)
    train_masks, validation_masks, _, _ = train_test_split(X_masks, X_inputs, random_state=random_state, test_size=0.1)

    train_inputs = torch.tensor(train_inputs)
    train_labels = torch.tensor(train_labels)
    train_masks = torch.tensor(train_masks)

    validation_inputs = torch.tensor(validation_inputs)
    validation_labels = torch.tensor(validation_labels)
    validation_masks = torch.tensor(validation_masks)

    test_inputs = torch.tensor(test_inputs)
    test_labels = torch.tensor(test_labels)
    test_masks = torch.tensor(test_masks)

    batch_size = 32

    train_data = TensorDataset(train_inputs, train_masks, train_labels)
    train_sampler = RandomSampler(train_data)
    train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=batch_size)

    validation_data = TensorDataset(validation_inputs, validation_masks, validation_labels)
    validation_sampler = SequentialSampler(validation_data)
    validation_dataloader = DataLoader(validation_data, sampler=validation_sampler, batch_size=batch_size)

    test_data = TensorDataset(test_inputs, test_masks, test_labels)
    test_sampler = SequentialSampler(test_data)
    test_dataloader = DataLoader(test_data, sampler=test_sampler, batch_size=batch_size)
    
    return train_dataloader, validation_dataloader, test_dataloader, model_architecture, device, test_labels
    

In [None]:
def define_input_with_test(seed, random_state, sentences , sentences_test, model_type, epochs):
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    if torch.cuda.is_available(): torch.cuda.manual_seed_all(seed)
        
    global tokenizer
    global model_architecture
        
    sentences = ["[CLS] " + sentence + " [SEP]" for sentence in sentences]
    sentences_test = ["[CLS] " + sentence + " [SEP]" for sentence in sentences_test]

    
    if model_type == 'Bert':
        tokenizer = BertTokenizer.from_pretrained("bert-base-uncased", do_lower_case=True)
        model_architecture = BertModel.from_pretrained('bert-base-uncased').to("cuda" if torch.cuda.is_available() else "cpu")

    if model_type == 'Electra':
        tokenizer = AutoTokenizer.from_pretrained("google/electra-base-discriminator")
        model_architecture = AutoModel.from_pretrained("google/electra-base-discriminator").to("cuda" if torch.cuda.is_available() else "cpu")
                
    if model_type == 'XLNet':
        tokenizer = AutoTokenizer.from_pretrained("xlnet-base-cased")
        model_architecture = AutoModel.from_pretrained("xlnet-base-cased").to("cuda" if torch.cuda.is_available() else "cpu")

    if model_type == 'Multilingua-Bert':
        tokenizer = BertTokenizer.from_pretrained('bert-base-multilingual-uncased')
        model_architecture = BertModel.from_pretrained("bert-base-multilingual-uncased").to("cuda" if torch.cuda.is_available() else "cpu")

        
    if model_type == 'Ernie':
        tokenizer = AutoTokenizer.from_pretrained("nghuyong/ernie-2.0-en")
        model_architecture = AutoModel.from_pretrained("nghuyong/ernie-2.0-en").to("cuda" if torch.cuda.is_available() else "cpu")

    
    tokenized_texts = [tokenizer.tokenize(sent) for sent in sentences]
    tokenized_texts_test = [tokenizer.tokenize(sent) for sent in sentences_test]

    
    MAX_LEN = 128
    
    input_ids = pad_sequences([tokenizer.convert_tokens_to_ids(txt) for txt in tokenized_texts],
                          maxlen=MAX_LEN, dtype="long", truncating="post", padding="post")
    
    input_ids_test = pad_sequences([tokenizer.convert_tokens_to_ids(txt) for txt in tokenized_texts_test],
                          maxlen=MAX_LEN, dtype="long", truncating="post", padding="post")
    
    attention_masks = []

    for seq in input_ids:
        seq_mask = [float(i>0) for i in seq]
        attention_masks.append(seq_mask)
        
    attention_masks_test = []

    for seq in input_ids_test:
        seq_mask = [float(i>0) for i in seq]
        attention_masks_test.append(seq_mask)
        
    X_inputs, _, X_labels, _ = train_test_split(input_ids, labels, random_state=random_state, test_size=0.1)
    X_masks, _, _, _ = train_test_split(attention_masks, input_ids, random_state=random_state, test_size=0.1)

    test_inputs,_, test_labels,_ = train_test_split(input_ids_test, labels_test, random_state=random_state, test_size=0.01)
    test_masks, _, _, _ = train_test_split(attention_masks_test, input_ids_test, random_state=random_state, test_size=0.01)

    
    
    train_inputs, validation_inputs, train_labels, validation_labels = train_test_split(X_inputs, X_labels, random_state=random_state, test_size=0.1)
    train_masks, validation_masks, _, _ = train_test_split(X_masks, X_inputs, random_state=random_state, test_size=0.1)

    train_inputs = torch.tensor(train_inputs)
    train_labels = torch.tensor(train_labels)
    train_masks = torch.tensor(train_masks)

    validation_inputs = torch.tensor(validation_inputs)
    validation_labels = torch.tensor(validation_labels)
    validation_masks = torch.tensor(validation_masks)

    test_inputs = torch.tensor(test_inputs)
    test_labels = torch.tensor(test_labels)
    test_masks = torch.tensor(test_masks)

    batch_size = 32

    train_data = TensorDataset(train_inputs, train_masks, train_labels)
    train_sampler = RandomSampler(train_data)
    train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=batch_size)

    validation_data = TensorDataset(validation_inputs, validation_masks, validation_labels)
    validation_sampler = SequentialSampler(validation_data)
    validation_dataloader = DataLoader(validation_data, sampler=validation_sampler, batch_size=batch_size)

    test_data = TensorDataset(test_inputs, test_masks, test_labels)
    test_sampler = SequentialSampler(test_data)
    test_dataloader = DataLoader(test_data, sampler=test_sampler, batch_size=batch_size)
    
    return train_dataloader, validation_dataloader, test_dataloader, model_architecture, device, test_labels
    

# Definiamo i modelli che utilizzeremo

### Solo Bert

In [None]:
class Model_Transformer(nn.Module):
  
    def __init__(self, input_dim_bert, output_dim, model_architecture):
        super().__init__()
        self.bert = model_architecture
        
        self.dropout = nn.Dropout(0.1)
        self.sem_linear = nn.Linear(input_dim_bert, output_dim)
        
    def forward(self, x_sem, attention_mask):
        with torch.no_grad():
            pooled_output = self.bert(x_sem, attention_mask)[0][:, 0, :]  
            pooled_output = self.dropout(pooled_output)
        logits = self.sem_linear(pooled_output)

        return logits

In [None]:
def execute_Transformer(epochs, model_architecture, train_dataloader, validation_dataloader, test_dataloader, device, test_labels):
    
    Alone_model = Model_Transformer(768,2, model_architecture)
    
    criterion = nn.CrossEntropyLoss()
    parameters = filter(lambda p: p.requires_grad, Alone_model.parameters())
    optimizer = optim.AdamW(Alone_model.parameters(), lr=2e-5)

    Alone_model.cuda()

    train_loss_set = []
    epoch = 0

    for _ in trange(epochs, desc="Epoch"):  
        Alone_model.train()  

        tr_loss = 0
        nb_tr_examples, nb_tr_steps = 0, 0

        for step, batch in enumerate(train_dataloader):

            batch = tuple(t.cuda() for t in batch)
            b_input_ids, b_input_mask, b_labels = batch
            optimizer.zero_grad()
            
            # Forward pass
            target_hat = Alone_model(b_input_ids, b_input_mask)
            loss = criterion(target_hat, b_labels)
            train_loss_set.append(loss.item())

            # Backward pass
            loss.backward()
            optimizer.step()
            tr_loss += loss.item()
            nb_tr_examples += b_input_ids.size(0)
            nb_tr_steps += 1

        ## VALIDATION

        Alone_model.eval()
        eval_loss, eval_accuracy = 0, 0
        nb_eval_steps, nb_eval_examples = 0, 0
        for batch in validation_dataloader:
            batch = tuple(t.cuda() for t in batch)
            b_input_ids, b_input_mask, b_labels = batch
            with torch.no_grad():
              # Forward pass, calculate logit predictions
              logits = Alone_model(b_input_ids, b_input_mask)
            logits = logits.detach().cpu().numpy()
            label_ids = b_labels.to('cpu').numpy()
            tmp_eval_accuracy = flat_accuracy(logits, label_ids)    
            eval_accuracy += tmp_eval_accuracy
            nb_eval_steps += 1

        epoch +=1

    predictions = []
    Alone_model.eval()

    for batch in test_dataloader:
        batch = tuple(t.to(device) for t in batch)
        b_input_ids, b_input_mask, b_labels = batch

        with torch.no_grad():
            logits = Alone_model(b_input_ids, b_input_mask)
        logits = logits.detach().cpu().numpy()

        predictions.append(logits)

        flat_predictions = [item for sublist in predictions for item in sublist]
        flat_predictions = np.argmax(flat_predictions, axis=1).flatten()
        
    A = accuracy_score(test_labels.numpy(), flat_predictions)
    B = f1_score(test_labels.numpy(), flat_predictions, average='macro')
    C = f1_score(test_labels.numpy(), flat_predictions, average='weighted')
    D = f1_score(test_labels.numpy(), flat_predictions, average=None)
        
    return A,B,C,D

In [None]:
seed = [10046, 10023, 17, 54, 31]
random_state = [1024, 3333, 1995, 2780, 3833]
model_architecture_list = ['Bert','XLNet','Multilingua-Bert', 'Electra', 'Ernie','Roberta', 'distilbert', 'ita_bert']
epochs = 2

model_architecture = model_architecture_list[0]

In [None]:
accuracy_list = []
macro_list = []
weighted_list = []
other_0 = []
other_1 = []


for i in range(0, 2):
    #train_dataloder, validation_dataloader, test_dataloder, model_architecture, device, test_labels = define_input_with_test(seed[i], random_state[i], sentences, sentences_test, model_architecture, epochs)
    train_dataloder, validation_dataloader, test_dataloder, model_architecture, device, test_labels = define_input(seed[i], random_state[i], sentences, model_architecture, epochs)

    
    A,B,C,D = execute_Transformer(epochs, model_architecture, train_dataloder, validation_dataloader, test_dataloder, device, test_labels)
    
    accuracy_list.append(A*100)
    macro_list.append(B)
    weighted_list.append(C)
    other_0.append(D[0])
    other_1.append(D[1])

In [None]:
print('*********** ACCURACY')
print(round(calculate_mean(accuracy_list),2))
print(round(calculate_standard_deviation(accuracy_list),2))

print('*********** MACRO')
print(round(calculate_mean(macro_list),2))
print(round(calculate_standard_deviation(macro_list),2))

print('*********** WEIGHTED')
print(round(calculate_mean(weighted_list), 2))
print(round(calculate_standard_deviation(weighted_list), 2))