In [None]:
!pip install transformers==4.35.2 torch==2.1.0 pandas==1.5.3 scikit-learn==1.2.2 tqdm==4.66.1 --quiet

In [None]:
import re
import torch
import numpy as np
import pandas as pd

from pathlib import Path
from tqdm import tqdm
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, recall_score
from transformers import AutoTokenizer, BertModel
from torch import nn, optim
from torch.utils.data import Dataset, DataLoader

In [None]:
# ARGs
datasets_path = Path('.')
train_filename = 'train.csv'
val_filename = 'valid.csv'
test_filename = 'test.csv'

bert_ckpt = 'bert-base-uncased'
max_length = 200
best_ckpt_name = 'best_model.pt'
num_epochs = 5
batch_size = 32
lr = 5e-4
validation_ratio = 0.1 # used if `val_filename` doesn't exist

# Read Data

In [None]:
def transform_remove_URL(text):
    url = re.compile(r'https?://\S+|www\.\S+')
    return url.sub(r'', text)


def transform_remove_html(text):
    html = re.compile(r'<.*?>|&([a-z0-9]+|#[0-9]{1,6}|#x[0-9a-f]{1,6});')
    return re.sub(html, '', text)


def transform_remove_usernames(text):
    uh = re.compile(r'([@][A-Za-z0-9_]+)|(\w+:\/\/\S+)')
    return uh.sub(r'', text)


def transform_remove_hashtags(text):
    return re.sub(r'#\w+', ' ', text)


def transform_remove_digits(text):
    return re.sub(r'\d+', ' ', text)


def transform_remove_emoji(text):
    emoji_pattern = re.compile("["
                               u"\U0001F600-\U0001F64F"  # emoticons
                               u"\U0001F300-\U0001F5FF"  # symbols & pictographs
                               u"\U0001F680-\U0001F6FF"  # transport & map symbols
                               u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
                               u"\U00002500-\U00002BEF"  # chinese char
                               u"\U00002702-\U000027B0"
                               u"\U00002702-\U000027B0"
                               u"\U000024C2-\U0001F251"
                               u"\U0001f926-\U0001f937"
                               u"\U00010000-\U0010ffff"
                               u"\u2640-\u2642"
                               u"\u2600-\u2B55"
                               u"\u200d"
                               u"\u23cf"
                               u"\u23e9"
                               u"\u231a"
                               u"\ufe0f"  # dingbats
                               u"\u3030"
                               "]+", re.UNICODE)
    return emoji_pattern.sub(r' ', text)


def transform_lowercase(text):
    return text.lower()


def transform_fix_i(text):
    fix = re.compile(r'i̇')
    return fix.sub(r'i', text)


def transform_fix_whitespace(text):
    return ' '.join(text.split())

In [None]:
def apply_transforms(text, transforms):
    for transform in transforms:
        text = transform(text)
    return text

In [None]:
# Define transformations
transforms = [
    transform_remove_URL,
    transform_remove_html,
    transform_remove_usernames,
    transform_remove_emoji,
    transform_lowercase,
    transform_fix_i,
    transform_fix_whitespace,
]

# Load and preprocess data
train_path = datasets_path / train_filename
val_path = datasets_path / val_filename
test_path = datasets_path / test_filename

train_df = pd.read_csv(train_path)
test_df = pd.read_csv(test_path)
has_validation = False

if val_path.exists():
    val_df = pd.read_csv(val_path)
    has_validation = True


train_df['text'] = train_df['text'].apply(lambda x: apply_transforms(x, transforms))
train_df['text'] = train_df['text'].astype(str)

test_df['text'] = test_df['text'].apply(lambda x: apply_transforms(x, transforms))
test_df['text'] = test_df['text'].astype(str)

if has_validation:
    val_df['text'] = val_df['text'].apply(lambda x: apply_transforms(x, transforms))
    val_df['text'] = val_df['text'].astype(str)

In [None]:
le = LabelEncoder()
x_train, y_train = train_df['text'], train_df['label']
le.fit(y_train.tolist())
y_train = le.transform(y_train.tolist())
y_train = y_train.reshape(-1,1).ravel()


x_test, y_test = test_df['text'], test_df['label']
y_test = le.transform(y_test.tolist())
y_test = y_test.reshape(-1,1).ravel()

if has_validation:
    x_dev, y_dev = val_df['text'], val_df['label']
    y_dev = le.transform(y_dev.tolist())
    y_dev = y_dev.reshape(-1,1).ravel()
else:
    x_train, x_dev, y_train, y_dev = train_test_split(x_train, y_train, test_size = validation_ratio, random_state = 42)

## Tokenize

In [None]:
class TextDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=512):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        encoding = self.tokenizer(
            text,
            add_special_tokens=True,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )
        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(label, dtype=torch.long)
        }

In [None]:
tokenizer = AutoTokenizer.from_pretrained(bert_ckpt)

# Create datasets
train_dataset = TextDataset(x_train.tolist(), y_train.tolist(), tokenizer, max_length=max_length)
dev_dataset = TextDataset(x_dev.tolist(), y_dev.tolist(), tokenizer, max_length=max_length)
test_dataset = TextDataset(x_test.tolist(), y_test.tolist(), tokenizer, max_length=max_length)

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
dev_loader = DataLoader(dev_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Model Training

In [None]:
class MyModel(nn.Module):
    def __init__(self, model_name):
        super(MyModel, self).__init__()
        self.bert_encoder = BertModel.from_pretrained(model_name)
        for param in self.bert_encoder.parameters():  # Freeze BERT
            param.requires_grad = False
        self.dropout = nn.Dropout(0.2)
        self.conv1d = nn.Conv1d(768, 32, kernel_size=3, padding='same')
        self.lstm = nn.LSTM(32, 100, bidirectional=True, batch_first=True)
        self.dropout_final = nn.Dropout(0.1)
        self.fc = nn.Linear(200, 1)

    def forward(self, input_ids, attention_mask):
        last_hidden_states = self.bert_encoder(input_ids=input_ids, attention_mask=attention_mask).last_hidden_state
        x = self.dropout(last_hidden_states.transpose(1, 2))
        x = self.conv1d(x).transpose(1, 2)
        x, _ = self.lstm(x)
        x = self.dropout_final(x[:, -1, :])  # We want the last time step
        outputs = self.fc(x)
        return outputs

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
# Initialize the model and move it to the device
model = MyModel(bert_ckpt).to(device)

criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)


def test_model(model, data_loader, split_name):
    model.eval()
    true_labels, predicted_labels = [], []

    with torch.no_grad():
        for batch in tqdm(data_loader, desc=f"Testing on {split_name}"):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            logits = model(input_ids, attention_mask)
            predictions = logits.squeeze() > 0

            true_labels.extend(labels.int().cpu().numpy())
            predicted_labels.extend(predictions.cpu().numpy())

    metrics = {
        'accuracy': accuracy_score(true_labels, predicted_labels),
        'recall_macro': recall_score(true_labels, predicted_labels, average='macro'),
        'recall_negative': recall_score(true_labels, predicted_labels, pos_label=0, average='binary'),
        'f1_score': f1_score(true_labels, predicted_labels, average='binary')
    }

    print(f"{split_name} - Accuracy: {metrics['accuracy']}, Recall (Macro): {metrics['recall_macro']}, Negative Recall: {metrics['recall_negative']}, F1 Score (Binary): {metrics['f1_score']}")
    return metrics

def train_model(model, train_loader, dev_loader, num_epochs, best_ckpt_name):
    best_recall = 0.0

    # Test on dev before starting training
    test_model(model, dev_loader, "Dev Split (Pre-Training)")

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        for batch in tqdm(train_loader, desc=f"Training Epoch {epoch}"):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            optimizer.zero_grad()
            logits = model(input_ids, attention_mask)
            loss = criterion(logits.squeeze(), labels.float())
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_loss = total_loss / len(train_loader)
        print(f"Epoch {epoch}: Loss - {avg_loss}")

        # Test on dev split after each training epoch
        dev_metrics = test_model(model, dev_loader, f"Dev Split ({epoch+1}. epoch)")

        # Check if this is the best model so far and save it
        if dev_metrics['recall_macro'] > best_recall:
            best_recall = dev_metrics['recall_macro']
            torch.save(model.state_dict(), best_ckpt_name)
            print(f"Saved new best model with recall: {best_recall}")

In [None]:
train_model(model, train_loader, dev_loader, num_epochs, best_ckpt_name)
test_model(model, test_loader, 'Test Split')