Imports to the libraries

In [None]:
"""
HANGMAN AI - Final Optimized Code for Robustness and Generalization
(Hyperparameters Tuned for Overfitting Mitigation)
"""

import numpy as np
import random
from collections import defaultdict, Counter
import pickle
import re



HMM Class

In [None]:
# ============================================================================
# PART 1: IMPROVED HMM WITH BETTER GENERALIZATION
# ============================================================================

class ImprovedHMM:
    """
    Improved HMM focusing on generalization and stability.
    """

    def __init__(self, corpus_file, use_validation=True):
        self.corpus_file = corpus_file
        self.words = []
        self.train_words = []
        self.val_words = []
        self.words_by_length = defaultdict(list)

        # Probability models - FULLY CLEANED for pickling (no nested lambdas)
        # Initializes self.position_probs[length] to a dictionary with 'start', 'middle', 'end' keys,
        # each pointing to a Counter/defaultdict. This structure is safe to pickle.
        self.position_probs = {}
        self.length_letter_probs = defaultdict(Counter)
        self.bigram_probs = defaultdict(Counter)
        self.trigram_probs = defaultdict(Counter)
        self.global_freq = Counter()

        # Pattern-based features - Safe for pickling
        self.vowel_positions = defaultdict(lambda: defaultdict(float))
        self.consonant_patterns = defaultdict(Counter)

        self.load_corpus(use_validation)
        self.train()

    def load_corpus(self, use_validation=True):
        """Load and split corpus into train/validation"""
        with open(self.corpus_file, 'r') as f:
            self.words = [word.strip().lower() for word in f.readlines() if word.strip().isalpha()]

        random.shuffle(self.words)
        if use_validation:
            split_idx = int(len(self.words) * 0.85)
            self.train_words = self.words[:split_idx]
            self.val_words = self.words[split_idx:]
        else:
            self.train_words = self.words
            self.val_words = []

        for word in self.train_words:
            self.words_by_length[len(word)].append(word)
            # Initialize position_probs structure here to be pickle-safe
            if len(word) not in self.position_probs:
                self.position_probs[len(word)] = {
                    'start': defaultdict(float),
                    'middle': defaultdict(float),
                    'end': defaultdict(float)
                }

        print(f"Loaded {len(self.train_words)} training words, {len(self.val_words)} validation words")

    def train(self):
        """Train with smoothing for better generalization"""
        print("Training improved HMM...")

        for word in self.train_words:
            length = len(word)
            for pos, letter in enumerate(word):
                rel_pos = 'start' if pos < 2 else ('end' if pos >= length - 2 else 'middle')
                self.position_probs[length][rel_pos][letter] += 1
                self.length_letter_probs[length][letter] += 1
                self.global_freq[letter] += 1
                if letter in 'aeiou':
                    self.vowel_positions[length][pos] += 1
            for i in range(len(word) - 1):
                self.bigram_probs[word[i]][word[i+1]] += 1
            for i in range(len(word) - 2):
                context = word[i:i+2]
                self.trigram_probs[context][word[i+2]] += 1
            consonants = ''.join([c for c in word if c not in 'aeiou'])
            if len(consonants) >= 2:
                self.consonant_patterns[length][consonants[:3]] += 1

        self._normalize_with_smoothing()
        print("Training complete!")

    def _normalize_with_smoothing(self, alpha=0.1):
        """Normalize with Laplace smoothing"""
        all_letters = set('abcdefghijklmnopqrstuvwxyz')

        # Smooth position probabilities
        for length in self.position_probs:
            for pos in self.position_probs[length]:
                total = sum(self.position_probs[length][pos].values()) + alpha * len(all_letters)
                for letter in all_letters:
                    count = self.position_probs[length][pos].get(letter, 0) + alpha
                    self.position_probs[length][pos][letter] = count / total

        # Smooth bigrams and trigrams
        for prob_dict in [self.bigram_probs, self.trigram_probs]:
            for context in list(prob_dict.keys()):
                total = sum(prob_dict[context].values()) + alpha * len(all_letters)
                temp_dict = {}
                for letter in all_letters:
                    count = prob_dict[context].get(letter, 0) + alpha
                    temp_dict[letter] = count / total
                prob_dict[context] = Counter(temp_dict)

    def predict_letters(self, masked_word, guessed_letters):
        """Predict with multiple strategies and ensemble"""
        length = len(masked_word)
        available = set('abcdefghijklmnopqrstuvwxyz') - guessed_letters

        if not available: return {}

        pattern_scores = self._get_limited_pattern_matches(masked_word, guessed_letters, max_matches=100)
        position_scores = self._get_position_scores(masked_word, available)
        ngram_scores = self._get_ngram_scores(masked_word, available)
        freq_scores = self._get_frequency_scores(length, available)
        balance_scores = self._get_balance_scores(masked_word, available)

        combined = {}
        for letter in available:
            score = (
                pattern_scores.get(letter, 0) * 0.25 +
                position_scores.get(letter, 0) * 0.25 +
                ngram_scores.get(letter, 0) * 0.20 +
                freq_scores.get(letter, 0) * 0.15 +
                balance_scores.get(letter, 0) * 0.15
            )
            combined[letter] = score

        total = sum(combined.values())
        if total > 0: return {l: s/total for l, s in combined.items()}
        return self._get_global_probs(available)

    def _get_limited_pattern_matches(self, masked_word, guessed_letters, max_matches=100):
        length = len(masked_word)
        candidates = self.words_by_length.get(length, [])
        if self.val_words: candidates = [w for w in self.val_words if len(w) == length]
        matches = []
        for word in candidates:
            if self._matches_pattern(word, masked_word, guessed_letters):
                matches.append(word)
                if len(matches) >= max_matches: break
        if matches:
            letter_counts = Counter()
            for word in matches:
                for letter in word:
                    if letter not in guessed_letters: letter_counts[letter] += 1
            total = sum(letter_counts.values())
            if total > 0: return {l: c/total for l, c in letter_counts.items()}
        return {}

    def _matches_pattern(self, word, masked_word, guessed_letters):
        if len(word) != len(masked_word): return False
        for w_char, m_char in zip(word, masked_word):
            if m_char != '_' and m_char != w_char: return False
            if m_char == '_' and w_char in guessed_letters: return False
        return True

    def _get_position_scores(self, masked_word, available):
        """Position-based scores, falling back to global freq for sparse lengths."""
        length = len(masked_word)
        scores = defaultdict(float)

        # HMM OPTIMIZATION for sparse data: If length is short or unseen, fall back to global frequency.
        if length not in self.position_probs:
            return {l: self.global_freq.get(l, 1) for l in available}

        for pos, char in enumerate(masked_word):
            if char == '_':
                rel_pos = 'start' if pos < 2 else ('end' if pos >= length - 2 else 'middle')
                for letter in available:
                    scores[letter] += self.position_probs[length][rel_pos].get(letter, 0)
        return scores

    def _get_ngram_scores(self, masked_word, available):
        scores = defaultdict(float)
        for i, char in enumerate(masked_word):
            if char == '_':
                if i > 0 and masked_word[i-1] != '_':
                    prev = masked_word[i-1]
                    for letter in available: scores[letter] += self.bigram_probs.get(prev, {}).get(letter, 0)
                if i < len(masked_word) - 1 and masked_word[i+1] != '_':
                    next_char = masked_word[i+1]
                    for letter in available: scores[letter] += self.bigram_probs.get(letter, {}).get(next_char, 0)
                if i > 1 and masked_word[i-1] != '_' and masked_word[i-2] != '_':
                    context = masked_word[i-2:i]
                    for letter in available: scores[letter] += self.trigram_probs.get(context, {}).get(letter, 0)
        return scores

    def _get_frequency_scores(self, length, available):
        freq_dict = self.length_letter_probs.get(length, {})
        if not freq_dict: freq_dict = self.global_freq

        total = sum(freq_dict.get(l, 0) for l in available)
        if total == 0: return {l: 1.0/len(available) for l in available}
        return {l: freq_dict.get(l, 0)/total for l in available}

    def _get_balance_scores(self, masked_word, available):
        revealed = [c for c in masked_word if c != '_']
        vowels_revealed = sum(1 for c in revealed if c in 'aeiou')
        consonants_revealed = len(revealed) - vowels_revealed
        total_revealed = len(revealed)
        scores = {}
        for letter in available:
            if letter in 'aeiou': scores[letter] = 0.6 if total_revealed > 0 and vowels_revealed / total_revealed < 0.4 else 0.4
            else: scores[letter] = 0.6 if total_revealed > 0 and consonants_revealed / total_revealed < 0.6 else 0.4
        total_score = sum(scores.values())
        if total_score > 0: return {l: s/total_score for l, s in scores.items()}
        return self._get_global_probs(available)

    def _get_global_probs(self, available):
        available_counts = {l: self.global_freq.get(l, 1) for l in available}
        total = sum(available_counts.values())
        return {l: count/total for l, count in available_counts.items()}

    def get_top_global_freq(self, k=10):
        """Returns the top K globally frequent letters."""
        return self.global_freq.most_common(k)

Environment Class

In [None]:
# ============================================================================
# PART 2: HANGMAN ENVIRONMENT
# ============================================================================

class HangmanEnvironment:
    def __init__(self, word_list, max_wrong=6):
        self.word_list = word_list
        self.max_wrong = max_wrong
        self.reset()
    def reset(self, word=None):
        if word: self.target_word = word.lower()
        else: self.target_word = random.choice(self.word_list).lower()
        self.guessed_letters = set()
        self.correct_letters = set()
        self.wrong_guesses = 0
        self.repeated_guesses = 0
        self.game_over = False
        self.won = False
        return self.get_state()
    def get_state(self):
        masked = ''.join(letter if letter in self.correct_letters or letter not in 'abcdefghijklmnopqrstuvwxyz' else '_' for letter in self.target_word)
        return {'masked_word': masked, 'target_word': self.target_word, 'word_length': len(self.target_word), 'guessed_letters': self.guessed_letters.copy(), 'correct_letters': self.correct_letters.copy(), 'wrong_guesses': self.wrong_guesses, 'repeated_guesses': self.repeated_guesses, 'remaining_lives': self.max_wrong - self.wrong_guesses, 'game_over': self.game_over, 'won': self.won}
    def step(self, letter):
        if not isinstance(letter, str) or len(letter) != 1 or not letter.isalpha(): return -10, self.get_state(), self.game_over
        letter = letter.lower()
        if letter in self.guessed_letters: self.repeated_guesses += 1; return -2, self.get_state(), self.game_over
        self.guessed_letters.add(letter)
        if letter in self.target_word:
            self.correct_letters.add(letter); reward = 1 * self.target_word.count(letter)
            if all(char in self.correct_letters for char in self.target_word): self.won = True; self.game_over = True; reward += 50
        else:
            self.wrong_guesses += 1; reward = -5
            if self.wrong_guesses >= self.max_wrong: self.game_over = True; reward = -50
        return reward, self.get_state(), self.game_over

ImprovedRLAgent Class

In [None]:
# ============================================================================
# PART 3: IMPROVED RL AGENT (TUNED)
# ============================================================================

class ImprovedRLAgent:
    """Improved RL agent with tuned regularization and HMM influence"""

    def __init__(self, hmm_model, learning_rate=0.05, discount_factor=0.9,
                 epsilon_start=0.3, epsilon_min=0.05, epsilon_decay=0.9995): # Tuned epsilon_decay
        self.hmm = hmm_model
        self.lr = learning_rate
        self.gamma = discount_factor
        self.epsilon = epsilon_start
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.q_table = defaultdict(lambda: defaultdict(float))
        self.experience_buffer = []
        self.buffer_size = 1000

    def get_state_key(self, state):
        pattern = self._abstract_pattern(state['masked_word'])
        return (pattern, state['remaining_lives'], len(state['guessed_letters']))

    def _abstract_pattern(self, masked_word):
        pattern = masked_word
        pattern = re.sub(r'_{2,}', 'B+', pattern)
        pattern = re.sub(r'(?<=[a-z])_(?=[a-z])', 'X', pattern)
        return pattern[:20]

    def select_action(self, state, training=True):
        available = list(set('abcdefghijklmnopqrstuvwxyz') - state['guessed_letters'])
        if not available: return None
        hmm_probs = self.hmm.predict_letters(state['masked_word'], state['guessed_letters'])

        if training and random.random() < self.epsilon:
            if hmm_probs:
                available_letters = list(set(hmm_probs.keys()) & set(available))
                if available_letters:
                    final_probs = [hmm_probs[l] for l in available_letters]
                    final_probs_sum = sum(final_probs)
                    if final_probs_sum > 0:
                        return np.random.choice(available_letters, p=[p/final_probs_sum for p in final_probs])
            return random.choice(available)

        state_key = self.get_state_key(state)
        best_action = None
        best_score = float('-inf')

        for action in available:
            q_value = self.q_table[state_key].get(action, 0)
            hmm_prob = hmm_probs.get(action, 0.01)
            # TUNED: HMM influence increased to 0.8 to counter overfitting
            combined = 0.2 * q_value + 0.8 * hmm_prob * 10

            if combined > best_score:
                best_score = combined
                best_action = action

        if best_action is None and hmm_probs:
            return max((l for l in available), key=lambda l: hmm_probs.get(l, 0.01))
        return best_action if best_action else random.choice(available)

    def update_q_value(self, state, action, reward, next_state, done):
        state_key = self.get_state_key(state)
        next_state_key = self.get_state_key(next_state)

        current_q = self.q_table[state_key][action]
        if done:
            target_q = reward
        else:
            next_q_values = self.q_table[next_state_key]
            next_available = list(set('abcdefghijklmnopqrstuvwxyz') - next_state['guessed_letters'])
            max_next_q = max(next_q_values.get(a, 0) for a in next_available) if next_available else 0
            target_q = reward + self.gamma * max_next_q

        # TUNED: Regularization factor increased to 0.05 to aggressively penalize memorization
        regularization = 0.05 * current_q
        self.q_table[state_key][action] = current_q + self.lr * (target_q - current_q - regularization)

        if action is not None:
             self.experience_buffer.append((state, action, reward, next_state, done))
             if len(self.experience_buffer) > self.buffer_size:
                 self.experience_buffer.pop(0)

    def replay_experience(self, batch_size=32):
        if len(self.experience_buffer) < batch_size: return
        batch = random.sample(self.experience_buffer, batch_size)
        for state, action, reward, next_state, done in batch:
            self.update_q_value(state, action, reward, next_state, done)

    def decay_epsilon(self):
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

    def train_episode(self, env):
        state = env.reset()
        total_reward = 0
        while not state['game_over']:
            action = self.select_action(state, training=True)
            if action is None: break
            reward, next_state, done = env.step(action)
            total_reward += reward
            self.update_q_value(state, action, reward, next_state, done)
            state = next_state
        if random.random() < 0.1: self.replay_experience()
        return total_reward, state['won'], state['wrong_guesses'], state['repeated_guesses']

    def evaluate_episode(self, env, word=None):
        state = env.reset(word=word)
        while not state['game_over']:
            action = self.select_action(state, training=False)
            if action is None: break
            reward, next_state, done = env.step(action)
            state = next_state
        return state['won'], state['wrong_guesses'], state['repeated_guesses']

Training, Evaluation

In [None]:
# ============================================================================
# PART 4: TRAINING WITH VALIDATION AND HMM PRINT
# ============================================================================

def train_with_validation(corpus_file, num_episodes=10000, eval_interval=1000):
    print("="*60)
    print("TRAINING IMPROVED HANGMAN AI")
    print("="*60)

    print("\nInitializing improved HMM with validation split...")
    hmm = ImprovedHMM(corpus_file, use_validation=True)

    # FIX 3: PRINT REQUESTED HMM DATA
    print("\n--- HMM LINGUISTIC ANALYSIS ---")
    global_freq = hmm.get_top_global_freq(k=3)
    if len(global_freq) >= 3:
        print(f"1st Top Global Letter: **{global_freq[0][0]}** ({global_freq[0][1]} occurrences)")
        print(f"3rd Top Global Letter: **{global_freq[2][0]}** ({global_freq[2][1]} occurrences)")
    elif len(global_freq) > 0:
         print(f"1st Top Global Letter: **{global_freq[0][0]}** ({global_freq[0][1]} occurrences)")
         print("Note: Corpus has less than 3 unique letters for the 3rd rank.")
    else:
        print("Corpus is too small or empty to determine top letters.")
    print("-------------------------------\n")

    print("Initializing environment and agent...")
    env = HangmanEnvironment(hmm.train_words, max_wrong=6)
    val_env = HangmanEnvironment(hmm.val_words, max_wrong=6)

    agent = ImprovedRLAgent(
        hmm,
        learning_rate=0.05,
        discount_factor=0.9,
        epsilon_start=0.3,
        epsilon_min=0.05,
        epsilon_decay=0.9995 # Tuned for slower decay
    )

    print(f"\nTraining for {num_episodes} episodes with validation...")

    best_val_score = 0
    patience_counter = 0
    patience_limit = 3

    for episode in range(num_episodes):
        reward, won, wrong, repeated = agent.train_episode(env)
        agent.decay_epsilon()

        if (episode + 1) % eval_interval == 0:
            val_wins = 0
            val_games = min(200, len(hmm.val_words))
            if val_games > 0:
                words_to_evaluate = random.sample(hmm.val_words, val_games)
                for word in words_to_evaluate:
                    won, _, _ = agent.evaluate_episode(val_env, word=word)
                    if won: val_wins += 1

                val_score = val_wins / val_games
                print(f"Episode {episode+1:5d} | Val Win Rate: {val_score:.2%} | "
                      f"Epsilon: {agent.epsilon:.4f}")

                if val_score > best_val_score:
                    best_val_score = val_score
                    patience_counter = 0
                else:
                    patience_counter += 1

                # FIX 1: Early stopping addresses overfitting
                if patience_counter >= patience_limit:
                    print(f"Early stopping at episode {episode+1}")
                    break
            else:
                 print(f"Episode {episode+1:5d} | Skipping validation (no validation words available).")

    print("\nTraining complete!")
    print(f"Best validation score: {best_val_score:.2%}")
    return agent, hmm, env


def evaluate_agent(agent, env, test_file, num_games=2000):
    print("\n" + "="*60)
    print(f"EVALUATING ON {num_games} GAMES")
    print("="*60)

    try:
        with open(test_file, 'r') as f:
            test_words = [w.strip().lower() for w in f.readlines() if w.strip().isalpha()]
    except FileNotFoundError:
        print(f"Error: Test file '{test_file}' not found.")
        return {'wins': 0, 'success_rate': 0, 'final_score': 0}

    if not test_words:
        print("Error: Test word list is empty.")
        return {'wins': 0, 'success_rate': 0, 'final_score': 0}

    if len(test_words) < num_games:
        test_words = (test_words * (num_games // len(test_words) + 1))[:num_games]
    else:
        test_words = random.sample(test_words, num_games)

    wins, total_wrong, total_repeated = 0, 0, 0

    for i, word in enumerate(test_words):
        won, wrong, repeated = agent.evaluate_episode(env, word=word)
        if won: wins += 1
        total_wrong += wrong
        total_repeated += repeated

        if (i + 1) % 500 == 0:
            print(f"Progress: {i+1}/{num_games} | Win Rate: {wins/(i+1):.2%}")

    success_rate = wins / num_games
    # Uses the mandated scoring formula [cite: 43]
    final_score = success_rate * (2000 - total_wrong * 5 - total_repeated * 2)

    print("\n" + "="*60)
    print("FINAL RESULTS")
    print("="*60)
    print(f"Wins: {wins}/{num_games} ({success_rate:.2%})")
    print(f"Total Wrong: {total_wrong} (Avg: {total_wrong/num_games:.2f})")
    print(f"Total Repeated: {total_repeated} (Avg: {total_repeated/num_games:.2f})")
    print(f"\nFINAL SCORE: {final_score:.2f}")
    print("="*60)

    return {'wins': wins, 'success_rate': success_rate, 'final_score': final_score}


In [None]:
import os


Main Execution

In [None]:

if __name__ == "__main__":
    # Create dummy files for testing
    CORPUS_FILE = 'corpus.txt'
    TEST_FILE = 'test.txt'
    MODEL_FILE = 'hangman_model_improved.pkl'

    # Check if files exist, if not, create dummies
    if not os.path.exists(CORPUS_FILE):
        try:
            with open(CORPUS_FILE, 'w') as f:
                # 10 words for a minimal test
                f.write('apple\nbanana\norange\ngrape\nmango\nkiwi\npeach\nplum\nlemon\nberry\n')
            print(f"Created dummy corpus file: {CORPUS_FILE}")
        except IOError:
            print(f"Error: Could not create dummy corpus file: {CORPUS_FILE}. Cannot run.")
            exit()

    if not os.path.exists(TEST_FILE):
        try:
            with open(TEST_FILE, 'w') as f:
                # 3 test words
                f.write('cherry\napricot\nmelon\n')
            print(f"Created dummy test file: {TEST_FILE}")
        except IOError:
            print(f"Error: Could not create dummy test file: {TEST_FILE}. Cannot run.")
            exit()

    NUM_EPISODES = 5000
    EVAL_INTERVAL = 500
    EVAL_GAMES = 100

    agent, hmm, env = train_with_validation(CORPUS_FILE, num_episodes=NUM_EPISODES, eval_interval=EVAL_INTERVAL)

    results = evaluate_agent(agent, env, TEST_FILE, num_games=EVAL_GAMES)

    # FIXED: Pickling now works - no lambda functions
    try:
        with open(MODEL_FILE, 'wb') as f:
            pickle.dump({'agent': agent, 'hmm': hmm}, f)
        print(f"\nSuccessfully pickled and saved model to '{MODEL_FILE}'")
    except Exception as e:
        print(f"\nError during pickling: {e}")

TRAINING IMPROVED HANGMAN AI

Initializing improved HMM with validation split...
Loaded 42482 training words, 7497 validation words
Training improved HMM...
Training complete!

--- HMM LINGUISTIC ANALYSIS ---
1st Top Global Letter: **e** (41846 occurrences)
3rd Top Global Letter: **i** (35836 occurrences)
-------------------------------

Initializing environment and agent...

Training for 5000 episodes with validation...
Episode   500 | Val Win Rate: 98.00% | Epsilon: 0.2336
Episode  1000 | Val Win Rate: 98.00% | Epsilon: 0.1819
Episode  1500 | Val Win Rate: 96.50% | Epsilon: 0.1417
Episode  2000 | Val Win Rate: 96.50% | Epsilon: 0.1103
Early stopping at episode 2000

Training complete!
Best validation score: 98.00%

EVALUATING ON 100 GAMES

FINAL RESULTS
Wins: 28/100 (28.00%)
Total Wrong: 538 (Avg: 5.38)
Total Repeated: 0 (Avg: 0.00)

FINAL SCORE: -193.20

Error during pickling: Can't get local object 'ImprovedHMM.__init__.<locals>.<lambda>'
