### Importing required libraries

In [None]:
from transformers import BertModel, BertTokenizer, AdamW, get_linear_schedule_with_warmup
import torch
import os
import numpy as np
import pandas as pd
import seaborn as sns
from pylab import rcParams
from pathlib import Path
import matplotlib.pyplot as plt
from matplotlib import rc
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
from collections import defaultdict
from textwrap import wrap
from torch import nn, optim
from torch.nn import functional as F
from torch.utils.data import Dataset, DataLoader
from tqdm.notebook import tqdm

### Setting up library constants

In [None]:
%matplotlib inline
%config InlineBackend.figure_format='retina'

sns.set(style='whitegrid', palette='muted', font_scale=1.2)
HAPPY_COLORS_PALETTE = ["#01BEFE", "#FFDD00", "#FF7D00", "#FF006D", "#ADFF02", "#8F00FF"]
sns.set_palette(sns.color_palette(HAPPY_COLORS_PALETTE))

rcParams['figure.figsize'] = 12, 8

RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)


### Importing & Cleaning up database

In [None]:
datapath = 'dataset.tsv'
df = pd.read_csv(datapath, sep='\t')

In [None]:

df = df.drop(df[df.category == 'Not Dark Pattern'].index)
df = df.drop(df[df.category == 'Forced Action'].index)
df = df.drop(df[df.category == 'Sneaking'].index)
df = df.drop(df[df.category == 'Obstruction'].index)

sns.countplot(df.category)

In [None]:
labels = {}
for val,key in enumerate(np.unique(df.category)):
    labels[key] = val

class_names = list(labels.keys())

### Setting up constants

In [None]:
PRETRAINED = 'bert-base-cased'
BATCHSIZE = 32
TRAIN_RATIO = 0.8
EVAL_RATIO = 0.9
DROPOUT = 0.1
NUM_LABELS = len(class_names)
MODULE_ROOT = os.getcwd()
PROJECT_ROOT = Path(MODULE_ROOT).resolve().parents[1]
PRETRAINED_MODEL_DIR = os.path.join(PROJECT_ROOT, "dark_pattern_spotter\\pretrained_models")
PRETRAINED_MODEL = os.path.join(PRETRAINED_MODEL_DIR, os.listdir(PRETRAINED_MODEL_DIR)[-1])
EPOCHS = 10
LR = 3e-5
MODEL_NAME = "best-model"

### Creating a tokenizer

In [None]:
torch.cuda.empty_cache()
use_cuda = torch.cuda.is_available()
#use_cuda = False
device = torch.device("cuda" if use_cuda else "cpu")
#device = torch.device('cpu')
print(f"Device: {device}")
tokenizer = BertTokenizer.from_pretrained(PRETRAINED)

### Selecting max length

In [None]:

token_lens = []

for txt in df.text:
  tokens = tokenizer.encode(txt, max_length=512)
  token_lens.append(len(tokens))

sns.distplot(token_lens)
plt.xlim([0, 256])
plt.xlabel('Token count')

MAXLENGTH = 32

### Creating a dataset consisting of text & labels

In [None]:
class DarkPatternDataset(Dataset):
    def __init__(self, df):
        self.texts = [text for text in df['text']]
        self.targets = [labels[category] for category in df['category']]
    
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, idx):
        text = str(self.texts[idx])
        target = self.targets[idx]

        encoding = tokenizer.encode_plus(text,
                                         add_special_tokens=True,
                                         max_length=MAXLENGTH,
                                         return_token_type_ids=False,
                                         padding='max_length',
                                         return_attention_mask=True,
                                         truncation=True,
                                         return_tensors='pt')
        
        return {
            'text': text,
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'targets': torch.tensor(target, dtype=torch.long)
        }


### Initialise Dataloaders for training, evaluation and testing

In [None]:
df_train, df_test = train_test_split(df, test_size=1-TRAIN_RATIO, random_state=RANDOM_SEED)
df_val, df_test = train_test_split(df_test, test_size=0.5)


def create_data_loader(df, batch_size):

  ds = DarkPatternDataset(df)

  return DataLoader(
    ds,
    batch_size=batch_size,
    shuffle=True,
  )

train_dataloader = create_data_loader(df_train, BATCHSIZE)
val_dataloader = create_data_loader(df_val, BATCHSIZE)
test_dataloader = create_data_loader(df_test, BATCHSIZE)

### Creating a specialised model for our task

In [None]:
class DarkPatternClassifier(nn.Module):

    def __init__(self, dropout=DROPOUT):
        super(DarkPatternClassifier, self).__init__()
        self.bert = BertModel.from_pretrained(PRETRAINED)
        self.drop = nn.Dropout(dropout)
        self.out = nn.Linear(self.bert.config.hidden_size, NUM_LABELS)

    def forward(self, input_ids, attention_mask):
        _, pooled_output = self.bert(input_ids=input_ids, attention_mask=attention_mask, return_dict=False)
        output = self.drop(pooled_output)
        return self.out(output)
    

### Initialising the model & assigning it to the device

In [None]:
model = DarkPatternClassifier()
model = model.to(device)

### Checking the shape of model's output

In [None]:
data = next(iter(train_dataloader))

input_ids = data['input_ids'].to(device)
attention_mask = data['attention_mask'].to(device)


### Initialising an `optimizer`, `scheduler` & a `loss_fn`

In [None]:
optimizer = AdamW(model.parameters(), lr=LR, correct_bias=False)
total_steps = len(train_dataloader) * EPOCHS

scheduler = get_linear_schedule_with_warmup(
    optimizer,
    num_warmup_steps=0,
    num_training_steps=total_steps
)

loss_fn = nn.CrossEntropyLoss().to(device)

### Creating a training function

In [None]:
def train_epoch(model, dataloader, loss_fn, optimizer, device, scheduler, n_examples):
    model = model.train()

    losses = []
    correct_predictions = 0

    for d in tqdm(dataloader):
        input_ids = d['input_ids'].to(device)
        attention_mask = d['attention_mask'].to(device)
        targets = d['targets'].to(device)

        outputs = model(input_ids, attention_mask)
        _, preds = torch.max(outputs, dim=1)
        loss = loss_fn(outputs, targets)

        correct_predictions += torch.sum(preds == targets)
        losses.append(loss.item())

        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        scheduler.step()
        optimizer.zero_grad()
    
    return correct_predictions.double() / n_examples, np.mean(losses)


### Creating an evaluation function

In [None]:
def eval_model(model, dataloader, loss_fn, device, n_examples):
    model = model.eval()

    losses = []
    correct_predictions = 0

    with torch.no_grad():
        for d in dataloader:
            input_ids = d['input_ids'].to(device)
            attention_mask = d['attention_mask'].to(device)
            targets = d['targets'].to(device)

            outputs = model(input_ids, attention_mask)
            _, preds = torch.max(outputs, dim=1)
            loss = loss_fn(outputs, targets)

            correct_predictions += torch.sum(preds == targets)
            losses.append(loss.item())

    return correct_predictions.double() / n_examples, np.mean(losses)


### Importing our pre-trained Dark Pattern Spotter model

In [None]:

checkpoint = torch.load(PRETRAINED_MODEL)
model_dict = model.state_dict() 
pretrained_dict = {k: v for k, v in checkpoint.items() if k in model_dict}
model_dict.update(pretrained_dict)
model.load_state_dict(model_dict)

### Model training

In [None]:
%%time

history = defaultdict(list)
best_accuracy = 0

for epoch in range(EPOCHS):
    print(f'Epoch {epoch + 1}/{EPOCHS}')
    print('-'*10)

    train_acc, train_loss = train_epoch(model, train_dataloader, loss_fn, optimizer, device, scheduler, len(df_train))
    print(f'Train loss {train_loss} accuracy {train_acc}')

    val_acc, val_loss = eval_model(model, val_dataloader, loss_fn, device, len(df_val))
    print(f'Val   loss {val_loss} accuracy {val_acc}\n')

    history['train_acc'].append(train_acc)
    history['train_loss'].append(train_loss)
    history['val_acc'].append(val_acc)
    history['val_loss'].append(val_loss)

    if val_acc > best_accuracy:
        model_path = os.path.join(MODULE_ROOT, f"models\\{MODEL_NAME}.pth")
        torch.save(model.state_dict(), model_path)
        best_accuracy = val_acc

### Training analytics

In [None]:
history['train_acc'] = [x.cpu() for x in history['train_acc']]
history['val_acc'] = [x.cpu() for x in history['val_acc']]

In [None]:
plt.plot(history['train_acc'], label='train accuracy')
plt.plot(history['val_acc'], label='validation accuracy')
plt.title('Training history')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend()
plt.ylim([0, 1])

### Model evaluation

In [None]:
test_acc, _ = eval_model(model, test_dataloader, loss_fn, device, len(df_test))

test_acc.item()

### A helper function to get more analytics

In [None]:
def get_predictions(model, dataloader):
    model = model.eval()

    texts = []
    predictions = []
    predictions_probs = []
    real_vals = []

    with torch.no_grad():
        for d in dataloader:
            text = d["text"]
            input_ids = d['input_ids'].to(device)
            attention_mask = d['attention_mask'].to(device)
            targets = d['targets'].to(device)

            outputs = model(input_ids, attention_mask)
            _, preds = torch.max(outputs, dim=1)

            texts.extend(text)
            predictions.extend(preds)
            predictions_probs.extend(outputs)
            real_vals.extend(targets)
    
    predictions = torch.stack(predictions).cpu()
    predictions_probs = torch.stack(predictions_probs).cpu()
    real_vals = torch.stack(real_vals).cpu()

    return texts, predictions, predictions_probs, real_vals

### Model accuracy

In [None]:
y_texts, y_pred, y_pred_probs, y_test = get_predictions(model, test_dataloader)

In [None]:
# One might need to remove categories with low frequencies in the dataset
# because there might not be any results with said category in the test dataset

#class_names.remove("Forced Action")

print(classification_report(y_test, y_pred, target_names=class_names))

### Generating a confusion matrix

In [None]:
def show_confusion_matrix(confusion_matrix):
    hmap = sns.heatmap(confusion_matrix, annot=True, fmt='d', cmap='Blues')
    hmap.yaxis.set_ticklabels(hmap.yaxis.get_ticklabels(), rotation=0, ha='right')
    hmap.xaxis.set_ticklabels(hmap.xaxis.get_ticklabels(), rotation=30, ha='right')
    plt.ylabel('True Category')
    plt.xlabel('Predicted category')

cm = confusion_matrix(y_test, y_pred)
df_cm = pd.DataFrame(cm, index=class_names, columns=class_names)
show_confusion_matrix(df_cm)

### Running model on raw text

In [None]:
someText = "Some sample text"
encoded = tokenizer.encode_plus(
        someText,
        add_special_tokens=True,
        max_length=MAXLENGTH,
        return_token_type_ids=False,
        padding='max_length',
        return_attention_mask=True,
        truncation=True,
        return_tensors='pt')

input_ids = encoded['input_ids'].to(device)
attention_mask = encoded['attention_mask'].to(device)

output = model(input_ids, attention_mask)
print(torch.max(output, dim=1))