<a href="https://colab.research.google.com/github/hishamp3/MasterThesis-Lies-DeceptiveText/blob/main/Linear_Classifier.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install transformers
!pip install sentencepiece

In [None]:
import pandas as pd
import re
import string

In [None]:
def clean_text(text):
    # to lower case
    text = text.lower()
    # remove links
    text = re.sub('https:\/\/\S+', '', text)
    # remove punctuation
    text = re.sub('[%s]' % re.escape(string.punctuation), '', text)
    # remove next line
    text = re.sub(r'[^ \w\.]', '', text)
    # remove words containing numbers
    text = re.sub('\w*\d\w*', '', text)

    return text

In [None]:
df = pd.read_csv("./sample_data/fake reviews dataset.csv",usecols=["text_","label"])

In [None]:
df['text'] = df.text_.apply(lambda x: clean_text(x))

In [None]:
print(df.head(5))

  label                                              text_  \
0    CG  Love this!  Well made, sturdy, and very comfor...   
1    CG  love it, a great upgrade from the original.  I...   
2    CG  This pillow saved my back. I love the look and...   
3    CG  Missing information on how to use it, but it i...   
4    CG  Very nice set. Good quality. We have had the s...   

                                                text  
0  love this  well made sturdy and very comfortab...  
1  love it a great upgrade from the original  ive...  
2  this pillow saved my back i love the look and ...  
3  missing information on how to use it but it is...  
4  very nice set good quality we have had the set...  


In [None]:
model_name = 'xlm-roberta-base'
#model_name = 'roberta-base'
#model_name = 'bert-base-uncased'

In [None]:
from transformers import BertTokenizer
from transformers import RobertaTokenizer, TFRobertaModel
from transformers import XLNetTokenizer, XLNetModel
from transformers import AutoTokenizer,XLMRobertaForMaskedLM

tokenizer = AutoTokenizer.from_pretrained(model_name)
example_text = 'The Weather is good tonight'
bert_input = tokenizer(example_text,padding='max_length', max_length = 10,
                       truncation=True, return_tensors="pt")


print(bert_input['input_ids'])
print(bert_input['attention_mask'])

Downloading (…)lve/main/config.json:   0%|          | 0.00/615 [00:00<?, ?B/s]

Downloading (…)tencepiece.bpe.model:   0%|          | 0.00/5.07M [00:00<?, ?B/s]

Downloading (…)/main/tokenizer.json:   0%|          | 0.00/9.10M [00:00<?, ?B/s]

tensor([[     0,    581, 214526,     83,   4127, 179028,      2,      1,      1,
              1]])
tensor([[1, 1, 1, 1, 1, 1, 1, 0, 0, 0]])


In [None]:
example_text = tokenizer.decode(bert_input.input_ids[0])
print(example_text)

<s> The Weather is good tonight</s><pad><pad><pad>


In [None]:
import torch
import numpy as np
from transformers import BertTokenizer

tokenizer = AutoTokenizer.from_pretrained(model_name)
labels = {'OR':0,
          'CG':1
          }

class Dataset(torch.utils.data.Dataset):

    def __init__(self, df):

        self.labels = [labels[label] for label in df['label']]
        self.texts = [tokenizer(text,
                               padding='max_length', max_length = 512, truncation=True,
                                return_tensors="pt") for text in df['text']]

    def classes(self):
        return self.labels

    def __len__(self):
        return len(self.labels)

    def get_batch_labels(self, idx):
        # Fetch a batch of labels
        return np.array(self.labels[idx])

    def get_batch_texts(self, idx):
        # Fetch a batch of inputs
        return self.texts[idx]

    def __getitem__(self, idx):

        batch_texts = self.get_batch_texts(idx)
        batch_y = self.get_batch_labels(idx)

        return batch_texts, batch_y

In [None]:
np.random.seed(112)
df_train, df_val, df_test = np.split(df.sample(frac=1, random_state=42),
                                     [int(.8*len(df)), int(.9*len(df))])

print(len(df_train),len(df_val), len(df_test))

32345 4043 4044


In [None]:
from torch import nn
from transformers import BertModel
from transformers import RobertaTokenizer, RobertaModel
from transformers import XLMRobertaForCausalLM, AutoConfig
from transformers import AutoModelForMaskedLM
from transformers import AutoModel

class Classifier(nn.Module):

    def __init__(self, dropout=0.6):

        super(Classifier, self).__init__()
        self.model = AutoModel.from_pretrained(model_name)
        self.dropout = nn.Dropout(dropout)
        self.linear = nn.Linear(768, 2)
        self.relu = nn.ReLU()

    def forward(self, input_id, mask):

        _, pooled_output = self.model(input_ids= input_id, attention_mask=mask,return_dict=False)
        dropout_output = self.dropout(pooled_output)
        linear_output = self.linear(dropout_output)
        final_layer = self.relu(linear_output)

        return final_layer

In [None]:
from torch.optim import Adam
from tqdm import tqdm

def train(model, train_data,learning_rate, epochs):

    train = Dataset(train_data)

    train_dataloader = torch.utils.data.DataLoader(train, batch_size=8, shuffle=True)

    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")

    criterion = nn.CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr= learning_rate)

    if use_cuda:

            model = model.cuda()
            criterion = criterion.cuda()

    for epoch_num in range(epochs):

            total_acc_train = 0
            total_loss_train = 0

            for train_input, train_label in tqdm(train_dataloader):

                train_label = train_label.to(device)
                mask = train_input['attention_mask'].to(device)
                input_id = train_input['input_ids'].squeeze(1).to(device)

                output = model(input_id, mask)

                batch_loss = criterion(output, train_label.long())
                total_loss_train += batch_loss.item()

                acc = (output.argmax(dim=1) == train_label).sum().item()
                total_acc_train += acc

                model.zero_grad()
                batch_loss.backward()
                optimizer.step()

            print(
                f'Epochs: {epoch_num + 1} | Train Loss: {total_loss_train / len(train_data): .3f} \
                | Train Accuracy: {total_acc_train / len(train_data): .3f}'
                )


In [None]:
import gc
gc.collect()
torch.cuda.empty_cache()

In [None]:
# Training
EPOCHS = 2
model = Classifier()
LR = 1e-6

train(model, df_train, LR, EPOCHS)

Downloading model.safetensors:   0%|          | 0.00/1.12G [00:00<?, ?B/s]

100%|██████████| 4044/4044 [47:58<00:00,  1.40it/s]


Epochs: 1 | Train Loss:  0.034                 | Train Accuracy:  0.896


100%|██████████| 4044/4044 [48:04<00:00,  1.40it/s]


Epochs: 2 | Train Loss:  0.012                 | Train Accuracy:  0.964


In [None]:
from sklearn.metrics import confusion_matrix
from sklearn.metrics import f1_score

def evaluate(model, test_data):

    test = Dataset(test_data)

    test_dataloader = torch.utils.data.DataLoader(test, batch_size=8)

    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")

    criterion = nn.CrossEntropyLoss()
    if use_cuda:

        model = model.cuda()
        criterion = criterion.cuda()

    total_loss_test = 0
    total_acc_test = 0
    y_pred = []
    y_true = []
    with torch.no_grad():

        for test_input, test_label in test_dataloader:

              test_label = test_label.to(device)
              mask = test_input['attention_mask'].to(device)
              input_id = test_input['input_ids'].squeeze(1).to(device)

              output = model(input_id, mask)

              batch_loss = criterion(output, test_label.long())
              total_loss_test += batch_loss.item()

              acc = (output.argmax(dim=1) == test_label).sum().item()
              total_acc_test += acc

              y_pred.extend((torch.max(torch.exp(output), 1)[1]).data.cpu().numpy())
              y_true.extend(test_label.data.cpu().numpy())
              cf_matrix = confusion_matrix(y_true, y_pred)
              score_f1 = f1_score(y_true, y_pred)

    print(f'Test Accuracy: {total_acc_test / len(test_data): .3f}|Val Loss: {total_loss_test / len(test_data): .3f}')
    print(f'Confusion Matrix: {cf_matrix}')
    print(f'F1 score: {score_f1}')

In [None]:
frames = [df_val, df_test]
test_set = pd.concat(frames)

In [None]:
# Evaluation
evaluate(model, test_set)