In [1]:
# Imports
import pandas as pd
import numpy as np
import os

# Preprocessing

In [2]:
# Load data

team_id = '20' #put your team id here
split = 'test_1' # replace by 'test_2' for FINAL submission

df = pd.read_csv('dataset/tweets_train.csv')
df_test = pd.read_csv(f'dataset/tweets_{split}.csv')

In [3]:
df['words_str'] = df['words'].apply(lambda words: ' '.join(eval(words)))
df_test['words_str'] = df_test['words'].apply(lambda words: ' '.join(eval(words)))

In [3]:
def preprocess(text):
    new_text = []
    for t in text.split(" "):
        t = '@user' if t.startswith('@') and len(t) > 1 else t
        t = 'http' if t.startswith('http') else t
        new_text.append(t)
    return " ".join(new_text)

In [4]:
df['words_str'] = df['text'].apply(preprocess)
df_test['words_str'] = df_test['text'].apply(preprocess)

In [5]:
from sklearn import preprocessing
from transformers import BertTokenizer, BertModel, BertPreTrainedModel, TrainingArguments, Trainer
from transformers import RobertaTokenizer, RobertaPreTrainedModel, RobertaModel, AutoTokenizer, AutoModel, PreTrainedModel
from transformers import TrainerCallback
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score
from torch.utils.data import DataLoader, Dataset
import torch.nn as nn
import torch
import numpy as np
import torch.nn.functional as F
from torch import optim

2023-08-14 13:27:03.650863: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [6]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [7]:
X = df['words_str']
y_text = df['sentiment']
# y_text = df.sentiment.values
le = preprocessing.LabelEncoder()
le.fit(y_text)
print(f'Original classes {le.classes_}')
print(f'Corresponding numeric classes {le.transform(le.classes_)}')
y =le.transform(y_text)
print(f"X: {X.shape}")
print(f"y: {y.shape} {np.unique(y)}")

Original classes ['negative' 'neutral' 'positive']
Corresponding numeric classes [0 1 2]
X: (8000,)
y: (8000,) [0 1 2]


In [11]:
# Splitting
train_texts, val_texts, train_labels, val_labels = train_test_split(X, y, test_size=0.2, random_state=42)

In [8]:
# Tokenize the input
tokenizer_bert = BertTokenizer.from_pretrained('bert-base-uncased')
tokenizer_roberta = RobertaTokenizer.from_pretrained('roberta-base')
tokenizer_twitter = AutoTokenizer.from_pretrained('cardiffnlp/twitter-roberta-base')
tokenizer_twitter_sentiment = AutoTokenizer.from_pretrained('cardiffnlp/twitter-roberta-base-sentiment-latest')

In [12]:
tokenizer = tokenizer_twitter_sentiment
train_encodings = tokenizer(train_texts.tolist(), truncation=True, padding=True)
val_encodings = tokenizer(val_texts.tolist(), truncation=True, padding=True)

Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


# Classification

In [13]:
class ClassificationDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels.astype('int') # Change to integer type

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx], dtype=torch.long) # Change to long type for classification
        return item

    def __len__(self):
        return len(self.labels)

class FocalLoss(nn.Module):
    def __init__(self, alpha=1, gamma=2, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, inputs, targets):
        log_prob = F.log_softmax(inputs, dim=-1)
        prob = torch.exp(log_prob)
        return F.nll_loss(
            ((1 - prob) ** self.gamma) * log_prob,
            targets,
            reduction=self.reduction
        )


class BertClassification(BertPreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = 3
        self.bert = BertModel(config)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.classifier = nn.Linear(config.hidden_size, self.num_labels)

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.bert(input_ids, attention_mask=attention_mask)
        pooled_output = outputs[1]
        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)
        
        loss = None
        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))

        return (loss, logits) if loss is not None else logits

class RobertaClassification(RobertaPreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = 3
        self.roberta = RobertaModel(config)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.classifier = nn.Linear(config.hidden_size, self.num_labels)

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.roberta(input_ids, attention_mask=attention_mask)
        pooled_output = outputs[1]
        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)
        
        loss = None
        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))

        return (loss, logits) if loss is not None else logits

    
class RobertaClassificationTwitter(nn.Module):
    def __init__(self):
        super(RobertaClassificationTwitter, self).__init__()
        self.num_labels = 3
        self.roberta = AutoModel.from_pretrained('cardiffnlp/twitter-roberta-base-sentiment')
        self.dropout = nn.Dropout(0.1)
        self.classifier = nn.Linear(self.roberta.config.hidden_size, self.num_labels)

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.roberta(input_ids, attention_mask=attention_mask)
        pooled_output = outputs[1]
        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)
        
        loss = None
        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
        
        return {"loss": loss, "logits": logits} if loss is not None else {"logits": logits}

# Function to compute f1_macro
def f1_macro(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=1)
    return {'f1_macro': f1_score(labels, predictions, average='macro')}



class RobertaClassificationTwitter_2(nn.Module):
    def __init__(self):
        super(RobertaClassificationTwitter_2, self).__init__()
        self.roberta = AutoModel.from_pretrained('cardiffnlp/twitter-roberta-base')
        self.dropout = nn.Dropout(0.1)
        hidden_size = self.roberta.config.hidden_size

        # Adding an additional hidden layer
        self.hidden_layer = nn.Linear(hidden_size, hidden_size//2)
        
        # Adding L2 regularization (weight decay) to the hidden layer
        self.regularization = nn.LayerNorm(hidden_size//2)
        
        # Final classification layer with 3 classes
        self.classifier = nn.Linear(hidden_size//2, 3)

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.roberta(input_ids, attention_mask=attention_mask)
        pooled_output = outputs[1]
        pooled_output = self.dropout(pooled_output)
        
        # Passing through the hidden layer with ReLU activation
        hidden_output = self.hidden_layer(pooled_output)
        hidden_output = F.relu(hidden_output)
        
        # Applying Layer Normalization (regularization)
        hidden_output = self.regularization(hidden_output)
        
        logits = self.classifier(hidden_output)
        
        loss = None
        if labels is not None:
            loss = nn.CrossEntropyLoss()(logits, labels)
        
        return {"loss": loss, "logits": logits} if loss is not None else {"logits": logits}


# class RobertaClassificationTwitter_3(nn.Module):
#     def __init__(self):
#         super(RobertaClassificationTwitter_3, self).__init__()
#         self.roberta = AutoModel.from_pretrained('cardiffnlp/twitter-roberta-base-sentiment-latest')
#         self.dropout = nn.Dropout(0.1)
#         hidden_size = self.roberta.config.hidden_size

#         # Adding an additional hidden layer
#         self.hidden_layer = nn.Linear(hidden_size, hidden_size//2)
        
#         # Adding L2 regularization (weight decay) to the hidden layer
#         self.regularization = nn.LayerNorm(hidden_size//2)
        
#         # Final classification layer with 3 classes
#         self.classifier = nn.Linear(hidden_size//2, 3)

#     def forward(self, input_ids, attention_mask, labels=None):
#         outputs = self.roberta(input_ids, attention_mask=attention_mask)
#         pooled_output = outputs[1]
#         pooled_output = self.dropout(pooled_output)
        
#         # Passing through the hidden layer with ReLU activation
#         hidden_output = self.hidden_layer(pooled_output)
#         hidden_output = F.relu(hidden_output)
        
#         # Applying Layer Normalization (regularization)
#         hidden_output = self.regularization(hidden_output)
        
#         logits = self.classifier(hidden_output)
        
#         loss = None
#         if labels is not None:
#             loss = nn.CrossEntropyLoss()(logits, labels)
        
#         return {"loss": loss, "logits": logits} if loss is not None else {"logits": logits}


class RobertaClassificationTwitter_3(nn.Module):
    def __init__(self):
        super(RobertaClassificationTwitter_3, self).__init__()
        self.roberta = AutoModel.from_pretrained('cardiffnlp/twitter-roberta-base-sentiment-latest')
        self.dropout = nn.Dropout(0.1)
        hidden_size = self.roberta.config.hidden_size

        # Adding an additional hidden layer
        self.hidden_layer = nn.Linear(hidden_size, hidden_size//2)
        
        # Adding L2 regularization (weight decay) to the hidden layer
        self.regularization = nn.LayerNorm(hidden_size//2)
        
        # Final classification layer with 3 classes
        self.classifier = nn.Linear(hidden_size//2, 3)

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.roberta(input_ids, attention_mask=attention_mask)
        pooled_output = outputs[1]
        pooled_output = self.dropout(pooled_output)
        
        # Passing through the hidden layer with ReLU activation
        hidden_output = self.hidden_layer(pooled_output)
        hidden_output = F.relu(hidden_output)
        
        # Applying Layer Normalization (regularization)
        hidden_output = self.regularization(hidden_output)
        
        logits = self.classifier(hidden_output)
        
        loss = None
        if labels is not None:
            loss_fn = FocalLoss(alpha=0.25, gamma=2)
            loss = loss_fn(logits, labels)
        
        return {"loss": loss, "logits": logits} if loss is not None else {"logits": logits}

    
    
class ThresholdEarlyStoppingCallback(TrainerCallback):
    def on_evaluate(self, args, state, control, metrics, **kwargs):
        f1 = metrics['eval_f1_macro']
        if f1 > 0.77:
            control.should_training_stop = True
        return control


In [13]:
train_dataset = ClassificationDataset(train_encodings, train_labels)
val_dataset = ClassificationDataset(val_encodings, val_labels)

In [14]:
model_bert = BertClassification.from_pretrained('bert-base-uncased')
model_roberta = RobertaClassification.from_pretrained('roberta-base')
model_twitter = RobertaClassificationTwitter_3()

Some weights of BertClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.weight', 'classifier.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of RobertaClassification were not initialized from the model checkpoint at roberta-base and are newly initialized: ['classifier.weight', 'roberta.pooler.dense.weight', 'roberta.pooler.dense.bias', 'classifier.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [15]:
model = model_twitter.to(device)

# Define training arguments and trainer
training_args = TrainingArguments(
    output_dir='./output',
    per_device_train_batch_size=256,
    per_device_eval_batch_size=256,
    learning_rate=0.00001,
    num_train_epochs=1000,
    logging_dir='./logs',
    evaluation_strategy='steps',
    logging_steps=100,
    weight_decay=0.0001,
    lr_scheduler_type='cosine',  # Using a cosine scheduler
    warmup_steps=100  # Number of warmup steps
)




trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    compute_metrics=f1_macro,
    callbacks=[ThresholdEarlyStoppingCallback()],
)


# Train the model
trainer.train()
eval_results = trainer.evaluate()
print(eval_results)



Step,Training Loss,Validation Loss,F1 Macro
100,0.2609,0.157991,0.696736
200,0.1221,0.158308,0.739416
300,0.0702,0.190606,0.735959
400,0.0395,0.247891,0.739504
500,0.0205,0.326379,0.736227
600,0.0112,0.369469,0.737477
700,0.0078,0.378922,0.757081
800,0.0069,0.377149,0.747492
900,0.0052,0.4921,0.742174
1000,0.006,0.443613,0.757841


{'eval_loss': 0.5212607383728027, 'eval_f1_macro': 0.7757241371340746, 'eval_runtime': 2.0557, 'eval_samples_per_second': 778.326, 'eval_steps_per_second': 3.405, 'epoch': 416.0}


In [16]:
# Save the model
trainer.save_model('pretrained_models/roberta-base-twitter-clf')

# model = BertRegression.from_pretrained("./path/to/save/directory")

# Test

In [29]:
# Define a dataset without labels for testing
class ClassificationTestDataset(Dataset):
    def __init__(self, encodings):
        self.encodings = encodings

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        return item

    def __len__(self):
        return len(self.encodings["input_ids"])


In [18]:
# Tokenize the test sentences
sentences = list(df_test.words_str.values)
test_encodings = tokenizer(sentences, truncation=True, padding=True)

# Convert to a PyTorch Dataset (using the renamed class)
test_dataset = ClassificationTestDataset(test_encodings)

# Get predictions with the neural network
predictions = trainer.predict(test_dataset)
y_hat_prob_tensor = torch.tensor(predictions.predictions, dtype=torch.float32)

# Convert the probabilities to class labels
y_hat_labels = torch.argmax(y_hat_prob_tensor, dim=1).cpu().numpy()

# revert the label encoding
y_hat_labels = le.inverse_transform(y_hat_labels)

# Save the results with the specified format
directory = 'results'
np.save(os.path.join(directory, f'{team_id}__{split}__clf_pred.npy'), y_hat_labels)


In [19]:
# Load 20__test_1__reg_pred.npy

d = np.load('results/20__test_1__clf_pred.npy', allow_pickle=True)
d.shape

(1000,)

# Testing if the model is correct

In [20]:
from transformers import TrainingArguments, AutoModel

In [14]:
y = np.load('results/20__test_1__clf_pred.npy', allow_pickle=True)

In [21]:
training_args_path = 'pretrained_models/roberta-base-twitter-clf/training_args.bin'
training_args = TrainingArguments(training_args_path)

In [26]:
model_path = 'pretrained_models/roberta-base-twitter-clf/pytorch_model.bin'
model = RobertaClassificationTwitter_3()
model.load_state_dict(torch.load(model_path))
model.eval() # Set to evaluation mode if doing inference

RobertaClassificationTwitter_3(
  (roberta): RobertaModel(
    (embeddings): RobertaEmbeddings(
      (word_embeddings): Embedding(50265, 768, padding_idx=1)
      (position_embeddings): Embedding(514, 768, padding_idx=1)
      (token_type_embeddings): Embedding(1, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): RobertaEncoder(
      (layer): ModuleList(
        (0-11): 12 x RobertaLayer(
          (attention): RobertaAttention(
            (self): RobertaSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): RobertaSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (

In [30]:
sentences = list(df_test.words_str.values)
test_encodings = tokenizer(sentences, truncation=True, padding=True)
test_dataset = ClassificationTestDataset(test_encodings)

In [35]:
def predict(model, dataset):
    model.eval()  # Set to evaluation mode
    dataloader = DataLoader(dataset, batch_size=32)  # You can adjust the batch size
    predictions = []
    
    with torch.no_grad():
        for batch in dataloader:
            input_ids = batch['input_ids']
            attention_mask = batch['attention_mask']
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            logits = outputs["logits"]
            probabilities = torch.nn.functional.softmax(logits, dim=-1)
            predicted_classes = torch.argmax(probabilities, dim=-1).cpu().numpy()
            predictions.extend(predicted_classes)
    
    return np.array(predictions)

In [36]:
y_hat_labels_indices = predict(model, test_dataset)

In [37]:
y_hat_labels = le.inverse_transform(y_hat_labels_indices)

In [38]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# Assuming y is your true labels and y_hat_labels is your predicted labels
accuracy = accuracy_score(y, y_hat_labels)
precision = precision_score(y, y_hat_labels, average='macro') # or 'micro', 'weighted', depending on your task
recall = recall_score(y, y_hat_labels, average='macro')
f1 = f1_score(y, y_hat_labels, average='macro')

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1)


Accuracy: 1.0
Precision: 1.0
Recall: 1.0
F1 Score: 1.0
