Text classification based on recurrent neural networks. Three architectures were tested: a classic RNN, an LSTM, and a GRU. Bidirectioal variants was also tested.

**1.Import libraries and parse arguments**

In [13]:
import pandas as pd
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
import argparse
import torch
from typing import Tuple

# Parse arguments 
parser = argparse.ArgumentParser()
parser.add_argument("--batch_size", default=16, type=int, help="Batch size.")
parser.add_argument("--epochs", default=20, type=int, help="Number of epochs.")
parser.add_argument("--rnn", default="GRU", choices=["LSTM", "GRU", "RNN"], help="RNN layer type.")
parser.add_argument("--rnn_dim", default=64, type=int, help="RNN layer dimension.")
parser.add_argument("--seed", default=42, type=int, help="Random seed.")
parser.add_argument("--threads", default=1, type=int, help="Maximum number of threads to use.")
parser.add_argument("--we_dim", default=128, type=int, help="Word embedding dimension.")
parser.add_argument("--bidirectional", default=True, type=bool, help="Use bidirectional RNN.")

_StoreAction(option_strings=['--bidirectional'], dest='bidirectional', nargs=None, const=None, default=True, type=<class 'bool'>, choices=None, required=False, help='Use bidirectional RNN.', metavar=None)

**2.Tokenize the words and encode them using unique IDs.** Filter out words that appear only once (to reduce noise, though further testing is needed to confirm its usefulness) and limit the vocabulary to a predefined maximum size (max_size).

In [14]:
# ----- Tokenize -----
def simple_tokenize(text):
    return text.lower().split()

def build_vocab(texts, min_freq=2, max_size=None):
    word_counts = {}
    for t in texts:
        tokens = simple_tokenize(t)
        for tok in tokens:
            if tok in word_counts:
                word_counts[tok] += 1
            else:
                word_counts[tok] = 1

    # pořadí: PAD, UNK, pak nejčastější tokeny
    vocab = {"<pad>":0, "<unk>":1}

    # Filter out unique words
    items = [(w,c) for w,c in word_counts.items() if c >= min_freq] 

    # sort from the most frquent
    items.sort(key=lambda x: (-x[1], x[0]))

    # Reduce size
    if max_size is not None:
        items = items[:max_size]

    # add unique ids
    for w,_ in items:
        vocab[w] = len(vocab)
    return vocab

def encode(X, vocab):
    ids = []
    for text in X:
        ids_text = []
        for tok in simple_tokenize(text):
            ids_text.append(vocab.get(tok, vocab["<unk>"]))
        ids.append(ids_text)
    return ids

**3.Data and batch loaders**

In [15]:
# Dataset
class TextDataset(Dataset):
    def __init__(self, X_ids, y):
        self.X = X_ids
        self.y = list(y)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, i):
        return self.X[i], self.y[i]

# collate function
def collate_fn(batch, pad_id):
    word_ids, labels = zip(*batch)

    word_ids = [torch.as_tensor(x, dtype=torch.long) for x in word_ids]

    x = torch.nn.utils.rnn.pad_sequence(word_ids, batch_first=True, padding_value=pad_id)
    y = torch.as_tensor(labels, dtype=torch.long)
    lengths = torch.as_tensor([len(xi) for xi in word_ids], dtype=torch.long)

    return x, y, lengths

**4.Define model architecture.** First, word embedding layer. Second, RNN layer, and finally output linear layer with 5 outputs (corresponding to the 5 classes))

In [16]:
# Define model
class Model(torch.nn.Module):
    def __init__(self, args: argparse.Namespace, train_string_vocab) -> None:
        super().__init__()

        # Create all needed layers.
        # Create a `torch.nn.Embedding` layer, embedding the word ids
        # from `train.words.string_vocab` to dimensionality `args.we_dim`.
        self._word_embedding = torch.nn.Embedding(
            num_embeddings=len(train_string_vocab),
            embedding_dim=args.we_dim,
            padding_idx=train_string_vocab["<pad>"]
        )

        # Create an RNN layer, either `torch.nn.RNN`, `torch.nn.LSTM` or `torch.nn.GRU` 
        # depending on `args.rnn`. The result should be better if the layer will be 
        # bidirectional (`bidirectional=True`) with dimensionality `args.rnn_dim`. 
        # During the model computation, the layer will process the word embeddings 
        # generated by the `self._word_embedding` layer, and we will sum the outputs 
        # of forward and backward directions.
        if args.rnn == "LSTM":
            self._word_rnn = torch.nn.LSTM(
                input_size=args.we_dim,
                hidden_size=args.rnn_dim,
                bidirectional=args.bidirectional,
                batch_first=True,
            )
        elif args.rnn == "GRU":
            self._word_rnn = torch.nn.GRU(
                input_size=args.we_dim,
                hidden_size=args.rnn_dim,
                bidirectional=args.bidirectional,
                batch_first=True,
            )
        elif args.rnn == "RNN":
            self._word_rnn = torch.nn.RNN(
                input_size=args.we_dim, 
                hidden_size=args.rnn_dim, 
                bidirectional=args.bidirectional, 
                batch_first=True,
            )
        else:
            raise ValueError(f"Unsupported RNN type: {args.rnn}")

        # Create an output linear layer (`torch.nn.Linear`) processing the RNN output, 
        # producing logits for tag prediction
        self._output_layer = torch.nn.Linear(
            in_features=args.rnn_dim,
            out_features=5,
        )

    def forward(self, word_ids: torch.Tensor, lengths: torch.tensor) -> torch.Tensor:
        # TODO: Start by embedding the `word_ids` using the word embedding layer.
        emb = self._word_embedding(word_ids)

        # Process the embedded words through the RNN layer. Because the sentences
        # have different length, we have to use `torch.nn.utils.rnn.pack_padded_sequence`
        # to construct a variable-length `PackedSequence` from the input.
        # Finally, also pass `batch_first=True` and `enforce_sorted=False` to the call.
        packed = torch.nn.utils.rnn.pack_padded_sequence(emb, lengths.cpu(), batch_first=True, enforce_sorted=False)

        if isinstance(self._word_rnn, torch.nn.LSTM):
            _, (h_n, _) = self._word_rnn(packed)
        else:
            _, h_n = self._word_rnn(packed)

        if self._word_rnn.bidirectional:
            h = h_n[-2] + h_n[-1] # sum finall hidden states from both directions
            logits = self._output_layer(h)
        else:
            h = h_n[-1]
            logits = self._output_layer(h)

        return logits

**5.Train and evaluate functions**

In [17]:
def train_one_epoch(
    model: torch.nn.Module,
    dataloader: DataLoader,
    optimizer: torch.optim.Optimizer,
    loss_fn: torch.nn.Module,
    device: torch.device,
) -> Tuple[float, float]:
    model.train()
    total_loss, total_correct, total_examples = 0.0, 0, 0

    for x, y, lengths in dataloader:
        x = x.to(device)
        y = y.to(device)
        lengths = lengths.to(device)

        optimizer.zero_grad()
        logits = model(x, lengths)
        loss = loss_fn(logits, y)

        loss.backward()
        optimizer.step()

        with torch.no_grad():
            preds = logits.argmax(dim=1)
            total_correct += (preds == y).sum().item()
            total_examples += y.size(0)
            total_loss += loss.item() * y.size(0)

    avg_loss = total_loss / max(1, total_examples)
    acc = total_correct / max(1, total_examples)
    return avg_loss, acc


@torch.no_grad()
def evaluate(
    model: torch.nn.Module,
    dataloader: DataLoader,
    loss_fn: torch.nn.Module,
    device: torch.device,
) -> Tuple[float, float]:
    model.eval()
    total_loss, total_correct, total_examples = 0.0, 0, 0

    for x, y, lengths in dataloader:
        x = x.to(device)
        y = y.to(device)
        lengths = lengths.to(device)

        logits = model(x, lengths)
        loss = loss_fn(logits, y)

        preds = logits.argmax(dim=1)
        total_correct += (preds == y).sum().item()
        total_examples += y.size(0)
        total_loss += loss.item() * y.size(0)

    avg_loss = total_loss / max(1, total_examples)
    acc = total_correct / max(1, total_examples)
    return avg_loss, acc

**6. Main loop.** The main loop was implemented in a standard Python script and executed from the terminal. In this Jupyter Notebook version, loading arguments from the command line does not make much sense, but the original code was kept unchanged for consistency.

In [18]:
def main(args: argparse.Namespace):
    #  Load data 
    data = pd.read_csv("df_file.csv")
    X_train, X_val, y_train, y_val = train_test_split(data["Text"], data["Label"], test_size=0.20, random_state=42)

    # Tokenize
    vocab = build_vocab(X_train, max_size=25000)
    X_train_ids = encode(X_train, vocab)
    X_val_ids = encode(X_val, vocab)

    # Load datasets
    train_ds = TextDataset(X_train_ids, y_train)
    val_ds   = TextDataset(X_val_ids,   y_val)

    pad_id = vocab["<pad>"]
    train_loader = DataLoader(train_ds, batch_size=args.batch_size, shuffle=True,
                            collate_fn=lambda b: collate_fn(b, pad_id))
    val_loader   = DataLoader(val_ds, batch_size=args.batch_size, shuffle=False,
                            collate_fn=lambda b: collate_fn(b, pad_id))
    
    # model, optimizer, loss
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = Model(args, vocab).to(device)

    optimizer = torch.optim.Adam(model.parameters(), lr=3e-4)
    loss_fn = torch.nn.CrossEntropyLoss()

    best_val_acc = 0.0
    best_state = None

    for epoch in range(1, args.epochs + 1):
        train_loss, train_acc = train_one_epoch(model, train_loader, optimizer, loss_fn, device)
        val_loss, val_acc = evaluate(model, val_loader, loss_fn, device)

        print(
            f"Epoch {epoch:02d}/{args.epochs} | "
            f"train loss {train_loss:.4f}, acc {train_acc:.3f} | "
            f"val loss {val_loss:.4f}, acc {val_acc:.3f}"
        )

        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_state = {k: v.cpu() for k, v in model.state_dict().items()}

    if best_state is not None:
        model.load_state_dict(best_state)
        torch.save(model.state_dict(), "best_model.pt")
        print(f"Best val acc: {best_val_acc:.3f} (model saved to best_model.pt)")


if __name__ == "__main__":
    main_args = parser.parse_args([] if "__file__" not in globals() else None)
    main(main_args)

Epoch 01/20 | train loss 1.5885, acc 0.267 | val loss 1.5595, acc 0.299
Epoch 02/20 | train loss 1.4421, acc 0.438 | val loss 1.4872, acc 0.373
Epoch 03/20 | train loss 1.3015, acc 0.560 | val loss 1.4071, acc 0.467
Epoch 04/20 | train loss 1.1357, acc 0.665 | val loss 1.3071, acc 0.515
Epoch 05/20 | train loss 0.9225, acc 0.767 | val loss 1.1567, acc 0.566
Epoch 06/20 | train loss 0.6442, acc 0.828 | val loss 0.9093, acc 0.658
Epoch 07/20 | train loss 0.4003, acc 0.913 | val loss 0.8162, acc 0.701
Epoch 08/20 | train loss 0.2419, acc 0.956 | val loss 0.6896, acc 0.744
Epoch 09/20 | train loss 0.1640, acc 0.974 | val loss 0.5976, acc 0.787
Epoch 10/20 | train loss 0.0982, acc 0.996 | val loss 0.5884, acc 0.804
Epoch 11/20 | train loss 0.1063, acc 0.982 | val loss 0.6275, acc 0.778
Epoch 12/20 | train loss 0.0615, acc 0.997 | val loss 0.5466, acc 0.811
Epoch 13/20 | train loss 0.0339, acc 1.000 | val loss 0.5466, acc 0.827
Epoch 14/20 | train loss 0.0297, acc 1.000 | val loss 0.5783, ac

Results: <br>
epochs=20, batch=16 <br>

| Model | Bidirectional | Accuracy |
|:------|:--------------:|:---------:|
| RNN   | False          | 0.449     |
| RNN   | True           | 0.661     |
| LSTM  | False          | 0.703     |
| LSTM  | True           | 0.804     |
| GRU   | False          | 0.724     |
| GRU   | True           | 0.831     |

To get better results: tune hyperparameters (mainly rnn_dim and we_dim), use pretrained tokenizers (e.g. Word2Vec developed by Tomáš Mikolov), 
add cosine decay, try adding more layers, or switch to transformers.