In [None]:
import os
from collections import Counter
from itertools import product
from statistics import mean, mode, median

import matplotlib.pyplot as plt
import nltk
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from nltk.tokenize import TweetTokenizer, word_tokenize
from sklearn.metrics import classification_report, f1_score
from sklearn.model_selection import train_test_split
from torch import optim
from tqdm.auto import tqdm
from transformers import AutoModel
from transformers import BertModel, RobertaModel, AlbertModel, BartForSequenceClassification
from transformers import BertTokenizerFast, RobertaTokenizerFast, AlbertTokenizerFast, BartTokenizer

nltk.download('punkt')

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
names = ['id', 'json', 'label', 'statement', 'subject', 'speaker', 'job', 'state', 'party', 'credit', 'barely_true',
         'false', 'half_true', 'mostly_true', 'pants_on_fire', 'context', 'justification']
df = pd.read_csv('LIAR-PLUS-master/dataset/tsv/train2.tsv', sep='\t', names=names)

In [None]:
df['label'].value_counts()

In [None]:
texts = list(df[df['label'] == 'true']['statement']) + list(df[df['label'] == 'false']['statement'])
labels = list(df[df['label'] == 'true']['label']) + list(df[df['label'] == 'false']['label'])

In [None]:
import random

temp = list(zip(texts, labels))
random.shuffle(temp)
texts, labels = zip(*temp)

In [None]:
lengths = [len(text) for text in texts]

In [None]:
print(sorted(lengths, reverse=True))

In [None]:
print(f"median: {median(lengths)}\nmode: {1}\nmean: {round(mean(lengths))}\nmax: {max(lengths)}\nmin: {min(lengths)}")


In [None]:
tokenize = TweetTokenizer()

In [None]:
ready_texts = [tokenize.tokenize(text) for text in texts]
ready_labels = [1 if label == 'true' else 0 for label in labels]

In [None]:
X_train, X_test, y_train, y_test = train_test_split(ready_texts, ready_labels, test_size=0.1)

In [None]:
len(X_train), len(y_train), len(X_test), len(y_test)

In [None]:
X_train, y_train, X_val, y_val = X_train[368:], y_train[368:], X_train[:368], y_train[:368]

In [None]:
c = Counter(y_train)
print(f"Number of positive examples: {c[0]}\nNumber of negative examples: {c[1]}")

In [None]:
lengths = [len(x) for x in X_train]
print(
    f"median: {median(lengths)}\nmode: {mode(lengths)}\nmean: {round(mean(lengths))}\nmax: {max(lengths)}\nmin: {min(lengths)}")

In [None]:
data = lengths
num_bins = 57
plt.hist(data, num_bins, color='purple', alpha=0.5, rwidth=0.85)
plt.title('Sentence Length Distribution')
plt.xlabel('Sentence Length')
plt.ylabel('Frequency')
plt.show()

In [None]:
word2token = {'PAD': 0, 'UNK': 1}
all_words = set()
for text in X_train:
    for word in text:
        all_words.add(word)
for word in all_words:
    word2token[word] = len(word2token)

In [None]:
class RNNclassifier(nn.Module):
    def __init__(self, device, emb_size, num_classes=1, dropout=0.4, hidden_size=100):
        super(RNNclassifier, self).__init__()
        self.device = device
        self.hidden_size = hidden_size
        self.emb_size = emb_size
        self.dropout = nn.Dropout(dropout).to(self.device)
        self.num_classes = num_classes
        self.embedding = nn.Embedding(self.emb_size, self.hidden_size).to(self.device)
        self.rnn = nn.RNN(self.hidden_size, self.hidden_size, batch_first=True).to(self.device)
        self.linear = nn.Linear(self.hidden_size, self.num_classes).to(self.device)

    def forward(self, tokens, attention_ids, length):
        embs = self.embedding(tokens)
        rnn_out, hidden = self.rnn(embs)
        drop_out = self.dropout(rnn_out)
        output_zero_padding = drop_out.permute([2, 0, 1]) * attention_ids
        output_zero_padding = output_zero_padding.permute([1, 2, 0])
        out = torch.sum(output_zero_padding, 1).T / length
        out = out.T
        out = self.linear(out)
        return out

In [None]:
class Dataset(torch.utils.data.Dataset):
    def __init__(self, texts, labels, maxlen, word2token, device):
        self.texts = texts
        self.labels = labels
        self.device = device
        self.maxlen = maxlen
        self.word2token = word2token

    def __getitem__(self, item):
        text = self.texts[item]
        label = self.labels[item]
        transformed_text = [self.word2token.get(word, 1) for word in text][:self.maxlen]
        transformed_text = torch.tensor(
            transformed_text + [self.word2token['PAD'] for _ in range(self.maxlen - len(transformed_text))],
            dtype=torch.long, device=self.device)
        attention_ids = torch.tensor(
            [1 for _ in range(len(transformed_text))] + [0 for _ in range(self.maxlen - len(transformed_text))],
            dtype=torch.long, device=self.device)
        return transformed_text, len(transformed_text), attention_ids, label

    def __len__(self):
        return len(self.texts)

In [None]:
def train_model(model, dataloader, dev_dataloader, epoches, optim=optim.RMSprop, lr=0.01):
    optimizer = optim(model.parameters(), lr=lr)  # Adam, AdamW, Adadelta, Adagrad, SGD, RMSProp
    binary = nn.BCEWithLogitsLoss()
    best_f = 0
    for epoch in range(epoches):
        print(epoch + 1, "epoch")
        t = tqdm(dataloader)
        i = 0
        for sentence, length, attention_ids, label in t:
            pred = model(sentence, attention_ids, length)
            loss = binary(pred.view(-1), label.type(torch.float32))
            if i % 10 == 0:
                torch.save(model, 'model.pt')
                predicted = []
                true = []
                with torch.no_grad():
                    for sentence, length, attention_ids, label in dev_dataloader:
                        pred = model(sentence, attention_ids, length)
                        idx = (torch.sigmoid(pred) > 0.5).type(torch.int).item()
                        predicted.append(idx)
                        true.append(label.item())
                f1 = f1_score(true, predicted, average='macro')
                if f1 > best_f:
                    torch.save(model, f"{round(f1, 3)}model.pt")
                    best_f = f1
                    print("Saving with score", best_f)
            i += 1
            t.set_description(f"loss: {round(float(loss), 3)}, f-macro: {round(f1, 3)}")
            t.refresh()
            loss.backward()
            optimizer.step()
            model.zero_grad()
    return best_f

In [None]:
trainds = Dataset(X_train, y_train, 50, word2token, device)
devds = Dataset(X_val, y_val, 50, word2token, device)
testds = Dataset(X_test, y_test, 50, word2token, device)

In [None]:
train_dataloader = torch.utils.data.DataLoader(trainds, batch_size=128)
dev_dataloader = torch.utils.data.DataLoader(devds, batch_size=1)
test_dataloader = torch.utils.data.DataLoader(testds, batch_size=1)

In [None]:
model = RNNclassifier(device, len(word2token), 1, 0.4, 100)
model.train()
train_model(model, train_dataloader, dev_dataloader, epoches=10)

In [None]:
def param_optim(lr_list, optim_list):
    best_f = 0
    best_params = []
    for lr, optim in product(lr_list, optim_list):
        model = RNNclassifier(device, len(word2token), 1, 0.4, 100)
        model.train()
        f1 = train_model(model, train_dataloader, dev_dataloader, epoches=10, optim=optim, lr=lr)
        if best_f < f1:
            best_f = f1
            best_params = [lr, optim]
    return best_f, best_params

In [None]:
lr_list = [0.1, 0.01, 0.001, 0.0001]
op_list = [optim.Adam, optim.AdamW, optim.Adadelta, optim.Adagrad, optim.SGD, optim.RMSprop]

In [None]:
best_f, best_params = param_optim(lr_list, op_list)

In [None]:
best_f, best_params

In [None]:
model.eval()

In [None]:
def evaluate(model, test_dataloader):
    predicted = []
    true = []
    with torch.no_grad():
        for sentence, length, attention_ids, label in test_dataloader:
            pred = model(sentence, attention_ids, length)
            idx = (torch.sigmoid(pred) > 0.5).type(torch.int).item()
            predicted.append(idx)
            true.append(label.item())
    print(classification_report(true, predicted))

In [None]:
evaluate(torch.load('0.595model.pt'), test_dataloader)

In [None]:
class CNNclassifier(nn.Module):
    def __init__(self, device, maxlen, max_pool, emb_size, num_classes=2, hidden_size=100):
        super(CNNclassifier, self).__init__()
        self.device = device
        self.hidden_size = hidden_size
        self.maxlen = maxlen
        self.max_pool = max_pool
        self.emb_size = emb_size
        self.embedding = nn.Embedding(self.emb_size, self.hidden_size).to(self.device)
        self.cnn = nn.Conv1d(self.hidden_size, int(self.hidden_size / 2), 3)
        self.maxpool = nn.MaxPool1d(int(self.maxlen - 2))
        self.linear = nn.Linear(int((self.hidden_size / 2)), num_classes).to(self.device)

    def forward(self, tokens, attention_ids, length):
        embs = self.embedding(tokens)
        embs = embs.permute(0, 2, 1)
        cnn_out = self.cnn(embs)
        max_out = self.maxpool(cnn_out)
        max_out = max_out.permute(0, 2, 1)
        out = self.linear(max_out)
        out = out.squeeze(1)
        return out

In [None]:
cn = CNNclassifier(device, 50, 3, len(word2token), 1)
cn.train()

In [None]:
train_model(cn, train_dataloader, dev_dataloader, 10)

In [None]:
cn.eval()

In [None]:
evaluate(torch.load('0.55model.pt'), test_dataloader)

In [None]:
model_name = "bert-base-multilingual-cased"
tokenizer = BertTokenizerFast.from_pretrained(model_name)

In [None]:
class bertDataset(torch.utils.data.Dataset):
    def __init__(self, texts, labels, tokenizer, device):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.device = device

    def __getitem__(self, item):
        text = self.texts[item]
        label = self.labels[item]
        tokens = self.tokenizer(text, padding='max_length', max_length=50, truncation=True, return_tensors='pt',
                                is_split_into_words=True).to(self.device)
        tokens['input_ids'] = torch.squeeze(tokens['input_ids'], 0)[:512]
        tokens['attention_mask'] = torch.squeeze(tokens['attention_mask'], 0)[:512]
        tokens['token_type_ids'] = torch.squeeze(tokens['token_type_ids'], 0)[:512]
        length = sum(tokens['attention_mask']).item()
        label = torch.tensor(label, dtype=torch.float32, device=self.device)
        return tokens, length, label

    def __len__(self):
        return len(self.texts)

In [None]:
bert_train = bertDataset(X_train, y_train, tokenizer, device)
bert_dev = bertDataset(X_val, y_val, tokenizer, device)
bert_test = bertDataset(X_test, y_test, tokenizer, device)
bert_tdataloader = torch.utils.data.DataLoader(bert_train, batch_size=64)
bert_ddataloader = torch.utils.data.DataLoader(bert_dev, batch_size=1)
bert_ttdataloader = torch.utils.data.DataLoader(bert_test, batch_size=1)

In [None]:
class BERT_GRU(nn.Module):
    def __init__(self, device, num_classes=1, hidden_size=100, model_name="bert-base-multilingual-cased"):
        super(BERT_GRU, self).__init__()
        self.device = device
        self.bert = BertModel.from_pretrained(model_name).to(self.device)
        self.gru = nn.GRU(input_size=768, hidden_size=hidden_size, batch_first=True).to(self.device)
        self.dropout = nn.Dropout(0.4).to(self.device)
        self.linear = nn.Linear(hidden_size, num_classes).to(self.device)

    def forward(self, tokens, length):
        with torch.no_grad():
            output = self.bert(input_ids=tokens['input_ids'], attention_mask=tokens['attention_mask'],
                               token_type_ids=tokens['token_type_ids'], return_dict=True)
        lstm_out, hidden = self.gru(output[
                                        'last_hidden_state'])  #its size is equal to batch_size, sequence_size, embedding_size (in case of this notebook, it's 32, 55, 768)
        lstm_out = self.dropout(lstm_out)
        output_zero_padding = lstm_out.permute([2, 0, 1]) * tokens['attention_mask']
        output_zero_padding = output_zero_padding.permute([1, 2, 0]).to(self.device)
        linear = torch.sum(output_zero_padding, 1).to(self.device)
        length = length.to(self.device)
        linear = linear.T / length
        linear = linear.T
        out = self.linear(linear)
        return out

In [None]:
def bert_train_model(model, dataloader, dev_dataloader, epoches):
    optimizer = optim.Adam(model.parameters(), lr=1e-5)  #lr 1e-5 and adam for roberta and albert
    binary = nn.BCEWithLogitsLoss()
    best_f = 0
    for epoch in range(epoches):
        print(epoch + 1, "epoch")
        t = tqdm(dataloader)
        i = 0
        for sentence, length, label in t:
            pred = model(sentence, length)
            loss = binary(pred.view(-1), label)
            if i % 300 == 0:
                torch.save(model, 'model.pt')
                predicted = []
                true = []
                with torch.no_grad():
                    for sentence, length, label in dev_dataloader:
                        pred = model(sentence, length)
                        idx = (torch.sigmoid(pred) > 0.5).type(torch.int).item()
                        predicted.append(idx)
                        true.append(label.item())
                f1 = f1_score(true, predicted, average='macro')
                if f1 > best_f:
                    torch.save(model, f"{round(f1, 3)}model.pt")
                    best_f = f1
                    print("Saving with score", best_f)
            i += 1
            t.set_description(f"loss: {round(float(loss), 3)}, f-macro: {round(f1, 3)}")
            t.refresh()
            loss.backward()
            optimizer.step()
            model.zero_grad()

In [None]:
def bert_evaluate(model, test_dataloader):
    predicted = []
    true = []
    with torch.no_grad():
        for sentence, length, label in test_dataloader:
            pred = model(sentence, length)
            idx = (torch.sigmoid(pred) > 0.5).type(torch.int).item()
            predicted.append(idx)
            true.append(label.item())
    print(classification_report(true, predicted))

In [None]:
m = BERT_GRU(device, 1, 100)
m.train()

In [None]:
bert_train_model(m, bert_tdataloader, bert_ddataloader, 10)

In [None]:
class BERTclassifier(nn.Module):
    def __init__(self, device, num_classes=1, model_name="bert-base-multilingual-cased"):
        super(BERTclassifier, self).__init__()
        self.device = device
        self.bert = BertModel.from_pretrained(model_name).to(self.device)
        self.linear = nn.Linear(768, num_classes).to(self.device)

    def forward(self, tokens, length):
        output = self.bert(input_ids=tokens['input_ids'], attention_mask=tokens['attention_mask'],
                           token_type_ids=tokens['token_type_ids'], return_dict=True)
        output = output['last_hidden_state'][:,
                 0]  #its size is equal to batch_size, embedding_size (in case of this notebook, it's 32, 768)
        out = self.linear(output)
        return out

In [None]:
b = BERTclassifier(device, 1)
b.train()

In [None]:
bert_train_model(b, bert_tdataloader, bert_ddataloader, 3)

In [None]:
b.eval()

In [None]:
bert_evaluate(b, bert_ttdataloader)

In [None]:
tokenizer = RobertaTokenizerFast.from_pretrained('roberta-base', add_prefix_space=True)

In [None]:
class robertaDataset(torch.utils.data.Dataset):
    def __init__(self, texts, labels, tokenizer, device, al=False):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.device = device
        self.al = al

    def __getitem__(self, item):
        text = self.texts[item]
        label = self.labels[item]
        tokens = self.tokenizer(text, padding='max_length', max_length=50, truncation=True, return_tensors='pt',
                                is_split_to_words=True).to(self.device)
        tokens['input_ids'] = torch.squeeze(tokens['input_ids'], 0)
        tokens['attention_mask'] = torch.squeeze(tokens['attention_mask'], 0)
        if self.al:
            tokens['token_type_ids'] = torch.squeeze(tokens['token_type_ids'], 0)[:512]
        length = sum(tokens['attention_mask']).item()
        label = torch.tensor(label, dtype=torch.float32, device=self.device)
        return tokens, length, label

    def __len__(self):
        return len(self.texts)

In [None]:
class roBERTaclassifier(nn.Module):
    def __init__(self, device, num_classes=1, model_name='roberta-base'):
        super(roBERTaclassifier, self).__init__()
        self.device = device
        self.roberta = RobertaModel.from_pretrained(model_name).to(self.device)
        self.linear = nn.Linear(768, num_classes).to(self.device)

    def forward(self, tokens, length):
        output = self.roberta(**tokens)
        output = output.last_hidden_state[:,
                 0]  #its size is equal to batch_size, embedding_size (in case of this notebook, it's 32, 768)
        out = self.linear(output)
        return out

In [None]:
rob = roBERTaclassifier(device)
rob.train()

In [None]:
roberta_train = robertaDataset(X_train, y_train, tokenizer, device)
roberta_dev = robertaDataset(X_val, y_val, tokenizer, device)
roberta_test = robertaDataset(X_test, y_test, tokenizer, device)
roberta_tdataloader = torch.utils.data.DataLoader(roberta_train, batch_size=16)
roberta_ddataloader = torch.utils.data.DataLoader(roberta_dev, batch_size=1)
roberta_ttdataloader = torch.utils.data.DataLoader(roberta_test, batch_size=1)

In [None]:
bert_train_model(rob, roberta_tdataloader, roberta_ddataloader, 1)

In [None]:
rob.eval()

In [None]:
bert_evaluate(rob, roberta_ttdataloader)

In [None]:
albert_tokenizer = AlbertTokenizerFast.from_pretrained('albert-base-v2')

In [None]:
class ALBERTclassifier(nn.Module):
    def __init__(self, device, num_classes=1, model_name='albert-base-v2'):
        super(ALBERTclassifier, self).__init__()
        self.device = device
        self.albert = AlbertModel.from_pretrained(model_name).to(self.device)
        self.linear = nn.Linear(768, num_classes).to(self.device)

    def forward(self, tokens, length):
        output = self.albert(**tokens)
        output = output.last_hidden_state[:,
                 0]  #its size is equal to batch_size, embedding_size (in case of this notebook, it's 32, 768)
        out = self.linear(output)
        return out

In [None]:
albert_train = robertaDataset(X_train, y_train, albert_tokenizer, device, True)
albert_dev = robertaDataset(X_val, y_val, albert_tokenizer, device, True)
albert_test = robertaDataset(X_test, y_test, albert_tokenizer, device, True)
albert_tdataloader = torch.utils.data.DataLoader(albert_train, batch_size=16)
albert_ddataloader = torch.utils.data.DataLoader(albert_dev, batch_size=1)
albert_ttdataloader = torch.utils.data.DataLoader(albert_test, batch_size=1)

In [None]:
albert = ALBERTclassifier(device)
albert.train()

In [None]:
bert_train_model(albert, albert_tdataloader, albert_ddataloader, 1)

In [None]:
albert.eval()

In [None]:
bert_evaluate(albert, albert_ttdataloader)