In [None]:
# If in Colab, then import the drive module from google.colab
if 'google.colab' in str(get_ipython()):
  from google.colab import drive
  # Mount the Google Drive to access files stored there
  drive.mount('/content/drive')

  # Install the latest version of torchtext library quietly without showing output

  !pip install torchinfo -qq

  basepath = '/content/drive/MyDrive/NLP UTD/'



Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Importing PyTorch library for tensor computations and neural network modules
import torch
import torch.nn as nn

# For working with textual data vocabularies and for displaying model summaries

# General-purpose Python libraries for random number generation and numerical operations
import random
import numpy as np

# Utilities for efficient serialization/deserialization of Python objects and for element tallying
import joblib
from collections import Counter

# For creating lightweight attribute classes and for partial function application
from functools import partial

# For filesystem path handling, generating and displaying confusion matrices, and date-time manipulations
from pathlib import Path
from sklearn.metrics import confusion_matrix
from datetime import datetime

# For plotting and visualization
import matplotlib.pyplot as plt
import seaborn as sns
# %matplotlib inline

### NEW ##########################
# imports from Huggingface ecosystem
from transformers.modeling_outputs import SequenceClassifierOutput
from transformers import PreTrainedModel, PretrainedConfig
from transformers import TrainingArguments, Trainer

In [None]:
# Set the base folder path using the Path class for better path handling
base_folder = Path(basepath)

# Define the data folder path by appending the relative path to the base folder
# This is where the data files will be stored
data_folder = base_folder / 'datasets'

# Define the model folder path for saving trained models
# This path points to a specific folder designated for NLP models related to the IMDb dataset
model_folder = base_folder / 'models'

custom_functions = base_folder/'custom-functions'


In [None]:
# Step 2: Import pandas and other necessary libraries
import pandas as pd
import os  # Importing os for checking the current directory
from pathlib import Path  # Importing Path to handle file paths


In [None]:
train_data=pd.read_csv(data_folder/'train.csv')


In [None]:
train_data.head()

Unnamed: 0,ID,Tweet,anger,anticipation,disgust,fear,joy,love,optimism,pessimism,sadness,surprise,trust
0,2017-21441,“Worry is a down payment on a problem you may ...,0,1,0,0,0,0,1,0,0,0,1
1,2017-31535,Whatever you decide to do make sure it makes y...,0,0,0,0,1,1,1,0,0,0,0
2,2017-21068,@Max_Kellerman it also helps that the majorit...,1,0,1,0,1,0,1,0,0,0,0
3,2017-31436,Accept the challenges so that you can literall...,0,0,0,0,1,0,1,0,0,0,0
4,2017-22195,My roommate: it's okay that we can't spell bec...,1,0,1,0,0,0,0,0,0,0,0


In [None]:
import re
def clean_text(text):
    # Remove URLs
    text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)
    # Remove special characters and punctuation
    text = re.sub(r'\W', ' ', text)
    # Remove single characters
    text = re.sub(r'\s+[a-zA-Z]\s+', ' ', text)
    # Remove extra spaces
    text = re.sub(r'\s+', ' ', text)
    return text


In [None]:
train_data['Tweet'] = train_data['Tweet'].apply(clean_text)

In [None]:
from sklearn.model_selection import train_test_split
# First split: 60% training and 40% (test + validation)
train_data, val_data = train_test_split(train_data, test_size=0.2, random_state=42)

# Check the shapes of the new datasets
print(f"Training Data Shape: {train_data.shape}")
print(f"Validation Data Shape: {val_data.shape}")

Training Data Shape: (6179, 13)
Validation Data Shape: (1545, 13)


In [None]:
train_data.shape,val_data.shape

((6179, 13), (1545, 13))

In [None]:
!pip install datasets



In [None]:
X_train=train_data['Tweet']
X_val=val_data['Tweet']

In [None]:
y_train = train_data[['anger', 'anticipation', 'disgust', 'fear', 'joy', 'love',
                    'optimism', 'pessimism', 'sadness', 'surprise', 'trust']].values.tolist()

y_valid = val_data[['anger', 'anticipation', 'disgust', 'fear', 'joy', 'love',
                    'optimism', 'pessimism', 'sadness', 'surprise', 'trust']].values.tolist()

In [None]:
from datasets import Dataset

In [None]:
trainset = Dataset.from_dict({
    'texts': X_train,  # Your cleaned text data for training
    'labels': y_train           # Your multilabels for training
})

validset = Dataset.from_dict({
    'texts': X_val,  # Your cleaned text data for validation
    'labels': y_valid           # Your multilabels for validation
})

In [None]:
from transformers import PretrainedConfig

class CustomConfig(PretrainedConfig):
    def __init__(self, vocab_size=0, embedding_dim=0, hidden_dim1=0, hidden_dim2=0, num_labels=11, **kwargs):
        super().__init__(**kwargs)
        self.vocab_size = vocab_size
        self.embedding_dim = embedding_dim
        self.hidden_dim1 = hidden_dim1
        self.hidden_dim2 = hidden_dim2
        self.num_labels = num_labels

In [None]:
import torch
import torch.nn as nn
from transformers import PreTrainedModel
from transformers.modeling_outputs import SequenceClassifierOutput

class CustomMLP(PreTrainedModel):
    config_class = CustomConfig

    def __init__(self, config):
        super().__init__(config)

        self.embedding_bag = nn.EmbeddingBag(config.vocab_size, config.embedding_dim)
        self.layers = nn.Sequential(
            nn.Linear(config.embedding_dim, config.hidden_dim1),
            nn.BatchNorm1d(num_features=config.hidden_dim1),
            nn.ReLU(),
            nn.Dropout(p=0.5),
            nn.Linear(config.hidden_dim1, config.hidden_dim2),
            nn.BatchNorm1d(num_features=config.hidden_dim2),
            nn.ReLU(),
            nn.Dropout(p=0.5),
            nn.Linear(config.hidden_dim2, 11)  # Output layer
        )

    def forward(self, input_ids, offsets, labels=None):
      embed_out = self.embedding_bag(input_ids, offsets)
      logits = self.layers(embed_out)
      loss = None
      if labels is not None:
        loss_fct = nn.BCEWithLogitsLoss()
        loss = loss_fct(logits, labels.float())  # Ensure labels are float type
      return SequenceClassifierOutput(
        loss=loss,
        logits=logits
    )


In [None]:
from collections import Counter, OrderedDict
from typing import Dict, List, Optional, Union

class Vocab:
    def __init__(self, tokens: List[str]) -> None:
        self.itos: List[str] = tokens
        self.stoi: Dict[str, int] = {token: i for i, token in enumerate(tokens)}
        self.default_index: Optional[int] = None

    def __getitem__(self, token: str) -> int:
        if token in self.stoi:
            return self.stoi[token]
        if self.default_index is not None:
            return self.default_index
        raise RuntimeError(f"Token '{token}' not found in vocab")

    def __contains__(self, token: str) -> bool:
        return token in self.stoi

    def __len__(self) -> int:
        return len(self.itos)

    def insert_token(self, token: str, index: int) -> None:
        if index < 0 or index > len(self.itos):
            raise ValueError("Index out of range")
        if token in self.stoi:
            old_index = self.stoi[token]
            if old_index < index:
                self.itos.pop(old_index)
                self.itos.insert(index - 1, token)
            else:
                self.itos.pop(old_index)
                self.itos.insert(index, token)
        else:
            self.itos.insert(index, token)

        self.stoi = {token: i for i, token in enumerate(self.itos)}

    def append_token(self, token: str) -> None:
        if token in self.stoi:
            raise RuntimeError(f"Token '{token}' already exists in the vocab")
        self.insert_token(token, len(self.itos))

    def set_default_index(self, index: Optional[int]) -> None:
        self.default_index = index

    def get_default_index(self) -> Optional[int]:
        return self.default_index

    def lookup_token(self, index: int) -> str:
        if 0 <= index < len(self.itos):
            return self.itos[index]
        raise RuntimeError(f"Index {index} out of range")

    def lookup_tokens(self, indices: List[int]) -> List[str]:
        return [self.lookup_token(index) for index in indices]

    def lookup_indices(self, tokens: List[str]) -> List[int]:
        return [self[token] for token in tokens]

    def get_stoi(self) -> Dict[str, int]:
        return self.stoi.copy()

    def get_itos(self) -> List[str]:
        return self.itos.copy()

    @classmethod
    def vocab(cls, ordered_dict: Union[OrderedDict, Counter], min_freq: int = 1, specials: Optional[List[str]] = None, special_first: bool = True) -> 'Vocab':
        specials = specials or []
        for token in specials:
            ordered_dict.pop(token, None)

        tokens = [token for token, freq in ordered_dict.items() if freq >= min_freq]

        if special_first:
            tokens = specials + tokens
        else:
            tokens = tokens + specials

        return cls(tokens)

In [None]:
def get_vocab(dataset, min_freq=1):
    """
    Generate a vocabulary from a dataset.

    Args:
        dataset (Dataset): A Hugging Face Dataset object. The dataset should
                           have a key 'texts' that contains the text data.
        min_freq (int): The minimum frequency for a token to be included in
                        the vocabulary.

    Returns:
        torchtext.vocab.Vocab: Vocabulary object containing tokens from the
                               dataset that meet or exceed the specified
                               minimum frequency. It also includes a special
                               '<unk>' token for unknown words.
    """
    # Initialize a counter object to hold token frequencies
    counter = Counter()

    # Update the counter with tokens from each text in the dataset
    # Iterating through texts in the dataset
    for text in dataset['texts']:  ###### Change from previous function ####
        counter.update(str(text).split())

    # Create a vocabulary using the counter object
    # Tokens that appear fewer times than `min_freq` are excluded
    my_vocab = Vocab.vocab(counter, min_freq=min_freq)

    # Insert a '<unk>' token at index 0 to represent unknown words
    my_vocab.insert_token('<unk>', 0)

    # Set the default index to 0
    # This ensures that any unknown word will be mapped to '<unk>'
    my_vocab.set_default_index(0)

    return my_vocab

In [None]:
# Creating a function that will be used to get the indices of words from vocab
def tokenizer(text, vocab):
    """Converts text to a list of indices using a vocabulary dictionary"""
    return [vocab[token] for token in str(text).split()]

In [None]:
import torch
import numpy as np

def collate_batch(batch, my_vocab):
    """
    Prepares a batch of data by transforming texts into indices based on a vocabulary and
    converting labels into a tensor for multilabel classification.

    Args:
        batch (list of dict): A batch of data where each element is a dictionary with keys
                              'labels' and 'texts'. 'labels' are the multilabels, and
                              'texts' are the corresponding texts.
        my_vocab (torchtext.vocab.Vocab): A vocabulary object that maps tokens to indices.

    Returns:
        dict: A dictionary with three keys:
              - 'input_ids': a tensor containing concatenated indices of the texts.
              - 'offsets': a tensor representing the starting index of each text in 'input_ids'.
              - 'labels': a tensor of the labels for each text in the batch.

    The function transforms each text into a list of indices based on the provided vocabulary.
    It also converts the labels into a tensor. The 'offsets' are computed to keep track of the
    start of each text within the 'input_ids' tensor, which is a flattened representation of all text indices.
    """

    # Get labels and texts from batch dict samples
    labels = [sample['labels'] for sample in batch]  # List of lists containing binary labels
    texts = [sample['texts'] for sample in batch]    # List of texts

    # Convert the list of labels into a tensor of dtype int32 for multilabel classification
    labels_tensor = torch.tensor(labels, dtype=torch.int32)  # Use int32 for compatibility with multilabel tasks

    # Convert the list of texts into a list of lists; each inner list contains the vocabulary indices for a text
    list_of_list_of_indices = [tokenizer(text, my_vocab) for text in texts]

    # Concatenate all text indices into a single tensor
    input_ids = torch.cat([torch.tensor(indices, dtype=torch.int64) for indices in list_of_list_of_indices])

    # Compute the offsets for each text in the concatenated tensor
    offsets = [0] + list(np.cumsum([len(indices) for indices in list_of_list_of_indices]))  # Cumulative lengths
    offsets = torch.tensor(offsets[:-1])  # Exclude the last cumulative sum since it's the end

    return {
        'input_ids': input_ids,
        'offsets': offsets,
        'labels': labels_tensor
    }


In [None]:
multi_vocab = get_vocab(trainset, min_freq=2)
collate_fn = partial(collate_batch, my_vocab=multi_vocab)

In [None]:
my_config = CustomConfig(vocab_size=len(multi_vocab),
                         embedding_dim=300,
                         hidden_dim1=200,
                         hidden_dim2=100,
                         num_labels=11)  # Update to the total number of unique labels


In [None]:
my_config.id2label={
    0: 'anger',
    1: 'anticipation',
    2: 'disgust',
    3: 'fear',
    4: 'joy',
    5: 'love',
    6: 'optimism',
    7: 'pessimism',
    8: 'sadness',
    9: 'surprise',
    10: 'trust'
}

In [None]:
my_config

CustomConfig {
  "embedding_dim": 300,
  "hidden_dim1": 200,
  "hidden_dim2": 100,
  "id2label": {
    "0": "anger",
    "1": "anticipation",
    "2": "disgust",
    "3": "fear",
    "4": "joy",
    "5": "love",
    "6": "optimism",
    "7": "pessimism",
    "8": "sadness",
    "9": "surprise",
    "10": "trust"
  },
  "label2id": {
    "LABEL_0": 0,
    "LABEL_1": 1,
    "LABEL_10": 10,
    "LABEL_2": 2,
    "LABEL_3": 3,
    "LABEL_4": 4,
    "LABEL_5": 5,
    "LABEL_6": 6,
    "LABEL_7": 7,
    "LABEL_8": 8,
    "LABEL_9": 9
  },
  "transformers_version": "4.44.2",
  "vocab_size": 6945
}

In [None]:
# Generating id_to_label by reversing the key-value pairs in label_to_id
my_config.label2id = {v: k for k, v in my_config.id2label .items()}

In [None]:
my_config

CustomConfig {
  "embedding_dim": 300,
  "hidden_dim1": 200,
  "hidden_dim2": 100,
  "id2label": {
    "0": "anger",
    "1": "anticipation",
    "2": "disgust",
    "3": "fear",
    "4": "joy",
    "5": "love",
    "6": "optimism",
    "7": "pessimism",
    "8": "sadness",
    "9": "surprise",
    "10": "trust"
  },
  "label2id": {
    "anger": 0,
    "anticipation": 1,
    "disgust": 2,
    "fear": 3,
    "joy": 4,
    "love": 5,
    "optimism": 6,
    "pessimism": 7,
    "sadness": 8,
    "surprise": 9,
    "trust": 10
  },
  "transformers_version": "4.44.2",
  "vocab_size": 6945
}

In [None]:
model = CustomMLP(config=my_config)


In [None]:
model

CustomMLP(
  (embedding_bag): EmbeddingBag(6945, 300, mode='mean')
  (layers): Sequential(
    (0): Linear(in_features=300, out_features=200, bias=True)
    (1): BatchNorm1d(200, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): Dropout(p=0.5, inplace=False)
    (4): Linear(in_features=200, out_features=100, bias=True)
    (5): BatchNorm1d(100, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): ReLU()
    (7): Dropout(p=0.5, inplace=False)
    (8): Linear(in_features=100, out_features=11, bias=True)
  )
)

In [None]:
from sklearn.metrics import f1_score, accuracy_score, classification_report
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    sigmoid_logits = torch.sigmoid(torch.tensor(logits))
    predictions = (sigmoid_logits >= 0.5).int().numpy()
    labels = labels.astype(int)

    f1_micro = f1_score(labels, predictions, average='micro', zero_division=0)
    f1_macro = f1_score(labels, predictions, average='macro', zero_division=0)

    return {
        'f1_micro': f1_micro,
        'f1_macro': f1_macro,
    }

In [None]:
# Configure training parameters
training_args = TrainingArguments(
    num_train_epochs=100,
    per_device_train_batch_size=120,  # Adjust if necessary
    per_device_eval_batch_size=120,    # Adjust if necessary
    weight_decay=0.5,
    learning_rate=0.0005,
    optim='adamw_torch',
    remove_unused_columns=False,

    output_dir=str(model_folder),
    evaluation_strategy='steps',
    eval_steps=200,
    save_strategy="steps",
    save_steps=200,
    load_best_model_at_end=True,
    save_total_limit=2,

    # Set metric_for_best_model to a multilabel metric like "f1" if computed in `compute_metrics`
    metric_for_best_model="eval_f1_micro",  # Use F1 score for multilabel classification
    greater_is_better=True,

    logging_strategy='steps',
    logging_steps=200,
    report_to='wandb',
    run_name='imdb_hf_trainer',
)




In [None]:
pip install wandb



In [None]:
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=trainset,
    eval_dataset=validset,
    data_collator=collate_fn,  # Make sure this function is correctly handling multilabel data
    compute_metrics=compute_metrics,  # Ensure this function computes metrics suitable for multilabel tasks
)


In [None]:
trainer.train()

Step,Training Loss,Validation Loss,F1 Micro,F1 Macro
200,0.5274,0.482315,0.176925,0.092776
400,0.4321,0.441077,0.38437,0.206478
600,0.3774,0.410598,0.480533,0.279623
800,0.3394,0.398941,0.512976,0.331178
1000,0.3125,0.396606,0.534067,0.354819
1200,0.2922,0.396428,0.539832,0.366259
1400,0.2767,0.399167,0.542104,0.37247
1600,0.2626,0.401751,0.539358,0.370541
1800,0.2519,0.408557,0.546599,0.378932
2000,0.2415,0.410464,0.550902,0.386397


In [None]:
trainer.evaluate()

In [None]:
valid_output = trainer.predict(validset)

In [None]:
valid_output._fields

In [None]:
valid_preds = np.argmax(valid_output.predictions, axis=-1)
valid_labels = np.array(valid_output.label_ids)

In [None]:
test_df=pd.read_csv(data_folder/'test.csv')

In [None]:
test_df['Tweet'] = test_df['Tweet'].apply(clean_text)

In [None]:
sample_X=test_df['Tweet'].values

In [None]:
device = 'cpu'
# Convert the list of texts into a list of lists; each inner list contains the vocabulary indices for a text
list_of_list_of_indices = [tokenizer(text, multi_vocab) for text in sample_X]

# Compute the offsets for each text in the concatenated tensor
offsets = [0] + [len(i) for i in list_of_list_of_indices]
offsets = torch.tensor(offsets[:-1]).cumsum(dim=0)

# Concatenate all text indices into a single tensor
indices = torch.cat([torch.tensor(i, dtype=torch.int64) for i in list_of_list_of_indices])

In [None]:
# move model to appropriate device
model.to(device)

# put model in evaluation mode
model.eval()

# get outputs (logits) from model
outputs = model(indices, offsets)
outputs

In [None]:
# get predicted labels
probabilities = torch.sigmoid(outputs.logits)
threshold = 0.5
predicted_labels = (probabilities >= threshold).int()
no_labels = predicted_labels.sum(1) == 0
predicted_labels[no_labels, torch.max(probabilities[no_labels], dim=1).indices] = 1

In [None]:
binary_predictions = predicted_labels
predicted_label_names = []
for prediction in binary_predictions:
    labels = [model.config.id2label[i] for i, label in enumerate(prediction) if label == 1]
    predicted_label_names.append(labels)

In [None]:
final_df = pd.DataFrame(predicted_labels.numpy(), columns=my_config.id2label.values())
final_df.insert(0, 'ID', test_df['ID'])
final_df


In [None]:
final_df.to_csv(data_folder/'result.csv', index=False)