# Project 2.2: Sentiment classification with MLPs

System imports to skip silly warnings:

In [1]:
import os; os.environ["PYTORCHINDUCTOR_LOGLEVEL"] = "ERROR"
import warnings; warnings.simplefilter(action = "ignore", category = UserWarning)

Standard `python` imports:

In [2]:
import argparse
from collections import Counter
from functools import wraps
import json
import math
from pathlib import Path
import random
import re
import string
from typing import Callable, Iterable, Literal, Self

Useful only when running notebook locally:

In [3]:
from rich import print
from rich.progress import Progress, track

Data and numerical libraries. Note that we set `torch` to use `cuda`.

In [4]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import sklearn.metrics
import sklearn.utils
import torch; torch.set_default_device("cuda")

`nltk` specific import and initialization:

In [None]:
import nltk
import nltk.stem
import nltk.corpus

[nltk_data] Downloading package wordnet to
[nltk_data]     /home/terraformer/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /home/terraformer/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [None]:
!unzip -o /usr/share/nltk_data/corpora/wordnet.zip   -d /usr/share/nltk_data/corpora/
!unzip -o /usr/share/nltk_data/corpora/stopwords.zip -d /usr/share/nltk_data/corpora/

Create a `cache` folder for caching results:

In [None]:
cache_path = Path.cwd() / "cache"; cache_path.parent.mkdir(
	parents = True,
	exist_ok = True,
)

## Utilities

Some utility functions that are needed here and there and did not bother encapsulating object-oriented style.

Fix seed across numerical libraries for reproducibility:

In [None]:
def fix_seed(seed: int = 42):
	random.seed(seed)

	np.random.seed(seed)
	sklearn.utils.check_random_state(seed)

	torch.manual_seed(seed)
	torch.cuda.manual_seed(seed)
	torch.cuda.manual_seed_all(seed)  # if using multi-GPU

	torch.backends.cudnn.deterministic = True
	torch.backends.cudnn.benchmark = False

Decorator to make whichever method returning a serializable output cachable. It is a quick and dirty cache for a single pass for this project.

In [None]:
def preload(method):
	cache_path = Path.cwd() / "cache"; cache_path.parent.mkdir(parents = True, exist_ok = True)
	cache_file = cache_path / method.__name__; cache_file = cache_file.with_suffix(".pt")

	@wraps(method)
	def wrapper(self, *args, **kwargs):
		if cache_file.is_file():
			with cache_file.open("rb") as f:
				return torch.load(f)

		result = method(self, *args, **kwargs)

		with cache_file.open("w+b") as f:
			torch.save(result, f)

		return result

	return wrapper

## Preprocessing

Utilities focused on preprocessing text in the data according to the task given (sentiment classification on messages).

We have Twitter messages so remove:

- `@` mentions
- `#` hashtags
- emails
- punctuation

We also lower the case.

In [None]:
class Preprocessor:

	def __call__(self, text: str) -> str:
		text = re.sub(r"@\w+"   , "", text)  # remove mentions
		text = re.sub(r"#\w+"   , "", text)  # remove hashtags
		text = re.sub(r"\S+@\S+", "", text)  # remove emails

		return text.translate(str.maketrans("", "", string.punctuation))  # remove punctuation

Tokenize text. This is were we lower the case. It happens in two places for berbosity and safety. Tokenization accounts for english stopwords as messages are in english.

In [None]:
class Tokenizer:

	def __init__(self):
		self.stopwords = set(nltk.corpus.stopwords.words("english"))
		self.lemmatizer = nltk.WordNetLemmatizer()
		self.stemmer = nltk.stem.PorterStemmer()
		self.tokenizer = nltk.tokenize.TweetTokenizer(
			preserve_case = False,  # ignore case (lower it)
			reduce_len = True,  # reduce repeated characters (e.g. "sooo" -> "so")
			strip_handles = True,  # remove @mentions also happening in preprocessing
		)

	def __call__(self, text: str):
		tokens = []

		for token in self.tokenizer.tokenize(text):
			token = token.lower()

			if token and not token.isdigit() and token not in self.stopwords:
				token = self.lemmatizer.lemmatize(token)
				token = self.stemmer.stem(token)

				tokens.append(token)

		return tokens

Both are callable so we can stack them in one preprocessing callable:

In [None]:
def preprocess_and_tokenize(text: str) -> list[str]:
	text = Preprocessor()(text)
	tokens = Tokenizer()(text)

	return tokens

## Data

What follows are utilities that help us import and process incoming data for the actual computation layers of the pipeline.

The following is a word index augmented with padding and unknown tokens at `0` and `1` respectively. Calling it on an iterable of tokens will return a sequence of their indices.

In [None]:
class Vocabulary(dict[str, int]):

	def __init__(self, word2idx: dict[str, int], *,
		pad_token = "<pad>",
		unk_token = "<unk>",
	):
		super().__init__(word2idx)

		self.pad_token = pad_token
		self.unk_token = unk_token

		self.pad_idx = self.get(pad_token, 0)
		self.unk_idx = self.get(unk_token, 1)

	def __call__(self, tokens: list[str]) -> list[int]:
		return [self.get(token, self.unk_idx) for token in tokens]

The following is a callable to translate incoming text into token indices to be fed to an embedding layer (to follow). It uses a vocabulary (word index) given (regardless where it came from or how it was built) to procude the indices. It applies the preprocessing and tokenization to the text. Lastly it trims or pads the output index sequence by a fixed length if given.

In [None]:
class TextTransform:

	def __init__(self,
		vocabulary: Vocabulary,
		preprocessor: Callable | None = None,
		tokenizer: Callable | None = None,
		max_len: int | None = None,
	):
		self.vocabulary = vocabulary
		self.preprocessor = preprocessor
		self.tokenizer = tokenizer
		self.max_len = max_len

	def __call__(self, text: str) -> torch.Tensor:
		if self.preprocessor: text = self.preprocessor(text)
		if self.tokenizer: tokens = self.tokenizer(text)
		else: tokens = text.split()

		indices = self.vocabulary(tokens)

		if self.max_len is not None:
			if len(indices) < self.max_len: indices += [self.vocabulary.pad_idx] * (self.max_len - len(indices))
			else: indices = indices[:self.max_len]

		return torch.tensor(indices,
			dtype = torch.long,
		)


This is a custom dataset object tailored to work with `torch` data loaders. We instantiate it based on desired split for simplicity.

In [None]:
class TwitterDataset(torch.utils.data.Dataset):

	def __init__(self, split: Literal["train", "val", "test"], *,
		transform: Callable,
	):
		self.data = self.load_data(split).reset_index()
		self.transform = transform

	def __len__(self) -> int:
		return len(self.data)

	def __getitem__(self, idx: int | slice):
		return self.transform(self.data.Text[idx]), torch.tensor(self.data.Label[idx],
			dtype = torch.float,
		)  # return tensor of token indices and label


	@classmethod
	def load_data(cls, split: Literal[
			"train",
			"val",
			"test",
		],
		root: str = "/kaggle/input/ai-2-dl-for-nlp-2025-homework-2",
		index: str = "ID",
	):
		return pd.read_csv(Path(root) / f"{split}_dataset.csv",
			index_col = index,  # Set ID column as index
			encoding = "utf-8",
		)

## Pipeline

The various steps from messages to sentiment.

### Embedding layer

The following is a custom embedding layer made to:

- load `glove` embeddings (of our chosen dimension) into `word2vec` format
- extract a vocabulary (word index) and a sequence of vectors from the `word2vec` we made
- create a pretrained embedding layer for our pipeline with the weights matching text based on the word index and vector sequence we got from the `word2vec` we made

Note that we statically store the word index in case we later edit it (for example sort it based on term frequency in a corpus). This is a dangerous practice and maybe can be done in a better way, but eventually we do not use `prune_with_frequencies` so we are not worried.

In [None]:
class Embedding(torch.nn.Embedding):

	word2idx: Vocabulary  # word to index mapping


	@classmethod
	def from_glove(cls,
		embedding_dim: Literal[50, 100, 200, 300] = 50,
		freeze: bool = False,
		pad_token: str = "<pad>",
		unk_token: str = "<unk>",
	**kwargs) -> Self:
		source_path: Path = Path("embeddings") / f"glove.6B.{embedding_dim}d.txt"
		target_path: Path = source_path.with_suffix(".word2vec.txt")

		cls.convert_glove_to_word2vec(source_path, target_path)

		word2idx, tensor = cls.load_word2vec_format(target_path)

		# Insert special tokens:
		pad_vector = torch.zeros(tensor.shape[1], device = tensor.device)
		unk_vector = torch.randn(tensor.shape[1], device = tensor.device) * 0.1  # smaller variance

		# Rebuild mapping with special tokens:
		word2idx = {
			pad_token: 0,
			unk_token: 1,
		**{word: idx + 2 for word, idx in word2idx.items()}}

		# Stack special vectors:
		tensor = torch.vstack(
			[
				pad_vector,
				unk_vector,
			tensor]
		)

		self = cls.from_pretrained(tensor,
			freeze = freeze,
		**kwargs)
		self.word2idx = Vocabulary(word2idx,
			pad_token = pad_token,
			unk_token = unk_token,
		)

		return self

	@classmethod
	def convert_glove_to_word2vec(cls,
		source_path: Path,
		target_path: Path | None = None,
	):
		with open(source_path, "r+", encoding="utf-8") as source_file:
			lines = source_file.readlines()  # read all lines

		num_tokens, embedding_dim = len(lines), len(lines[0].strip().split()) - 1  # count vocabulary size and embeddings dimension

		with open(target_path if target_path is not None else source_path, "w+", encoding="utf-8") as target_file:
			target_file.write(f"{num_tokens} {embedding_dim}\n")  # write the `word2vec` header

			for line in track(lines, "converting glove to word2vec".ljust(32), num_tokens):
				target_file.write(line)  # copy the rest of the lines

	@classmethod
	def load_word2vec_format(cls, word2vec_path: Path) -> tuple[Vocabulary, torch.Tensor]:
		with open(word2vec_path, "r+", encoding="utf-8") as word2vec_file:
			num_tokens, embedding_dim = map(int, word2vec_file.readline().strip().split())

			word2idx = {}  # word to index mapping
			vectors = torch.zeros(num_tokens, embedding_dim)  # preallocate tensor for word vectors

			for index, line in track(enumerate(word2vec_file), "get embeddings from word2vec".ljust(32), num_tokens):
				word, *vec = line.strip().split()  # split word and vector
				vec = torch.tensor(list(map(float, vec)))  # convert vector to tensor

				word2idx[word] = index  # map word to index
				vectors[index] = vec  # assign vector to the corresponding index

		return Vocabulary(word2idx), vectors


	def index(self, key: str | Iterable[str]) -> int | list[int]:
		return [self.word2idx[item] for item in key] if isinstance(key, Iterable) else self.word2idx[key]

	def get(self, key: str | Iterable[str]) -> torch.Tensor:
		return self.weight[self.index(key)]

	def prune_with_frequencies(self,
		frequencies: Counter[str],
		min_frequency: int = 1,
		max_vocab_size: int | None = None,
		pad_token: str = "<pad>",
		unk_token: str = "<unk>",
	) -> Self:
		tokens = [token for token, frequency in frequencies.items() if frequency >= min_frequency and token in self.word2idx]

	# 	Sort tokens by frequency (desc), then alphabetically (asc):
		tokens.sort(
			key = lambda token: (-frequencies[token], token)
		)

	#	Apply vocab size cap:
		tokens = tokens[:max_vocab_size]

	#	Build pruned token list:
		all_tokens = [
			pad_token,
			unk_token,
		] + tokens

		word2idx = {token: idx for idx, token in enumerate(all_tokens)}

	#	Build new weight matrix:
		embedding_dim = self.embedding_dim
		new_weights = torch.zeros(len(all_tokens), embedding_dim)

		for token, new_idx in word2idx.items():
			if   token == pad_token: continue
			elif token == unk_token: new_weights[new_idx] = torch.randn(embedding_dim) * 0.1
			else:
				old_idx = self.word2idx[token]
				new_weights[new_idx] = self.weight[old_idx]

	#	Create new embedding layer with new weights:
		new_embedding = self.from_pretrained(new_weights)
		new_embedding.word2idx = Vocabulary(word2idx,
			pad_token = pad_token,
			unk_token = unk_token,
		)

		return new_embedding

### Compount (hidden) layer

In creating a basic deep feed-forward network, it is customary to:

- apply activation on each layer otherwise it is not deep learning we are doing (deep linear networks reduce to flat linear regression)
- apply dropout for statistical regularization (create random variances of the model during training instead of explicitely training various models)

Notice the choices we make about activation:

- We decided to use `SiLU`, which is a smooth variant of `ReLU` with negative slope near the switch point, shown to perform slightly better.
- We apply it as preactivation, meaning activation precedes the layer. This makes no difference for the hidden layers, but only for the input and output layers. That way we apply activation on the input embeddings (it is not necessary, but it is not harmful either) and the output layer is treated as a logit anyways, so we do not mind if the output is not non-negative (strictly speaking even with `SiLU` it would be slightly negative).

Dropout is also applied before, therefore we randomly dropout input features too, which is desireable to avoid memorizing input data and create small data variances during training.

This combound layer is intended to be repeated as many times as we like as a hidden layer. 

In [None]:
class TwitterLayer(torch.nn.Sequential):

	def __init__(self,
		inputs_dim: int,
		output_dim: int | None = None,
		dropout: float = 0.5,
	):
		super().__init__(
			torch.nn.SiLU(),
			torch.nn.Dropout(
				p = dropout,
			),
			torch.nn.Linear(inputs_dim, output_dim or inputs_dim),
		)

### Model

This is the model we use for the task. It consists of:

- an embedding layer
- an input layer mapping to a fixed `hidden_dim` dimension
- several (up to a given number) hidden layers of fixed `hidden_dim` dimension
- an output layer with one dimension which is enough for a `0` or (to) `1` classification (or probability/logistic regression)

Notice that the embedding layer uses average pooling of the word embeddings it extracts to form a sentence embedding for the enicoming message, before feeding it to the neural network.

In [None]:
class TwitterModel(torch.nn.Module):

	def __init__(self, embedding: Embedding,
		hidden_dim: int | list[int] = 100,
		num_layers: int = 2,  # ignored if hidden_dim is a list
		dropout: float = 0.5,
	):
		super().__init__()

		self.embedding = embedding

		self.input_dim = self.embedding.embedding_dim
		self.output_dim = 1  # binary classification (positive/negative)

		if isinstance(hidden_dim, int):
			hidden_dim = [hidden_dim] * num_layers

		layer_dims = [self.input_dim] + hidden_dim + [self.output_dim]  # input and output dimensions along with hidden dimensions

		self.model = torch.nn.Sequential(
			*(
				TwitterLayer(
					inputs_dim = inputs_dim,
					output_dim = output_dim, dropout = dropout
				) for inputs_dim, output_dim in zip(
					layer_dims[ :-1],
					layer_dims[1:  ],
				)
			)
		)

		print()
		print(f"Model summary:")
		print()
		print(self)
		print()

	def forward(self, input: torch.Tensor) -> torch.Tensor:
		embeddings = self.embedding.forward(input)
		mask = (input != self.embedding.word2idx.pad_idx).unsqueeze(-1).float()
		embeddings *= mask
		pooled = embeddings.sum(1) / mask.sum(1).clamp(
			min = 1e-6,
		)

		return self.model.forward(pooled).squeeze(-1)

### Classifier

All this is put into one classifier pipeline which contains training and evaluation loops plus other utilities:

- `compile` assigns an optimizer and a loss for training for the pipeline with sensible defaults
- `prune` is helper function to trim the word index of the embedding layer according to word frequency used in the dataset fed to it
- `fit` is the training loop given a dataset (supports `torch` data loaded keyword arguments on top)
- `evaluate` is the test loop given a dataset
- `predict` is translating model output to readable output (crip sentiment decision `0` or `1` instead of a probability) using threhsold `.5`
- `compute` is a generic computation loop to avoid repeatition of code in the `fit` and `evaluate` methods
- `submit` will generate the requested `submission.csv` file

There are several other methods focused on producing meaningful reports and plots upon evaluuation.

Also note that the classifier is a context manager too. If used that way, it will try and load a previous cached trained model state to continue from or evaluate on.

Finally, the `preload` decorator is applied on all methods that return metrics so that they are not recomputed. This will be removed soon.

In [None]:
class TwitterClassifier:

	def __init__(self, model: TwitterModel,
		max_len: int = 32,
		path: Path = cache_path / "model.pt",
	):
		self.model = model
		self.max_len = max_len
		self.path = path

	def __enter__(self) -> Self:
		if self.path is not None and self.path.is_file():
			with self.path.open("rb+") as file:
				self.model.load_state_dict(torch.load(file))

		return self

	def __exit__(self, *_):
		if self.path is not None:
			with self.path.open("wb+") as file:
				torch.save(self.model.state_dict(), file)


	def compile(self,
		learning_rate: float = 1e-3,
		weight_decay : float = 0,
	):
		def accuracy(y_pred: torch.Tensor, y_true: torch.Tensor) -> torch.Tensor:
			return (y_pred * y_true).mean()

		def precision(y_pred: torch.Tensor, y_true: torch.Tensor) -> torch.Tensor:
			true_positive = (y_pred * y_true).sum()
			predicted_positive = y_pred.sum()

			return true_positive / predicted_positive.clamp(1e-6)

		def recall(y_pred: torch.Tensor, y_true: torch.Tensor) -> torch.Tensor:
			true_positive = (y_pred * y_true).sum()
			actual_positive = y_true.sum()

			return true_positive / actual_positive.clamp(1e-6)

		def f1(y_pred: torch.Tensor, y_true: torch.Tensor) -> torch.Tensor:
			p = precision(y_pred, y_true)
			r = recall(y_pred, y_true)

			return 2 * p * r / (p + r).clamp(1e-6)

		self.model.to(device = torch.device("cuda" if torch.cuda.is_available() else "cpu"))

		self.optimizer = torch.optim.AdamW(self.model.parameters(),
			lr = learning_rate,
			weight_decay = weight_decay,
		)
		self.loss_fn = torch.nn.BCEWithLogitsLoss()
		self.metrics = {
			"loss"     : self.loss_fn,
			"accuracy" : accuracy,
			"precision": precision,
			"recall"   : recall,
			"f1"       : f1,
		}

		return self

	def compute(self,
		y_pred: torch.Tensor,
		y_true: torch.Tensor,
	) -> dict[str, float]:
		y_prob = torch.sigmoid(y_pred)

		return {name: metric(y_pred if name == "loss" else y_prob, y_true).item() for name, metric in self.metrics.items()}

	def prune(self, train_dataset: TwitterDataset,
		min_frequency: int = 1,
		max_vocab_size: int | None = None,
		pad_token: str = "<pad>",
		unk_token: str = "<unk>",
	):
		if min_frequency > 1 or max_vocab_size is not None:
			frequencies = Counter()

			for text in track(train_dataset.data.Text, "counting frequencies".ljust(32), len(train_dataset.data)):
				frequencies.update(preprocess_and_tokenize(text))

			self.model.embedding = self.model.embedding.prune_with_frequencies( frequencies,
				min_frequency = min_frequency,
				max_vocab_size = max_vocab_size,
				pad_token = pad_token,
				unk_token = unk_token,
			)
			train_dataset.transform.vocabulary = self.model.embedding.word2idx

	@preload
	def fit(self,
		train_dataset: TwitterDataset,
		val_dataset  : TwitterDataset,
		epochs: int = 1,
		min_frequency: int = 1,
		max_vocab_size: int | None = None,
	**kwargs) -> dict[str, list[float]]:
		self.prune(train_dataset,
			min_frequency = min_frequency,
			max_vocab_size = max_vocab_size,
			pad_token = self.model.embedding.word2idx.pad_token,
			unk_token = self.model.embedding.word2idx.unk_token,
		)
		val_dataset.transform.vocabulary = self.model.embedding.word2idx

		metrics = Counter(
			loss      = [], val_loss      = [],  # type: ignore
			accuracy  = [], val_accuracy  = [],  # type: ignore
			precision = [], val_precision = [],  # type: ignore
			recall    = [], val_recall    = [],  # type: ignore
			f1        = [], val_f1        = [],  # type: ignore
		)
		total_loss = 0.

		print()
		print(f"Training for {epochs} epochs with:")
		print(
			json.dumps(kwargs,
				indent = 4,
			)
		)

		train_loader = torch.utils.data.DataLoader(train_dataset, **kwargs)
		assert train_loader.batch_size is not None
		batches = len(train_dataset) // train_loader.batch_size

		self.model.train()

		print()

		with Progress() as progress:
			train_task = progress.add_task(description = "finished epoch ---/---".ljust(32), total = epochs )
			batch_task = progress.add_task(description = "training loss -.------".ljust(32), total = batches)

			for epoch in range(epochs):
				progress.reset(batch_task)

				self.model.train()

				for batch_index, batch in enumerate(train_loader,
					start = epoch * batches + 1,
				):
					x, y_true = batch

					self.optimizer.zero_grad()
					y_pred = self.model.forward(x)
					loss = self.loss_fn.forward(
						y_pred,
						y_true,
					)
					loss.backward()
					self.optimizer.step()

					total_loss += loss.item()

					progress.update(batch_task,
						description = f"training loss {total_loss/batch_index:.6f}".ljust(32),
						total = batches,
						advance = 1,
					)

				metrics.update({       name  : [metric] for name, metric in self.evaluate(train_dataset).items()})
				metrics.update({f"val_{name}": [metric] for name, metric in self.evaluate(  val_dataset).items()})

				progress.update(train_task,
					description = f"finished epoch {epoch+1:3d}/{epochs:3d}".ljust(32),
					total = epochs,
					advance = 1,
				)

		return dict(metrics)  # type: ignore

	@preload
	@torch.no_grad
	def evaluate(self, test_dataset: TwitterDataset, **kwargs) -> dict[str, float]:
		batch_size = kwargs.pop("batch_size", len(test_dataset))
		loader = torch.utils.data.DataLoader(test_dataset,
			batch_size = batch_size,
		**kwargs)

		self.model.eval()

		y_preds = []
		y_trues = []

		for x, y_true in loader:
			y_pred = self.model(x)

			y_preds.append(y_pred)
			y_trues.append(y_true)

		y_pred = torch.cat(y_preds)
		y_true = torch.cat(y_trues)

		return self.compute(
			y_pred,
			y_true,
		)

	@preload
	@torch.no_grad
	def predict(self, dataset: TwitterDataset, **kwargs) -> torch.Tensor:
		loader = torch.utils.data.DataLoader(dataset, **kwargs)

		self.model.eval()

		y_pred = []

		for batch in loader:
			x, _ = batch  # ignore labels
			y_pred.append((torch.sigmoid(self.model(x)) > 0.5).long())

		return torch.cat(y_pred)

	@preload
	@torch.no_grad
	def predict_proba(self, dataset: TwitterDataset, **kwargs) -> torch.Tensor:
		loader = torch.utils.data.DataLoader(dataset, **kwargs)

		self.model.eval()

		y_probs = []

		for batch in loader:
			x, _ = batch  # ignore labels
			y_probs.append(torch.sigmoid(self.model(x)))

		return torch.cat(y_probs)


	def classification_report_str(self, dataset: TwitterDataset, **kwargs) -> str:
		y_true = torch.cat([y for _, y in torch.utils.data.DataLoader(dataset, **kwargs)])
		y_pred = self.predict(dataset, **kwargs)

		report = sklearn.metrics.classification_report(
			y_true.numpy(force = True),
			y_pred.numpy(force = True), digits = 6
		)

		return str(report)

	def roc_auc(self, dataset: TwitterDataset, **kwargs) -> float:
		y_true = torch.cat([y for _, y in torch.utils.data.DataLoader(dataset, **kwargs)])
		y_prob = self.predict_proba(dataset, **kwargs)

		score = sklearn.metrics.roc_auc_score(
			y_true.numpy(force = True),
			y_prob.numpy(force = True),
		)

		return float(score)

	@preload
	def roc_curve(self, dataset: TwitterDataset, **kwargs) -> tuple[
		np.ndarray,
		np.ndarray,
		np.ndarray,
	]:
		y_true = torch.cat([y for _, y in torch.utils.data.DataLoader(dataset, **kwargs)])
		y_prob = self.predict_proba(dataset, **kwargs)

		return sklearn.metrics.roc_curve(
			y_true.numpy(force = True),
			y_prob.numpy(force = True),
		)

	def plot_roc_curve(self, dataset: TwitterDataset, **kwargs):
		fpr, tpr, _ = self.roc_curve(dataset, **kwargs)
		auc = self.roc_auc(dataset, **kwargs)

		plt.figure(
			figsize = (
				6,
				6,
			)
		)
		plt.plot(fpr, tpr, label=f"ROC AUC = {auc:.4f}")
		plt.plot(
			[0, 1],
			[0, 1], linestyle = "--", color = "gray"
		)
		plt.xlabel("False Positive Rate")
		plt.ylabel("True Positive Rate")
		plt.title("ROC Curve")
		plt.legend()
		plt.grid(True)
		plt.tight_layout()
		plt.savefig("roc_curve.png")
		plt.show()

	def plot_learning_curve(self, metrics: dict[str, list[float]],
		keys: set[str] = {
			"loss",
			"accuracy",
			"precision",
			"recall",
			"f1",
		},
	):
		plt.figure(
			figsize = (
				15,
				4,
			)
		)

		for i, key in enumerate(keys, 1):
			plt.subplot(1, len(keys), i)
			plt.plot(metrics[label :=        key  ], label = label)
			plt.plot(metrics[label := f"val_{key}"], label = label)
			plt.xlabel("epoch")
			plt.ylabel(key)
			plt.title(f"{key} vs. epoch")
			plt.legend()
			plt.grid(True)

		plt.tight_layout()
		plt.savefig("learning_curve.png")
		plt.show()

	def submit(self, dataset: TwitterDataset, *,
		submission_path: Path = Path("submission.csv"),
	):
		y_pred = self.predict(dataset)

		submission = pd.DataFrame(
			{
				"ID": dataset.data.index,
				"Label": y_pred,
			}
		)

		submission.to_csv(submission_path,
			index = False,
			encoding = "utf-8",
		)


## Main

This is where the magic happens. After a few manual trials the following settings were chosen. Optuna was not attempted, because running a single experiment already burns my GPU for several minutes, and I do not think I will benefit personally from super fine tuning my model.

In [None]:
args = argparse.Namespace(
	seed = 42,
	epochs = 4,
	glove_dim = 300,
	freeze = False,
	max_len = 256,
	learning_rate = 1e-3,
	weight_decay = 1e-1,
	dropout = 1e-1,
#	num_layers = 3,
#	hidden_dim = 300,
	layer_dims = [
		150,
		100,
		75,
	]
)

This is a helper function to prettify output of reports with fixed decimal places.

In [None]:
def round_metrics(metrics: dict[str, list[float]],
	digits: int = 6,
) -> dict[str, list[float]]:
	return {k: [round(v, digits) for v in values] for k, values in metrics.items()}

This is the experiment:

1. generate an embedding layer
2. build a model and assign it an optimizer and a loss
3. build a classifier using said model
4. build a transformation layer that includes:
   - preprocessing
   - tokenization
   - token indexing
5. instantiate the datasets used
   - a training dataset
   - a validation dataset that we used for model selection
6. train the model on the training dataset which also produces evaluation metrics
7. produce reports for evaluation

In [None]:
fix_seed(args.seed)

embedding = Embedding.from_glove(args.glove_dim,
	freeze = args.freeze,
)
model = TwitterModel(embedding,
	hidden_dim = args.layer_dims or args.hidden_dim,
	num_layers = args.num_layers,
	dropout    = args.dropout   ,
)
model.compile()

with TwitterClassifier(model) as classifier:
	classifier.compile(
		learning_rate = args.learning_rate,
		weight_decay  = args.weight_decay ,
	)

#	Generate a preprocessing and tokenization transform function for the dataset:
	transform = TextTransform(embedding.word2idx,
		preprocessor = Preprocessor(),
		tokenizer = Tokenizer(),
		max_len = args.max_len,
	)

#	Create datasets:
	train_data = TwitterDataset("train", transform = transform)
	val_data   = TwitterDataset("val"  , transform = transform)
	test_data  = TwitterDataset("test" , transform = transform)

#	Train the model:
	metrics = classifier.fit(
		train_data,
		val_data,
		epochs = args.epochs,
		batch_size = int(math.log10(len(train_data) + len(val_data))) + 1,
	)

#	Dump metrics to file:
	with open("sdi2200160.json", "w+",
		encoding = "utf-8",
	) as file:
		json.dump(round_metrics(metrics), file,
			indent = 4,
		)

#	Generate report:
	print()
	print(classifier.classification_report_str(val_data))
	print("ROC AUC:", classifier.roc_auc(val_data))
	print()

	classifier.plot_roc_curve(val_data)
	classifier.plot_learning_curve(metrics)

#	Submit predictions:
	classifier.submit(test_data,
		submission_path = Path("submission.csv"),
	)