# Full BERT Multilabel Classification Pipeline for Social Media Text

This notebook has everything you need to do multilabel BERT text classification on social media text. All you need to do is plug is provide labeled data in jsonlines format with one-hot encoded labels. This notebook will:

*   clean the data (unidecode, demojize, remove hastags)
*   augment the data (backtranslation, contextual word embedding)
*   compute the class weights and use custom weighted trainer
*   compute metrics
*   save the model

The user only needs to import their data in the "Import Data" section, and define the constants in the "Define Constants" section.









# Import Data

In [None]:
# Import data
import os
from google.colab import files
import shutil

# Upload Files
uploaded = files.upload()

Saving nurse_personal_health_professional_3-30-23.jsonl to nurse_personal_health_professional_3-30-23.jsonl


# Define Constants

In [None]:
DATE = "04-Apr-23" # Today's date
MODEL = "RoBERTa-base" # Model you want to use (from Huggingface Hub)
INPUT_DATA = f"nurse_personal_health_professional_3-30-23.jsonl" # Dataset name
TEXT_COLUMN = "caption" # Name of the text-containing column in your dataset
LABEL_NAMES = ["personal_life", "health", "professional_life"] # Names of the labels in your dataset
DATASET_NAME = "personal-health-professional" # savename to write the augmented data to
MODEL_SAVE_NAME = "professional_health_personal" # Savename for the model
DATA_SAMPLE_SIZE = None # int to sample, None to use all data

# Install Packages

In [None]:
pip install transformers evaluate datasets nlpaug sentencepiece sacremoses jsonlines emoji unidecode

# Load Data

In [None]:
import jsonlines
import random

In [None]:
# Load data
path = "./"
infile = INPUT_DATA

with jsonlines.open(path + infile) as reader:
    data = [obj for obj in reader]

if DATA_SAMPLE_SIZE:
    data = data[:DATA_SAMPLE_SIZE]

# Shuffle data
random.seed(42)
random.shuffle(data)

# Clean Data

In [None]:
import re
from emoji import demojize
from unidecode import unidecode

def remove_hashtag(text: str):
    """Remove hashtags from some text"""
    return re.sub(r"#\w+", "", text)

# Normalize unicode text, remove hashtags, and encode emojis as :emoji_names:
for datum in data:
    datum[TEXT_COLUMN]

# Augment Data

In [None]:
import nlpaug.augmenter.word as naw

# Reserve some data for your test dataset - you should not test on augmented data
data = data[:int(len(data) * .7)]
reserved_data = data[int(len(data) * 0.7):]

augmented_data = []

# Contextual word embedding
aug = naw.ContextualWordEmbsAug(
        model_path=MODEL,
        action="insert",
        device="cuda"
      )

# Back-translation
back_translation_aug = naw.BackTranslationAug(
    from_model_name='facebook/wmt19-en-de', 
    to_model_name='facebook/wmt19-de-en',
    device="cuda"
)

# Augment and add to dataset
for i, obj in enumerate(data):
    if i % 10 == 0:
        print(f"augmenting {i} of {len(data)}") 

    # Original data
    augmented_data.append(obj)

    # Remove :emoji_names: before augmenting
    text = re.sub(r":[a-z_]*?:", "", obj[TEXT_COLUMN])
    text = re.sub(r"\s+", " ", text)
    
    # Skip empty
    if text == " " or not text:
        continue

    # Insert words if caption is < 300 words
    if len(text.split(" ")) < 300:
        augmented_obj = obj.copy()
        augmented_obj[TEXT_COLUMN] = aug.augment(text)[0]
        if augmented_obj[TEXT_COLUMN] != text:
            augmented_data.append(augmented_obj)

    # Backtranslate
    backtranslated_obj = obj.copy()
    backtranslated_obj[TEXT_COLUMN] = back_translation_aug.augment(text)[0]
    if backtranslated_obj[TEXT_COLUMN] != text:
        augmented_data.append(backtranslated_obj)

    

In [None]:
train_outfile = f"TRAIN-augmented_{DATASET_NAME}_{DATE}.jsonl"
test_outfile = f"TEST-{DATASET_NAME}_{DATE}.jsonl"

with jsonlines.open(train_outfile, "w") as writer:
    writer.write_all(augmented_data)

with jsonlines.open(test_outfile, "w") as writer:
    writer.write_all(reserved_data)

# Train Model

In [None]:
"""
https://colab.research.google.com/github/NielsRogge/Transformers-Tutorials/blob/master/BERT/Fine_tuning_BERT_(and_friends)_for_multi_label_text_classification.ipynb#scrollTo=PgS0wMWExcqP
Modified for multiclass with weighted trainer
"""

from evaluate import evaluator
from datasets import load_dataset, concatenate_datasets
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer, EvalPrediction
import numpy as np

from sklearn.metrics import f1_score, roc_auc_score, accuracy_score

from torch import nn
import torch

from collections import defaultdict
import re


# Clear CUDA Cache
torch.cuda.empty_cache()

# Load separate dataset files
train_dataset = load_dataset("json", data_files=f"TRAIN-augmented_{DATASET_NAME}_{DATE}.jsonl")["train"]
test_dataset = load_dataset("json", data_files=f"TEST-{DATASET_NAME}_{DATE}.jsonl")["train"]

# Join the separate files into single dataset
dataset = concatenate_datasets([train_dataset, test_dataset])

# Make a train/test split in the single dataset
dataset = dataset.train_test_split(test_size=len(test_dataset))

id2label = {idx:label for idx, label in enumerate(LABEL_NAMES)}
label2id = {label:idx for idx, label in enumerate(LABEL_NAMES)}

def get_labels(
        multilabel_dataset: list[dict], 
        label_names: list[str],
        unique: bool = False
    ):
    """Get all possible label combinations from a multilabel dataset

    Args:
        multilabel_dataset (list[dict]): jsonlines type dataset
        label_names (list[str]): The names of the labels in dataset
                                 ["label1", "label2", "label3"]
        unique (bool): Get only one of each label
    
    Returns: 
        list[list]: The labels in the dataset in nested list format, for example:
                    [[0, 1, 0], [0, 1, 1], [1, 1, 1], etc..]
    """
    labels = []

    for datum in multilabel_dataset:
        label = [value for label, value in datum.items() if label in label_names]
        if unique:
            if label not in labels:
                labels.append(label)
        else:
            labels.append(label)

    return labels


def compute_multilabel_weights(labels: list[list]):
    """Generate class weights given a set of one-hot multi-label labels.
        
    Args:
        labels (list[list]): List of one-hot encoded labels: [[0, 1, 1], [1, 1, 0], etc...]

    Returns:
        list[float]: The class weights

    Credit:
        Inspired by https://gist.github.com/angeligareta/83d9024c5e72ac9ebc34c9f0b073c64c
    """
    n_samples = len(labels)
    n_classes = len(labels[0])
    class_count = np.array(labels).sum(axis=0)

    # Compute class weights using balanced method
    return [n_samples / (n_classes * freq) if freq > 0 else 1 for freq in class_count]

all_labels = get_labels(list(dataset["train"]), LABEL_NAMES)
weights = compute_multilabel_weights(all_labels)


class WeightedTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        """Custom loss function to apply weightings when calculating loss"""
        labels = inputs.get("labels")
        # forward pass
        outputs = model(**inputs)
        logits = outputs.get("logits")
        # compute custom loss - make sure tensor device matches model/dataset device!
        criterion = nn.CrossEntropyLoss(weight=torch.tensor(weights, device="cuda"))
        loss = criterion(logits, inputs["labels"])
        return (loss, outputs) if return_outputs else loss

tokenizer = AutoTokenizer.from_pretrained(MODEL) # Make tokenizer

def preprocess_data(examples, labels=LABEL_NAMES, tokenizer=tokenizer):
    text = examples[TEXT_COLUMN]
    encoding = tokenizer(text, padding="max_length", truncation=True, max_length=512) # Set to max length for instagram posts
    labels_batch = {label: examples[label] for label in labels}
    labels_matrix = np.zeros((len(text), len(labels)))
    for idx, label in enumerate(labels):
        labels_matrix[:, idx] = labels_batch[label]
    encoding["labels"] = labels_matrix.tolist()
    return encoding

encoded_dataset = dataset.map(preprocess_data, batched=True, remove_columns=dataset["train"].column_names)

encoded_dataset.set_format("torch")

# Define model
model = AutoModelForSequenceClassification.from_pretrained(MODEL,
            num_labels=len(LABEL_NAMES),
            id2label=id2label,
            label2id=label2id)

batch_size = 16
metric_name = "f1"

args = TrainingArguments(
    f"model_training_progress",
    evaluation_strategy="epoch",
    save_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    num_train_epochs=5,
    weight_decay=0.01,
    load_best_model_at_end=True,
    metric_for_best_model=metric_name
)

def multi_label_metrics(predictions, labels):
    sigmoid = torch.nn.Sigmoid()
    probs = sigmoid(torch.Tensor(predictions))
    y_pred = torch.where(probs > 0.5, 1, 0)
    y_true = labels
    f1_micro_average = f1_score(y_true=y_true, y_pred=y_pred, average="micro")
    roc_auc = roc_auc_score(y_true, y_pred, average="micro")
    accuracy = accuracy_score(y_true, y_pred)
    return {
        "f1": f1_micro_average,
        "roc_auc": roc_auc,
        "accuracy": accuracy
    }

def compute_metrics(p: EvalPrediction):
    preds = p.predictions[0] if isinstance(p.predictions, 
            tuple) else p.predictions
    return multi_label_metrics(
        predictions=preds,
        labels=p.label_ids
    )

trainer = WeightedTrainer(
    model, 
    args,
    train_dataset=encoded_dataset["train"],
    eval_dataset=encoded_dataset["test"],
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

trainer.train()
trainer.evaluate()
trainer.save_model("trained_model")


# Prediction function for testing

In [None]:
# Define prediction function
def predict(text: str, return_label = True):
    """Multiclass labeling for text interests"""
    encoding = tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
    encoding = {k: v.to(trainer.model.device) for k, v in encoding.items()}
    outputs = trainer.model(**encoding)
    logits = outputs.logits
    sigmoid = torch.nn.Sigmoid()
    probs = sigmoid(torch.Tensor(logits))
    predictions = torch.where(probs > 0.5, 1, 0).squeeze()
    print(predictions)
    if return_label:
      predicted_label = [id2label[i] for i, value in enumerate(predictions) if value.item() == 1]
      return predicted_label
    else:
      return predictions


# Zip and download the model

In [None]:
!zip -r trained_model.zip trained_model

In [None]:
files.download("{MODEL_SAVE_NAME}.zip")