### PyTorch Fine Tune Transformers
This notebook demonstrates how to finetune a transformers model for text classification by appending layers to a base model.

In [1]:
from transformers import DistilBertTokenizerFast, DistilBertModel, BatchEncoding
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import pandas as pd
import numpy as np
import pickle
import torch

In [2]:
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
MODEL = 'distilbert-base-uncased'
MAX_LEN = 256

In [3]:
class TextDataset(torch.utils.data.Dataset):

    def __init__(self, encodings: BatchEncoding, labels: list):
        self.encodings = encodings
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        item = {
            key: torch.tensor(val[idx]) for key, val in self.encodings.items()
        }
        item["labels"] = torch.tensor(self.labels[idx], dtype=torch.float)
        return item

In [4]:
class Classifier(torch.nn.Module):
    
    MODEL_FILENAME = 'model.pb'
    
    def __init__(self, transformer, tokenizer, n, max_seq_len = MAX_LEN):
        super(Classifier, self).__init__()
        self.transformer = transformer
        self.pooling_layer = torch.nn.Linear(
            self.transformer.config.dim, self.transformer.config.dim
        )
        self.dropout_layer = torch.nn.Dropout(0.3)
        self.classifier_layer = torch.nn.Linear(
            self.transformer.config.dim, n
        )
        self.tokenizer = tokenizer
        self.to(DEVICE)
        self.n = n
        
    def save(self, path: str):
        with open(path, 'wb') as file:
            pickle.dump(self, file)
        
    def forward(self, input_ids, attention_mask):
        hidden_state = self.transformer(
            input_ids=input_ids, attention_mask=attention_mask
        )[0]
        pooled_output = hidden_state[:, 0]
        pooled_output = self.pooling_layer(pooled_output)
        pooled_output = torch.nn.ReLU()(pooled_output)
        pooled_output = self.dropout_layer(pooled_output)
        pooled_output = self.classifier_layer(pooled_output)
        
        return pooled_output
    
    def learn(
        self,
        data: torch.utils.data.Dataset,
        verbose: bool = False,
        epochs: int = 3,
        batch_size: int = 4,
        lr: float = 5e-5,
    ):
        train_loader = torch.utils.data.DataLoader(
            data, batch_size=batch_size, shuffle=True
        )
        optimizer = torch.optim.AdamW(self.parameters(), lr=lr)
        self.train()
        for i in range(epochs):
            for j, batch in enumerate(train_loader):
                optimizer.zero_grad()
                input_ids = batch["input_ids"].to(DEVICE)
                attention_mask = batch["attention_mask"].to(DEVICE)
                labels = batch["labels"].to(DEVICE)
                outputs = self(input_ids, attention_mask=attention_mask)
                loss = torch.nn.BCEWithLogitsLoss()(outputs, labels)
                if verbose and j % 5000 == 0:
                    print(f"Epoch {i+1}, Loss: {loss.item()}")
                    
                loss.backward()
                optimizer.step()
                        
    def predict(
        self,
        sequence: list,
    ) -> list:
        self.eval()
        predictions = []
        for text in sequence:
            inputs = self.tokenizer(
                text, truncation=True, padding='max_length', return_tensors="pt"
            ).to(DEVICE)
            pred = (
                self(inputs["input_ids"], inputs["attention_mask"])
                .to("cpu")
                .clone()
                .detach()
                .numpy()
                .tolist()
            )
            y = [0] * self.n
            y[pred[0].index(max(pred[0]))] = 1
            predictions.append(y)

        return predictions

In [5]:
def build_y(data, dataset):
    return data.label.apply(lambda x: [1 if y in x else 0 for y in dataset.label.unique()]).tolist()

def build_X(data, clf):
    return clf.tokenizer(
    data.text.tolist(),
    truncation=True,
    padding="max_length",
    return_tensors="pt",
)

In [6]:
df = pd.read_csv('res/bbc.csv').groupby('label').sample(10)

In [7]:
clf = Classifier(
    transformer=DistilBertModel.from_pretrained(MODEL),
    tokenizer=DistilBertTokenizerFast.from_pretrained(
        MODEL, model_max_length=MAX_LEN,
    ),
    n=len(df.label.unique())
)

Some weights of the model checkpoint at distilbert-base-uncased were not used when initializing DistilBertModel: ['vocab_projector.weight', 'vocab_transform.weight', 'vocab_layer_norm.weight', 'vocab_layer_norm.bias', 'vocab_projector.bias', 'vocab_transform.bias']
- This IS expected if you are initializing DistilBertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing DistilBertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [8]:
train_df, test_df = train_test_split(df)

In [9]:
y_train = build_y(train_df, df)
y_test = build_y(test_df, df)

In [10]:
X_train = build_X(train_df, clf)
X_test = test_df.text.tolist()

In [11]:
train = TextDataset(
    encodings=X_train,
    labels=y_train,
)

In [12]:
clf.learn(data=train)

  if sys.path[0] == '':


In [13]:
pred = clf.predict(X_test)

In [14]:
print(accuracy_score(y_test, pred))

0.8461538461538461
