In [None]:
! pip install -U accelerate
! pip install -U transformers
! pip install datasets

In [1]:
import os
import pandas as pd
import numpy as np
import shutil
import sys
import tqdm.notebook as tq
from collections import defaultdict

import torch
import torch.nn as nn

In [2]:
import spacy
import re
import pickle
import ast
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, hamming_loss, jaccard_score
import pandas as pd
import torch
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
from transformers import BertTokenizer, BertForSequenceClassification, Trainer, TrainingArguments
from transformers import DataCollatorWithPadding
from datasets import Dataset
from datasets import load_metric
import matplotlib.pyplot as plt
from PIL import Image
import numpy as np

  from .autonotebook import tqdm as notebook_tqdm


In [1]:
from models import captionning, description
from bert_functions import trad, get_description
df=trad.final('excel.xlsx','e03f5a75-f095-449b-b53d-54dde5dbba3f:fx')
df=get_description.desc(df,'../Images')
df.to_excel('described.xlsx',index=False)

  from .autonotebook import tqdm as notebook_tqdm


Files already downloaded and verified


In [None]:
from bert_functions import get_ready
df=pd.read_excel('described.xlsx')
df,y_true=get_ready.ready(df)

df.to_excel('ready.xlsx',index=False)

In [None]:
df=pd.read_excel('excel.xlsx')
pictures=df['Bildbez']
df=pd.read_excel('ready.xlsx')
y_true=pd.read_excel('y_true.xlsx')

In [None]:
from sklearn.model_selection import train_test_split
# split into train and test
df_train, df_testing = train_test_split(df, random_state=77, test_size=0.30, shuffle=True)
# split test into test and validation datasets
df_test, df_valid = train_test_split(df_testing, random_state=88, test_size=0.50, shuffle=True)

In [None]:
from transformers import BertTokenizer, BertModel
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

In [None]:
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, df, tokenizer, max_len, target_list):
        self.tokenizer = tokenizer
        self.df = df
        self.title = list(df['combined'])
        self.targets = self.df[target_list].values
        self.max_len = max_len

    def __len__(self):
        return len(self.title)

    def __getitem__(self, index):
        title = str(self.title[index])
        title = " ".join(title.split())
        inputs = self.tokenizer.encode_plus(
            title,
            None,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            return_token_type_ids=True,
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )
        return {
            'input_ids': inputs['input_ids'].flatten(),
            'attention_mask': inputs['attention_mask'].flatten(),
            'token_type_ids': inputs["token_type_ids"].flatten(),
            'targets': torch.FloatTensor(self.targets[index]),
            'title': title
        }


In [None]:
target_list = list(df.columns)[:-1]
target_list

In [None]:
# Hyperparameters
MAX_LEN = 256
TRAIN_BATCH_SIZE = 32
VALID_BATCH_SIZE = 32
TEST_BATCH_SIZE = 32
EPOCHS = 200
LEARNING_RATE = 1e-05

In [None]:
train_dataset = CustomDataset(df_train, tokenizer, MAX_LEN, target_list)
valid_dataset = CustomDataset(df_valid, tokenizer, MAX_LEN, target_list)
test_dataset = CustomDataset(df_test, tokenizer, MAX_LEN, target_list)
final_dataset = CustomDataset(df, tokenizer, MAX_LEN, target_list)
testing_dataset = CustomDataset(df_testing, tokenizer, MAX_LEN, target_list)

In [None]:
# Data loaders

def data_loader (TRAIN_BATCH_SIZE, VALID_BATCH_SIZE, TEST_BATCH_SIZE):
  train_data_loader = torch.utils.data.DataLoader(train_dataset,
      batch_size=TRAIN_BATCH_SIZE,
      shuffle=True,
      num_workers=0
  )

  val_data_loader = torch.utils.data.DataLoader(valid_dataset,
      batch_size=VALID_BATCH_SIZE,
      shuffle=False,
      num_workers=0
  )

  test_data_loader = torch.utils.data.DataLoader(test_dataset,
      batch_size=TEST_BATCH_SIZE,
      shuffle=False,
      num_workers=0
  )

  return train_data_loader, val_data_loader, test_data_loader



**Pre Training**

In [None]:
from datasets import load_dataset
from transformers import BertTokenizer, BertForSequenceClassification, Trainer, TrainingArguments, DataCollatorWithPadding
import torch
from sklearn.metrics import f1_score
import numpy as np

# Load GoEmotions dataset
go_emotions = load_dataset('go_emotions')

# Initialize BERT tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Tokenize the dataset
def tokenize_function(examples):
    return tokenizer(examples['text'], truncation=True, padding='max_length', max_length=512)

tokenized_go_emotions = go_emotions.map(tokenize_function, batched=True, remove_columns=['text'])

# Prepare the datasets
train_dataset = tokenized_go_emotions['train']
val_dataset = tokenized_go_emotions['validation']
test_dataset = tokenized_go_emotions['test']

# Convert labels to multi-hot encoding
def format_labels(examples):
    num_labels = 28
    multi_hot_labels = np.zeros((len(examples['labels']), num_labels))
    for i, labels in enumerate(examples['labels']):
        for label in labels:
            multi_hot_labels[i][label] = 1
    examples['labels'] = multi_hot_labels.tolist()
    return examples

train_dataset = train_dataset.map(format_labels, batched=True)
val_dataset = val_dataset.map(format_labels, batched=True)
test_dataset = test_dataset.map(format_labels, batched=True)


# Load pretrained model
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=28)


# Training arguments
training_args = TrainingArguments(
    output_dir='/results',
    overwrite_output_dir=True,
    num_train_epochs=3,
    per_device_train_batch_size=8,
    save_steps=10_000,
    save_total_limit=2,
    prediction_loss_only=True,
)

# Custom Data Collator to ensure correct label formatting
class CustomDataCollator(DataCollatorWithPadding):
    def __call__(self, features):
        batch = super().__call__(features)
        labels = torch.tensor([f['labels'] for f in features], dtype=torch.float)
        batch['labels'] = labels
        return batch

data_collator = CustomDataCollator(tokenizer)

# Initialize Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    tokenizer=tokenizer,
    data_collator=data_collator)

# Train the model
trainer.train()

# Evaluate the model
eval_results = trainer.evaluate()
print(f"Evaluation Results: {eval_results}")

# Save the model
trainer.save_model('/pre_trained')


In [None]:
class BERTClass(torch.nn.Module):
    def __init__(self,dropout):
        super(BERTClass, self).__init__()
        self.bert_model = BertModel.from_pretrained('./pre_trained', return_dict=True)
        self.dropout = torch.nn.Dropout(dropout)
        self.linear = torch.nn.Linear(768, len(target_list))

    def forward(self, input_ids, attn_mask, token_type_ids):
        output = self.bert_model(
            input_ids,
            attention_mask=attn_mask,
            token_type_ids=token_type_ids
        )
        output_dropout = self.dropout(output.pooler_output)
        output = self.linear(output_dropout)
        return output


# # Freezing BERT layers: (tested, weaker convergence)
# for param in model.bert_model.parameters():
#     param.requires_grad = False
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')


In [None]:
import torch
from transformers import AdamW
import torch.nn as nn
import numpy as np
import tqdm as tq
from collections import defaultdict
import os
from sklearn.metrics import hamming_loss

def loss_fn(outputs, targets):
    return torch.nn.BCEWithLogitsLoss()(outputs, targets)

# Define the optimizer
def optimize(model, lr):
    return AdamW(model.parameters(), lr=lr)


In [None]:

# Training of the model for one epoch
def train_model(training_loader, model, optimizer, threshold=0.5):
    losses = []
    correct_predictions = 0
    num_samples = 0
    model.train()

    loop = tq.tqdm(enumerate(training_loader), total=len(training_loader), leave=True, colour='BLUE')
    for batch_idx, data in loop:
        ids = data['input_ids'].to(device, dtype=torch.long)
        mask = data['attention_mask'].to(device, dtype=torch.long)
        token_type_ids = data['token_type_ids'].to(device, dtype=torch.long)
        targets = data['targets'].to(device, dtype=torch.float)

        outputs = model(ids, mask, token_type_ids)
        loss = loss_fn(outputs, targets)
        losses.append(loss.item())

        probs = torch.sigmoid(outputs)
        preds = (probs >= threshold).float()

        optimizer.zero_grad()
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()

        loop.set_postfix(batch_loss=loss.item())

    # Move targets and preds to CPU before converting to numpy arrays
    targets_cpu = targets.cpu().detach().numpy()
    preds_cpu = preds.cpu().detach().numpy()

    return model, hamming_loss(targets_cpu, preds_cpu), np.mean(losses)

def eval_model(validation_loader, model, threshold=0.5):
    losses = []
    targets_all = []
    preds_all = []
    model.eval()

    with torch.no_grad():
        for batch_idx, data in enumerate(validation_loader, 0):
            ids = data['input_ids'].to(device, dtype=torch.long)
            mask = data['attention_mask'].to(device, dtype=torch.long)
            token_type_ids = data['token_type_ids'].to(device, dtype=torch.long)
            targets = data['targets'].to(device, dtype=torch.float)

            outputs = model(ids, mask, token_type_ids)
            loss = loss_fn(outputs, targets)
            losses.append(loss.item())

            probs = torch.sigmoid(outputs)
            preds = (probs >= threshold).float()

            targets_all.append(targets.cpu())  # Move targets to CPU
            preds_all.append(preds.cpu())  # Move preds to CPU

    targets_all = torch.cat(targets_all, dim=0)
    preds_all = torch.cat(preds_all, dim=0)

    # Calculate hamming loss
    hamming_loss_val = hamming_loss(targets_all.cpu().numpy(), preds_all.cpu().numpy())

    return hamming_loss_val, np.mean(losses)


In [None]:

history = defaultdict(list)
best_accuracy = 0

thresholds = [0.2,0.3, 0.4, 0.5, 0.6, 0.7,0.8]
learning_rate = [1e-05, 5e-05, 1e-04, 5e-04]
batch_size = [8, 16, 32]
dropout = 0.3
accs = []


i=0
for lr in learning_rate:
    for batch in batch_size:
        train_data_loader, val_data_loader, test_data_loader = data_loader(batch, batch, batch)
        model = BERTClass(dropout)
        model.to(device)
        optimizer = optimize(model, lr)
        best_hamming = 1

        for threshold in thresholds:
            i+=1
            for epoch in range(1, EPOCHS + 1):
                print(f'Epoch {epoch}/{EPOCHS}, Threshold {threshold}, Combination {i}')
                model, train_hamming, train_loss = train_model(train_data_loader, model, optimizer, threshold)
                val_hamming, val_loss = eval_model(val_data_loader, model, threshold)

                print(f'train_loss={train_loss:.4f}, val_loss={val_loss:.4f} train_hamming={train_hamming:.4f}, val_hamming={val_hamming:.4f}')

                history['train_hamming'].append(train_hamming)
                history['train_loss'].append(train_loss)
                history['val_hamming'].append(val_hamming)
                history['val_loss'].append(val_loss)

                if val_hamming < best_hamming:
                    torch.save(model.state_dict(),'./bert/best_model_state.bin' )
                    best_hamming = val_hamming
            accs.append(best_hamming)




In [None]:
accs

In [None]:
lr=1e-04
batch=32
threshold=0.4
dropout=0.3


In [None]:
history = defaultdict(list)
best_accuracy = 0
train_data_loader, val_data_loader, test_data_loader = data_loader(batch, batch, batch)
model = BERTClass(dropout)
model.to(device)
optimizer = optimize(model, lr)
best_hamming = 1
for epoch in range(1, EPOCHS + 1):
    print(f'Epoch {epoch}/{EPOCHS}, Threshold {threshold}')
    model, train_hamming, train_loss = train_model(train_data_loader, model, optimizer, threshold)
    val_hamming, val_loss = eval_model(val_data_loader, model, threshold)

    print(f'train_loss={train_loss:.4f}, val_loss={val_loss:.4f} train_hamming={train_hamming:.4f}, val_hamming={val_hamming:.4f}')

    history['train_hamming'].append(train_hamming)
    history['train_loss'].append(train_loss)
    history['val_hamming'].append(val_hamming)
    history['val_loss'].append(val_loss)

    if val_hamming < best_hamming:
        torch.save(model.state_dict(),'./bert/best_model_state.bin')
        best_hamming = val_hamming



In [None]:
final_data_loader = torch.utils.data.DataLoader(final_dataset,
      batch_size=batch,
      shuffle=False,
      num_workers=0)

testing_data_loader = torch.utils.data.DataLoader(testing_dataset,
      batch_size=batch,
      shuffle=False,
      num_workers=0)

  # Load the trained model
model = BERTClass(dropout)
model.load_state_dict(torch.load('./bert/best_model_state.bin', map_location=torch.device('cpu')))
model.to(device)
model.eval()

In [None]:
from bert_functions import get_prediction
input_ids, predictions = get_prediction.pred(final_data_loader, model, threshold)
input_ids2, predictions2 = get_prediction.pred(testing_data_loader, model, threshold)

In [None]:
target=target_list[:-4]
target.append('Productivity')
target.append('Valence')

In [None]:
y_pred=pd.DataFrame(predictions,columns=target)

In [None]:
from bert_functions import eval

good=eval.eval(y_pred,y_true)
good

In [None]:
y_pred2=pd.DataFrame(predictions2,columns=target)
y_true2=pd.DataFrame(df_testing.drop(columns=['combined']),columns=target_list)


good2=eval.eval(y_pred2,y_true2)

In [None]:
#for i in range(y_pred.shape[0]):
for i in range(50):
    pred_row=y_pred.iloc[i]
    true_row=y_true.iloc[i]
    pred_labels=[col for col in pred_row.index[:-2] if pred_row[col]==1]
    true_labels=[col for col in true_row.index[:-2] if true_row[col]==1]
    c=0
    for y in pred_labels:
        if y in true_labels:
            c+=1
    if len(pred_labels)!=0:
        if c/len(pred_labels)+c/len(true_labels)==2:
            a=pictures[i]
            try:
                display(Image.open(f'../Images/{a}.jpg'))
                print(df['combined'][i])
                print('True labels : ',true_labels,', Productivity : ',true_row[-2],', Valence : ',true_row[-1])
                print('Predicted labels : ',pred_labels,', Productivity : ',pred_row[-2],', Valence : ',pred_row[-1])
                print(c/len(true_labels),c/len(pred_labels))
            except FileNotFoundError:
                pass
