In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader, RandomSampler, SequentialSampler
import transformers
from transformers import BertTokenizer, BertModel, BertConfig
import sklearn
from sklearn import metrics
import numpy as np

In [None]:
pip install scikit-multilearn

In [None]:
!pip install openpyxl

In [None]:
from skmultilearn.model_selection import iterative_train_test_split

In [None]:
# # Setting up the device for GPU usage
from torch import cuda
device = 'cuda' if cuda.is_available() else 'cpu'

In [None]:
df = pd.read_excel("/content/drive/My Drive/Datasets/oversampled_dataset.xlsx")
df['list'] = df[df.columns[1:]].values.tolist()
new_df = df[['Title', 'list']].copy()
new_df.head()

In [None]:
idx = new_df.index
text = new_df['Title']
labels = new_df['list']

In [None]:
print(text)

In [None]:
labels = np.array([np.array(label, dtype=int) for label in labels])
print(labels)

In [None]:
# create label weights
label_weights = 1 - labels.sum(axis=0) / labels.sum()

#from sklearn.utils.class_weight import compute_class_weight
#label_weights = compute_class_weight('balanced', classes=[0, 1], y=labels.flatten())
#label_weights = torch.tensor(label_weights).float().to(device)

In [None]:
# stratified train-val-test split for multilabel dataset
row_ids = np.arange(len(labels))
train_idx, y_train, remaining_idx, y_remaining = iterative_train_test_split(row_ids[:,np.newaxis], labels, test_size = 0.2)
val_idx, y_val, test_idx, y_test = iterative_train_test_split(remaining_idx[:,np.newaxis], y_remaining, test_size = 0.5)

# Extract corresponding texts for each split
x_train = [text[i] for i in train_idx.flatten()]
x_val = [text[i] for i in val_idx.flatten()]
x_test = [text[i] for i in test_idx.flatten()]


In [None]:
# Configuration
MAX_LEN = 200
TRAIN_BATCH_SIZE = 16
VALID_BATCH_SIZE = 8
EPOCHS = 8
LEARNING_RATE = 2e-05
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

In [None]:
class CustomDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len=200):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, index):
        text = str(self.texts[index])
        text = " ".join(text.split())

        inputs = self.tokenizer.encode_plus(
            text,
            None,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            truncation=True,
            return_token_type_ids=True
        )

        return {
            'ids': torch.tensor(inputs['input_ids'], dtype=torch.long),
            'mask': torch.tensor(inputs['attention_mask'], dtype=torch.long),
            'token_type_ids': torch.tensor(inputs['token_type_ids'], dtype=torch.long),
            'targets': torch.tensor(self.labels[index], dtype=torch.float)
        }

# Create datasets
train_dataset = CustomDataset(x_train, y_train, tokenizer, max_len=MAX_LEN)
val_dataset = CustomDataset(x_val, y_val, tokenizer, max_len=MAX_LEN)
test_dataset = CustomDataset(x_test, y_test, tokenizer, max_len=MAX_LEN)

# Create dataloaders
train_loader = DataLoader(train_dataset, batch_size=TRAIN_BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=VALID_BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=VALID_BATCH_SIZE, shuffle=False)

In [None]:
print("FULL Dataset: {}".format(len(x_train) + len(x_val) + len(x_test)))
print("TRAIN Dataset: {}".format(len(train_dataset)))
print("VALIDATION Dataset: {}".format(len(val_dataset)))
print("TEST Dataset: {}".format(len(test_dataset)))

In [None]:
class BERTClass(torch.nn.Module):
    def __init__(self, num_labels):
        super(BERTClass, self).__init__()
        self.l1 = transformers.BertModel.from_pretrained('bert-base-uncased')
        self.l2 = torch.nn.Dropout(0.3)
        self.l3 = torch.nn.Linear(768, num_labels)

    def forward(self, ids, mask, token_type_ids):
        _, output_1 = self.l1(ids, attention_mask=mask, token_type_ids=token_type_ids, return_dict=False)
        output_2 = self.l2(output_1)
        output = self.l3(output_2)
        return output

# Initialize model with number of labels
num_labels = y_train.shape[1]
model = BERTClass(num_labels)
model.to(device)

In [None]:
# Loss function with class weights
def loss_fn(outputs, targets):
    return torch.nn.BCEWithLogitsLoss(weight=torch.tensor(label_weights).to(device))(outputs, targets)

# Optimizer
optimizer = torch.optim.Adam(params=model.parameters(), lr=LEARNING_RATE)

In [None]:
# Lists to track metrics during training
train_losses = []
val_losses = []
accuracies = []
f1_micro_scores = []
f1_macro_scores = []

In [None]:
# Training function
def train(epoch):
    model.train()
    train_loss = 0
    for _, data in enumerate(train_loader, 0):
        ids = data['ids'].to(device)
        mask = data['mask'].to(device)
        token_type_ids = data['token_type_ids'].to(device)
        targets = data['targets'].to(device)

        optimizer.zero_grad()
        outputs = model(ids, mask, token_type_ids)
        loss = loss_fn(outputs, targets)
        #if _%500==0:
            #print(f'Epoch: {epoch}, Loss: {loss.item()}')

        train_loss += loss.item()
        loss.backward()
        optimizer.step()

    #train_losses.append(epoch_loss / len(train_loader))
    return train_loss / len(train_loader)

In [None]:
# Validation function
def validation(epoch):
    model.eval()
    val_loss = 0
    fin_targets = []
    fin_outputs = []
    with torch.no_grad():
        for _, data in enumerate(val_loader, 0):
            ids = data['ids'].to(device)
            mask = data['mask'].to(device)
            token_type_ids = data['token_type_ids'].to(device)
            targets = data['targets'].to(device)

            outputs = model(ids, mask, token_type_ids)

            #loss = loss_fn(outputs, targets)
            #val_loss += loss.item()
            val_loss += loss_fn(outputs, targets).item()
            fin_targets.extend(targets.cpu().detach().numpy().tolist())
            fin_outputs.extend(torch.sigmoid(outputs).cpu().detach().numpy().tolist())

    fin_outputs = np.array(fin_outputs) >= 0.5
    f1_micro = metrics.f1_score(fin_targets, fin_outputs, average='micro')
    f1_macro = metrics.f1_score(fin_targets, fin_outputs, average='macro')

    #return fin_outputs, fin_targets
    return val_loss / len(val_loader), f1_micro, f1_macro

In [None]:
# Training loop

train_losses = []
val_losses = []
f1_scores_micro = []
f1_scores_macro = []

for epoch in range(EPOCHS):
    #train(epoch)
    #outputs, targets = validation(epoch)

    train_loss = train(epoch)
    val_loss, f1_micro, f1_macro = validation(epoch)

    # Convert to binary predictions
    #outputs = np.array(outputs) >= 0.5
    train_losses.append(train_loss)
    val_losses.append(val_loss)
    f1_scores_micro.append(f1_micro)
    f1_scores_macro.append(f1_macro)
    # Calculate metrics
    #accuracy = metrics.accuracy_score(targets, outputs)
    #f1_micro = metrics.f1_score(targets, outputs, average='micro')
    #f1_macro = metrics.f1_score(targets, outputs, average='macro')

    #print(f"Accuracy Score = {accuracy}")
    #print(f"F1 Score (Micro) = {f1_micro}")
    #print(f"F1 Score (Macro) = {f1_macro}")
    #print(f"Hamming Loss = {hamming_loss}")

    print(f"Epoch {epoch+1}/{EPOCHS}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, F1 Micro: {f1_micro:.4f}, F1 Macro: {f1_macro:.4f}")

In [None]:
# Save the model
torch.save(model.state_dict(), "MyBert_multilabel_model.pt")
print("Model saved successfully!")

In [None]:
# Testing function
def test_model(test_loader, model):
    model.eval()
    fin_targets = []
    fin_outputs = []
    with torch.no_grad():
        for _, data in enumerate(test_loader, 0):
            ids = data['ids'].to(device)
            mask = data['mask'].to(device)
            token_type_ids = data['token_type_ids'].to(device)
            targets = data['targets']

            outputs = model(ids, mask, token_type_ids)
            fin_targets.extend(targets.cpu().detach().numpy().tolist())
            fin_outputs.extend(torch.sigmoid(outputs).cpu().detach().numpy().tolist())

    fin_outputs = np.array(fin_outputs) >= 0.5
    accuracy = metrics.accuracy_score(fin_targets, fin_outputs)
    f1_micro = metrics.f1_score(fin_targets, fin_outputs, average='micro')
    f1_macro = metrics.f1_score(fin_targets, fin_outputs, average='macro')

    print("Test Metrics:")
    print(f"Accuracy = {accuracy:.4f}")
    print(f"F1 Micro = {f1_micro:.4f}")
    print(f"F1 Macro = {f1_macro:.4f}")


In [None]:
# Test the model
test_model(test_loader, model)

In [None]:
# Plot metrics
plt.figure(figsize=(12, 6))
plt.plot(train_losses, label="Train Loss", marker='o')
plt.plot(val_losses, label="Validation Loss", marker='o')
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.title("Loss Over Epochs")
plt.legend()
plt.grid()
plt.show()

plt.figure(figsize=(12, 6))
plt.plot(f1_scores_micro, label="F1 Micro", marker='o')
plt.plot(f1_scores_macro, label="F1 Macro", marker='o')
plt.xlabel("Epochs")
plt.ylabel("F1 Score")
plt.title("F1 Scores Over Epochs")
plt.legend()
plt.grid()
plt.show()

In [None]:
from google.colab import files

files.download('MyBert_multilabel_model.pt')