In [1]:
%run local_functions.py
from local_functions import *

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import re


from datasets import load_dataset
from datasets import Dataset, DatasetDict
from transformers import BertConfig, BertModel


from transformers import AutoTokenizer
from transformers import AutoModelForSequenceClassification

from sklearn.metrics import f1_score, roc_auc_score, accuracy_score
from transformers import EvalPrediction
import torch
from transformers import TrainingArguments, Trainer


from sklearn.model_selection import train_test_split

plt.style.use("dark_background")

pd.set_option("display.max_columns", 2500)
pd.set_option("display.max_rows", 50)
pd.set_option("display.max_colwidth", 50)


plt.style.use("dark_background")

%load_ext lab_black

2023-09-09 09:47:48.605953: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2023-09-09 09:47:48.757426: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


# Model

In [10]:
model = AutoModelForSequenceClassification.from_pretrained(
    "bert-base-uncased", num_labels=2, problem_type="multi_label_classification"
)

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [15]:
model

BertForSequenceClassification(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(30522, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayer(
          (attention): BertAttention(
            (self): BertSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-12,

In [23]:
import torch
import torch.nn as nn


class BertEmbeddings(nn.Module):
    def __init__(
        self, vocab_size, hidden_size, max_position_embeddings, type_vocab_size
    ):
        super(BertEmbeddings, self).__init__()
        self.word_embeddings = nn.Embedding(vocab_size, hidden_size, padding_idx=0)
        self.position_embeddings = nn.Embedding(max_position_embeddings, hidden_size)
        self.token_type_embeddings = nn.Embedding(type_vocab_size, hidden_size)
        self.LayerNorm = nn.LayerNorm(hidden_size, eps=1e-12)
        self.dropout = nn.Dropout(0.1)

    def forward(self, input_ids, token_type_ids):
        input_shape = input_ids.size()
        seq_length = input_shape[1]
        position_ids = torch.arange(
            seq_length, dtype=torch.long, device=input_ids.device
        )
        position_ids = position_ids.unsqueeze(0).expand(input_shape)

        word_embeddings = self.word_embeddings(input_ids)
        position_embeddings = self.position_embeddings(position_ids)
        token_type_embeddings = self.token_type_embeddings(token_type_ids)

        embeddings = word_embeddings + position_embeddings + token_type_embeddings
        embeddings = self.LayerNorm(embeddings)
        embeddings = self.dropout(embeddings)
        return embeddings


class BertSelfAttention(nn.Module):
    def __init__(self, hidden_size, num_attention_heads):
        super(BertSelfAttention, self).__init__()
        self.num_attention_heads = num_attention_heads
        self.attention_head_size = hidden_size // num_attention_heads
        self.all_head_size = self.num_attention_heads * self.attention_head_size

        self.query = nn.Linear(hidden_size, self.all_head_size)
        self.key = nn.Linear(hidden_size, self.all_head_size)
        self.value = nn.Linear(hidden_size, self.all_head_size)

        self.dropout = nn.Dropout(0.1)

    def transpose_for_scores(self, x):
        new_x_shape = x.size()[:-1] + (
            self.num_attention_heads,
            self.attention_head_size,
        )
        x = x.view(*new_x_shape)
        return x.permute(0, 2, 1, 3)

    def forward(self, hidden_states, attention_mask):
        mixed_query_layer = self.query(hidden_states)
        mixed_key_layer = self.key(hidden_states)
        mixed_value_layer = self.value(hidden_states)

        query_layer = self.transpose_for_scores(mixed_query_layer)
        key_layer = self.transpose_for_scores(mixed_key_layer)
        value_layer = self.transpose_for_scores(mixed_value_layer)

        attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2))
        attention_scores = attention_scores / math.sqrt(self.attention_head_size)
        attention_scores = attention_scores + attention_mask

        attention_probs = nn.Softmax(dim=-1)(attention_scores)
        attention_probs = self.dropout(attention_probs)

        context_layer = torch.matmul(attention_probs, value_layer)
        context_layer = (
            context_layer.permute(0, 2, 1, 3).contiguous().view(hidden_states.size())
        )
        return context_layer


class BertIntermediate(nn.Module):
    def __init__(self, hidden_size, intermediate_size):
        super(BertIntermediate, self).__init__()
        self.dense = nn.Linear(hidden_size, intermediate_size)
        self.intermediate_act_fn = nn.GELU()

    def forward(self, hidden_states):
        intermediate_output = self.dense(hidden_states)
        intermediate_output = self.intermediate_act_fn(intermediate_output)
        return intermediate_output


class BertOutput(nn.Module):
    def __init__(self, intermediate_size, hidden_size):
        super(BertOutput, self).__init__()
        self.dense = nn.Linear(intermediate_size, hidden_size)
        self.LayerNorm = nn.LayerNorm(hidden_size, eps=1e-12)
        self.dropout = nn.Dropout(0.1)

    def forward(self, intermediate_output, hidden_states):
        hidden_states = self.dense(intermediate_output)
        hidden_states = self.dropout(hidden_states)
        hidden_states = self.LayerNorm(hidden_states + hidden_states)
        return hidden_states


class BertLayer(nn.Module):
    def __init__(self, hidden_size, num_attention_heads, intermediate_size):
        super(BertLayer, self).__init__()
        self.attention = BertSelfAttention(hidden_size, num_attention_heads)
        self.intermediate = BertIntermediate(hidden_size, intermediate_size)
        self.output = BertOutput(intermediate_size, hidden_size)

    def forward(self, hidden_states, attention_mask):
        attention_output = self.attention(hidden_states, attention_mask)
        intermediate_output = self.intermediate(attention_output)
        layer_output = self.output(intermediate_output, attention_output)
        return layer_output


class BertEncoder(nn.Module):
    def __init__(
        self, num_hidden_layers, hidden_size, num_attention_heads, intermediate_size
    ):
        super(BertEncoder, self).__init__()
        layer = BertLayer(hidden_size, num_attention_heads, intermediate_size)
        self.layer = nn.ModuleList([layer for _ in range(num_hidden_layers)])

    def forward(self, hidden_states, attention_mask):
        all_hidden_states = ()
        all_attentions = ()
        for layer_module in self.layer:
            hidden_states = layer_module(hidden_states, attention_mask)
            all_hidden_states = all_hidden_states + (hidden_states,)
        return all_hidden_states, all_attentions


class BertPooler(nn.Module):
    def __init__(self, hidden_size):
        super(BertPooler, self).__init__()
        self.dense = nn.Linear(hidden_size, hidden_size)
        self.activation = nn.Tanh()

    def forward(self, hidden_states):
        first_token_tensor = hidden_states[:, 0]
        pooled_output = self.dense(first_token_tensor)
        pooled_output = self.activation(pooled_output)
        return pooled_output


class MyBertForSequenceClassification(nn.Module):
    def __init__(self, num_classes):
        super(MyBertForSequenceClassification, self).__init__()

        self.embeddings = BertEmbeddings(30522, 768, 512, 2)
        self.encoder = BertEncoder(12, 768, 12, 3072)
        self.pooler = BertPooler(768)
        self.classifier = nn.Sequential(nn.Linear(768, num_classes))
        self.dropout = nn.Dropout(0.1)

        if id2label is not None and label2id is not None:
            self.id2label = id2label
            self.label2id = label2id

    def forward(self, input_ids, token_type_ids, attention_mask):
        embedding_output = self.embeddings(input_ids, token_type_ids)
        all_hidden_states, _ = self.encoder(embedding_output, attention_mask)
        sequence_output = all_hidden_states[-1]
        pooled_output = self.pooler(sequence_output)
        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)
        return logits

In [25]:
# Create an instance of the model


MyBertForSequenceClassification(
  (embeddings): BertEmbeddings(
    (word_embeddings): Embedding(30522, 768, padding_idx=0)
    (position_embeddings): Embedding(512, 768)
    (token_type_embeddings): Embedding(2, 768)
    (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
    (dropout): Dropout(p=0.1, inplace=False)
  )
  (encoder): BertEncoder(
    (layer): ModuleList(
      (0-11): 12 x BertLayer(
        (attention): BertSelfAttention(
          (query): Linear(in_features=768, out_features=768, bias=True)
          (key): Linear(in_features=768, out_features=768, bias=True)
          (value): Linear(in_features=768, out_features=768, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
        )
        (intermediate): BertIntermediate(
          (dense): Linear(in_features=768, out_features=3072, bias=True)
          (intermediate_act_fn): GELU(approximate='none')
        )
        (output): BertOutput(
          (dense): Linear(in_features=3072, out_feat

# Other

In [16]:
df = pd.read_csv("datasets/wiki_movie_plots_deduped.csv")
df = df[["Plot", "Genre"]]
df = df.loc[df.Genre != "unknown"].reset_index(drop=True)
df["text"] = df["Plot"].apply(text_normalization_2)
df.drop("Plot", axis=1, inplace=True)
df = df[["text", "Genre"]]
# retain rows with top 100 genres
top_100_genres = df["Genre"].value_counts().head(100).index.tolist()
df = df.loc[df["Genre"].isin(top_100_genres)].reset_index(drop=True)

In [17]:
training_df, testing_df = train_test_split(df, test_size=0.25, random_state=42)

In [20]:
encoded_df = pd.get_dummies(
    training_df["Genre"], columns=["Genre"], prefix="", prefix_sep=""
)
encoded_df = encoded_df.astype(bool)
encoded_df_con = pd.concat([training_df["text"], encoded_df], axis=1)

train_df, temp_df = train_test_split(encoded_df_con, test_size=0.3, random_state=42)
valid_df, test_df = train_test_split(temp_df, test_size=0.5, random_state=42)

# Convert the split DataFrames into Datasets
train = Dataset.from_pandas(train_df, split="train")
valid = Dataset.from_pandas(valid_df, split="validation")
test = Dataset.from_pandas(test_df, split="test")

dataset = DatasetDict({"train": train, "validation": valid, "test": test})

labels = [
    label
    for label in dataset["train"].features.keys()
    if label not in ["text", "__index_level_0__"]
]
id2label = {idx: label for idx, label in enumerate(labels)}
label2id = {label: idx for idx, label in enumerate(labels)}

In [22]:
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")


def preprocess_data(examples):
    # take a batch of texts
    text = examples["text"]
    # encode them
    encoding = tokenizer(text, padding="max_length", truncation=True, max_length=256)
    # add labels
    labels_batch = {k: examples[k] for k in examples.keys() if k in labels}
    # create numpy array of shape (batch_size, num_labels)
    labels_matrix = np.zeros((len(text), len(labels)))
    # fill numpy array
    for idx, label in enumerate(labels):
        labels_matrix[:, idx] = labels_batch[label]

    encoding["labels"] = labels_matrix.tolist()

    return encoding


encoded_dataset = dataset.map(
    preprocess_data, batched=True, remove_columns=dataset["train"].column_names
)

encoded_dataset.set_format("torch")

Map:   0%|          | 0/12770 [00:00<?, ? examples/s]

Map:   0%|          | 0/2737 [00:00<?, ? examples/s]

Map:   0%|          | 0/2737 [00:00<?, ? examples/s]

# More

In [28]:
model_1 = MyBertForSequenceClassification(
    num_classes=len(labels),
    id2label=id2label,
    label2id=label2id,
)

TypeError: MyBertForSequenceClassification.__init__() got an unexpected keyword argument 'id2label'