In [5]:
import re
import math
import time
import pandas as pd
from nltk.tokenize import word_tokenize
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from transformers import BertModel, BertTokenizer

from torchinfo import summary
from torchviz import make_dot
import matplotlib.pyplot as plt
import matplotlib.patches as patches

# Define the model
class bert_for_binary_classification(nn.Module):
    def __init__(self):
        super(bert_for_binary_classification, self).__init__()
        self.bert = BertModel.from_pretrained('bert-base-multilingual-uncased')
        self.dropout = nn.Dropout(self.bert.config.hidden_dropout_prob)
        self.classifier = nn.Linear(self.bert.config.hidden_size, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, input_ids, attention_mask, token_type_ids):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
        pooled_output = outputs.pooler_output
        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)
        probabilities = self.sigmoid(logits)
        return probabilities.squeeze()
    
def load_bert(dir):
    model= bert_for_binary_classification()

    model.load_state_dict(torch.load(dir))

    return model

def create_bert_dataloader(texts, labels, batch_size, shuffle=True):
    # Fetch a bert tokenizer, since the default one will suffice
    tokenizer = BertTokenizer.from_pretrained("bert-base-multilingual-uncased")

    # Tokenize texts to pytorch tensors
    inputs = tokenizer(texts, max_length=300, padding='max_length', truncation=True, return_tensors="pt")
    
    # Convert labels to PyTorch tensor
    labels_tensor = torch.tensor(labels, dtype=torch.float32)
    
    # Create DataLoader for the dataset
    dataset = TensorDataset(inputs["input_ids"], inputs["token_type_ids"], inputs["attention_mask"], labels_tensor)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)
    
    return dataloader

def find_device(verbose=False):
    # Check if a CUDA-enabled GPU is available
    if torch.cuda.is_available():
        device = torch.device("cuda")
        if verbose: print("### Cuda Found :: Using GPU ###")
    
    else:
        device = torch.device("cpu")
        if verbose: print("### Cuda not found :: Using CPU ###")

    return device

# Cleans and tokenizes a text for bert
def prepare_text(text):
    # Convert text to lowercase
    text = text.lower()
    
    # Remove user mentions
    text = re.sub(r'@\w+', '', text)

    # Remove external links
    text = re.sub(r'([hH][tT]{2}[pP][sS]?:\/\/)?[\w~-]+(\.[\w~-]+)+(\/[\w~-]*)*', '', text)

    # Tokenize text
    tokens = word_tokenize(text)

    # Remove punctuation and digits
    tokens = [re.sub(r'[\W\d_]+', '', token) for token in tokens]

    # Remove short/empty tokens
    tokens = [token for token in tokens if len(token) > 1]

    # Remove stop words
    #... do ID-IFD? to find the stop words of this dataset

    text = ' '.join(tokens)

    return tokenize_text(text)

def tokenize_text(text):
    # Fetch the BERT tokenizer
    tokenizer = BertTokenizer.from_pretrained("bert-base-multilingual-uncased")

    # Tokenize texts to pytorch tensors
    inputs = tokenizer(text, max_length=300, padding='max_length', truncation=True, return_tensors="pt")
    
    # Create return tokenized text in parts
    return inputs["input_ids"], inputs["token_type_ids"], inputs["attention_mask"]

def predict_sentiment(text, model, device):
    # Prepare the text for the model
    input_ids, type_ids, mask = prepare_text(text)
    
    input_ids = input_ids.to(device)
    type_ids = type_ids.to(device)
    mask = mask.to(device)

    model.eval()

    with torch.no_grad():
        output = model(input_ids, mask, type_ids)

        prediction = (output > 0.5).float().item()

    # Return the prediction and its confidence
    if prediction == 1:
        return 'POSITIVE', output
    else:
        return 'NEGATIVE', (1.0 - output)
    
def bert_benchmark(model, dataloader, device, verbose=False):
    criterion = nn.BCELoss()
    
    total_loss = 0.0
    total_correct = 0
    total_samples = 0
    total_duration = 0

    model.eval()
    with torch.no_grad():
        for batch_index, batch in enumerate(dataloader):
            # Note the time before processing this batch
            prev_time = time.time()
            
            # Fetch data from batch and move to the target device
            input_ids, token_type_ids, attention_mask, targets = batch
            input_ids = input_ids.to(device)
            token_type_ids = token_type_ids.to(device)
            attention_mask = attention_mask.to(device)
            targets = targets.to(device)

            # Run the model
            outputs = model(input_ids, attention_mask, token_type_ids)

            # Find the loss and accumulate
            loss = criterion(outputs, targets)
            total_loss += loss.item() * targets.size(0)
            
            # Find the accuracy and accumulate
            predictions = (outputs > 0.5).float()
            correct = (predictions == targets).sum().item()
            total_correct += correct

            total_samples += targets.size(0)

            #Find how long the epoch took
            duration = time.time() - prev_time
            total_duration += duration

            # Estimate remaining runtime
            avg_runtime = total_duration / (batch_index + 1)
            remaining_batches = len(dataloader) - (batch_index + 1)
            remaining_time = avg_runtime * remaining_batches

            # Print the progress, loss, and accuracy
            if verbose: print(f'Batch {batch_index+1}/{len(dataloader)} {progress_bar(batch_index+1, len(dataloader), 50)}'
                              f' - Loss: {total_loss / total_samples:.4f}'
                              f' - Accuracy: {total_correct / total_samples * 100:.4f}%'
                              f' - Remaining Time: {int(remaining_time // 60)}:{int(remaining_time % 60):02d}'
                              , end='\r')
    print()

    return (total_loss/total_samples), (total_correct/total_samples)

def progress_bar(numerator, denominator, length):
    progress_bar_fill = math.floor((numerator / denominator) * length)

    return '[' + ('=' * progress_bar_fill) + (' ' * (length - progress_bar_fill)) + ']'

In [2]:
device = find_device()
model = load_bert('models/BERT.pth')
model = model.to(device)

Some weights of the model checkpoint at bert-base-multilingual-uncased were not used when initializing BertModel: ['cls.predictions.transform.dense.weight', 'cls.predictions.transform.dense.bias', 'cls.predictions.transform.LayerNorm.bias', 'cls.seq_relationship.weight', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.decoder.weight', 'cls.predictions.bias', 'cls.seq_relationship.bias']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


Find Model Summary

In [43]:
with open('plots/model_summary.txt', 'w') as file:
    file.write(str(model))

Test the model on various texts

In [3]:
text = 'I hate you!'

sentiment, confidence = predict_sentiment(text, model, device)

print(f'The predicted sentiment of the text "{text}" is {sentiment} with {confidence*100:0.2f}% confidence.')

The predicted sentiment of the text "I hate you!" is NEGATIVE with 90.42% confidence.


In [29]:
text = 'I love you!'

sentiment, confidence = predict_sentiment(text, model, device)

print(f'The predicted sentiment of the text "{text}" is {sentiment} with {confidence*100:0.2f}% confidence.')

The predicted sentiment of the text "I love you!" is POSITIVE with 93.78% confidence.


In [30]:
text = 'I hope you have a really bad day!'

sentiment, confidence = predict_sentiment(text, model, device)

print(f'The predicted sentiment of the text "{text}" is {sentiment} with {confidence*100:0.2f}% confidence.')

The predicted sentiment of the text "I hope you have a really bad day!" is NEGATIVE with 89.53% confidence.


In [31]:
text = 'I hope you have a really good day!'

sentiment, confidence = predict_sentiment(text, model, device)

print(f'The predicted sentiment of the text "{text}" is {sentiment} with {confidence*100:0.2f}% confidence.')

The predicted sentiment of the text "I hope you have a really good day!" is POSITIVE with 89.52% confidence.


Test model on *entire dataset (Keep and display a running total of accuracy)

In [None]:
data = pd.read_csv('data/s140-prepared.csv')

texts = data['text'].to_list()
targets = data['target'].to_list()

texts = [str(text) for text in texts]


dataloader = create_bert_dataloader(texts, targets, 100)


In [68]:
total_correct = 0
total = 0

model.eval()
with torch.no_grad():
    for batch_index, batch in enumerate(dataloader):
        input_ids, type_ids, mask, y = batch

        input_ids = input_ids.to(device)
        type_ids = type_ids.to(device)
        mask = mask.to(device)
        y = y.to(device)

        output = model(input_ids, mask, type_ids)

        total_correct += sum((output > 0.5).float() == y)
        total += len(y)

        print(f'Accuracy: {total_correct/total*100:.2f}% - ({batch_index+1}/{len(dataloader)})', end='\r')

### Creating Dataloader ###
### Done - Running Tests ###
Accuracy: 78.16 - (91/16000)

Recover Lost Test loss and accuracy

In [6]:
import os
import pandas as pd
from sklearn.model_selection import train_test_split

state = 42
data_frac = 0.03125
data_dir = 'data/'
verbose = True
batch_size = 15

# Read and separate the data
data = pd.read_csv(os.path.join(data_dir, 's140-prepared.csv'))

# Split our data into negative and positive examples
data_neg = data[data['target'] == 0]
data_pos = data[data['target'] == 1]

# Randomly shuffle and truncate each data frame
data_neg = data_neg.sample(frac=data_frac, random_state=state)
data_pos = data_pos.sample(frac=data_frac, random_state=state)


# Combine the shuffled and truncated data frames
data = pd.concat([data_neg, data_pos])

#
X = data['text'].to_list()
y = data['target'].to_list()

X = [str(x) for x in X]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=state)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.25, random_state=state)

print(f'Train Size: {len(y_train)}\tTest Size: {len(y_test)}\tVal Size: {len(y_val)}')

Train Size: 30000	Test Size: 10000	Val Size: 10000


In [6]:


# Get the same test dataset to obtain the loss and accuracy that was lost after training







# Create dataloaders for easy batching
train_dataloader = create_bert_dataloader(X_train, y_train, batch_size)
test_dataloader = create_bert_dataloader(X_test, y_test, batch_size, False)
val_dataloader = create_bert_dataloader(X_val, y_val, batch_size, False)


if verbose: print('### Testing ###')
loss, accuracy = bert_benchmark(model, test_dataloader, device, verbose=verbose)

print (f'Final Loss: {loss}')
print (f'Final Accuracy: {accuracy}')




### Testing ###
Final Loss: 0.45682513979263606
Final Accuracy: 0.7864583333333334
Train Size: 15360	Test Size: 4800	Val Size: 3840
