<a href="https://colab.research.google.com/github/Amanuel-jissa/Hnagman_game-machine/blob/main/Final_Trexquant_training__model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# A CNN+Bi-LSTM+GCN approch to hangman game player model.

 The model i have worked on uses an architecture of multiple tools to predict the optimal next character guess.
 Specifically, the model combines three distinct neural network methods:

 1. CNNs:- This NN helps the model capture information that is suited to answer questions like "When I see the letters 't', 'h', '_' next to each other in that order, what is the most likely character for the blank?". I'm more familiar with the application of CNN in a 2D image, for instance a NN might learn to recognize a specific part/section of an image by finding a specific positional arrangement of pixels like a small dark circle (pupil) inside a larger light circle (iris). The filter only cares about that local pattern of pixels.

  However, in the Hangman AI, i considered the space as the 1D sequence of the word mask. The CNN's Conv1d layer slides a small window which might be 3, 3 or 5 across this sequence. This filter is a learned set of weights that activates when it faces a specific and recognizable pattern of characters and blanks.





 2. Bi-LSTM:- i used this to process the word mask
    as a sequence, capturing contextual and long-range dependencies between characters.
    First the word mask is converted into a sequence of integer indices which are then passed through an embedding layer to create dense vector representation for each character.
    To do this I cconstructed a multi-layer bidirectional LSTM that processes the embedded sequence. This was in hopes of the model allowing the network to gather information from both left-to-right and right-to-left contexts for every position in the sequence. The final hidden states from the forward and backward passes of the last layer are concatenated to form the LSTM feature vector.



 3. GCNs:- I implemented this tool to model the intrinsic relationships between
    characters in the English language. after a graph is constructed where characters are nodes
    and edges are weighted by co-occurrence frequency, the code then learns
    from the structural properties of the language itself.

    The code is trained on a dataset of game states that are generated through simulations against an oracle. This oracle calculates the true character frequency distribution from all
    possible words consistent with the current game state. The model's loss function is a
    combination of standard cross-entropy and a KL divergence term which encourages the model's output distribution to align with the oracle's.


In [None]:
from __future__ import annotations
import os
import random
import string
import pickle
import time
from pathlib import Path
from collections import defaultdict, Counter
from functools import lru_cache
from itertools import combinations
import numpy as np
import networkx as nx
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from tqdm.auto import tqdm
from torch.cuda.amp import autocast, GradScaler
from torch.optim.lr_scheduler import LinearLR, CosineAnnealingLR, SequentialLR

# CONFIGURATION

In [None]:
dictionary_file_path = "/content/drive/MyDrive/trained1/sim/words_250000_train.txt"
model_save_directory = "/content/drive/MyDrive/trained2"
cached_simulation_path = "/content/drive/MyDrive/trained1/sim/0_77_simulated_game_states.pkl"
Path(model_save_directory).mkdir(exist_ok=True, parents=True)
# parameters
max_word_len = 50
max_lives = 6
alphabet = list(string.ascii_lowercase)
blank_char = "_"
# the character vocabulary includes all letters and the blank character
char_vocab_size = len(alphabet) + 1
life_feature_dim = max_lives + 1 # input dimension for the lives remaining feature (0 to 6)
# for the sake of reproducibility and comparing models with different updates.
random_seed = 2
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
random.seed(random_seed); np.random.seed(random_seed); torch.manual_seed(random_seed)
if torch.cuda.is_available(): torch.cuda.manual_seed_all(random_seed)
# Hyperparameters for the training
cnn_layer_1_filters, cnn_layer_2_filters, cnn_layer_3_filters = 1024, 512, 256
embedding_dim, lstm_hidden_dim = 256, 512
batch_size = 1024
total_epochs = 30
warmup_lr, max_lr = 2e-4, 2e-3
cosine_annealing_period = 24
label_smoothing_factor = 0.05
gradient_clip_value = 2.0
# We gradually introduce the KL divergence loss to stabilize initial training.
kl_annealing_decay = 1.0
kl_warmup_epochs = 4
kl_start_weight, kl_min_weight = 0.21, 0.02
# Training strategy
early_stopping_patience = 4
endgame_oversample_factor = 3  # Re-add states from near-solved games to focus on the endgame
hard_word_min_len, hard_word_max_len = 4, 10
hard_word_oversample_factor = 4 # Add more mediumlength words, which are often trickier

In [None]:
# loading and preparing the dictionary
with open(dictionary_file_path) as f:
    all_words_from_file = [word.strip().lower() for word in f if word.strip().isalpha()]
print("loading dictionanry")
# split into a main set for training or simulation and a holdout set for validation(2% of the training set)
random.shuffle(all_words_from_file)
split_index = int(0.98 * len(all_words_from_file))
main_word_list = all_words_from_file[:split_index]
holdout_word_list = all_words_from_file[split_index:]
# this potentially will group words by length for fasterlookups by the oracle
words_by_length = defaultdict(list)
for word in main_word_list:
    words_by_length[len(word)].append(word)

#coverts word lists to numpy character arrays to increase speed
char_arrays_by_length = {
    length: np.array([list(word) for word in word_list], dtype="<U1")
    for length, word_list in words_by_length.items() if length <= max_word_len
}

# calculate positional letter frequencies
positional_char_freqs = np.zeros((max_word_len, len(alphabet)), np.float32)
position_counts = np.zeros(max_word_len)
for word in main_word_list:
    for i, char in enumerate(word[:max_word_len]):
        positional_char_freqs[i, alphabet.index(char)] += 1
        position_counts[i] += 1
# normalizing to get probabilities
positional_char_freqs /= position_counts[:, None] + 1e-8 # add small epsilon to avoid division by zero

loading dictionanry


In [None]:
#build graph based features
char_to_index = {c: i for i, c in enumerate(alphabet)}
num_chars = len(alphabet)
# build a graph where nodes are letters and edges represent co-occurrence in words
cooccurrence_counts = np.zeros((num_chars, num_chars), dtype=np.uint32)
for word in main_word_list:
    # for every pair of unique letters in a word increment their co-occurrence count
    for char1, char2 in combinations(set(word), 2):
        idx1, idx2 = char_to_index[char1], char_to_index[char2]
        cooccurrence_counts[idx1, idx2] += 1
        cooccurrence_counts[idx2, idx1] += 1 # symmetric relationship

# normalize to create a transitionlike matrix
cooc_matrix = cooccurrence_counts.astype(np.float32)
cooc_matrix /= cooc_matrix.sum(axis=1, keepdims=True) + 1e-9


In [None]:

#  this script analyzes a given dictionary file to recommend a data-driven
#  alpha (damping factor) for the PageRank algorithm.
#
#  It does thid by
#  1. Building a character co-occurrence graph from the dictionary.
#  2. Analyzing the graph's structural properties (density and clustering).
#  3. Applying heuristics to adjust a baseline alpha, making it tailored
#     to the specific structure of the language in the dictionary.
import networkx as nx
import numpy as np
from itertools import combinations
import string
from pathlib import Path
def analyze_dictionary_and_recommend_alpha(file_path: str) -> float:
    """
    Reads a dictionary, builds a character graph, and recommends a PageRank alpha.
    args:
        file_path: The full path to the dictionary text file.
    returns:
        The recommended alpha value as a float.
    """
    with open(file_path) as f:
        # Read words, ensuring they are clean and lowercase
        words = [word.strip().lower() for word in f if word.strip().isalpha()]


    character_graph = nx.from_numpy_array(cooccurrence_counts, create_using=nx.Graph)

    density = nx.density(character_graph)
    print(f"Graph Density is {density:.4f}")

    # Metric 2: Average Clustering Coefficient
    # Measures the "cliqueness" of the graph. High clustering means that letters
    # that are connected to the same letter are also likely connected to each other.
    # We use the 'weight' parameter to consider the strength of connections.
    avg_clustering = nx.average_clustering(character_graph, weight='weight')
    print(f"Average Clustering Coefficient: {avg_clustering:.4f}")
    # Heuristic for Density:
    # If the graph is very dense, the links are less meaningful, so we should
    # rely more on random "teleportation" (i.e., a lower alpha).
    # If the graph is sparse, links are rare and important, so we should follow
    # them more often (i.e., a higher alpha).
    # NOTE: These thresholds are examples and can be tuned.
    base_alpha=0.85
    if density > 0.85:
        print("Heuristic Applied: Graph is very dense. Lowering alpha.")
        base_alpha -= 0.05
    elif density < 0.50:
        print("Heuristic Applied: Graph is sparse. Increasing alpha.")
        base_alpha += 0.05

    # Heuristic for Clustering:
    # If the graph is highly clustered, it has meaningful communities of letters
    # (like 's', 't', 'r'). We want to encourage the algorithm to explore these
    # local communities, so we increase alpha.
    # NOTE: This threshold is an example and can be tuned.
    if avg_clustering > 0.65:
        print("Heuristic Applied: Graph is highly clustered. Increasing alpha.")
        base_alpha += 0.05

    # --- Step 5: Finalize and Return the Recommended Value ---
    # Ensure the final alpha stays within a reasonable, stable range.
    final_alpha = max(0.70, min(0.99, 0.85))


    print(f"recommended Alpha value iss: {final_alpha:.4f}")


    return final_alpha

if __name__ == "__main__":
    recommended_alpha = analyze_dictionary_and_recommend_alpha(dictionary_file_path)



Graph Density is 1.0000
Average Clustering Coefficient: 0.1567
Heuristic Applied: Graph is very dense. Lowering alpha.
recommended Alpha value iss: 0.8500


as expected graph density is 1.0 which is saying that the character co-occurrence graph is fully connected or as we call it in math, its a complete graph.

In the context of this project, this would mean that for every possible pair of the 26 letters in the alphabet, there is at least one word in the dictionary file that contains both of those letters.

the clustering coefficient is a measure of how "cliquey" a network is where an average clustering value of 0 indicates low clustering and 1 indicates high clustering. an average clustering of 15.67% indicate that its low. we can understanf this by looking at the following example, for instance if a letter 'C' is frequently found in words with two other letters like 'H' and 'T', there is only a 15.7% chance that 'H' and 'T' are also strongly connected to each other.

In [None]:
"""this part of the code is converting the raw data on letter pairs into a network.
it does this so that the model can apply advanced graph algorithms to measure the strategic
importance of each letter, which we will then use to help the model make smarter guesses.
I gOt the idea from https://pi.math.cornell.edu/~mec/Winter2009/RalucaRemus/Lecture3/lecture3.html """
character_graph = nx.from_numpy_array(cooccurrence_counts, create_using=nx.Graph)
# ffeature 1- pagerank - identifies generally important letters
pagerank_scores = nx.pagerank(character_graph, alpha=0.8, weight="weight")
pagerank_vector = np.array([pagerank_scores[i] for i in range(num_chars)], dtype=np.float32)
# feature 2- personalized pageRank - measures proximity of all letters to a given letter
# we pre-calculate the PPR vector for each possible starting letter.
def calculate_personalized_pagerank(seed_char_index, alpha=0.85):
    personalization_vector = {i: 0.0 for i in range(num_chars)}
    personalization_vector[seed_char_index] = 1.0
    ppr = nx.pagerank(character_graph, alpha=alpha, weight="weight", personalization=personalization_vector)
    return np.array([ppr[i] for i in range(num_chars)], dtype=np.float32)

ppr_matrix = np.stack([calculate_personalized_pagerank(i) for i in range(num_chars)], axis=1)
@lru_cache(maxsize=None)
def build_game_state_features(mask: str):
    """creates a feature vector from the current game mask and revealed letters."""
    padded_mask = mask.ljust(max_word_len, blank_char)[:max_word_len]

    # feature Set 1- Positional character frequencies for all blank spots
    pos_freq_features = [
        positional_char_freqs[i, j] if padded_mask[i] == blank_char else 0.0
        for i in range(max_word_len) for j in range(len(alphabet))
    ]
    base_features = np.array(pos_freq_features, np.float32)

    # feature Set 2- Graph-based features derived from revealed letters
    revealed_chars = [char for char in padded_mask if char != blank_char]
    if revealed_chars:
        revealed_indices = [char_to_index[ch] for ch in revealed_chars]
        # Sum of co-occurrence scores with all revealed letters
        cooc_sum_features = cooc_matrix[:, revealed_indices].sum(axis=1)
        # average ppr from the perspective of each revealed letter
        ppr_avg_features = ppr_matrix[:, revealed_indices].mean(axis=1)
    else:
        # if no letters are revealed, these features are zero
        cooc_sum_features = np.zeros(num_chars, np.float32)
        ppr_avg_features = np.zeros(num_chars, np.float32)

    # Z-score normalization helps stabilize the features
    def z_normalize(v):
        return (v - v.mean()) / (v.std() + 1e-6)

    # concatenate all features into a single vector
    graph_features = np.concatenate([pagerank_vector, z_normalize(cooc_sum_features), z_normalize(ppr_avg_features)], axis=0)
    return np.concatenate([base_features, graph_features], axis=0)

# we define the feature dimension dynamically from a sample call
feature_vector_dim = len(build_game_state_features(blank_char * max_word_len))

print("the dimension of the feature is=", feature_vector_dim)

the dimension of the feature is= 1378


In [None]:
@lru_cache(maxsize=16384)
def oracle(mask: str, guessed_letters: frozenset):
    """
    this function calculates the ground-truth probability distribution of the next
    correct letter, given the current state, by looking at the entire dictionary.
    we use thhis to generate the better training targets for the model.
    """
    word_len = len(mask)

    # in case we dont have any words of this length in our dictionary we can't make an
    # informed guess so it returns a uniform probability distribution over all letters.
    # this safety case is more useful when we consider small subsets of the dictionary for
    # experimenting on better hyperparameters.
    if word_len not in char_arrays_by_length:
        return np.full(len(ALPHABET), 1 / len(ALPHABET), np.float32)

    # Retrieves the pre-computed numpy array of all words of the correct length.
    candidate_words = char_arrays_by_length[word_len]

    # create a boolean array, initially all true, to mark which words are still valid candidates.
    is_valid = np.ones(len(candidate_words), bool)
    guessed_list = list(guessed_letters)

    # loop through each character of the current mask
    for i, char in enumerate(mask):
        # if the character is a revealed letter (not a blank).
        if char != BLANK:
            # A valid candidate word MUST have this same letter at this exact position.
            # I used '&=' to progressively eliminate words that don't match.
            is_valid &= (candidate_words[:, i] == char)
        # if the character is a blank, and we have guessed some letters.
        elif guessed_list:
            # the letter at this blank position can't be one of the previously guessed letters
            # (since those guesses were either wrong or are already revealed elsewhere).
            is_valid &= ~np.isin(candidate_words[:, i], guessed_list)
    valid_candidates = candidate_words[is_valid]

    # if our filtering rules have eliminated all possible words, return a uniform distribution.
    if valid_candidates.size == 0:
        return np.full(len(ALPHABET), 1 / len(ALPHABET), np.float32)

    # count letter frequencies in the remaining valid words

    # flaten the 2D array of candidate words into a single 1D array of all possible letters.
    flat_chars = valid_candidates.ravel()
    unguessed_chars = flat_chars[~np.isin(flat_chars, guessed_list)]

    # if there are no unguessed letters left in any of the candidate words, we can't proceed.
    if unguessed_chars.size == 0:
        return np.full(len(ALPHABET), 1 / len(ALPHABET), np.float32)

    # calculate the final probability distribution ---
    char_counts = np.zeros(len(ALPHABET), np.float32)
    unique_chars, counts = np.unique(unguessed_chars, return_counts=True)
    for char, count in zip(unique_chars, counts):
        char_counts[ALPHABET.index(char)] = count
    # this gives us the probability of each letter being the correct next guess.
    return char_counts / char_counts.sum()

The function eblow is basically a "game simulator" whose only job is to create a good quality dataset for the initial training of the model. It plays thousands of games against itself with the help of the oracle. to do this the function loops through a list of words. For each word, it resets the game state (no letters guessed, no wrong guesses). The while loop continues until the game is won or lost. At every turn, the simulator calls the oracle function. The oracle looks at the entire dictionary and calculates the perfect, ground truth probability distribution for the next guess. so for example, it might determine that 'e' has a 12% chance of being correct, 's' has a 9% chance, and so on. but the simulator always choses the single best letter from the oracle so it would create a very boring and narrow training set. The model would only learn about the perfect path and would be confused if it ever encountered a slightly suboptimal state which is why i used a parameter called temperature to control how much we either stick to the oracle's advice or explore other options(high temperature).
so if temperature is low like 0.2, then 1 / temperature is a large number 5. Raising the probabilities to the 5th power will make the highest probability muchlarger than the others. and causes the simulation tow almost always pick the oracle's best guess.
But if temperature is high, say 2, then 1 / temperature is a small number (0.5). Taking the square root of the probabilities makes them all closer to each other. This gives unlikely letters a better chance of getting picked.

In [None]:
def generate_training_data(words_to_play: list[str], temperature: float = 0.80):
    """
    function simulates hangman games to generate training samples.
    it uses the oracle to make probabilistic guesses, creating a dataset
    of tuples.
    """
    training_samples = []
    for word in tqdm(words_to_play, desc="simulating gameplay", leave=False):
        secret_word = word[:max_word_len]
        guessed_so_far, wrong_guesses = set(), 0

        while wrong_guesses < max_lives:
            # create the current mask
            mask = "".join(c if c in guessed_so_far else blank_char for c in secret_word)
            if blank_char not in mask:
                break # when word is solved

            # get the better probability distribution from the oracle
            oracle_probabilities = oracle(mask, frozenset(guessed_so_far)).copy()

            # to generate varied gameplay, we sample from the oracle's distribution
            # instead of always picking the best letter.
            sampling_probs = oracle_probabilities.copy()
            for g in guessed_so_far: # Can't guess the same letter twice
                sampling_probs[alphabet.index(g)] = 0
            if sampling_probs.sum() < 1e-8:
                break
            sampling_probs = np.power(sampling_probs + 1e-9, 1 / temperature)
            sampling_probs /= sampling_probs.sum()


            chosen_guess = np.random.choice(alphabet, p=sampling_probs)
            lives_remaining = max_lives - wrong_guesses

            training_samples.append((
                mask,
                guessed_so_far.copy(),
                chosen_guess,
                oracle_probabilities,
                build_game_state_features(mask),
                lives_remaining
            ))
            guessed_so_far.add(chosen_guess)
            if chosen_guess not in secret_word:
                wrong_guesses += 1
    # this part of the code is added later after testing the model. i collected data on which word length
    #that the model underperforms and used that data to oversample games.
    endgame_states = [sample for sample in training_samples if sample[0].count("_") <= 2]
    training_samples += endgame_states * (endgame_oversample_factor - 1)
    random.shuffle(training_samples)
    return training_samples

In [None]:
class HangmanDataset(Dataset):
    """simple PyTorch dataset to wrap our list of training samples."""
    def __init__(self, experiences):
        self.experiences = experiences
    def __len__(self):
        return len(self.experiences)
    def __getitem__(self, idx):
        return self.experiences[idx]

def create_mask_tensor(masks: list[str]) -> np.ndarray:
    """converts a batch of string masks into a one hot encoded tensor."""
    batch_size = len(masks)
    local_max_len = min(max_word_len, max(len(m) for m in masks))
    tensor = np.zeros((batch_size, char_vocab_size, local_max_len), np.float32)

    for i, mask_str in enumerate(masks):
        for j, char in enumerate(mask_str[:local_max_len]):
            idx = alphabet.index(char) if char in alphabet else char_vocab_size - 1
            tensor[i, idx, j] = 1.0
    return tensor
def create_guessed_letters_tensor(guessed_sets: list[set]) -> np.ndarray:
    """creates a multi hot tensor indicating which letters have been guessed."""
    tensor = np.zeros((len(guessed_sets), len(alphabet)), np.float32)
    for i, guessed in enumerate(guessed_sets):
        for char in guessed:
            tensor[i, alphabet.index(char)] = 1.0
    return tensor
def collate_batch(batch):
    """processes a batch of samples into tensors for the model."""
    masks, guessed_sets, chosen_letters, oracle_dists, feature_vectors, lives = zip(*batch)
    # padded one hot tensor of the word mask
    mask_tensors = torch.from_numpy(create_mask_tensor(masks))
    # padded sequence of character indices for the LSTM
    seq_len = mask_tensors.shape[2]
    mask_indices_list = []
    for s in masks:
        indices = [alphabet.index(c) if c in alphabet else char_vocab_size - 1 for c in s[:seq_len]]
        indices += [char_vocab_size - 1] * (seq_len - len(indices))
        mask_indices_list.append(indices)
    mask_indices = torch.tensor(mask_indices_list, dtype=torch.long)
    # multi hot tensor of guessed letters
    guessed_chars_tensor = torch.from_numpy(create_guessed_letters_tensor(guessed_sets))
    # stacked tensor of pre computed features
    game_state_features = torch.from_numpy(np.stack(feature_vectors))
    # onehot tensor for remaining lives
    lives_tensor = F.one_hot(torch.tensor(lives), num_classes=life_feature_dim).float()
    # target letter indices for the loss function
    target_letter_indices = torch.tensor([alphabet.index(letter) for letter in chosen_letters])
    # oracle's ground-truth probability distributions
    oracle_distributions = torch.from_numpy(np.stack(oracle_dists))
    return mask_tensors, mask_indices, guessed_chars_tensor, game_state_features, lives_tensor, target_letter_indices, oracle_distributions


The provided GraphConvolutionalOperator class is a PyTorch implementation of a two layer GCN. Its useful to refine the features of each node in a graph by sharing information from its neighbors. In the context of the Hangman AI, the nodes represent the letters of the alphabet, and the graph structure (the adjacency matrix) tells us the relationships between them, such as how often they co-occur in English words. This operator takes a set of initial node features and the graph's adjacency matrix as input. It then produces a single, fixed-size vector that summarizes the entire graph's state after propagating information between the letters.

The main GCN layer is its propagation rule which mathematically defines how node features are updated. The canonical representations for a single GCN layer is: $$ H^{(l+1)} = \sigma(\hat{A} H^{(l)} W^{(l)}) $$
where $H^{(l)}$ is the matrix of node features at layer $l$. Each row corresponds to a node, and each column is a feature. For the first layer, this is the initial input feature matrix, often denoted as $X$. $W^{(l)}$ is the learnable weight matrix for layer $l$. This is the primary parameter that the model learns during training. Its role is to apply a linear transformation to the node features, projecting them into a different feature space. $\hat{A}$ is the normalized adjacency matrix of the graph. This matrix encodes the graph's structure. The element $\hat{A}_{ij}$ is non-zero if node $i$ and node $j$ are connected. The normalization (often symmetric) is crucial for stabilizing the learning process. $\sigma$ is a non-linear activation function, such as ReLU ($\text{max}(0, x)$), applied element-wise. $H^{(l+1)}$ is the matrix of node features for the next layer, which serves as the output of the current layer.

to beter understad this we can consider the product $H^{(l)} W^{(l)}$ as a feture applying a learned linear transformation to the features of every node independently. In the code beow we achieve this by the nn.Linear layers. then the product $\hat{A} (\dots)$ performs the "convolution". For each node, it computes a weighted sum of the feature vectors of its neighbors (as defined by $\hat{A}$). This step smooths features across the graph.

In [None]:
# defining model to be used
class GraphConvolutionalOperator(nn.Module):
    """
    this is a simple GCN layer. I like to consider this as an operator that transforms
    node featuresbased on the graph's structure. It learns a transformation and then
    shares the transformed features to neighboring nodes.
    """
    def __init__(self, input_dim=128, hidden_dim=128, output_dim=128):
        super().__init__()
        # These linear layers are the learnable transformations, like W in the GCN formula
        self.feature_transform_1 = nn.Linear(input_dim, hidden_dim, bias=False)
        self.feature_transform_2 = nn.Linear(hidden_dim, output_dim, bias=False)
    def forward(self, node_features, adjacency_matrix):
        """
        performs the graph convolution operation: o(A*X*W)
        where adjacency_matrix is the (A), the graph structure.
        and the node_features is the (X), the features at each node
        """
        # First layer of propagation
        transformed_features_1 = self.feature_transform_1(node_features)
        propagated_features_1 = torch.relu(adjacency_matrix @ transformed_features_1)

        # Second layer of propagation
        transformed_features_2 = self.feature_transform_2(propagated_features_1)
        propagated_features_2 = adjacency_matrix @ transformed_features_2

        # We take the mean of all character features to get a single graph representation
        return propagated_features_2.mean(dim=1)

class HangmanAI_Network(nn.Module):
    """
    The main neural network for guessing letters. It combines several sub-modules:
    - An LSTM to understand the sequence of the mask
    - A CNN to capture local patterns in the mask.
    - A GCN to reason about character relationships.
    """
    def __init__(self, static_feature_dim: int, emb_dim: int, lstm_h_dim: int, gcn_embedding_dim: int):
        super().__init__()
        #LSTM for sequential mask processing
        self.character_embedding = nn.Embedding(char_vocab_size, emb_dim)
        self.mask_encoder_lstm = nn.LSTM(emb_dim, lstm_h_dim, num_layers=2, batch_first=True, bidirectional=True)
        # CNN for local pattern detection
        self.cnn_layer_1 = nn.Conv1d(char_vocab_size, cnn_layer_1_filters, 3, padding=1)
        self.bn1 = nn.BatchNorm1d(cnn_layer_1_filters)
        self.cnn_layer_2 = nn.Conv1d(cnn_layer_1_filters, cnn_layer_2_filters, 3, padding=1)
        self.bn2 = nn.BatchNorm1d(cnn_layer_2_filters)
        self.cnn_layer_3 = nn.Conv1d(cnn_layer_2_filters, cnn_layer_3_filters, 3, padding=1)
        self.bn3 = nn.BatchNorm1d(cnn_layer_3_filters)
        self.pool = nn.AdaptiveMaxPool1d(1)

        # GCN for character graph reasoning
        self.character_node_embeddings = nn.Parameter(torch.randn(len(alphabet), gcn_embedding_dim))
        self.graph_processor = GraphConvolutionalOperator(
            input_dim=gcn_embedding_dim, hidden_dim=gcn_embedding_dim, output_dim=gcn_embedding_dim
        )
        # the adjacency matrix is fixed, so we register it as a non-trainable buffer
        adj_matrix = cooc_matrix / (cooc_matrix.sum(1, keepdims=True) + 1e-9)
        self.register_buffer("adjacency_matrix", torch.from_numpy(adj_matrix).float())

        # final Combination and Output Head
        combined_feature_dim = (cnn_layer_3_filters + (2 * lstm_h_dim) + len(alphabet) +
                                static_feature_dim + life_feature_dim + gcn_embedding_dim)

        self.combiner_head = nn.Sequential(
            nn.Linear(combined_feature_dim, 512), nn.ReLU(), nn.Dropout(0.2),
            nn.Linear(512, 256), nn.ReLU(), nn.Dropout(0.1)
        )
        self.output_layer = nn.Linear(256, len(alphabet))

    def forward(self, mask_tensors, mask_indices, guessed_chars_tensor, static_features, lives_tensor):
        # process mask with LSTM
        embedded_sequence = self.character_embedding(mask_indices)
        _, (last_hidden_state, _) = self.mask_encoder_lstm(embedded_sequence)
        # concatenate the final forward and backward hidden states
        lstm_features = torch.cat([last_hidden_state[-2], last_hidden_state[-1]], dim=1)

        # process mask with CNN
        cnn_out = F.relu(self.bn1(self.cnn_layer_1(mask_tensors)))
        cnn_out = F.relu(self.bn2(self.cnn_layer_2(cnn_out)))
        cnn_out = F.relu(self.bn3(self.cnn_layer_3(cnn_out)))
        cnn_features = self.pool(cnn_out).squeeze(2)
        # process character graph with GCN
        batch_size = mask_tensors.size(0)
        # Expand the character embeddings for each item in the batch
        node_features = self.character_node_embeddings.unsqueeze(0).expand(batch_size, -1, -1)
        gcn_features = self.graph_processor(node_features, self.adjacency_matrix)

        # concatenate all features and predict
        full_feature_vector = torch.cat([
            cnn_features, lstm_features, guessed_chars_tensor, static_features, lives_tensor, gcn_features
        ], dim=1)

        combined_output = self.combiner_head(full_feature_vector)
        return self.output_layer(combined_output)


In [None]:
# Agent code
class HangmanAgent:
    """
    an advanced agent that uses a hybrid strategy for guessing letters.
    - It uses the trained neural network to get a shortlist of promising letters.
    - It then uses a typical information gain approach on this shortlist to
       make a final decision, which adds robustness.
    """
    def __init__(self, model: nn.Module, device: torch.device):
        self.device = device
        self.alphabet = list(string.ascii_lowercase)
        self.model = model
        self.model.to(self.device)
        self.model.eval()
        # The agent needs access to the dictionary to calculate information gain
        self.char_arrays_by_length = char_arrays_by_length
    @lru_cache(maxsize=2048)
    def find_possible_words(self, mask: str, guessed: frozenset) -> np.ndarray:
        #efficiently filters the dictionary to find all words consistent with the current game state.
        L = len(mask)
        if L not in self.char_arrays_by_length:
            return np.empty((0, L), "<U1")
        arr = self.char_arrays_by_length[L]
        is_valid = np.ones(len(arr), bool)
        glist = list(guessed)
        for i, ch in enumerate(mask):
            if ch != blank_char:
                is_valid &= (arr[:, i] == ch)
            elif glist:
                is_valid &= ~np.isin(arr[:, i], glist)
        return arr[is_valid]
    @torch.no_grad()
    def decide_next_guess(self, mask: str, guessed_letters: frozenset, wrong_guesses: int) -> str:
        """The core decision-making logic of the agent."""
        # Get predictions from the neural network
        mask_tensor = torch.from_numpy(create_mask_tensor([mask])).to(self.device)
        mask_len = mask_tensor.shape[2]
        indices = [self.alphabet.index(c) if c in self.alphabet else char_vocab_size - 1 for c in mask[:mask_len]]
        indices += [char_vocab_size - 1] * (mask_len - len(indices))
        mask_indices = torch.tensor([indices], dtype=torch.long, device=self.device)
        guessed_tensor = torch.from_numpy(create_guessed_letters_tensor([guessed_letters])).to(self.device)
        features_tensor = torch.from_numpy(build_game_state_features(mask)[None, :]).to(self.device)
        lives_left = max_lives - wrong_guesses
        lives_tensor = F.one_hot(torch.tensor([lives_left], device=self.device), num_classes=life_feature_dim).float()
        logits = self.model(mask_tensor, mask_indices, guessed_tensor, features_tensor, lives_tensor)[0]
        # mask out already-guessed letters before applying softmax to get a clean distribution
        for g in guessed_letters:
            if g in self.alphabet:
                logits[self.alphabet.index(g)] = -torch.inf
        probabilities = F.softmax(logits, dim=0).cpu().numpy()
        # Information Gain to re-rank the top candidates
        num_candidates_from_model = 7
        top_indices = np.argsort(-probabilities)[:num_candidates_from_model]
        model_candidates = [self.alphabet[i] for i in top_indices if self.alphabet[i] not in guessed_letters]
        if not model_candidates: # Fallback if all top candidates were somehow already guessed
            return next(c for c in self.alphabet if c not in guessed_letters)
        possible_words = self.find_possible_words(mask, guessed_letters)
        num_possible_words = len(possible_words)
        # If the word space is very small, just trust the model's best guess.
        if num_possible_words <= 1:
            return model_candidates[0]
        best_letter, min_expected_size = None, float('inf')
        # to keep this step fast, we sample from the candidate words if the list is too long.
        max_words_for_ig = 1000
        if num_possible_words > max_words_for_ig:
            sample_indices = np.random.choice(num_possible_words, max_words_for_ig, replace=False)
            info_gain_word_sample = possible_words[sample_indices]
        else:
            info_gain_word_sample = possible_words

        sample_size = len(info_gain_word_sample)
        for letter in model_candidates:
            # we want the guess that best splits the remaining possible words.
            # a good method for this is minimizing the expected size of the remaining word list.
            hit_count = 0
            for word_array in info_gain_word_sample:
                # Check if the letter is in any of the unknown slots of the word
                if letter in word_array[mask == blank_char]:
                    hit_count += 1
            p_hit = hit_count / sample_size
            # Expected size = P(hit)*(size if hit)+P(miss)*(size if miss)
            expected_remaining_size = p_hit * hit_count + (1 - p_hit) * (sample_size - hit_count)
            if expected_remaining_size < min_expected_size:
                min_expected_size = expected_remaining_size
                best_letter = letter
        return best_letter if best_letter is not None else model_candidates[0]
    def play_game(self, secret_word: str) -> bool:
        """Plays a single game of Hangman against a given word."""
        guessed, wrong_guesses = set(), 0
        mask = blank_char * len(secret_word)
        while blank_char in mask and wrong_guesses < max_lives:
            letter_to_guess = self.decide_next_guess(mask, frozenset(guessed), wrong_guesses)
            guessed.add(letter_to_guess)
            if letter_to_guess not in secret_word:
                wrong_guesses += 1
            mask = ''.join(c if c in guessed else blank_char for c in secret_word)
        return blank_char not in mask # Return True for a win, False for a loss

# Training

In [None]:
def get_kl_coefficient(epoch: int) -> float:
    """warming up schedule for the KL divergence loss weight."""
    if epoch < kl_warmup_epochs:
        return (epoch / kl_warmup_epochs) * kl_start_weight
    if epoch < 8:
        return kl_start_weight
    #after a few epochs, start decaying the weight
    return max(kl_start_weight * (kl_annealing_decay ** (epoch - 8)), kl_min_weight)
def run_training_loop(model: nn.Module, dataset: Dataset):
    # split data into training and validation sets
    train_set_size = int(0.9 * len(dataset))
    val_set_size = len(dataset) - train_set_size
    train_set, validation_set = random_split(
        dataset, [train_set_size, val_set_size],
        generator=torch.Generator().manual_seed(random_seed)
    )
    dataloader_config = {'batch_size': batch_size,'collate_fn': collate_batch,'num_workers': 0, 'pin_memory': True}
    train_loader = DataLoader(train_set, shuffle=True, **dataloader_config)
    validation_loader = DataLoader(validation_set, **dataloader_config)
    optimizer = optim.AdamW(model.parameters(), lr=max_lr, weight_decay=1e-5)
    scaler = GradScaler() # gor mixed-precision training
    # learning rate schedule via cosine
    warmup_scheduler = LinearLR(optimizer, start_factor=warmup_lr / max_lr, end_factor=1.0, total_iters=7)
    cosine_scheduler = CosineAnnealingLR(optimizer, T_max=cosine_annealing_period)
    scheduler = SequentialLR(optimizer, [warmup_scheduler, cosine_scheduler], milestones=[7])
    best_validation_loss = float('inf')
    epochs_without_improvement = 0
    best_model_path = None
    for epoch in range(total_epochs):
        model.train()
        total_train_loss = 0.0


        print(f"\n {epoch+1}")
        for batch in train_loader:
            mask_t, mask_idx, guessed_t, features_t, lives_t, targets, oracle_dist = [t.to(device) for t in batch]

            with autocast():
                logits = model(mask_t, mask_idx, guessed_t, features_t, lives_t)
                # loss is a combination of two objectives:
                # - guess the letter correctly (supervised learning).
                # -match the Oracle's probability distribution.
                ce_loss = F.cross_entropy(logits, targets, label_smoothing=label_smoothing_factor)
                kld_loss = F.kl_div(F.log_softmax(logits, -1), oracle_dist, reduction='batchmean')
                loss = ce_loss + get_kl_coefficient(epoch) * kld_loss
            optimizer.zero_grad(set_to_none=True)
            scaler.scale(loss).backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), gradient_clip_value)
            scaler.step(optimizer)
            scaler.update()

            total_train_loss += loss.item() * targets.size(0)
        # --- Validation Phase ---
        model.eval()
        total_validation_loss, validation_samples = 0.0, 0
        with torch.no_grad():
            for batch in validation_loader:
                mask_t, mask_idx, guessed_t, features_t, lives_t, targets, _ = [t.to(device) for t in batch]
                with autocast():
                    logits = model(mask_t, mask_idx, guessed_t, features_t, lives_t)
                    # for validation, we only care about the primary cross entropy loss
                    val_loss_batch = F.cross_entropy(logits, targets, reduction='sum')
                total_validation_loss += val_loss_batch.item()
                validation_samples += targets.size(0)
        avg_train_loss = total_train_loss / train_set_size
        avg_validation_loss = total_validation_loss / validation_samples
        # evaluation on holdout set
        # create a temporary agent with the current model state to check win rate
        validation_agent = HangmanAgent(model=model, device=device)
        num_wins = sum(validation_agent.play_game(w) for w in holdout_word_list[:50])
        win_rate = num_wins / 50
        print(f"Ep{epoch}, Val Loss: {avg_validation_loss:.3f}, Holdout WR: {win_rate*100:5.2f}%")
        # model Checkpointing and Early Stopping
        if avg_validation_loss < best_validation_loss - 1e-4:
            best_validation_loss = avg_validation_loss
            epochs_without_improvement = 0
            model_path = Path(model_save_directory) / f"best_model_epoch_{epoch+1}.pth"
            torch.save(model.state_dict(), model_path)
            best_model_path = model_path
        else:
            epochs_without_improvement += 1
            if epochs_without_improvement >= early_stopping_patience:
                print("Early stoppage")
                break
        scheduler.step()
    return best_model_path


In [None]:
if __name__ == "__main__":
    # generate training data
    with open(cached_simulation_path, "rb") as f:
        gameplay_experiences = pickle.load(f)
    print(f"total training samples: {len(gameplay_experiences):,}")
    full_dataset = HangmanDataset(gameplay_experiences)
    # startthe model
    hangman_model = HangmanAI_Network(static_feature_dim=feature_vector_dim,emb_dim=embedding_dim,lstm_h_dim=lstm_hidden_dim,gcn_embedding_dim=128).to(device)
    # start training
    best_model_file = run_training_loop(hangman_model, full_dataset)
    # final evaluation
    if best_model_file and best_model_file.is_file():
        print("calculating win rate")
        final_model = HangmanAI_Network(static_feature_dim=feature_vector_dim,emb_dim=embedding_dim,lstm_h_dim=lstm_hidden_dim,gcn_embedding_dim=128)
        final_model.load_state_dict(torch.load(best_model_file, map_location=device))
        # create the final agent with the loaded model
        final_agent = HangmanAgent(model=final_model, device=device)
        total_wins = 0
        for word in holdout_word_list:
            if final_agent.play_game(word):
                total_wins += 1
        final_win_rate = total_wins / len(holdout_word_list)
        print(f"final win rate is: {final_win_rate * 100:.2f}% ({total_wins}/{len(holdout_word_list)})")
    else:
        print("\n no model was saved.")

total training samples: 8,096,982


  scaler = GradScaler() # gor mixed-precision training



 1


  with autocast():
  with autocast():


Ep0, Val Loss: 2.276, Holdout WR: 54.00%

 2
Ep1, Val Loss: 2.223, Holdout WR: 60.00%

 3
Ep2, Val Loss: 2.188, Holdout WR: 58.00%

 4


KeyboardInterrupt: 