In [None]:
import torch
import pandas as pd
from tqdm.notebook import tqdm

In [None]:
df = pd.read_csv('/content/smile-annotations-final.csv',
    names=['id', 'text', 'category'])

df.set_index('id', inplace=True)

In [None]:
df.head()

In [None]:
df.text.iloc[78]

In [None]:
df.category.value_counts()

In [None]:
df=df[~df.category.str.contains('\|')]

In [None]:
df=df[df.category != 'nocode']

In [None]:
df.category.value_counts()

In [None]:
possible_labels=df.category.unique()

In [None]:
label_dict={}
for index, possible_lable in enumerate(possible_labels):
    label_dict[possible_lable]=index

In [None]:
label_dict

In [None]:
df['labels']=df.category.replace(label_dict)
df.head(10)

In [None]:
from sklearn.model_selection import train_test_split

In [None]:
X_train, X_val, Y_train, Y_val=train_test_split(
    df.index.values,
    df.labels.values,
    test_size=0.15,
    random_state=17,
    stratify=df.labels.values
)

In [None]:
df['data_type']=['not_set']*df.shape[0]

In [None]:
df.loc[X_train, 'data_type']='train'
df.loc[X_val, 'data_type']='val'

In [None]:
df.groupby(['category','labels','data_type']).count()

In [None]:
from transformers import BertTokenizer
from torch.utils.data import TensorDataset

In [None]:
tokenizer=BertTokenizer.from_pretrained(
    'bert-base-uncased',
    do_lower_case=True
)

In [None]:
encoded_data_train=tokenizer.batch_encode_plus(
    df[df.data_type=='train'].text.values,
    add_special_tokens=True,
    return_attention_mask=True,
    padding='max_length',
    truncation=True,
    max_length=256,
    return_tensors='pt'
)

encoded_data_val=tokenizer.batch_encode_plus(
    df[df.data_type=='val'].text.values,
    add_special_tokens=True,
    return_attention_mask=True,
    padding='max_length',
    truncation=True,
    max_length=256,
    return_tensors='pt'
)

input_ids_train=encoded_data_train['input_ids']
attention_masks_train=encoded_data_train['attention_mask']
labels_train=torch.tensor(df[df.data_type=='train'].labels.values)

input_ids_val=encoded_data_val['input_ids']
attention_masks_val=encoded_data_val['attention_mask']
labels_val=torch.tensor(df[df.data_type=='val'].labels.values)


In [None]:
dataset_train=TensorDataset(input_ids_train, attention_masks_train, labels_train)
dataset_val=TensorDataset(input_ids_val, attention_masks_val, labels_val)

In [None]:
len(dataset_train)

In [None]:
len(dataset_val)

In [None]:
from transformers import BertForSequenceClassification

In [None]:
model=BertForSequenceClassification.from_pretrained(
    'bert-base-uncased',
    num_labels=len(label_dict),
    output_attentions=False,
    output_hidden_states=False
)

In [None]:
from torch.utils.data import DataLoader, RandomSampler, SequentialSampler

In [None]:
batch_size=4

dataloader_train=DataLoader(
    dataset_train,
    sampler=RandomSampler(dataset_train),
    batch_size=batch_size
)

batch_size=32

dataloader_val=DataLoader(
    dataset_val,
    sampler=SequentialSampler(dataset_val),
    batch_size=batch_size
)

In [None]:
from torch.optim import AdamW
from transformers import get_linear_schedule_with_warmup

In [None]:
optimizer=AdamW(
    model.parameters(),
    lr=1e-5,
    eps=1e-8
)

In [None]:
epochs=10

scheduler=get_linear_schedule_with_warmup(
    optimizer,
    num_warmup_steps=0,
    num_training_steps=len(dataloader_train)*epochs
)

In [None]:
import numpy as np

In [None]:
from sklearn.metrics import f1_score

In [None]:
preds=[]

In [None]:
def f1_score_func(preds, labels):
    preds_flat=np.argmax(preds, axis=1).flatten()
    labels_flat=labels.flatten()
    return f1_score(labels_flat, preds_flat, average='weighted')

In [None]:
def accuracy_per_class(preds, labels):
    label_dict_inverse={v:k for k, v in label_dict.items()}

    preds_flat=np.argmax(preds, axis=1).flatten()
    labels_flat=labels.flatten()

    for label in np.unique(labels_flat):
        y_preds=preds_flat[labels_flat==label]
        y_true=labels_flat[labels_flat==label]
        print(f'Class: {label_dict_inverse[label]}')
        print(f'Accuracy: {len(y_preds[y_preds==label])}/{len(y_true)} \n')

In [None]:
import random

seed_val = 17
random.seed(seed_val)
np.random.seed(seed_val)
torch.manual_seed(seed_val)
torch.cuda.manual_seed_all(seed_val)

In [None]:
device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

print(device)

In [None]:
def evaluate(dataloader_val):
    model.eval()

    loss_val_total = 0
    predictions, true_vals = [], []

    with torch.no_grad():
        for batch in dataloader_val:
            batch = tuple(b.to(device) for b in batch)

            inputs = {
                'input_ids': batch[0],
                'attention_mask': batch[1],
                'labels': batch[2]
            }

            outputs = model(**inputs)

            loss = outputs.loss
            logits = outputs.logits

            loss_val_total += loss.item()

            predictions.append(logits.detach().cpu().numpy())
            true_vals.append(inputs['labels'].detach().cpu().numpy())

    loss_val_avg = loss_val_total / len(dataloader_val)

    predictions = np.concatenate(predictions, axis=0)
    true_vals = np.concatenate(true_vals, axis=0)

    return loss_val_avg, predictions, true_vals

In [None]:
for epoch in range(1, epochs + 1):

    model.train()
    loss_train_total = 0

    progress_bar = tqdm(
        dataloader_train,
        desc=f'Epoch {epoch}',
        leave=False
    )

    for batch in progress_bar:
        model.zero_grad()

        batch = tuple(b.to(device) for b in batch)

        inputs = {
            'input_ids': batch[0],
            'attention_mask': batch[1],
            'labels': batch[2]
        }

        outputs = model(**inputs)
        loss = outputs.loss

        loss_train_total += loss.item()
        loss.backward()

        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)

        optimizer.step()
        scheduler.step()

        progress_bar.set_postfix({
            'train_loss': f'{loss.item():.3f}'
        })

    loss_train_avg = loss_train_total / len(dataloader_train)

    val_loss, predictions, true_vals = evaluate(dataloader_val)
    val_f1 = f1_score_func(predictions, true_vals)

    print(f'\nEpoch {epoch}')
    print(f'Training loss: {loss_train_avg:.4f}')
    print(f'Validation loss: {val_loss:.4f}')
    print(f'Validation F1 (weighted): {val_f1:.4f}')

In [None]:
import os

os.makedirs("models", exist_ok=True)

torch.save({
    'model_state_dict': model.state_dict(),
    'label_dict': label_dict
}, 'models/bert_sentiment.pt')

In [None]:
checkpoint = torch.load('models/bert_sentiment.pt', map_location=device)

model.load_state_dict(checkpoint['model_state_dict'])
model.to(device)
model.eval()

val_loss, predictions, true_vals = evaluate(dataloader_val)

print("Final Validation Loss:", val_loss)
print("Final Validation F1:", f1_score_func(predictions, true_vals))

accuracy_per_class(predictions, true_vals)