In [1]:
import pandas as pd
from sklearn.model_selection import train_test_split
from transformers import BertTokenizer
import numpy as np

# Load dataset
df = pd.read_csv('datasets/cleaned_OLID.tsv', sep="\t")

# sample_size = 8000
# positive_ratio = 0.5

# pos_df = df[df['label'] == 1].sample(n=np.floor(sample_size*positive_ratio).astype(int), random_state=1)

# neg_df = df[df['label'] == 0].sample(n=np.floor(sample_size*(1-positive_ratio)).astype(int), random_state=1)

# df = pd.concat([pos_df, neg_df])
df = df.sample(frac=1, random_state=42)

tweets = df['tweet'].values
labels = df['label'].values


# Split the dataset into training and validation sets
train_texts, val_texts, train_labels, val_labels = train_test_split(tweets, labels, test_size=0.2, random_state=42)

# Initialize BERT tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')


# Tokenize and encode the training and validation texts
train_encodings = tokenizer(train_texts.tolist(), truncation=True, padding=True)
val_encodings = tokenizer(val_texts.tolist(), truncation=True, padding=True)


vocab_size = len(tokenizer.get_vocab())

  _torch_pytree._register_pytree_node(


In [2]:
train_encodings.keys()

dict_keys(['input_ids', 'token_type_ids', 'attention_mask'])

In [3]:
import torch

class TweetDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        # item = {'input_ids': torch.tensor(self.encodings[idx])}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

train_dataset = TweetDataset(train_encodings, train_labels)
val_dataset = TweetDataset(val_encodings, val_labels)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=12, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=12, shuffle=False)


In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
# LSTM Model
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(hidden_size * 2, output_size)
        self.sigmoid = nn.Sigmoid()
        
    def forward(self, batch, device):
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        embedded = self.embedding(input_ids)
        lstm_out, _ = self.lstm(embedded)

        # Apply attention mask after LSTM
        # Masking the lstm output by zeroing out the effects of padding
        expanded_mask = attention_mask.unsqueeze(-1).expand(lstm_out.size()).float()
        lstm_out = lstm_out * expanded_mask


        lstm_out = torch.cat((lstm_out[:, -1, :self.hidden_size], lstm_out[:, 0, self.hidden_size:]), dim=1)
        out = self.fc(lstm_out)
        return self.sigmoid(out)

# Model setup
input_size = len(tokenizer.get_vocab())
hidden_size = 512
output_size = 1
model = LSTMModel(input_size, hidden_size, output_size, num_layers=2)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# Training
num_epochs = 3
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for batch in train_loader:
        # input_ids = batch['input_ids'].to(device)
        labels = batch['labels'].float().to(device)
        labels = labels.squeeze()
        # attention_mask = batch['attention_mask'].to(device)
        
        optimizer.zero_grad()
        outputs = model(batch, device)
        outputs = outputs.squeeze()

        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    average_loss = total_loss / len(train_loader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Average Loss: {average_loss:.4f}")

Epoch [1/3], Average Loss: 0.6377
Epoch [2/3], Average Loss: 0.5807
Epoch [3/3], Average Loss: 0.4445


In [5]:
import numpy as np
import time
# Evaluation
model.eval()
# Perform evaluation on validation set and calculate metrics as needed
# Example: calculate accuracy
correct = 0
total = 0
i = 0
prediction_list = np.array([])

with torch.no_grad():
    test_start  = time.time()
    print('start')
    for batch in val_loader:
        # input_ids = batch['input_ids'].to(device)
        labels = batch['labels'].float()
        # attention_mask = batch['attention_mask'].to(device)

        outputs = model(batch, device).detach().cpu()
        predictions = torch.round(outputs.squeeze())
        correct += ((outputs.squeeze() > 0.5) == labels).sum().item()

        total += labels.size(0)
        prediction_list = np.append(prediction_list, predictions.numpy())
    print('end')
    
    test_end = time.time()

accuracy = correct / total
print(f'Validation Accuracy: {accuracy}')


start
end
Validation Accuracy: 0.7613293051359517


In [6]:
# torch.save(model.state_dict(), 'models/OLID_lstm.pth')

In [7]:
print(model)

LSTMModel(
  (embedding): Embedding(30522, 512)
  (lstm): LSTM(512, 512, num_layers=2, batch_first=True, bidirectional=True)
  (fc): Linear(in_features=1024, out_features=1, bias=True)
  (sigmoid): Sigmoid()
)


In [8]:
from sklearn.metrics import classification_report

# Assuming you have the true labels in `val_labels` and the predicted labels in `prediction_list`
report = classification_report(val_labels, prediction_list)

print(report)

              precision    recall  f1-score   support

           0       0.77      0.90      0.83      1764
           1       0.71      0.48      0.57       884

    accuracy                           0.76      2648
   macro avg       0.74      0.69      0.70      2648
weighted avg       0.75      0.76      0.75      2648



In [9]:
from sklearn.metrics import confusion_matrix

# Assuming you have the true labels in `val_labels` and the predicted labels in `prediction_list`
cm = confusion_matrix(val_labels, prediction_list)

# Extract TP, TN, FP, FN from the confusion matrix
TP = cm[1, 1]
TN = cm[0, 0]
FP = cm[0, 1]
FN = cm[1, 0]

print(f"True Positives (TP): {TP}")
print(f"True Negatives (TN): {TN}")
print(f"False Positives (FP): {FP}")
print(f"False Negatives (FN): {FN}")


True Positives (TP): 420
True Negatives (TN): 1596
False Positives (FP): 168
False Negatives (FN): 464


In [10]:
# import pandas as pd

# # Create a DataFrame with the validation texts and labels
# validation_df = pd.DataFrame({'text': val_texts, 'label': val_labels})

# # Add the prediction list as a new column to the DataFrame
# validation_df['prediction'] = prediction_list

# # Save the DataFrame as a CSV file
# validation_df.to_csv('OLID_validation_with_predictions.csv', index=False)
