In [None]:
import sys
from importlib import reload
from pathlib import Path

import torch
import torch.nn as nn
from sklearn import metrics
from torch.utils.data import DataLoader, Dataset

sys.path.append("..")


from src import plotly_plots as pp
from src import torch_util as tu
from src import util

reload(util)
reload(pp)
split_idx, split_date = util.load_split_idx()
df_splits = {
    k: util.dataset_to_df(d)
    for k, d in util.load_dataset_splits(
        split_idx, Path("../data/dataset.ndjson")
    ).items()
}

In [None]:
print("training data:")
token_count = df_splits["train"]["length"].sum()
char_count = df_splits["train"]["tokens"].list.explode().str.split("").explode().len()
print(
    f"we have {len(df_splits['train'])} examples",
    f" -> {token_count} tokens (avg {token_count / len(df_splits['train']):.1f} tokens/example)",
    f" -> {char_count} characters (avg {char_count / token_count:.1f} chars/token)",
    sep="\n",
)

# split to characters
char_examples = (
    df_splits["train"]
    .select("tokens", "tags")
    .map_rows(
        lambda row: util.split_to_chars(*row, only_starts=False),
    )
)
char_examples.columns = ["chars", "char_tags"]
char_examples = char_examples.with_columns(
    name=df_splits["train"]["name"], lang=df_splits["train"]["lang"]
)
print(f"splitted data {char_examples.columns}")

# print(
#     f"\nthere are {len(char_counts)} unique characters",
#     f" and {len(char_tag_counts)} unique tags",
# )

# train_df, val_df = util.data_split(char_examples, 0.3)


## vocab


In [None]:
# vocab for tokens
vocab = ["<pad>", "<unk>"] + list(char_counts.keys())[:10]
token2idx = {t: i for i, t in enumerate(vocab)}

# tags
tag_vocab = ["<pad>"] + list(char_tag_counts.keys())
tag2idx = {t: i for i, t in enumerate(tag_vocab)}

print("vocab (tokens):", vocab)
print("vocab (tags)  :", tag_vocab)

# Convert tokens and labels to indices
# these are lists of lists!
train_token_idx = [[token2idx.get(t, 1) for t in seq] for seq in train_df["chars"]]
train_tag_idx = [[tag2idx[t] for t in seq] for seq in train_df["char_tags"]]
# print("\nlists of lists:")
# print(train_token_idx)
# print(train_tag_idx)
print(f"\ntraining examples of length: {[len(e) for e in train_token_idx]}")

# validation data
val_token_idx = [[token2idx.get(t, 1) for t in seq] for seq in val_df["chars"]]
val_tag_idx = [[tag2idx[t] for t in seq] for seq in val_df["char_tags"]]
print(f"validation examples of length: {[len(e) for e in val_token_idx]}")

train_token_tensors = tu.seqs2padded_tensor(train_token_idx)
train_tag_tensors = tu.seqs2padded_tensor(train_tag_idx)
val_token_tensors = tu.seqs2padded_tensor(val_token_idx)
val_tag_tensors = tu.seqs2padded_tensor(val_tag_idx)

In [None]:
class SequenceDataset(Dataset):
    def __init__(self, tokens, labels):
        self.tokens = tokens
        self.labels = labels

    def __len__(self):
        return len(self.tokens)

    def __getitem__(self, idx):
        return self.tokens[idx], self.labels[idx]


# Create dataset and dataloader
train_dataset = SequenceDataset(train_token_tensors, train_tag_tensors)
train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True)


## Model

Can we tokenize the text using a classifier model?


In [None]:
class LSTMTagger(nn.Module):
    def __init__(self, vocab_size, tagset_size, embedding_dim, hidden_dim):
        super(LSTMTagger, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.hidden2tag = nn.Linear(hidden_dim, tagset_size)

    def forward(self, sentences):
        embeds = self.embedding(
            sentences
        )  # Shape: (batch_size, seq_len, embedding_dim)
        lstm_out, _ = self.lstm(embeds)  # Shape: (batch_size, seq_len, hidden_dim)
        tag_scores = self.hidden2tag(
            lstm_out
        )  # Shape: (batch_size, seq_len, tagset_size)
        return tag_scores


In [None]:
# Parameters
embedding_dim = 4
hidden_dim = 32
vocab_size = len(vocab)
tagset_size = len(tag_vocab)

model = LSTMTagger(vocab_size, tagset_size, embedding_dim, hidden_dim)
loss_function = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.005)

# Training loop
epochs = 20
losses = []
for epoch in range(epochs):
    for examples, labels in train_loader:
        # Forward pass
        tag_scores = model(examples)

        # Reshape for loss calculation
        loss = loss_function(tag_scores.view(-1, tagset_size), labels.view(-1))

        # Backward pass and optimization
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        losses.append(loss.item())

    # print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item():.4f}")
# pp.scatter(y=losses)

## evaluate


In [None]:
model.eval()
with torch.no_grad():
    tag_scores = model(val_token_tensors)
    predictions = torch.argmax(tag_scores, dim=-1)  # Shape: (batch_size, seq_len)

pred_tags = []
true_tags = []

for pred, true_t in zip(predictions, val_tag_idx):
    # print(pred.shape)
    # print((len(true_tags)))
    true_tags.extend([tag_vocab[t] for t in true_t])
    pred_tags.extend([tag_vocab[t] for t in pred[: len(true_t)]])


confmat = metrics.confusion_matrix(true_tags, pred_tags, labels=tag_vocab)

acc = metrics.accuracy_score(true_tags, pred_tags)
print("accuracy", acc)

pp.heatmap(
    confmat,
    # tag_vocab,
    # log_scale=False,
    # size=800,
).show()