## Imports

In [None]:
!pip install transformers



In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import pandas as pd
import numpy as np
import sklearn
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras 
import torch
import transformers
import seaborn as sns
import random

# Data Loading

## Loading the dataset and splitting the data into a train and test sets

In [None]:
def to_bool(col):
  for i in range(len(col)):
    if col[i] == 1:
      col[i] = True
    else:
      col[i] = False
  return col

In [None]:
df = pd.read_csv('/content/drive/MyDrive/dev/f_correct_training.csv', names=['text', 'humor'])
df['humor'] = to_bool(np.asanyarray(df['humor']))
df['humor'] = df['humor'].astype(int)

In [None]:
from sklearn.model_selection import train_test_split

RANDOM_SEED = np.random.randint(0, 1000)
print('random seed: ', RANDOM_SEED)
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)

df_train, df_test = train_test_split(df, test_size=0.25, random_state=RANDOM_SEED)
df_train.shape, df_test.shape

random seed:  99


((1001, 2), (334, 2))

In [None]:
train_text = []
train_labels = []

for txt in df_train.text:
    train_text.append(txt)
for lbl in df_train.humor:
    train_labels.append(lbl)

test_text = []
test_labels = []

for txt in df_test.text:
    test_text.append(txt)
for lbl in df_test.humor:
    test_labels.append(lbl)

# Model Creation & Setup

## The classifier consists of a:

*   Pre-Trained BERT Model
*   Dropout Layer (*p=0.1*)
*   Fully Connected Layer



In [None]:
class HumorClassifier(torch.nn.Module):

  def __init__(self, n_classes):
    super(HumorClassifier, self).__init__()
    self.bert = BertModel.from_pretrained(PRE_TRAINED_MODEL_NAME)
    self.drop = torch.nn.Dropout(p=0.1)
    self.out = torch.nn.Linear(self.bert.config.hidden_size, n_classes)
  
  def forward(self, input_ids, attention_mask):
    returned = self.bert(
        input_ids=input_ids,
        attention_mask=attention_mask
    )
    pooled_output = returned.pooler_output
    output = self.drop(pooled_output)
    return self.out(output)

In [None]:
from transformers import BertModel, BertTokenizer

PRE_TRAINED_MODEL_NAME = 'bert-base-cased'

tokenizer = BertTokenizer.from_pretrained(PRE_TRAINED_MODEL_NAME) 
model = HumorClassifier(2)

Some weights of the model checkpoint at bert-base-cased were not used when initializing BertModel: ['cls.predictions.transform.LayerNorm.weight', 'cls.seq_relationship.weight', 'cls.predictions.transform.dense.bias', 'cls.predictions.bias', 'cls.seq_relationship.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.decoder.weight']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [None]:
device = torch.device('cuda')
model = model.to(device)

## Initialization of parameters for the model and training

*  Max length of the padded input = 50
*  Batch size = 16
*  Learning rate = 2e-5
*  Bias correction = True
*  Weight decay = 0.01
*  warmup = 10% of total steps


In [None]:
from transformers import AdamW
from transformers import get_linear_schedule_with_warmup
from torch.nn import CrossEntropyLoss, NLLLoss, BCEWithLogitsLoss

MAX_LEN = 50
BATCH_SIZE = 16
EPOCHS = 6

optimizer = AdamW(model.parameters(),
                  lr = 2e-5, # This is the value Michael used.
                  eps = 1e-6, # args.adam_epsilon  - default is 1e-8.
                  correct_bias=True,
                  weight_decay=0.01
                )

total_steps = len(train_text) * EPOCHS
warmup_steps = 0.1 * total_steps

scheduler = get_linear_schedule_with_warmup(optimizer, 
                                            num_warmup_steps = warmup_steps, # Default value in run_glue.py
                                            num_training_steps = total_steps)


loss_fn = CrossEntropyLoss().to(device)

## Helper function for creating training batches according with the *dynamic approach*

1. The analyzed text is sorted by length. 
2. A number of examples are selected from a random point in the sorted array and put together as one batch.
3. Inputs within a batch are padded to the length of the longes input sequence in that batch.




In [None]:
def dynamic_batching(all_text, all_labels, batch_size):

  all_input_ids = []
  
  print('Tokenizing Text...')

  for txt in all_text:

    input_id = tokenizer.encode(text=txt,
                                add_special_tokens=True,
                                max_length=MAX_LEN,
                                truncation=True,
                                padding=False)
    all_input_ids.append(input_id)
  

  samples = sorted(zip(all_input_ids, all_labels), key=lambda x: len(x[0]))

  text_batches = []
  labels_batches = []

  print('Selecting Batches...')

  while len(samples) > 0:
    to_take = min(batch_size, len(samples))
    idx = random.randint(0, len(samples) - to_take)

    batch = samples[idx : (idx + to_take)]

    text_batches.append([x[0] for x in batch])
    labels_batches.append([x[1] for x in batch])

    del samples[idx : idx + to_take]
  

  final_input_ids = []
  final_att_masks = []
  final_labels = []

  batched_input = zip(text_batches, labels_batches) 

  print('Padding...')

  for (texts, labels) in batched_input:

    batch_padded_input_ids = []
    batch_att_masks = []

    max_size = max([len(txt) for txt in texts])

    for txt in texts:
      num_pad_tokens = max_size - len(txt)
      padded_text = txt + [tokenizer.pad_token_id] * num_pad_tokens
      att_mask = [1] * len(txt) + [0] * num_pad_tokens

      batch_padded_input_ids.append(padded_text)
      batch_att_masks.append(att_mask)
    
    final_input_ids.append(torch.tensor(batch_padded_input_ids))
    final_att_masks.append(torch.tensor(batch_att_masks))
    final_labels.append(torch.tensor(labels))
  
  print('Batches Created')
  return(final_input_ids, final_att_masks, final_labels)

# Training Loop

In [None]:
%%time

training_stats = []

for epoch in range(0, EPOCHS):
  print('-' * 20)
  print('Epoch: ', epoch+1)
  print('-' * 20)

  print('Creating training batches...')

  (input_ids, att_masks, labels) = dynamic_batching(train_text, train_labels, BATCH_SIZE)
  print('-' * 15)
  print('Trainiing on: ', len(input_ids), ' batches')

  losses = []
  correct_predictions = 0

  model.train()

  for batch in range(0, len(input_ids)):

    b_input_ids = input_ids[batch].to(device)
    b_att_mask = att_masks[batch].to(device)
    b_labels = labels[batch].to(device)

    model.zero_grad()

    logits = model(b_input_ids, b_att_mask)

    _, preds = torch.max(logits, dim=1)
    loss = loss_fn(logits, b_labels)

    correct_predictions += torch.sum(preds == b_labels).item()
    losses.append(loss.item())

    loss.backward()

    torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)

    optimizer.step()
    scheduler.step()
    optimizer.zero_grad()
  
  avg_train_loss = np.sum(losses) / len(input_ids)
  accuracy = correct_predictions / len(train_text) 
  print('avg training loss: ', avg_train_loss)
  print('accuracy: ', accuracy)

print('training done!')

--------------------
Epoch:  1
--------------------
Creating training batches...
Tokenizing Text...
Selecting Batches...
Padding...
Batches Created
---------------
Trainiing on:  63  batches
avg training loss:  0.6914909264397999
accuracy:  0.5564435564435565
--------------------
Epoch:  2
--------------------
Creating training batches...
Tokenizing Text...
Selecting Batches...
Padding...
Batches Created
---------------
Trainiing on:  63  batches
avg training loss:  0.6875141792827182
accuracy:  0.5554445554445554
--------------------
Epoch:  3
--------------------
Creating training batches...
Tokenizing Text...
Selecting Batches...
Padding...
Batches Created
---------------
Trainiing on:  63  batches
avg training loss:  0.667988291808537
accuracy:  0.5924075924075924
--------------------
Epoch:  4
--------------------
Creating training batches...
Tokenizing Text...
Selecting Batches...
Padding...
Batches Created
---------------
Trainiing on:  63  batches
avg training loss:  0.61895667

# Evaluation

In [None]:
test_inputs, test_att_masks, test_labels = dynamic_batching(test_text, test_labels, BATCH_SIZE)

model.eval()

predictions = []
true_labels = []

print('Test Set Predictions...')

for batch in range(0, len(test_inputs)):

  b_input_ids = test_inputs[batch].to(device)
  b_att_mask = test_att_masks[batch].to(device)
  b_labels = test_labels[batch].to(device)

  with torch.no_grad():
    logits = model(b_input_ids, b_att_mask)
  
  logits = torch.nn.functional.softmax(logits, dim=1)

  logits = logits.detach().cpu().numpy()
  label_ids = b_labels.to('cpu').numpy()
  
  # Store predictions and true labels
  predictions.append(logits)
  true_labels.append(label_ids)

print('Evaluation Done!')

print('-' * 20)
predictions = np.concatenate(predictions, axis=0)
true_labels = np.concatenate(true_labels, axis=0)

preds = np.argmax(predictions, axis=1).flatten()
print('Accuracy:')
print(np.sum(preds == true_labels) / len(true_labels))


Tokenizing Text...
Selecting Batches...
Padding...
Batches Created
Test Set Predictions...
Evaluation Done!
--------------------
Accuracy:
0.7934131736526946


In [None]:
from sklearn.metrics import f1_score, precision_recall_fscore_support

f1 = f1_score(true_labels, preds)
prec_rec_f1 = precision_recall_fscore_support(true_labels, preds, average='binary')

print('F1-Score: ', f1)
print(prec_rec_f1)

F1-Score:  0.8217054263565892
(0.8548387096774194, 0.7910447761194029, 0.8217054263565892, None)
