<a href="https://colab.research.google.com/github/yogasgm/prototype_finetuning_pytorch/blob/main/Prototype_Multiclass_Classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

# Importing libraries

In [None]:
!pip install transformers

import torch
import torch.nn as nn
import pandas as pd
import numpy as np
import random
import shutil
import sys
from sklearn.model_selection import train_test_split

# Setting seed for reproducibility

In [None]:
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

In [None]:
set_seed(43)

# Downloading dataset

In [None]:
from requests import get as rget

# Update URL to raw CSV file
url = "https://raw.githubusercontent.com/Fal186/Mapping-web3/refs/heads/main/dataset/web3_Stage3_communicative_intent.csv"

res = rget(url)
with open('file.csv', 'wb+') as f:
    f.write(res.content)

train_df = pd.read_csv('file.csv')

In [None]:
train_df.info()

In [None]:
possible_labels = train_df.Predicted_Label.unique()
label_dict = {}
for index, possible_label in enumerate(possible_labels):
    label_dict[possible_label] = index
label_dict

In [None]:
train_df['Predicted_Label'] = train_df.Predicted_Label.replace(label_dict)

In [None]:
train_df.head()

# Preparing the tokenizer

In [None]:
#Set Max Lenght, maksimal 512 (BERT)
MAX_LEN = 512

In [None]:
from transformers import BertTokenizer, BertModel

In [None]:
#download the tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

In [None]:
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, df, tokenizer, max_len):
        self.tokenizer = tokenizer
        self.df = df
        self.text = df['text']
        self.targets = self.df['Predicted_Label']
        self.max_len = max_len

    def __len__(self):
        return len(self.text)

    def __getitem__(self, index):
        text = str(self.text[index])
        text = " ".join(text.split())
        inputs = self.tokenizer.encode_plus(
            text,
            None,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            return_token_type_ids=True,
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )

        return {
            'input_ids': inputs['input_ids'].flatten(),
            'attention_mask': inputs['attention_mask'].flatten(),
            'token_type_ids': inputs["token_type_ids"].flatten(),
            'targets': torch.tensor(self.targets[index], dtype=torch.long)
        }

# Splitting & Tokenizing Dataset

In [None]:
# Checking for available device
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [None]:
device

In [None]:
# Adjusting the train/validation/test split
train_df, temp_df = train_test_split(train_df, test_size=0.2, random_state=43)
val_df, test_df = train_test_split(temp_df, test_size=0.5, random_state=43)

# Reset the indices
train_df = train_df.reset_index(drop=True)
val_df = val_df.reset_index(drop=True)
test_df = test_df.reset_index(drop=True)

In [None]:
# === Compute Class Weights ===
from sklearn.utils.class_weight import compute_class_weight
import numpy as np
import torch

# Assume label_dict has been defined and labels are numerical
y = train_df['Predicted_Label'].values
classes = np.unique(y)

# Compute weights
weights = compute_class_weight(class_weight='balanced', classes=classes, y=y)
class_weights = torch.tensor(weights, dtype=torch.float).to(device)

# === Updated Loss Function with Class Weights ===
def criterion (outputs, targets):
    return torch.nn.CrossEntropyLoss(weight=class_weights)(outputs, targets)

In [None]:
print (class_weights)

In [None]:
# For training set
print(train_df['Predicted_Label'].value_counts())

# For validation set
print(val_df['Predicted_Label'].value_counts())

# For validation set
print(test_df['Predicted_Label'].value_counts())

In [None]:
train_df.shape

In [None]:
val_df.shape

In [None]:
val_df

In [None]:
# Create the CustomDataset for each set
train_dataset = CustomDataset(train_df, tokenizer, MAX_LEN)
valid_dataset = CustomDataset(val_df, tokenizer, MAX_LEN)
test_dataset = CustomDataset(test_df, tokenizer, MAX_LEN)

In [None]:
len(train_dataset)

# Setting hyperparameters

In [None]:
TRAIN_BATCH_SIZE = 16
VALID_BATCH_SIZE = 16
EPOCHS = 5
LEARNING_RATE = 5e-5

In [None]:
# Data loaders
train_data_loader = torch.utils.data.DataLoader(train_dataset, batch_size=TRAIN_BATCH_SIZE)
val_data_loader = torch.utils.data.DataLoader(valid_dataset, batch_size=VALID_BATCH_SIZE)

In [None]:
# Checking for available device
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [None]:
device

# Additional functions for loading and saving checkpoints

In [None]:
def load_ckp(checkpoint_fpath, model, optimizer):
    """
    checkpoint_path: path to save checkpoint
    model: model that we want to load checkpoint parameters into
    optimizer: optimizer we defined in previous training
    """
    # load check point
    checkpoint = torch.load(checkpoint_fpath)
    # initialize state_dict from checkpoint to model
    model.load_state_dict(checkpoint['state_dict'])
    # initialize optimizer from checkpoint to optimizer
    optimizer.load_state_dict(checkpoint['optimizer'])
    # initialize valid_loss_min from checkpoint to valid_loss_min
    valid_loss_min = checkpoint['valid_loss_min']
    # return model, optimizer, epoch value, min validation loss
    return model, optimizer, checkpoint['epoch'], valid_loss_min

def save_ckp(state, is_best, checkpoint_path, best_model_path):
    """
    state: checkpoint we want to save
    is_best: is this the best checkpoint; min validation loss
    checkpoint_path: path to save checkpoint
    best_model_path: path to save best model
    """
    f_path = checkpoint_path
    # save checkpoint data to the path given, checkpoint_path
    torch.save(state, f_path)
    # if it is a best model, min validation loss
    if is_best:
        best_fpath = best_model_path
        # copy that checkpoint file to best path given, best_model_path
        shutil.copyfile(f_path, best_fpath)

# Training the Model

Defining and Initializing the BERT Classification Model

In [None]:
# Define the model
class BERTClass(torch.nn.Module):
    def __init__(self):
        super(BERTClass, self).__init__()
        self.bert_model = BertModel.from_pretrained('bert-base-uncased')
        self.dropout = torch.nn.Dropout(0.3)
        self.linear = torch.nn.Linear(768, len(label_dict))  # Adjust the final layer to match the number of classes

    def forward(self, input_ids, attn_mask, token_type_ids):
        output = self.bert_model(
            input_ids,
            attention_mask=attn_mask,
            token_type_ids=token_type_ids
        )
        output_dropout = self.dropout(output.pooler_output)
        output = self.linear(output_dropout)
        return output

In [None]:
# Initialize the model
model = BERTClass()
model.to(device)

Setting Up the Loss Function and Optimizer

In [None]:
# Define loss function (CrossEntropyLoss for multi-class classification)
def loss_fn(outputs, targets):
    return torch.nn.CrossEntropyLoss()(outputs, targets)

optimizer = torch.optim.AdamW(params =  model.parameters(), lr=LEARNING_RATE)

Initialization of Validation Target and Output Lists

In [None]:
val_targets=[]
val_outputs=[]

Training and Validation Loop with Early Stopping

In [None]:
def train_model(n_epochs, training_loader, validation_loader, model, optimizer, checkpoint_path, best_model_path, patience):
  valid_loss_min = np.inf
  no_improve = 0

  for epoch in range(1, n_epochs+1):
    train_loss = 0.0
    valid_loss = 0.0
    model.train()
    print('############# Epoch {}: Training Start #############'.format(epoch))

    for batch_idx, data in enumerate(training_loader):
        ids = data['input_ids'].to(device, dtype = torch.long)
        mask = data['attention_mask'].to(device, dtype = torch.long)
        token_type_ids = data['token_type_ids'].to(device, dtype = torch.long)
        targets = data['targets'].to(device, dtype = torch.long)

        outputs = model(ids, mask, token_type_ids)

        optimizer.zero_grad()
        loss = criterion(outputs, targets)

        loss.backward()
        optimizer.step()

        train_loss = train_loss + ((1 / (batch_idx + 1)) * (loss.item() - train_loss))

    print('############# Epoch {}: Training End #############'.format(epoch))

    print('############# Epoch {}: Validation Start #############'.format(epoch))
    model.eval()

    with torch.no_grad():
      for batch_idx, data in enumerate(validation_loader, 0):
            ids = data['input_ids'].to(device, dtype = torch.long)
            mask = data['attention_mask'].to(device, dtype = torch.long)
            token_type_ids = data['token_type_ids'].to(device, dtype = torch.long)
            targets = data['targets'].to(device, dtype = torch.long)

            outputs = model(ids, mask, token_type_ids)
            loss = loss_fn(outputs, targets)

            valid_loss = valid_loss + ((1 / (batch_idx + 1)) * (loss.item() - valid_loss))

            val_targets.extend(targets.cpu().detach().numpy().tolist())
            val_outputs.extend(torch.sigmoid(outputs).cpu().detach().numpy().tolist())

    print('############# Epoch {}: Validation End #############'.format(epoch))

    train_loss = train_loss/len(training_loader)
    valid_loss = valid_loss/len(validation_loader)

    print('Epoch: {} \tAvgerage Training Loss: {:.6f} \tAverage Validation Loss: {:.6f}'.format(
    epoch, train_loss, valid_loss))

    checkpoint = {
      'epoch': epoch + 1,
      'valid_loss_min': valid_loss,
      'state_dict': model.state_dict(),
      'optimizer': optimizer.state_dict()
    }

    if valid_loss <= valid_loss_min:
        print('Validation loss decreased ({:.6f} --> {:.6f}).  Saving model ...'.format(valid_loss_min, valid_loss))
        torch.save(checkpoint, best_model_path)
        valid_loss_min = valid_loss
        no_improve = 0
    else:
        no_improve += 1
        if no_improve >= patience:
          print("Early stopping due to no improvement in validation loss")
          break

  return model

In [None]:
import os

# Ensure directories exist
os.makedirs("/content/gdrive/MyDrive/ckpt_path", exist_ok=True)
os.makedirs("/content/gdrive/MyDrive/Best_Model", exist_ok=True)

# Define paths for checkpoint and best model
ckpt_path = "/content/gdrive/MyDrive/Best_Model/best_model(16-5e-5-stage3-BERT-class-weightsv3).pth"
best_model_path = "/content/gdrive/MyDrive/ckpt_path/ckpth(16-5e-5-stage3-BERT-class-weightsv3).pth"



# Start Train

In [None]:
trained_model = train_model(EPOCHS, train_data_loader, val_data_loader, model, optimizer, ckpt_path, best_model_path, patience = 2)

In [None]:
# Load the saved checkpoint
model, optimizer, start_epoch, valid_loss_min = load_ckp(best_model_path, model, optimizer)

print(f'The validation loss of the best saved model is: {valid_loss_min}')

# Test

In [None]:
from sklearn.metrics import classification_report, confusion_matrix

In [None]:
# switch model out of training mode
model.eval()

In [None]:
predictions = []
true_labels = []

In [None]:
# Data loaders
test_data_loader = torch.utils.data.DataLoader(test_dataset, batch_size=VALID_BATCH_SIZE)

In [None]:
# iterate over test data
for data in test_data_loader:
    # move tensors to GPU if available
    ids = data['input_ids'].to(device, dtype = torch.long)
    mask = data['attention_mask'].to(device, dtype = torch.long)
    token_type_ids = data['token_type_ids'].to(device, dtype = torch.long)
    targets = data['targets'].to(device, dtype = torch.long)

    # compute predicted outputs
    outputs = model(ids, mask, token_type_ids)

    # convert output probabilities to predicted class
    _, pred = torch.max(outputs, dim=1)

    predictions.extend(pred.cpu().detach().numpy().tolist())
    true_labels.extend(targets.cpu().detach().numpy().tolist())

In [None]:
# calculate and print classification report
print(classification_report(true_labels, predictions, target_names=label_dict.keys()))

In [None]:
# print confusion matrix
print(confusion_matrix(true_labels, predictions))

# Predict New Text Input

In [None]:
def predict_target(model, tokenizer, text, max_len=128):
    model.eval()

    # Preprocess the text
    inputs = tokenizer.encode_plus(
        text,
        None,
        add_special_tokens=True,
        max_length=max_len,
        padding='max_length',
        return_token_type_ids=True,
        truncation=True,
        return_attention_mask=True,
        return_tensors='pt'
    )

    ids = inputs['input_ids'].to(device)
    mask = inputs['attention_mask'].to(device)
    token_type_ids = inputs["token_type_ids"].to(device)

    outputs = model(ids, mask, token_type_ids)
    outputs = torch.softmax(outputs, dim=1).detach().cpu().numpy()

    # Get the label with highest probability
    predicted_label_idx = np.argmax(outputs, axis=1)

    # Reverse the label dictionary to get the name of the class
    reverse_label_dict = {v: k for k, v in label_dict.items()}
    predicted_label = reverse_label_dict[predicted_label_idx[0]]

    # Get probabilities for each class
    probabilities = outputs[0]

    return predicted_label, probabilities


In [None]:
text = ""
predicted_label, probabilities = predict_target(model, tokenizer, text)
print(f"The predicted label for the text is: {predicted_label}")
print(f"The probabilities for each class are: {probabilities}")

In [None]:
text = ""
predicted_label, probabilities = predict_target(model, tokenizer, text)
print(f"The predicted label for the text is: {predicted_label}")
print(f"The probabilities for each class are: {probabilities}")

In [None]:
text = ""
predicted_label, probabilities = predict_target(model, tokenizer, text)
print(f"The predicted label for the text is: {predicted_label}")
print(f"The probabilities for each class are: {probabilities}")