<a href="https://colab.research.google.com/github/AshaSharm/Demo/blob/main/MiniLM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
from transformers import AutoTokenizer, AutoModel, AutoConfig
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np
import pickle
from collections import Counter
from sklearn import metrics


In [2]:
!pip install transformers




In [3]:
# Constants and parameters
MAX_LEN = 128
PRE_TRAINED_MODEL = "microsoft/deberta-base"
NUM_LABELS = 23
MODEL_PATH = "minilm_model.pth.tar"
ENCODER_PATH = "encoder.pkl"
LEARNING_RATE = 1e-04
EPOCHS = 5
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [4]:
# Tokenizer
tokenizer = AutoTokenizer.from_pretrained(PRE_TRAINED_MODEL)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/52.0 [00:00<?, ?B/s]



config.json:   0%|          | 0.00/474 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

In [5]:
#Data Preprocessing Functions
# Remove null labels
def remove_empty(text):
    return [lab for lab in text if lab != '']

# Remove extra spaces from labels
def remove_space(text):
    return [lab.strip() for lab in text]

# Replace duplicate labels with correct labels
def replace_label(df, src, trg):
    def replace(texts):
        return [lab if lab != src else trg for lab in texts]
    df['target'] = df['target'].map(replace)

# Get all noisy labels that don't have any sentiments
def get_noisy_labels(df):
    noisy_labels = []
    for label, count in Counter(df.target.explode()).items():
        if pd.isna(label):
            continue
        if count < 5:
            if 'positive' not in label.split():
                if 'negative' not in label.split():
                    noisy_labels.append(label)
    return noisy_labels

# Remove noisy labels from the dataframe
def remove_noisy_labels(df):
    print("Removing noisy labels...")
    df = df.drop([384])  # outlier with 14 labels
    noisy_labels = get_noisy_labels(df)
    for i in range(len(df)):
        for nLabel in noisy_labels:
            if nLabel in df.iloc[i, 1]:
                df.iloc[i, 1].remove(nLabel)

    df = df[df["target"].str.len() != 0]
    return df

# Combine labels that have very low frequency to a single label based on threshold
def combine_labels(df, min_samples=100):
    print("Combining labels...")
    label_counts = df.target.explode().value_counts()
    label_names = label_counts.index
    fewer_labels = []

    for i, label in enumerate(label_names):
        if label_counts[i] < min_samples:
            fewer_labels.append(label)

    def replace_fewer(labels):
        fewers = []
        for label in labels:
            sentiment = label.split(' ')[-1]
            if label in fewer_labels:
                fewers.append(' '.join(['extra', sentiment]))
            else:
                fewers.append(label)
        return fewers

    df['target'] = df['target'].map(replace_fewer)
    return df

# Undersample very frequent labels
def undersample_labels(df, labels, frac):
    udf = df[df.target.apply(lambda x: x == labels)]
    indexesToDrop = udf.index.values
    underSampleLabel = udf.sample(frac=frac)
    df = df.drop(indexesToDrop)
    df = pd.concat([df, underSampleLabel])
    return df

# Encode labels for training
def encode_labels(df):
    print("Encoding Labels...")
    le = MultiLabelBinarizer()
    df['encoded'] = le.fit_transform(df.target.tolist()).tolist()
    df = df[['text', 'encoded']]

    with open('encoder.pkl', 'wb') as encoder:
        pickle.dump(le, encoder)

    return df

# Split the dataset and save it
def split_and_save(df, split_size=0.2):
    df_train, df_test = train_test_split(df, test_size=split_size)
    df_train = df_train.reset_index(drop=True)
    df_test = df_test.reset_index(drop=True)
    df_train.to_pickle('train.pkl')
    df_test.to_pickle('test.pkl')
    print("Preprocessed and Saved...")
    return True

# Loading the dataframe and doing basic preprocessing
def loader(dfPath):
    df = pd.read_csv(dfPath, header=None)
    df = df.fillna('')

    # To get all the multi labels in one column
    columns = ['text']
    labels = []

    for idx in range(1, 15):
        name = 'label_' + str(idx)
        labels.append(name)
        columns.append(name)

    df.columns = columns

    df['target'] = df[labels].values.tolist()
    df['target'] = df['target'].map(remove_empty)
    df['target'] = df['target'].map(remove_space)

    df = df[['text', 'target']]

    # Replacing labels that are similar but have some spelling mistakes
    replace_label(df, 'advisor/agent service positive', 'advisoragent service positive')
    replace_label(df, 'advisor/agent service negative', 'advisoragent service negative')
    replace_label(df, 'tyre age/dot code negative', 'tyre agedot code negative')

    # Removing noisy labels
    df = remove_noisy_labels(df)
    # Combining labels that have frequency less than 100
    df = combine_labels(df)
    # Undersampling high frequency labels datapoints
    df = undersample_labels(df, ['value for money positive'], 0.1)
    df = undersample_labels(df, ['garage service positive'], 0.2)
    df = undersample_labels(df, ['value for money positive', 'garage service positive'], 0.2)
    # Encoding the labels for training
    df = encode_labels(df)

    return df

df = loader("/content/Evaluation-dataset.csv")
split_and_save(df)


Removing noisy labels...
Combining labels...
Encoding Labels...
Preprocessed and Saved...


True

In [6]:
# Define the custom dataset
class SentimentDataset(Dataset):
    def __init__(self, df_path, tokenizer, max_len):
        self.df = pd.read_pickle(df_path)
        self.texts = self.df.text
        self.targets = self.df.encoded
        self.max_len = max_len
        self.tokenizer = tokenizer

    def __len__(self):
        return self.df.shape[0]

    def __getitem__(self, index):
        text = self.texts[index]
        target = self.targets[index]
        inputs = self.tokenizer.encode_plus(
            text,
            None,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            truncation=True,
            return_token_type_ids=True
        )
        ids = inputs['input_ids']
        mask = inputs['attention_mask']

        return {
            'ids': torch.tensor(ids, dtype=torch.long),
            'mask': torch.tensor(mask, dtype=torch.long),
            'targets': torch.tensor(target, dtype=torch.long)
        }

In [7]:
#Data loader
def get_loader(train_batch_size=32, test_batch_size=8, shuffle=True, num_workers=1, pin_memory=True):
    trainDataset = SentimentDataset('train.pkl', tokenizer, MAX_LEN)
    testDataset = SentimentDataset('test.pkl', tokenizer, MAX_LEN)

    trainLoader = DataLoader(
        dataset=trainDataset,
        batch_size=train_batch_size,
        shuffle=shuffle,
        num_workers=num_workers,
        pin_memory=pin_memory
    )

    testLoader = DataLoader(
        dataset=testDataset,
        batch_size=test_batch_size,
        shuffle=shuffle,
        num_workers=num_workers,
        pin_memory=pin_memory
    )

    return trainLoader, testLoader, tokenizer

# Define the model
class SentimentMultilabelMiniLM(torch.nn.Module):
    def __init__(self, num_labels):
        super(SentimentMultilabelMiniLM, self).__init__()
        self.minilm = AutoModel.from_pretrained(PRE_TRAINED_MODEL)
        self.drop = torch.nn.Dropout(0.3)
        self.classifier = torch.nn.Linear(self.minilm.config.hidden_size, num_labels)

    def forward(self, input_ids, attention_mask):
        outputs = self.minilm(
            input_ids=input_ids,
            attention_mask=attention_mask
        )
        pooled_output = outputs.last_hidden_state[:, 0, :]  # Get the [CLS] token's output
        output = self.drop(pooled_output)
        return self.classifier(output)

In [8]:
# Define training and evaluation functions
def loss_fun(outputs, targets):
    return torch.nn.BCEWithLogitsLoss()(outputs, targets)

def save_metrics(eval_metrics, file_name):
    with open('{}_metrics.pkl'.format(file_name), 'wb') as eval:
        pickle.dump(eval_metrics, eval)
    return True

def save_checkpoint(state, filename=MODEL_PATH):
    print("=> Saving Model")
    torch.save(state, filename)

def print_metrics(true, pred, loss, type):
    pred = np.array(pred) >= 0.35
    hamming_loss = metrics.hamming_loss(true, pred)
    precision_micro = metrics.precision_score(true, pred, average='micro', zero_division=1)
    recall_micro = metrics.recall_score(true, pred, average='micro', zero_division=1)
    f1_micro = metrics.f1_score(true, pred, average='micro', zero_division=1)
    precision_macro = metrics.precision_score(true, pred, average='macro', zero_division=1)
    recall_macro = metrics.recall_score(true, pred, average='macro', zero_division=1)
    f1_macro = metrics.f1_score(true, pred, average='macro', zero_division=1)

    print(
        "{} loss: {:.4f} | Hamming Loss: {:.4f} | Precision Micro: {:.4f} | Recall Micro: {:.4f} | F1 Micro: {:.4f} | Precision Macro: {:.4f} | Recall Macro: {:.4f} | F1 Macro: {:.4f}".format(
            type,
            loss,
            hamming_loss,
            precision_micro,
            recall_micro,
            f1_micro,
            precision_macro,
            recall_macro,
            f1_macro,
        )
    )

    return {
        'loss': loss,
        'hamming_loss': hamming_loss,
        'precision_micro': precision_micro,
        'recall_micro': recall_micro,
        'f1_micro': f1_micro,
        'precision_macro': precision_macro,
        'recall_macro': recall_macro,
        'f1_macro': f1_macro,
    }

In [9]:
def train(model, device, train_loader, optimizer):
    model.train()
    fin_targets = []
    fin_outputs = []
    total_loss = 0

    for batch in train_loader:
        ids = batch['ids'].to(device, dtype=torch.long)
        mask = batch['mask'].to(device, dtype=torch.long)
        targets = batch['targets'].to(device, dtype=torch.float)

        optimizer.zero_grad()
        outputs = model(ids, mask)
        loss = loss_fun(outputs, targets)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        fin_targets.extend(targets.cpu().detach().numpy().tolist())
        fin_outputs.extend(torch.sigmoid(outputs).cpu().detach().numpy().tolist())

    return fin_outputs, fin_targets, total_loss / len(train_loader)


In [10]:
def validation(model, device, test_loader):
    model.eval()
    fin_targets = []
    fin_outputs = []
    total_loss = 0

    with torch.no_grad():
        for batch in test_loader:
            ids = batch['ids'].to(device, dtype=torch.long)
            mask = batch['mask'].to(device, dtype=torch.long)
            targets = batch['targets'].to(device, dtype=torch.float)

            outputs = model(ids, mask)
            loss = loss_fun(outputs, targets)

            total_loss += loss.item()
            fin_targets.extend(targets.cpu().detach().numpy().tolist())
            fin_outputs.extend(torch.sigmoid(outputs).cpu().detach().numpy().tolist())

    return fin_outputs, fin_targets, total_loss / len(test_loader)

In [11]:
# Main function
def main():
    train_loader, test_loader, tokenizer = get_loader()

    model = SentimentMultilabelMiniLM(NUM_LABELS)
    model.to(device)

    optimizer = torch.optim.Adam(params=model.parameters(), lr=LEARNING_RATE)

    best_loss = np.inf

    for epoch in range(EPOCHS):
        print(f"Epoch {epoch + 1}/{EPOCHS}")

        train_outputs, train_targets, train_loss = train(model, device, train_loader, optimizer)
        train_metrics = print_metrics(train_targets, train_outputs, train_loss, 'Train')

        val_outputs, val_targets, val_loss = validation(model, device, test_loader)
        val_metrics = print_metrics(val_targets, val_outputs, val_loss, 'Validation')

        if val_loss < best_loss:
            best_loss = val_loss
            save_checkpoint({
                'epoch': epoch + 1,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'loss': best_loss,
            })

        # Save metrics for each epoch
        save_metrics(train_metrics, f'train_epoch_{epoch + 1}')
        save_metrics(val_metrics, f'val_epoch_{epoch + 1}')

        print(f"Epoch {epoch + 1} completed.\n")

    print("Training complete.")

In [12]:
if __name__ == "__main__":
    main()

pytorch_model.bin:   0%|          | 0.00/559M [00:00<?, ?B/s]

Epoch 1/5
Train loss: 0.2127 | Hamming Loss: 0.0885 | Precision Micro: 0.5095 | Recall Micro: 0.4393 | F1 Micro: 0.4718 | Precision Macro: 0.2393 | Recall Macro: 0.1911 | F1 Macro: 0.2057
Validation loss: 0.1566 | Hamming Loss: 0.0577 | Precision Micro: 0.7256 | Recall Micro: 0.5908 | F1 Micro: 0.6513 | Precision Macro: 0.7552 | Recall Macro: 0.3214 | F1 Macro: 0.3330
=> Saving Model
Epoch 1 completed.

Epoch 2/5
Train loss: 0.1366 | Hamming Loss: 0.0493 | Precision Micro: 0.7501 | Recall Micro: 0.6780 | F1 Micro: 0.7122 | Precision Macro: 0.5712 | Recall Macro: 0.4248 | F1 Macro: 0.4558
Validation loss: 0.1291 | Hamming Loss: 0.0478 | Precision Micro: 0.7659 | Recall Micro: 0.6862 | F1 Micro: 0.7239 | Precision Macro: 0.6922 | Recall Macro: 0.4537 | F1 Macro: 0.4781
=> Saving Model
Epoch 2 completed.

Epoch 3/5
Train loss: 0.1070 | Hamming Loss: 0.0383 | Precision Micro: 0.7952 | Recall Micro: 0.7728 | F1 Micro: 0.7838 | Precision Macro: 0.6801 | Recall Macro: 0.5944 | F1 Macro: 0.621