In [1]:
# === Hackman (Optimized for CPU, Colab Ready) ===
# Upload corpus.txt and test.txt to /content before running
# !pip install hmmlearn==0.2.8 tensorflow-cpu==2.15.0 scikit-learn --quiet

import os, random, numpy as np, time, math, json, pickle
from collections import defaultdict, deque
from hmmlearn import hmm
from tensorflow.keras import layers, models, optimizers
import tensorflow as tf

# ------------- CONFIG -------------
CORPUS_FILE = "./corpus.txt"
TEST_FILE = "./test.txt"
MAX_WORD_LEN_FOR_HMM = 10
HMM_NUM_STATES = 8
HMM_NITER = 20
MAX_WORD_LEN = 12
LIVES = 6
NUM_EPISODES = 1500
BATCH_SIZE = 64
GAMMA = 0.98
LR = 1e-3
EPS_START, EPS_END, EPS_DECAY = 1.0, 0.05, 0.9995
REPLAY_SIZE, MIN_REPLAY_SIZE = 100000, 200
TARGET_UPDATE_EVERY = 100
SEED = 42

np.random.seed(SEED)
tf.random.set_seed(SEED)
random.seed(SEED)

# ------------- BASIC HELPERS -------------
ALPHABET = [chr(i) for i in range(ord('a'), ord('z')+1)]
LETTER_TO_IDX = {c:i for i,c in enumerate(ALPHABET)}
IDX_TO_LETTER = {i:c for c,i in LETTER_TO_IDX.items()}

def clean_word(w): return ''.join([c for c in w.lower().strip() if c.isalpha()])
def load_words(path):
    with open(path, 'r', encoding='utf-8') as f:
        return [clean_word(w) for w in f if clean_word(w)]

print("Loading data...")
assert os.path.exists(CORPUS_FILE), "Upload corpus.txt"
assert os.path.exists(TEST_FILE), "Upload test.txt"
corpus, test_words = load_words(CORPUS_FILE), load_words(TEST_FILE)
words_by_len = defaultdict(list)
for w in corpus: words_by_len[len(w)].append(w)
print(f"Loaded {len(corpus)} corpus words, {len(test_words)} test words")

# ------------- FAST HMM TRAINING -------------
def train_hmms(words_by_len, max_len=MAX_WORD_LEN_FOR_HMM):
    models = {}
    for L, words in sorted(words_by_len.items()):
        if L > max_len or len(words) < 50: continue
        X, lengths = [], []
        for w in words:
            X.extend([LETTER_TO_IDX[c] for c in w])
            lengths.append(len(w))
        X = np.array(X).reshape(-1,1)
        try:
            model = hmm.MultinomialHMM(n_components=HMM_NUM_STATES, n_iter=HMM_NITER, verbose=False)
            model.n_symbols = 26
            model.fit(X, lengths)
            models[L] = model
        except Exception as e:
            pass
    print(f"Trained {len(models)} HMMs")
    return models

hmms = train_hmms(words_by_len)

def get_hmm_for_length(L):
    if L in hmms: return hmms[L]
    if not hmms: return None
    return hmms[min(hmms.keys(), key=lambda x: abs(x-L))]

# Precompute corpus letter frequencies (huge speedup)
freq_global = np.zeros(26)
for w in corpus:
    for c in set(w): freq_global[LETTER_TO_IDX[c]] += 1
freq_global /= freq_global.sum()

# ------------- PROB VECTOR USING HMM -------------
def hmm_letter_prob_vector(masked, guessed):
    L = len(masked)
    model = get_hmm_for_length(L)
    prob = np.ones(26)*1e-9
    possible = [w for w in words_by_len.get(L, [])
                if all((masked[i] in ['_', w[i]]) for i in range(L))]
    if not possible:
        prob += freq_global
    elif len(possible) < 1000 or model is None:
        for w in possible:
            for i,ch in enumerate(masked):
                if ch == '_': prob[LETTER_TO_IDX[w[i]]] += 1
    else:
        for w in random.sample(possible, min(1000, len(possible))):
            for i,ch in enumerate(masked):
                if ch == '_': prob[LETTER_TO_IDX[w[i]]] += 1
    for g in guessed: prob[LETTER_TO_IDX[g]] = 0
    if prob.sum() == 0: prob = freq_global
    return prob / prob.sum()

# ------------- ENVIRONMENT -------------
class HangmanEnv:
    def _init_(self, words, lives=LIVES):
        self.words, self.lives = words, lives
        self.reset()
    def reset(self, word=None):
        self.word = clean_word(word or random.choice(self.words))
        self.mask = ['_'] * len(self.word)
        self.guessed, self.wrong, self.done = set(), 0, False
        return self._state()
    def _state(self):
        prob_vec = hmm_letter_prob_vector(''.join(self.mask), self.guessed)
        return {'masked': ''.join(self.mask), 'guessed': set(self.guessed),
                'lives_left': self.lives - self.wrong, 'hmm_probs': prob_vec}
    def step(self, letter):
        if self.done: return self._state(), 0, True, {}
        letter = letter.lower()
        if letter in self.guessed:
            return self._state(), -3, False, {'reason':'repeat'}
        self.guessed.add(letter)
        if letter in self.word:
            for i,c in enumerate(self.word):
                if c == letter: self.mask[i] = c
            reward = 10
            reason = 'correct'
        else:
            self.wrong += 1
            reward, reason = -2, 'wrong'
        if '_' not in self.mask:
            self.done = True; reward += 30; outcome = 'win'
        elif self.wrong >= self.lives:
            self.done = True; reward -= 10; outcome = 'lose'
        else: outcome = None
        info = {'reason': reason, 'outcome': outcome}
        return self._state(), reward, self.done, info

# ------------- ENCODING -------------
def encode_state(s):
    masked = s['masked']
    pos = np.zeros((MAX_WORD_LEN, 27), np.float32)
    for i,ch in enumerate(masked[:MAX_WORD_LEN]):
        pos[i, 26 if ch=='_' else LETTER_TO_IDX[ch]] = 1
    pos_flat = pos.flatten()
    guessed = np.zeros(26, np.float32)
    for g in s['guessed']: guessed[LETTER_TO_IDX[g]] = 1
    return np.concatenate([pos_flat, guessed, s['hmm_probs'], [s['lives_left']/LIVES]])

STATE_DIM = MAX_WORD_LEN*27 + 26 + 26 + 1
ACTION_DIM = 26

# ------------- DQN AGENT -------------
def build_q_net(in_dim, out_dim, lr):
    m = models.Sequential([
        layers.Input((in_dim,)),
        layers.Dense(256, activation='relu'),
        layers.Dense(128, activation='relu'),
        layers.Dense(out_dim, activation='linear')
    ])
    m.compile(optimizer=optimizers.Adam(lr), loss='mse')
    return m

class DQN:
    def _init_(self):
        self.model, self.target = build_q_net(STATE_DIM,ACTION_DIM,LR), build_q_net(STATE_DIM,ACTION_DIM,LR)
        self.update_target()
        self.replay = deque(maxlen=REPLAY_SIZE)
        self.eps = EPS_START
    def update_target(self): self.target.set_weights(self.model.get_weights())
    def act(self, s_vec, mask):
        if np.random.rand() < self.eps:
            valid = np.nonzero(mask)[0]
            return int(np.random.choice(valid)) if len(valid)>0 else np.random.randint(26)
        q = self.model.predict(s_vec.reshape(1,-1), verbose=0)[0]
        q -= (1e6*(1-mask))
        return int(np.argmax(q))
    def store(self, *args): self.replay.append(args)
    def sample(self, n):
        batch = random.sample(self.replay, n)
        s,a,r,s2,done,mask = zip(*batch)
        return np.array(s),np.array(a),np.array(r),np.array(s2),np.array(done),np.array(mask)
    def train(self, bs):
        if len(self.replay) < MIN_REPLAY_SIZE: return
        s,a,r,s2,done,mask = self.sample(bs)
        q_next = self.target.predict(s2, verbose=0)
        q_next -= (1e6*(1-mask))
        max_q = np.max(q_next,1)
        target = self.model.predict(s, verbose=0)
        for i in range(bs):
            target[i,a[i]] = r[i] if done[i] else r[i]+GAMMA*max_q[i]
        self.model.fit(s, target, epochs=1, verbose=0, batch_size=bs)
    def decay_eps(self): self.eps = max(EPS_END, self.eps*EPS_DECAY)

# ------------- TRAINING -------------
env, agent = HangmanEnv(corpus), DQN()
print("Seeding replay...")
while len(agent.replay) < MIN_REPLAY_SIZE:
    s = env.reset(); s_vec = encode_state(s)
    done=False
    while not done:
        mask = np.ones(26,np.float32)
        for g in s['guessed']: mask[LETTER_TO_IDX[g]]=0
        a = np.random.choice(np.nonzero(mask)[0])
        letter = IDX_TO_LETTER[a]
        s2,r,done,_ = env.step(letter)
        s2v = encode_state(s2)
        vm = np.ones(26,np.float32)
        for g in s2['guessed']: vm[LETTER_TO_IDX[g]]=0
        agent.store(s_vec,a,r,s2v,done,vm)
        s,s_vec=s2,s2v
print("Replay seeded.")

rewards=[]
start=time.time()
for ep in range(1, NUM_EPISODES+1):
    s = env.reset(); s_vec = encode_state(s)
    tot, done = 0, False
    while not done and len(s['guessed'])<26:
        mask = np.ones(26,np.float32)
        for g in s['guessed']: mask[LETTER_TO_IDX[g]]=0
        a = agent.act(s_vec,mask)
        letter=IDX_TO_LETTER[a]
        s2,r,done,_ = env.step(letter)
        s2v=encode_state(s2)
        vm=np.ones(26,np.float32)
        for g in s2['guessed']: vm[LETTER_TO_IDX[g]]=0
        agent.store(s_vec,a,r,s2v,done,vm)
        agent.train(BATCH_SIZE)
        tot+=r; s,s_vec=s2,s2v
    rewards.append(tot)
    agent.decay_eps()
    if ep%TARGET_UPDATE_EVERY==0: agent.update_target()
    if ep%100==0:
        print(f"Ep{ep}/{NUM_EPISODES} avgR={np.mean(rewards[-100:]):.1f} eps={agent.eps:.2f} time={(time.time()-start)/60:.1f}m")

# ------------- EVALUATION -------------
def evaluate(agent, test_words, n=500):
    env=HangmanEnv(test_words)
    wins=wrong=rep=0
    for w in random.sample(test_words, min(n,len(test_words))):
        s=env.reset(w); done=False
        while not done:
            mask=np.ones(26,np.float32)
            for g in s['guessed']: mask[LETTER_TO_IDX[g]]=0
            if mask.sum()==0: break
            s_vec=encode_state(s)
            a=agent.act(s_vec,mask)
            s,r,done,info=env.step(IDX_TO_LETTER[a])
            if info.get('reason')=='wrong': wrong+=1
            if info.get('reason')=='repeat': rep+=1
        if '_' not in env.mask: wins+=1
    games=min(n,len(test_words))
    return {'games':games,'wins':wins,'success_rate':wins/games,
            'avg_wrong':wrong/games,'avg_repeat':rep/games}

print("Evaluating...")
res=evaluate(agent,test_words)
print(res)

# Save
os.makedirs("hackman_models", exist_ok=True)
agent.model.save("hackman_models/dqn_model.h5")
with open("hackman_models/hmms.pkl","wb") as f: pickle.dump(hmms,f)
json.dump(res, open("hackman_models/eval.json","w"), indent=2)
print("Done.")

ModuleNotFoundError: No module named 'hmmlearn'