In [None]:
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader

from _functions import build_vocab, plot_result
from _objects import Decoder, Encoder, Seq2Seq, TextNormalizationDataset

# The seed for random state
_SEED: int = 574

# Set seed for reproducibility
torch.manual_seed(_SEED)

# Get the device for training
device: torch.device = torch.device("cuda")

# Load training data and create train/test split
df: pd.DataFrame = pd.read_csv("en_train.csv")
train_df, test_df = train_test_split(df, test_size=0.2, random_state=_SEED)

# Build vocabularies for input and output
input_vocab: dict[str, int] = build_vocab(train_df["before"])
output_vocab: dict[str, int] = build_vocab(train_df["after"])

# Inverse vocabularies for decoding
inverse_input_vocab: dict[str, int] = {v: k for k, v in input_vocab.items()}
inverse_output_vocab: dict[str, int] = {v: k for k, v in output_vocab.items()}


# Define model hyperparameters
INPUT_DIM: int = len(input_vocab)
OUTPUT_DIM: int = len(output_vocab)
ENC_EMB_DIM: int = 256
DEC_EMB_DIM: int = 256
HID_DIM: int = 512
N_LAYERS: int = 2
ENC_DROPOUT: float = 0.2
DEC_DROPOUT: float = 0.2
BATCH_SIZE: int = 128

# Create the encoder and decoder
encoder: Encoder = Encoder(INPUT_DIM, ENC_EMB_DIM, HID_DIM, N_LAYERS, ENC_DROPOUT)
decoder: Decoder = Decoder(OUTPUT_DIM, DEC_EMB_DIM, HID_DIM, N_LAYERS, DEC_DROPOUT)

# Create the model
model: Seq2Seq = Seq2Seq(encoder, decoder, device).to(device)

# Define the loss function and optimizer
criterion: nn.CrossEntropyLoss = nn.CrossEntropyLoss(ignore_index=output_vocab["<pad>"])
optimizer: optim.Optimizer = optim.Adam(model.parameters())

# Prepare the data
train_dataset: TextNormalizationDataset = TextNormalizationDataset(
    train_df, input_vocab, output_vocab
)
train_loader: DataLoader = DataLoader(
    train_dataset, batch_size=BATCH_SIZE, shuffle=True
)

# Create a validation set from the training data
train_df, val_df = train_test_split(train_df, test_size=0.2, random_state=_SEED)
val_dataset: TextNormalizationDataset = TextNormalizationDataset(
    val_df, input_vocab, output_vocab
)
val_loader: DataLoader = DataLoader(val_dataset, batch_size=BATCH_SIZE)

# Testing dataset
test_dataset: TextNormalizationDataset = TextNormalizationDataset(
    test_df, input_vocab, output_vocab
)
test_loader: DataLoader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

In [None]:
# Function to train the model
def train(
    model: Seq2Seq,
    dataloader: DataLoader,
    optimizer: optim.Optimizer,
    criterion: nn.CrossEntropyLoss,
) -> float:
    model.train()
    epoch_loss: int = 0

    # Create a progress bar
    total_batches: int = len(dataloader)
    print(f"\nTraining: 0/{total_batches} batches processed", end="\r")

    for i, batch in enumerate(dataloader):
        src = batch["input"].to(device)
        trg = batch["output"].to(device)

        optimizer.zero_grad()

        output = model(src, trg)

        # trg: [batch_size, trg_len]
        # output: [batch_size, trg_len, output_dim]

        output_dim = output.shape[-1]

        # Exclude the first token (<sos>)
        output = output[:, 1:].reshape(-1, output_dim)
        trg = trg[:, 1:].reshape(-1)

        # Calculate loss
        loss = criterion(output, trg)

        # Backpropagation
        loss.backward()

        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1)

        # Update parameters
        optimizer.step()

        epoch_loss += loss.item()

        # Update progress bar every 10 batches
        if (i + 1) % 10 == 0 or (i + 1) == total_batches:
            print(
                f"Training: {i+1}/{total_batches} batches processed | Current loss: {loss.item():.5f}",
                end="\r",
            )

    avg_loss: float = epoch_loss / len(dataloader)
    print(f"\nTraining completed. Average batch loss: {avg_loss:.5f}" + " " * 30)
    return avg_loss


# Function to evaluate the model
def evaluate(
    model: Seq2Seq, dataloader: DataLoader, criterion: nn.CrossEntropyLoss
) -> float:
    model.eval()
    epoch_loss: int = 0

    # Create a progress bar
    total_batches: int = len(dataloader)
    print(f"Evaluating: 0/{total_batches} batches processed", end="\r")

    with torch.no_grad():
        for i, batch in enumerate(dataloader):
            src = batch["input"].to(device)
            trg = batch["output"].to(device)

            output = model(src, trg, 0)  # Turn off teacher forcing

            # trg: [batch_size, trg_len]
            # output: [batch_size, trg_len, output_dim]

            output_dim = output.shape[-1]

            output = output[:, 1:].reshape(-1, output_dim)
            trg = trg[:, 1:].reshape(-1)

            loss = criterion(output, trg)

            epoch_loss += loss.item()

            # Update progress bar periodically
            if (i + 1) % 10 == 0 or (i + 1) == total_batches:
                print(f"Evaluating: {i+1}/{total_batches} batches processed", end="\r")

    avg_loss: float = epoch_loss / len(dataloader)
    print(f"\nEvaluation completed. Average batch loss: {avg_loss:.5f}" + " " * 30)
    return avg_loss


# Function to predict on test data
def predict(
    model: Seq2Seq, dataloader: DataLoader, inverse_output_vocab: dict[str, int]
):
    model.eval()
    predictions = []
    ids = []

    # Create a progress bar
    total_batches: int = len(dataloader)
    print(f"Predicting: 0/{total_batches} batches processed", end="\r")

    with torch.no_grad():
        for i, batch in enumerate(dataloader):
            src = batch["input"].to(device)
            id_batch = batch["id"]

            batch_size = src.shape[0]

            # Encoder outputs
            _, hidden = model.encoder(src)

            # First input to the decoder is the <sos> token
            input = torch.tensor([output_vocab["<sos>"]] * batch_size).to(device)

            # Store predictions for each sequence in the batch
            batch_outputs = [[] for _ in range(batch_size)]

            for _ in range(50):  # Adjust max length as needed
                output, hidden = model.decoder(input, hidden)
                pred_tokens = output.argmax(1)

                # Add predicted tokens to each sequence's output
                for i, token in enumerate(pred_tokens):
                    batch_outputs[i].append(token.item())

                # Update input for next time step
                input = pred_tokens

            # Convert token indices to characters for each sequence
            for i in range(batch_size):
                tokens = batch_outputs[i]
                # Find where the sequence ends (at <eos> token)
                if output_vocab["<eos>"] in tokens:
                    end_idx = tokens.index(output_vocab["<eos>"])
                    tokens = tokens[:end_idx]

                # Convert tokens to text
                text: str = "".join(
                    [
                        inverse_output_vocab.get(token, "")
                        for token in tokens
                        if token
                        not in [
                            output_vocab["<sos>"],
                            output_vocab["<eos>"],
                            output_vocab["<pad>"],
                        ]
                    ]
                )

                predictions.append(text)
                ids.append(id_batch[i])

            # Update progress bar periodically
            if (i + 1) % 10 == 0 or (i + 1) == total_batches:
                print(f"Predicting: {i+1}/{total_batches} batches processed", end="\r")

    print(
        f"\nPrediction completed. Generated {len(predictions)} predictions." + " " * 30
    )
    return ids, predictions

In [None]:
# Training loop
N_EPOCHS: int = 50
# How many epochs for early stopping, -1 to disable, 0 to stop immediately
early_stop_patience: int = 5
# Rounds since last best loss
current_patience: int = 0
# Best validation lost
best_valid_loss: float = float("inf")

print(f"Starting training for {N_EPOCHS} epochs...")
print(f"Training on {len(train_dataset)} examples")
print(f"Validating on {len(val_dataset)} examples")
print(f"Testing on {len(test_dataset)} examples")

# For tracking metrics
train_losses: list[float] = []
valid_losses: list[float] = []

for epoch in range(N_EPOCHS):

    print(f"\n{'='*50}")
    print(f"Epoch: {epoch+1}/{N_EPOCHS}")

    train_loss: float = train(model, train_loader, optimizer, criterion)
    valid_loss: float = evaluate(model, val_loader, criterion)

    train_losses.append(train_loss)
    valid_losses.append(valid_loss)

    print(f"Train Loss: {train_loss:.5f}")
    print(f"Validation Loss: {valid_loss:.5f}")

    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        current_patience = 0
        torch.save(model.state_dict(), "best-model.pt")
        print(f"New best model saved with validation loss: {valid_loss:.5f}")
    else:
        current_patience += 1
        print(
            f"{current_patience}/{early_stop_patience} epochs since loss decreases: {best_valid_loss:.5f}"
        )

    # Early stopping check
    if early_stop_patience >= 0 and current_patience > early_stop_patience:
        print(
            f"Validation loss increasing for {early_stop_patience} consecutive epochs. Early stopping..."
        )
        break

print(f"\n{'='*50}")
print("Training complete!")
print(f"Best validation loss: {best_valid_loss:.5f}")

# Print training summary
print("\nTraining Summary:")
for epoch, (t_loss, v_loss) in enumerate(zip(train_losses, valid_losses), 1):
    print(f"Epoch {epoch}: Train Loss = {t_loss:.5f}, Valid Loss = {v_loss:.5f}")

# Plotting training/validation loss
plot_result(train_losses, valid_losses)

In [None]:
# Load best model for prediction
print("\nLoading best model for testing...")
model.load_state_dict(torch.load("best-model.pt"))

# Create a dictionary mapping ids to indices for easier lookup
test_id_to_idx: dict[str, int] = {
    id_val: i for i, id_val in enumerate(test_df["id"].astype(str))
}

# Make predictions on test data
ids, predictions = predict(model, test_loader, inverse_output_vocab)

# Prepare result lists
actual_values: list[str] = []
before_values: list[str] = []

# Match predictions with actual values using ids
for pred_id in ids:
    if pred_id in test_id_to_idx:
        idx = test_id_to_idx[pred_id]
        actual_values.append(test_df.iloc[idx]["after"])
        before_values.append(test_df.iloc[idx]["before"])
    else:
        # If id not found, use empty strings as placeholders
        actual_values.append("")
        before_values.append("")

# Calculate accuracy
correct: int = sum(1 for act, pred in zip(actual_values, predictions) if act == pred)
accuracy: float = correct * 100 / len(actual_values) if actual_values else 0
print(f"Test Accuracy: {accuracy:.2f}%")

# Save predictions for analysis
results_df: pd.DataFrame = pd.DataFrame(
    {
        "id": ids,
        "before": before_values,
        "actual": actual_values,
        "predicted": predictions,
    }
)
results_df.to_csv("en_test_submission.csv", index=False)

In [None]:
# Load best model for prediction
print("\nLoading best model for submission...")
model.load_state_dict(torch.load("best-model.pt"))

# Validation dataframe
val_df: pd.DataFrame = pd.read_csv("./en_test.csv.zip")
val_df["before"] = val_df["before"].astype(str)

# Validation dataset
val_dataset: TextNormalizationDataset = TextNormalizationDataset(
    val_df, input_vocab, output_vocab
)
val_loader: DataLoader = DataLoader(val_dataset, batch_size=BATCH_SIZE)

# Make predictions on test data
ids, predictions = predict(model, val_loader, inverse_output_vocab)

# Save predictions for analysis
en_sample_submission = pd.DataFrame(
    {
        "id": ids,
        "after": predictions,
    }
).astype(str)

In [None]:
en_sample_submission.to_csv("en_sample_submission.csv", index=False)
# en_sample_submission.to_csv("en_sample_submission.csv.zip", index=False)