In [None]:
# BERT stuff

"""
NOTE: Implementation inspired by Hugging Face's tutorial: 
https://curiousily.com/posts/sentiment-analysis-with-bert-and-hugging-face-using-pytorch-and-python/
"""
!pip install -q tensorflow-text
!pip install -q tf-models-official
!pip install transformers==3

import transformers
from transformers import BertModel, BertTokenizer, AdamW, get_linear_schedule_with_warmup
import torch

import shutil

import tensorflow as tf
import tensorflow_hub as hub
import tensorflow_text as text
from official.nlp import optimization  # to create AdamW optmizer

tf.get_logger().setLevel('ERROR')

import numpy as np
import pandas as pd
import seaborn as sns
from pylab import rcParams
import matplotlib.pyplot as plt
from matplotlib import rc
from matplotlib.pyplot import figure
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
from collections import defaultdict
from textwrap import wrap
from torch import nn, optim
from torch.utils.data import Dataset, DataLoader
import re
import seaborn as sn
from sklearn.preprocessing import LabelBinarizer
from tqdm.notebook import tqdm

RANDOM_SEED = 1
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
%cd drive/MyDrive/Colab

## Processing the data 


In [None]:
# import the dataset

# Path Kodjo
# path = '/home/anselme/Desktop/Etude/MVA_S1/DL/MVA_DL/Data Mining'

dataset = pd.read_excel('dataset.xlsx', skiprows=0)

In [None]:
dataset['Class'].value_counts().plot(kind='bar')
plt.show()

In [None]:
# Dataset shuffle occurs here

from sklearn.utils import shuffle
dataset = shuffle(dataset)

dataset

In [None]:
# Clean lyrics and display their lengths
PRE_TRAINED_MODEL_NAME = 'bert-base-cased'
tokenizer = BertTokenizer.from_pretrained(PRE_TRAINED_MODEL_NAME)

for x in range(dataset.shape[0]):

    # Remove all elements between [] and clean up some more
    new_string = re.sub("\[.*?\]", "", dataset['Lyrics'][x])
    new_string = new_string.replace('\n', ' ')
    new_string = new_string.replace('[', '')
    new_string = new_string.replace(']', '')

    dataset['Lyrics'][x] = new_string

token_lens = []

# Plot token lengths
for x in range(dataset.shape[0]):

    # Max length determined by plotting
    tokens = tokenizer.encode(dataset['Lyrics'][x])
    token_lens.append(len(tokens))

print(max(token_lens))

sns_plot = sns.distplot(token_lens)
fig = sns_plot.get_figure()
fig.savefig('dist.eps', dpi=300) 
plt.xlabel('Token count')

In [None]:
dataset

In [None]:
# Import BERT tokenization elements
PRE_TRAINED_MODEL_NAME = 'bert-base-cased'
tokenizer = BertTokenizer.from_pretrained(PRE_TRAINED_MODEL_NAME)
MAX_LEN = 512

In [None]:
# Class for data set
alphabetical = []

for el in dataset['Class']:
    if (el not in alphabetical):
        alphabetical.append(el)

alphabetical = sorted(alphabetical)

print(alphabetical)

for x in range(len(dataset['Class'])):
    for y in range(len(alphabetical)):
        if (dataset['Class'][x] == alphabetical[y]):
            dataset['Class'][x] = y

# Binarizes the mood labels
encoder = LabelBinarizer()
moods = encoder.fit_transform(dataset['Class'].tolist())

print(moods)

# Data set for BERT created here !
class LyricsDataset(Dataset):

  def __init__(self, lyrics, targets, tokenizer, max_len):

      self.lyrics     = lyrics
      self.targets    = targets
      self.tokenizer  = tokenizer
      self.max_len    = max_len

  def __len__(self):
      return len(self.lyrics)

  def __getitem__(self, item):

      lyric = str(self.lyrics[item])
      target = self.targets[item]

      encoding = self.tokenizer.encode_plus(
          lyric,
          add_special_tokens=True,
          max_length=self.max_len,
          return_token_type_ids=False,
          padding='max_length',
          truncation = True,
          return_attention_mask=True,
          return_tensors='pt',
      )

      return {
          'lyric_text': lyric,
          'input_ids': encoding['input_ids'].flatten(),
          'attention_mask': encoding['attention_mask'].flatten(),
          'targets': torch.tensor(target, dtype=torch.long)
      }

In [None]:
# Split data set (80/10/10)
df_train, df_test = train_test_split(
  dataset,
  test_size=0.2,
  random_state=RANDOM_SEED
)

df_val, df_test = train_test_split(
  df_test,
  test_size=0.5,
  random_state=RANDOM_SEED
)

df_train.shape, df_val.shape, df_test.shape

In [None]:
# Prepare data for BERT

encoder = LabelBinarizer()
moods = encoder.fit_transform(dataset['Class'].tolist())

def create_data_loader(df, tokenizer, max_len, batch_size):

    ds = LyricsDataset(
        lyrics    = df.Lyrics.to_numpy(),
        targets   = encoder.fit_transform(df['Class'].tolist()),
        tokenizer = tokenizer,
        max_len   = max_len
    )

    return DataLoader(
      ds,
      batch_size=batch_size,
      num_workers=4
    )

BATCH_SIZE = 12

train_data_loader = create_data_loader(df_train, tokenizer, MAX_LEN, BATCH_SIZE)
val_data_loader   = create_data_loader(df_val, tokenizer, MAX_LEN, BATCH_SIZE)
test_data_loader  = create_data_loader(df_test, tokenizer, MAX_LEN, BATCH_SIZE)

In [None]:
data = next(iter(train_data_loader))

print(data['input_ids'].shape)
print(data['attention_mask'].shape)
print(data['targets'].shape)

In [None]:
# BERT from pre-trained architecture with frop-out and linear layer
class LyricClassifier(nn.Module):

    def __init__(self, n_classes):
        super(LyricClassifier, self).__init__()
        self.bert = BertModel.from_pretrained(PRE_TRAINED_MODEL_NAME)
        self.drop = nn.Dropout(p=0.3)
        self.out = nn.Linear(self.bert.config.hidden_size, n_classes)

    def forward(self, input_ids, attention_mask):
        _, pooled_output = self.bert(
            input_ids=input_ids,
            attention_mask=attention_mask
        )
        output = self.drop(pooled_output)

        return self.out(output)

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

model = LyricClassifier(len(alphabetical))
model = model.to(device)

In [None]:
input_ids = data['input_ids'].to(device)
attention_mask = data['attention_mask'].to(device)

print(input_ids.shape) # batch size x seq length
print(attention_mask.shape) # batch size x seq length

In [None]:
EPOCHS = 5

# Create optmizer and its schedule and loss
optimizer = AdamW(model.parameters(), lr=2e-5, correct_bias=False)

total_steps = len(train_data_loader) * EPOCHS

scheduler = get_linear_schedule_with_warmup(
  optimizer,
  num_warmup_steps=0,
  num_training_steps=total_steps
)

loss_fn = nn.CrossEntropyLoss().to(device)

In [None]:
# Function for training network
def train_epoch(
    model,
    data_loader,
    loss_fn,
    optimizer,
    device,
    scheduler,
    n_examples,
    epoch
):
    model = model.train()
    losses = []
    correct_predictions = 0

    progress_bar = tqdm(data_loader, desc='Epoch {:1d}'.format(epoch), leave=False, disable=False)

    for d in progress_bar:
        input_ids = d["input_ids"].to(device)
        attention_mask = d["attention_mask"].to(device)
        targets = d["targets"].to(device)

        outputs = model(
          input_ids=input_ids,
          attention_mask=attention_mask
        )

        _, preds = torch.max(outputs, dim=1)

        targets_1d = torch.argmax(targets, 1)

        loss = loss_fn(outputs, targets_1d)
        correct_predictions += torch.sum(preds == targets_1d)
        losses.append(loss.item())
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        scheduler.step()
        optimizer.zero_grad()

        progress_bar.set_postfix({'training_loss': '{:.3f}'.format(loss.item()/len(d))})

    return correct_predictions.double() / n_examples, np.mean(losses)

In [None]:
# Function for evaluating network
def eval_model(model, data_loader, loss_fn, device, n_examples):
    model = model.eval()
    losses = []
    correct_predictions = 0

    with torch.no_grad():

        progress_bar = tqdm(data_loader, desc='Epoch {:1d}'.format(epoch), leave=False, disable=False)

        for d in progress_bar:

          input_ids = d["input_ids"].to(device)
          attention_mask = d["attention_mask"].to(device)
          targets = d["targets"].to(device)

          outputs = model(
            input_ids=input_ids,
            attention_mask=attention_mask
          )

          targets_1d = torch.argmax(targets, 1)

          _, preds = torch.max(outputs, dim=1)
          loss = loss_fn(outputs, targets_1d)
          correct_predictions += torch.sum(preds == targets_1d)
          losses.append(loss.item())

          progress_bar.set_postfix({'validation_loss': '{:.3f}'.format(loss.item()/len(d))})
          
    return correct_predictions.double() / n_examples, np.mean(losses)

In [None]:
%%time

torch.cuda.empty_cache()

history = defaultdict(list)
best_accuracy = 0

for epoch in range(EPOCHS):
    print(f'Epoch {epoch + 1}/{EPOCHS}')
    print('-' * 10)

    train_acc, train_loss = train_epoch(
      model,
      train_data_loader,
      loss_fn,
      optimizer,
      device,
      scheduler,
      len(df_train),
      epoch
    )

    print(f'Train loss {train_loss} accuracy {train_acc}')

    val_acc, val_loss = eval_model(
      model,
      val_data_loader,
      loss_fn,
      device,
      len(df_val)
    )

    print(f'Val   loss {val_loss} accuracy {val_acc}')
    print()

    history['train_acc'].append(train_acc)
    history['train_loss'].append(train_loss)
    history['val_acc'].append(val_acc)
    history['val_loss'].append(val_loss)

    if val_acc > best_accuracy:
        torch.save(model.state_dict(), 'best_model_state.bin')
        best_accuracy = val_acc

In [None]:
plt.plot(history['train_acc'], label='train accuracy')
plt.plot(history['val_acc'], label='validation accuracy')
plt.title('BERT Training History')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend()
plt.ylim([0, 1]);

In [None]:
test_acc, _ = eval_model(
    model,
    test_data_loader,
    loss_fn,
    device,
    len(df_test)
)

test_acc.item()

In [None]:
# Function to get predictions for more in-depth resulst
def get_predictions(model, data_loader):
    model = model.eval()
    review_texts = []
    predictions = []
    prediction_probs = []
    real_values = []

    with torch.no_grad():
        for d in data_loader:
            texts = d["lyric_text"]
            input_ids = d["input_ids"].to(device)
            attention_mask = d["attention_mask"].to(device)
            targets = d["targets"].to(device)

            outputs = model(
              input_ids=input_ids,
              attention_mask=attention_mask
            )

            _, preds = torch.max(outputs, dim=1)

            targets_1d = torch.argmax(targets, 1)

            review_texts.extend(texts)
            predictions.extend(preds)
            prediction_probs.extend(outputs)
            real_values.extend(targets_1d)

    predictions = torch.stack(predictions).cpu()
    prediction_probs = torch.stack(prediction_probs).cpu()
    real_values = torch.stack(real_values).cpu()

    return review_texts, predictions, prediction_probs, real_values

In [None]:
y_review_texts, y_pred, y_pred_probs, y_test = get_predictions(
  model,
  test_data_loader
)

In [None]:
print(classification_report(y_test, y_pred, target_names=alphabetical))

In [None]:
def show_confusion_matrix(confusion_matrix):
    hmap = sns.heatmap(confusion_matrix, annot=True, fmt="d", cmap="Blues")
    hmap.yaxis.set_ticklabels(hmap.yaxis.get_ticklabels(), rotation=0, ha='right')
    hmap.xaxis.set_ticklabels(hmap.xaxis.get_ticklabels(), rotation=30, ha='right')
    plt.ylabel('True sentiment')
    plt.xlabel('Predicted sentiment');

cm = confusion_matrix(y_test, y_pred)

# Formats and displays the confusion matrix
figure(num=None, figsize=(5, 5), dpi=300)
df_cm = pd.DataFrame(cm, index=alphabetical, columns=alphabetical)
sn.heatmap(df_cm, annot=True, annot_kws={"size": 10}, cmap=plt.cm.Blues, fmt='g', cbar=False)
plt.title('BERT Confusion Matrix', fontsize=9)
plt.xlabel("Predicted Class", fontsize=9)
plt.ylabel("Actual Class", fontsize=9)
plt.show()