In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
from sklearn.model_selection import train_test_split
from sklearn.model_selection import KFold
import pandas as pd

LSTM MODEL

In [2]:
class WordleLSTM(nn.Module):
    """
    LSTM-based neural network for predicting Wordle words.

    Architecture rationale:

    - Separate Embedding layers:
        * Letter embedding converts integer-encoded letters into dense vectors.
          This allows the model to learn semantic relationships between letters.
        * Feedback embedding converts integer feedback values (0=wrong, 1=wrong position, 2=correct)
          into dense vectors, allowing the model to treat feedback as a separate concept.

    - LSTM:
        * Captures sequential dependencies between consecutive guesses within a Wordle game.
        * Later guesses depend on earlier feedback, which the LSTM hidden states can learn.
        * Packed sequences are used to efficiently handle variable-length games.

    - Dropout layers:
        * Applied after LSTM and fully connected layers to reduce overfitting
          and improve generalization.

    - Fully connected layers:
        * Transform LSTM hidden states into predictions for each of the 5 letter positions.
        * Output dimension = 5 letters * vocab_size (number of possible letters)
        * Softmax applied during training to compute probability distribution over letters.

    - Overall:
        * The model can predict a 5-letter word at each timestep based on prior guesses and feedback.
        * This design separates letter and feedback representations while capturing temporal dependencies.

    """
    def __init__(
        self,
        vocab_size=29,           # Harf sayısı
        letter_embedding_dim=16, # Harf embedding boyutu
        feedback_embedding_dim=4,# Feedback embedding boyutu (0,1,2)
        hidden_dim=128,
        num_layers=2,
        dropout=0.3
    ):
        super().__init__()

        self.hidden_dim = hidden_dim
        self.num_layers = num_layers

        # Çıkış boyutu: 5 harf * vocab_size
        self.output_dim = 5 * vocab_size

        # Harf ve feedback embedding
        self.letter_embedding = nn.Embedding(vocab_size, letter_embedding_dim)
        self.feedback_embedding = nn.Embedding(3, feedback_embedding_dim)

        # LSTM input boyutu = 5 harf * letter_emb + 5 feedback * feedback_emb
        self.lstm_input_dim = 5 * letter_embedding_dim + 5 * feedback_embedding_dim

        self.lstm = nn.LSTM(
            input_size=self.lstm_input_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout
        )

        self.post_lstm_dropout = nn.Dropout(dropout)
        self.fc1 = nn.Linear(hidden_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, self.output_dim)

    def forward(self, x, lengths):
        """
        x: (batch_size, seq_len, 10) -> 5 harf + 5 feedback
        lengths: (batch_size,) sequence uzunlukları
        """
        batch_size, seq_len, _ = x.size()

        # Harf ve feedback ayrımı
        letters = x[:, :, :5]   # pg1..pg5
        feedback = x[:, :, 5:]  # l1..l5

        # Embedding
        letters_emb = self.letter_embedding(letters)       # (B, S, 5, letter_embedding_dim)
        feedback_emb = self.feedback_embedding(feedback)   # (B, S, 5, feedback_embedding_dim)

        # Concatenate embeddingler
        x_embed = torch.cat([letters_emb, feedback_emb], dim=-1)  # (B, S, 5, letter+fb emb)
        x_embed = x_embed.view(batch_size, seq_len, -1)            # (B, S, LSTM_input_dim)

        # Packed sequence ile LSTM
        packed_x = nn.utils.rnn.pack_padded_sequence(
            x_embed,
            lengths.cpu(),
            batch_first=True,
            enforce_sorted=False
        )

        packed_out, (h, c) = self.lstm(packed_x)
        out, _ = nn.utils.rnn.pad_packed_sequence(packed_out, batch_first=True)

        # Dropout + FC
        out = self.post_lstm_dropout(out)
        out = F.relu(self.fc1(out))
        out = self.post_lstm_dropout(out)
        out = self.fc2(out)

        # Çıkış reshape: (B, S, 5 harf, vocab_size)
        out = out.view(batch_size, seq_len, 5, -1)

        return out

TRAIN AND TEST FUNCTION

In [3]:
def train(model, dataloader, optimizer, criterion, device):
    """
    Trains the given model for one epoch using the provided DataLoader.

    This function performs a full training loop:
    - Iterates over all batches in the DataLoader
    - Moves data to the target device (CPU/GPU)
    - Computes model predictions (logits)
    - Calculates loss and backpropagates gradients
    - Updates model weights using the optimizer
    - Tracks average loss and overall accuracy

    Args:
        model (torch.nn.Module): The model to be trained
        dataloader (torch.utils.data.DataLoader): Provides training batches
        optimizer (torch.optim.Optimizer): Optimization algorithm (e.g., Adam, SGD)
        criterion (torch.nn.Module): Loss function (e.g., CrossEntropyLoss)
        device (torch.device): Target device ("cpu" or "cuda")

    Returns:
        avg_loss (float): Average loss over all batches
        accuracy (float): Overall prediction accuracy (in %)
    """
    model.train()
    total_loss = 0.0
    correct_predictions = 0
    total_predictions = 0

    for X, Y, lengths in dataloader:
        X, Y, lengths = X.to(device), Y.to(device), lengths.to(device)
        optimizer.zero_grad()

        logits = model(X, lengths)
        logits_flat = logits.view(-1, 29)
        Y_flat = Y.view(-1)

        loss = criterion(logits_flat, Y_flat)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        _, predicted = torch.max(logits_flat, dim=1)
        correct_predictions += (predicted == Y_flat).sum().item()
        total_predictions += Y_flat.size(0)

    avg_loss = total_loss / len(dataloader)
    accuracy = 100.0 * correct_predictions / total_predictions if total_predictions > 0 else 0.0
    return avg_loss, accuracy

In [4]:
def test(model, dataloader, criterion, device):
    """
    Evaluates the given model on a validation or test dataset.

    This function performs an evaluation loop:
    - Sets the model to evaluation mode (disables dropout, batchnorm updates)
    - Iterates over all batches in the DataLoader without computing gradients
    - Computes model predictions (logits)
    - Calculates loss and overall accuracy
    - Returns average loss and accuracy across the dataset

    Args:
        model (torch.nn.Module): The model to be evaluated
        dataloader (torch.utils.data.DataLoader): Provides test/validation batches
        criterion (torch.nn.Module): Loss function (e.g., CrossEntropyLoss)
        device (torch.device): Target device ("cpu" or "cuda")

    Returns:
        avg_loss (float): Average loss over all batches
        accuracy (float): Overall prediction accuracy (in %)
    """
    model.eval()
    total_loss = 0.0
    correct_predictions = 0
    total_predictions = 0

    with torch.no_grad():
        for X, Y, lengths in dataloader:
            X, Y, lengths = X.to(device), Y.to(device), lengths.to(device)

            logits = model(X, lengths)
            logits_flat = logits.view(-1, 29)
            Y_flat = Y.view(-1)

            loss = criterion(logits_flat, Y_flat)
            total_loss += loss.item()

            _, predicted = torch.max(logits_flat, dim=1)
            correct_predictions += (predicted == Y_flat).sum().item()
            total_predictions += Y_flat.size(0)

    avg_loss = total_loss / len(dataloader) if len(dataloader) > 0 else 0.0
    accuracy = 100.0 * correct_predictions / total_predictions if total_predictions > 0 else 0.0
    return avg_loss, accuracy

DATASET PREPARATION

In [5]:
class WordleSequenceDataset(Dataset):
    """
    A custom PyTorch Dataset for handling Wordle gameplay data in sequence format.
    - Each game (identified by 'gameid') contains multiple attempts (rows).
    - Each attempt has encoded features for guesses and feedback.
    - The dataset groups and converts them into sequential tensors for model training.
    """
    def __init__(self, data_frame):
        super().__init__()

        self.sequences = []
        self.labels = []
        self.lengths = []
        # Group all attempts by game ID (each game is treated as one sequence)
        grouped = data_frame.groupby("gameid")
        # Process each game sequence separately
        for gameid, group in grouped:
            group_sorted = group.sort_values(by="attempt_index")

            # Input vector X:
            # - pg1..pg5: previous guessed letters
            # - l1..l5: feedback for each letter (e.g., correct/wrong position)
            # Combined feature dimension = 10 per timestep
            X = group_sorted[[
                "pg1", "pg2", "pg3", "pg4", "pg5",
                "l1", "l2", "l3", "l4", "l5"
                        ]].values

            # Target vector Y:
            # - t1..t5: target word (true letters encoded as integers)
            Y = group_sorted[["t1", "t2", "t3", "t4", "t5"]].values

            X_tensor = torch.tensor(X, dtype=torch.long)
            Y_tensor = torch.tensor(Y, dtype=torch.long)

            self.sequences.append(X_tensor)
            self.labels.append(Y_tensor)
            self.lengths.append(X_tensor.size(0))

    def __len__(self):
        # Return the number of games in the dataset
        return len(self.sequences)

    def __getitem__(self, idx):
        # Return one game sequence (X, Y, length)
        return self.sequences[idx], self.labels[idx], self.lengths[idx]


def collate_fn(batch):
    """
    Custom collate function for DataLoader.
    Pads sequences of different lengths so they can form uniform batches.
    Each batch is a list of tuples: (X_seq, Y_seq, seq_len)
    """
    sequences = [item[0] for item in batch]
    labels = [item[1] for item in batch]
    lengths = [item[2] for item in batch]
    # Pad sequences with zeros to match the length of the longest sequence in the batch
    # This allows variable-length games to be trained together efficiently
    padded_X = pad_sequence(sequences, batch_first=True)
    padded_Y = pad_sequence(labels, batch_first=True)
    # Convert lengths to a tensor (used later for masking or packing)
    lengths_tensor = torch.tensor(lengths, dtype=torch.long)

    return padded_X, padded_Y, lengths_tensor

In [6]:
from google.colab import drive
import os
drive.mount('/content/drive')
save_dir = "/content/drive/MyDrive/colab_kfold_results"
os.makedirs(save_dir, exist_ok=True)

Mounted at /content/drive


In [10]:
def kfold_training_save_report(df, model_class, k=5,lr=0.001, batch_size=16, num_epochs=10, device="cpu",
                               random_state=42, save_path="best_model.pth", report_path="training_report.csv"):
    """
    Performs K-Fold Cross-Validation on Wordle dataset, saves the best model,
    and writes training report to CSV.

    Args:
        df (pd.DataFrame): Full dataset with 'gameid' column.
        model_class: Model class (e.g., WordleLSTM).
        k (int): Number of folds.
        batch_size (int): Batch size for DataLoader.
        num_epochs (int): Training epochs per fold.
        device (str): "cpu" or "cuda".
        random_state (int): Random seed for reproducibility.
        save_path (str): File path to save the best model.
        report_path (str): File path to save training report CSV.

    Returns:
        fold_results (list of tuples): (train_loss, train_acc, test_loss, test_acc) for each fold.
    """

    all_gameids = df['gameid'].unique()
    kf = KFold(n_splits=k, shuffle=True, random_state=random_state)
    fold_results = []

    best_test_acc = 0.0
    best_model_state = None

    # Records for CSV
    records = []

    for fold, (train_index, test_index) in enumerate(kf.split(all_gameids)):
        print(f"\n=== Fold {fold+1}/{k} ===")

        # ID setlerini oluştur
        train_ids = set(all_gameids[train_index])
        test_ids = set(all_gameids[test_index])

        # Mask ile filtreleme
        train_mask = df['gameid'].map(lambda x: x in train_ids)
        test_mask = df['gameid'].map(lambda x: x in test_ids)
        train_df = df[train_mask]
        test_df = df[test_mask]

        train_dataset = WordleSequenceDataset(train_df)
        test_dataset = WordleSequenceDataset(test_df)

        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
        test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

        model = model_class(
                    vocab_size=29,      # letter numbers in alphabet
                    letter_embedding_dim=16,    # Previous Guess Embedding dimension
                    feedback_embedding_dim=4,   # Feedback Embedding dimension
                    hidden_dim=256,     # LSTM hidden dimension
                    num_layers=4,       # LSTM layer number
                    dropout=0.3         # Drop out
                ).to(device)


        optimizer = torch.optim.Adam(model.parameters(), lr=lr)
        criterion = torch.nn.CrossEntropyLoss()

        # Training loop
        for epoch in range(num_epochs):
            print(f"Fold {fold+1}, Epoch {epoch+1}: ")
            train_loss, train_acc = train(model, train_loader, optimizer, criterion, device)
            test_loss, test_acc = test(model, test_loader, criterion, device)

            print(f"Train Loss {train_loss:.4f}, Train Acc {train_acc:.2f}%, "
                  f"Test Loss {test_loss:.4f}, Test Acc {test_acc:.2f}%")

            # Save metrics for CSV
            records.append({
                "fold": fold+1,
                "epoch": epoch+1,
                "train_loss": train_loss,
                "train_acc": train_acc,
                "test_loss": test_loss,
                "test_acc": test_acc
            })


            # Save best model
            if test_acc > best_test_acc:
                best_test_acc = test_acc
                best_model_state = model.state_dict()


        fold_results.append((train_loss, train_acc, test_loss, test_acc))

        # CSV’ye ekle (append)
        df_report = pd.DataFrame(records)
        df_report.to_csv(os.path.join(save_dir,report_path), mode='a', index=False, header=False)
         # en iyi modeli kaydet
        model_path = os.path.join(save_dir, save_path)
        torch.save(best_model_state, model_path)

    print(f"\nTraining report saved at: {report_path}")
    print(f"Best Test Accuracy across folds: {best_test_acc:.2f}%")
    print(f"Best model saved at: {save_path}")

    return fold_results


In [13]:
def main(csv_path="",
         epochs=1,
         batch_size=1,
         lr=1e-3,
         shuffle=True):

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Running on: {device}")


    df = pd.read_csv(csv_path,header=None)
    df.columns = [
    "gameid",
    "attempt_index",
    "pg1", "pg2", "pg3", "pg4", "pg5",
    "l1", "l2", "l3", "l4", "l5",
    "g1", "g2", "g3", "g4", "g5",
    "t1", "t2", "t3", "t4", "t5"
    ]

    # Kontrol
    print(df.head())
    results = kfold_training_save_report(
    df=df,
    model_class=WordleLSTM,
    k=5,lr=lr,
    batch_size=batch_size,
    num_epochs=epochs,
    device=device,
    save_path="best_wordle_model.pth",
    report_path="training_report.csv"
    )

In [14]:
if __name__ == "__main__":
    main(
        csv_path="/content/turkish1000000.csv",
        epochs=100,
        batch_size=256,
        lr=1e-4,
        shuffle=True
    )


Running on: cuda
   gameid  attempt_index  pg1  pg2  pg3  pg4  pg5  l1  l2  l3  ...  g1  g2  \
0       0              0    1   24   27   17   23   0   0   0  ...   9  17   
1       0              1    9   17   19   19    0   0   2   0  ...   2  17   
2       0              2    2   17   22   15    0   0   2   0  ...  14  17   
3       0              3   14   17   16    7    0   0   2   0  ...   6  17   
4       0              4    6   17    4   20    0   2   2   0  ...   6  17   

   g3  g4  g5  t1  t2  t3  t4  t5  
0  19  19   0   6  17  20  28   0  
1  22  15   0   6  17  20  28   0  
2  16   7   0   6  17  20  28   0  
3   4  20   0   6  17  20  28   0  
4  20  28   0   6  17  20  28   0  

[5 rows x 22 columns]

=== Fold 1/5 ===
Fold 1, Epoch 1: 
Train Loss 1.7899, Train Acc 56.54%, Test Loss 1.2978, Test Acc 61.76%
Fold 1, Epoch 2: 
Train Loss 1.2421, Train Acc 63.15%, Test Loss 1.1635, Test Acc 65.19%
Fold 1, Epoch 3: 
Train Loss 1.1649, Train Acc 65.18%, Test Loss 1.1140, Test A