In [None]:
from time import perf_counter
from contextlib import contextmanager
from typing import Optional
import pandas as pd
from transformers import AutoModel, AutoTokenizer
import torch.nn as nn
import torch
import torch.nn.functional as F
import numpy as np
from pathlib import Path
import lightning as L
from torch.utils.data import DataLoader, Dataset
from sklearn.utils.class_weight import compute_class_weight
from torchmetrics import MetricCollection
from torchmetrics.classification import MulticlassAccuracy, MulticlassPrecision, MulticlassRecall, MulticlassF1Score
import torch.optim as optim
from lightning.pytorch.callbacks import TQDMProgressBar, ModelCheckpoint
from lightning.pytorch.loggers.tensorboard import TensorBoardLogger
from gensim.models import KeyedVectors
from torch.nn.utils.rnn import pack_padded_sequence, pad_sequence
from lightning.pytorch.tuner import Tuner

## Models training

In [None]:
@contextmanager
def measure_time() -> float:
    start = perf_counter()
    yield lambda: perf_counter() - start

In [None]:
POLISH_TRANSFORMER_MODEL_NAME = "dkleczek/bert-base-polish-cased-v1"
DATA_PATH = Path.cwd() / "data"
MODELS_PATH = Path.cwd() / "models"

In [None]:
class TransformerWrapper(nn.Module):

    def __init__(self, model_name: str = POLISH_TRANSFORMER_MODEL_NAME, start_training_layer: int = -1, num_classes: int = 2):
        super().__init__()

        self.model, model_out_channels = self._get_transformer(model_name=model_name, start_training_layer=start_training_layer)

        self.classifier = nn.Sequential(
            nn.Linear(in_features=model_out_channels, out_features=1024),
            nn.SiLU(),
            nn.Dropout(0.2),
            nn.Linear(in_features=1024, out_features=num_classes),
            nn.LogSoftmax(dim=1)
        )

    def forward(self, input_ids: torch.Tensor, attention_mask: torch.Tensor):
        pooler_output = self.model(input_ids, attention_mask=attention_mask)["pooler_output"]

        return self.classifier(pooler_output)
    
    def _get_transformer(self, model_name: str, start_training_layer: int):
        """Get pretrained Transformer model.

        Args:
            start_training_layer (int): Get number of layer from which model will be unfrozen. Pass -1 if unfreeze none of them.
        """
        model = AutoModel.from_pretrained(model_name)

        if start_training_layer == -1:
            for param in model.parameters():
                param.requires_grad = False
            return model, model.pooler.dense.out_features

        start_training_index = start_training_layer * 16

        for param in model.embeddings.parameters():
            param.requires_grad = False

        for idx, param in enumerate(model.encoder.layer.parameters()):
            param.requires_grad = False if idx < start_training_index else True

        for param in model.pooler.parameters():
            param.requires_grad = True if start_training_layer != -1 else False

        return model, model.pooler.dense.out_features

In [None]:
class TransfromerDataset(Dataset):
    def __init__(self, data_df: pd.DataFrame, target_column: str, text_column: str, model_name: str = POLISH_TRANSFORMER_MODEL_NAME):
        super().__init__()

        self.data, self.target = self._prepare_data_to_transformer(
            data_df=data_df,
            target_column=target_column,
            text_column=text_column,
            model_name=model_name
        )

        self.class_mapping = {
            class_name: idx for idx, class_name in enumerate((np.unique(self.target)))
        }

        self.num_classes = max(list(self.class_mapping.values())) + 1

    def __getitem__(self, index: int) -> tuple[torch.Tensor, torch.Tensor]:
        sample_data_input_id = torch.tensor(self.data["input_ids"][index])
        sample_data_attention_mask = torch.tensor(self.data["attention_mask"][index])
        sample_target = F.one_hot(
            torch.tensor(self.class_mapping[self.target[index]]), num_classes=self.num_classes
        ).float()

        return sample_data_input_id, sample_data_attention_mask, sample_target
    
    def _prepare_data_to_transformer(
        self, data_df: pd.DataFrame, target_column: str, text_column: str, model_name: str = POLISH_TRANSFORMER_MODEL_NAME
    ):
        tokenizer = AutoTokenizer.from_pretrained(model_name)

        data = tokenizer.batch_encode_plus(
            data_df[text_column].tolist(),
            max_length = 512,
            padding='max_length',
            truncation=True
        )

        target = data_df[target_column].tolist()

        return data, target
    
    def __len__(self) -> int:
        return len(self.target)
    
    def get_labels(self) -> list[int]:
        return [self.class_mapping[label] for label in self.target]

In [None]:
class TransformerDatasetModule(L.LightningDataModule):
    def __init__(self, *args, **kwargs):
        super().__init__()
        self.save_hyperparameters()

    def setup(self, stage: Optional[str] = None):
        self.train = TransfromerDataset(
            data_df=pd.read_csv(self.hparams.data_root / "train.csv"),
            target_column=self.hparams.target_column,
            text_column=self.hparams.text_column,
            model_name=self.hparams.model_name
        )
        self.test = TransfromerDataset(
            data_df=pd.read_csv(self.hparams.data_root / "test.csv"),
            target_column=self.hparams.target_column,
            text_column=self.hparams.text_column,
            model_name=self.hparams.model_name
        )

    def train_dataloader(self):
        return DataLoader(self.train, batch_size=self.hparams.batch_size, shuffle=True)

    def test_dataloader(self):
        return DataLoader(self.test, batch_size=self.hparams.batch_size, shuffle=False)
    
    def get_class_weights(self) -> list[float]:
        labels = self.train.get_labels()
        return torch.tensor(compute_class_weight('balanced', classes=np.unique(labels), y=labels))

In [None]:
class TransformerModule(L.LightningModule):
    def __init__(self, *args, **kwargs):
        super().__init__()
        self.save_hyperparameters()

        self.model = TransformerWrapper(
            model_name=self.hparams.model_name,
            start_training_layer=self.hparams.start_training_layer,
            num_classes=self.hparams.num_classes
        )

        metrics = MetricCollection([
            MulticlassAccuracy(self.hparams.num_classes, average=None),
            MulticlassPrecision(self.hparams.num_classes, average=None),
            MulticlassRecall(self.hparams.num_classes, average=None),
            MulticlassF1Score(self.hparams.num_classes, average=None)
        ])
        self.metrics = {
            "train": metrics.clone(prefix='train_'),
            "test": metrics.clone(prefix='test_')
        }

        self.criterion = nn.CrossEntropyLoss(weight=self.hparams.class_weights)

    def forward(self, input_ids: torch.Tensor, attention_mask: torch.Tensor):
        return self.model(input_ids, attention_mask)

    def training_step(self, batch, batch_idx):
        return self._shared_eval(batch, batch_idx, "train")

    def test_step(self, batch, batch_idx):
        return self._shared_eval(batch, batch_idx, "test")

    def _shared_eval(self, batch, batch_idx, stage):
        input_ids, attention_mask, targets = batch
        logits = self(input_ids, attention_mask)

        loss = self.criterion(logits, targets)

        self.metrics[stage].update(torch.argmax(logits, -1), torch.argmax(targets, -1))

        self.log(f"{stage}_loss", loss, on_epoch=True, on_step=True)
        return loss
    
    def on_train_epoch_end(self) -> None:
        metrics = self.metrics["train"].compute()

        for metric_name, values in metrics.items():
            for idx, value in enumerate(values):
                self.log(f"{metric_name}_class_{idx}", value, on_epoch=True)

        self.metrics["train"].reset()

    def on_test_epoch_end(self) -> None:
        metrics = self.metrics["test"].compute()

        for metric_name, values in metrics.items():
            for idx, value in enumerate(values):
                self.log(f"{metric_name}_class_{idx}", value, on_epoch=True)

        self.metrics["test"].reset()

    def configure_optimizers(self):
        optimizer = optim.AdamW(self.parameters(), lr=self.hparams.lr)
        scheduler = optim.lr_scheduler.StepLR(optimizer, 1, gamma=0.97)
        return [optimizer], [scheduler]

# Transformer Training

## Setup
- only classification head
- unfreeze last encoder layer + classification head
- unfreeze last 2 encoder layers + classification head

In [None]:
transformer_scores = {}

for start_training_layer in [-1, 9, 10]:
    datamodule = TransformerDatasetModule(
        target_column="label",
        text_column="preprocessed_text",
        batch_size=128,
        model_name=POLISH_TRANSFORMER_MODEL_NAME,
        data_root=DATA_PATH
    )
    datamodule.setup()

    model = TransformerModule(
        model_name=POLISH_TRANSFORMER_MODEL_NAME,
        num_classes=2,
        start_training_layer=start_training_layer,
        lr=2e-5,
        class_weights=datamodule.get_class_weights()
    )

    trainer = L.Trainer(
        max_epochs=30,
        accelerator="cuda" if torch.cuda.is_available() else "cpu",
        devices=1,
        callbacks=[TQDMProgressBar(refresh_rate=20)],
        logger=TensorBoardLogger(save_dir="logs/"),
        log_every_n_steps=20,
    )

    tuner = Tuner(trainer)

    # tuner.scale_batch_size(
    #     model=model,
    #     datamodule=datamodule,
    #     method="fit"
    # )

    tuner.lr_find(
        model=model,
        datamodule=datamodule,
        method="fit"
    )

    trainer.fit(model, datamodule=datamodule)
    transformer_test_scores = trainer.test(model, datamodule=datamodule)

    transformer_scores[start_training_layer] = transformer_test_scores

In [None]:
class Word2VecWrapper(nn.Module):

    def __init__(self, num_classes: int = 2):
        super().__init__()

        self.lstm = nn.LSTM(input_size=100, hidden_size=256, batch_first=True, num_layers=1, bidirectional=True, dropout=0.2)

        self.fcn = nn.Sequential(
            nn.Linear(256, 512),
            nn.SiLU(),
            nn.Linear(512, num_classes)
        )

    def forward(self, sequence):
        _, (last_hidden, _) = self.lstm(sequence)

        return self.fcn(last_hidden[-1])
    
    def _get_word2vec(self, model_path: str):
        return KeyedVectors.load_word2vec_format(model_path)

In [None]:
class LSTMModule(L.LightningModule):
    def __init__(self, *args, **kwargs):
        super().__init__()
        self.save_hyperparameters()

        self.model = Word2VecWrapper(
            num_classes=self.hparams.num_classes
        )

        metrics = MetricCollection([
            MulticlassAccuracy(self.hparams.num_classes, average=None),
            MulticlassPrecision(self.hparams.num_classes, average=None),
            MulticlassRecall(self.hparams.num_classes, average=None),
            MulticlassF1Score(self.hparams.num_classes, average=None)
        ])
        self.metrics = {
            "train": metrics.clone(prefix='train_'),
            "test": metrics.clone(prefix='test_')
        }

        self.criterion = nn.CrossEntropyLoss(weight=self.hparams.class_weights)

    def forward(self, sequence):
        return self.model(sequence)

    def training_step(self, batch, batch_idx):
        return self._shared_eval(batch, batch_idx, "train")

    def test_step(self, batch, batch_idx):
        return self._shared_eval(batch, batch_idx, "test")

    def _shared_eval(self, batch, batch_idx, stage):
        sequences, targets = batch
        batch_size = targets.shape[0]
        logits = self(sequences)

        loss = self.criterion(logits, targets)

        self.metrics[stage].update(torch.argmax(logits, -1), torch.argmax(targets, -1))

        self.log(f"{stage}_loss", loss, on_epoch=True, on_step=True, batch_size=batch_size)
        return loss
    
    def on_train_epoch_end(self) -> None:
        metrics = self.metrics["train"].compute()

        for metric_name, values in metrics.items():
            for idx, value in enumerate(values):
                self.log(f"{metric_name}_class_{idx}", value, on_epoch=True)

        self.metrics["train"].reset()

    def on_test_epoch_end(self) -> None:
        metrics = self.metrics["test"].compute()

        for metric_name, values in metrics.items():
            for idx, value in enumerate(values):
                self.log(f"{metric_name}_class_{idx}", value, on_epoch=True)

        self.metrics["test"].reset()

    def configure_optimizers(self):
        optimizer = optim.AdamW(self.parameters(), lr=self.hparams.lr)
        scheduler = optim.lr_scheduler.StepLR(optimizer, 1, gamma=0.97)
        return [optimizer], [scheduler]

In [None]:
class LSTMDataset(Dataset):
    def __init__(self, data_df: pd.DataFrame, target_column: str, text_column: str, model_path: str = "glove_100_3_polish.txt"):
        super().__init__()

        self.word2vec = KeyedVectors.load_word2vec_format(model_path)

        self.data, self.target = self._prepare_data_to_transformer(
            data_df=data_df,
            target_column=target_column,
            text_column=text_column,
        )

        self.class_mapping = {
            class_name: idx for idx, class_name in enumerate((np.unique(self.target)))
        }

        self.num_classes = max(list(self.class_mapping.values())) + 1

    def __getitem__(self, index: int) -> tuple[torch.Tensor, torch.Tensor]:
        sample_data = torch.tensor(self.data[index]).float()

        sample_target = F.one_hot(
            torch.tensor(self.class_mapping[self.target[index]]), num_classes=self.num_classes
        ).float()

        return sample_data, sample_target
    
    def _prepare_data_to_transformer(
        self, data_df: pd.DataFrame, target_column: str, text_column: str
    ):
        data = data_df[text_column].tolist()

        data = [
            element.split(" ") for element in data
        ]

        oov_embedding = np.random.random(self.word2vec.vector_size)

        data = [
            [
                self.word2vec.get_vector(word) if word in self.word2vec.key_to_index else oov_embedding for word  in words
            ] for words in data
        ]

        target = data_df[target_column].tolist()

        return data, target
    
    def __len__(self) -> int:
        return len(self.target)
    
    def get_labels(self) -> list[int]:
        return [self.class_mapping[label] for label in self.target]

In [None]:
class LSTMDatasetModule(L.LightningDataModule):
    def __init__(self, *args, **kwargs):
        super().__init__()
        self.save_hyperparameters()

    def setup(self, stage: Optional[str] = None):
        self.train = LSTMDataset(
            data_df=pd.read_csv(self.hparams.data_root / "train.csv"),
            target_column=self.hparams.target_column,
            text_column=self.hparams.text_column,
        )
        self.test = LSTMDataset(
            data_df=pd.read_csv(self.hparams.data_root / "test.csv"),
            target_column=self.hparams.target_column,
            text_column=self.hparams.text_column,
        )

    def train_dataloader(self):
        return DataLoader(self.train, batch_size=self.hparams.batch_size, shuffle=True, collate_fn=self._collate_fn)

    def test_dataloader(self):
        return DataLoader(self.test, batch_size=self.hparams.batch_size, shuffle=False, collate_fn=self._collate_fn)
    
    def _collate_fn(self, batch: list[tuple[torch.Tensor, torch.Tensor]]):
        sequences, targets = [seq for seq, _ in batch], [target for _, target in batch]
        
        lengths = [len(seq) for seq in sequences]
        
        padded_seqs = pad_sequence(sequences, batch_first=True)
        
        packed_seqs = pack_padded_sequence(padded_seqs, lengths, batch_first=True, enforce_sorted=False)

        return packed_seqs, torch.stack(targets)
    
    def get_class_weights(self) -> list[float]:
        labels = self.train.get_labels()
        return torch.tensor(compute_class_weight('balanced', classes=np.unique(labels), y=labels))

# LSTM Training

## Setup
- Word embeddings from GloVe + LSTM

In [None]:
datamodule = LSTMDatasetModule(
    target_column="label",
    text_column="preprocessed_text",
    batch_size=128,
    model_path=MODELS_PATH / "glove_100_3_polish.txt",
    data_root=DATA_PATH
)
datamodule.setup()

model = LSTMModule(
    model_path=MODELS_PATH / "glove_100_3_polish.txt",
    num_classes=2,
    lr=1e-3,
    class_weights=datamodule.get_class_weights()
)

trainer = L.Trainer(
    max_epochs=30,
    accelerator="cuda",
    callbacks=[
        TQDMProgressBar(refresh_rate=20),
    ],
    logger=TensorBoardLogger(save_dir="logs/"),
    log_every_n_steps=20,
)

tuner = Tuner(trainer)

# tuner.scale_batch_size(
#     model=model,
#     datamodule=datamodule,
#     method="fit"
# )

tuner.lr_find(
    model=model,
    datamodule=datamodule,
    method="fit"
)

trainer.fit(model, datamodule=datamodule)
lstm_test_scores = trainer.test(model, datamodule=datamodule)

## Data generation

In [None]:
from transformers import BertForMaskedLM, BertTokenizer, pipeline
from typing import Iterable, Iterator


class FillingMaskDataGenerator:
    def __init__(self) -> None:
        model = BertForMaskedLM.from_pretrained("dkleczek/bert-base-polish-uncased-v1")
        tokenizer = BertTokenizer.from_pretrained("dkleczek/bert-base-polish-uncased-v1")
        self.nlp = pipeline('fill-mask', model=model, tokenizer=tokenizer, top_k=3)

    def get_for_single(self, masked_sentence: str, n: int = 3) -> Iterator[str]:
        """Create n examples with filled mask

        Args:
            masked_sentence (str): Sentence with '[MASK]' where to fill
            n (int, optional): n examples. Defaults to 3.
        """
        yield from [result["sequence"] for result in self.nlp(masked_sentence)]
    
    def get_for_iterable(self, masked_sequences: Iterable[str], n: int = 3) -> Iterator[str]:
        for masked_sequence in masked_sequences:
            yield from self.get_for_single(masked_sentence=masked_sequence, n=n)