In [1]:
import os
import requests
import zipfile
import io
import torch
import numpy as np
import pandas as pd
from tqdm import tqdm
from transformers import AutoTokenizer, AutoModelForCausalLM
from english_words import get_english_words_set

  from .autonotebook import tqdm as notebook_tqdm


In [7]:
def download_and_extract_glove(dim=50, out_folder='.', force=False):
    """
    Download the 'glove.6B.zip' file (if needed) from Stanford NLP
    and extract only the glove.6B.{dim}d.txt file.

    :param dim: Dimension of the GloVe vectors (50, 100, 200, or 300).
    :param out_folder: Folder in which the .txt file will be placed.
    :param force: If True, re-download and re-extract even if the file exists.
    :return: Full path to the extracted GloVe file (e.g., './glove.6B.50d.txt').
    """
    glove_zip_filename = 'glove.6B.zip'
    glove_zip_filepath = os.path.join(out_folder, glove_zip_filename)
    glove_filename = f'glove.6B.{dim}d.txt'
    glove_filepath = os.path.join(out_folder, glove_filename)

    # 1) If file already exists (and not force), just return it
    if os.path.isfile(glove_filepath) and not force:
        print(f"[download_and_extract_glove] Found existing file: {glove_filepath}")
        return glove_filepath

    # 2) Ensure out_folder exists
    os.makedirs(out_folder, exist_ok=True)

    # 3) If GloVe zip not present (or force), download it
    if not os.path.isfile(glove_zip_filepath) or force:
        url = 'http://nlp.stanford.edu/data/glove.6B.zip'
        print(f"[download_and_extract_glove] Downloading GloVe.6B.zip to {glove_zip_filepath}...")
        response = requests.get(url, stream=True)
        response.raise_for_status()

        with open(glove_zip_filepath, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    f.write(chunk)
        print("[download_and_extract_glove] Download complete.")

    # 4) Extract only the needed dimension file if it doesn't exist or force
    with zipfile.ZipFile(glove_zip_filepath, 'r') as zf:
        if glove_filename not in zf.namelist():
            raise ValueError(f"[download_and_extract_glove] {glove_filename} not found in zip!")
        print(f"[download_and_extract_glove] Extracting {glove_filename} to {out_folder}...")
        zf.extract(glove_filename, path=out_folder)

    print(f"[download_and_extract_glove] GloVe file ready at: {glove_filepath}")
    return glove_filepath

def load_words_with_pandas(file_path):
    """
    Load words from a text file (one per line) using pandas.
    
    :param file_path: Path to the text file
    :return: A list of strings (words), skipping empty lines
    """
    # Read the file as a single-column CSV where each line = one row
    df = pd.read_csv(
        file_path,
        header=None,         # no header row in the file
        names=["word"],      # name the single column 'word'
        dtype=str,
        encoding='utf-8',
        engine='python'      # often needed when sep='\n'
    )
    
    # Convert the column to a list, dropping any NaN or empty strings
    words = df["word"].dropna().tolist()
    
    # Strip whitespace and remove any truly empty lines
    words = [w.strip() for w in words if w.strip()]
    
    return words


class DecryptoClueRecommender:
    """
    A class to recommend Decrypto clues using:
      - A perplexity matrix M_p (via GPT-2)
      - A similarity matrix M_s (via GloVe embeddings)
      - A vocabulary of ~1000 words
    """

    def __init__(self, vocab_list, glove_path,
                 matrix_dir='matrices',
                 matrix_prefix='decrypto',
                 force_matrix_build=False):
        """
        Initialize with a given vocabulary list of words and path to a GloVe file.
        Also specify paths/prefixes for saving/loading M_p and M_s.

        :param vocab_list: list of strings (our vocabulary)
        :param glove_path: file path to GloVe embeddings (e.g., "glove.6B.50d.txt")
        :param matrix_dir: folder where M_p.npy and M_s.npy will be saved/loaded
        :param matrix_prefix: prefix to use for matrix files (e.g., "decrypto")
        :param force_matrix_build: if True, rebuild matrices even if they exist
        """
        self.vocab = vocab_list
        self.word_to_idx = {word: i for i, word in enumerate(self.vocab)}
        self.N = len(self.vocab)

        # Decide which device we're using (GPU if available)
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

        # LLM & tokenizer for perplexity
        self.model_name = "meta-llama/Llama-3.2-1B-Instruct"



        # Setup matrix paths
        self.matrix_dir = matrix_dir
        os.makedirs(self.matrix_dir, exist_ok=True)
        self.matrix_prefix = matrix_prefix
        self.m_p_path = os.path.join(self.matrix_dir, f"{self.matrix_prefix}_Mp.npy")
        self.m_s_path = os.path.join(self.matrix_dir, f"{self.matrix_prefix}_Ms.npy")

        # Prepare placeholders for the two matrices
        self.M_p = None  # Perplexity matrix
        self.M_s = None  # Similarity matrix

        # Optionally load from disk if they exist
        if not force_matrix_build:
            self._try_load_matrices()
        
        if not os.path.exists(self.m_p_path):
            print(f"[DecryptoClueRecommender] Initializing {self.model_name}...")
            self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
            self.model = AutoModelForCausalLM.from_pretrained(self.model_name).to(self.device)
            self.model.eval()  # put in eval mode
            
        if not os.path.exists(self.m_s_path):
            # Load GloVe embeddings
            self.embeddings = self._load_glove_embeddings(glove_path)

    def _try_load_matrices(self):
        """
        Attempt to load M_p and M_s from self.m_p_path and self.m_s_path.
        If they exist and match the correct shape, set them.
        """
        loaded = False
        if os.path.isfile(self.m_p_path) and os.path.isfile(self.m_s_path):
            # Check shapes to ensure it matches our current vocabulary
            M_p_temp = np.load(self.m_p_path)
            M_s_temp = np.load(self.m_s_path)
            if M_p_temp.shape == (self.N, self.N) and M_s_temp.shape == (self.N, self.N):
                self.M_p = M_p_temp
                self.M_s = M_s_temp
                loaded = True
                print(f"[DecryptoClueRecommender] Loaded matrices from disk:\n"
                      f"  M_p: {self.m_p_path}\n"
                      f"  M_s: {self.m_s_path}")

        if not loaded:
            print("[DecryptoClueRecommender] No valid saved matrices found. Call build_matrices().")

    def _load_glove_embeddings(self, glove_file):
        """
        Load GloVe embeddings from a local file into a dict: {word: np.array([...])}
        """
        if not os.path.isfile(glove_file):
            raise FileNotFoundError(f"[DecryptoClueRecommender] GloVe file not found: {glove_file}")

        print(f"[DecryptoClueRecommender] Loading GloVe embeddings from {glove_file}...")

        embeddings_dict = {}
        dim = None
        with open(glove_file, 'r', encoding='utf-8') as f:
            for line in f:
                values = line.strip().split()
                word = values[0]
                coefs = np.asarray(values[1:], dtype='float32')
                if dim is None:
                    dim = len(coefs)
                if len(coefs) == dim:
                    embeddings_dict[word] = coefs
        print(f"[DecryptoClueRecommender] Loaded {len(embeddings_dict)} word vectors (dim={dim}).")
        return embeddings_dict

    def _get_glove_vector(self, word):
        """
        Retrieve the GloVe vector for the given word.
        If the word isn't in GloVe, return a zero vector.
        """
        # GloVe is mostly lowercase; might need to lower() your words for best coverage
        lookup_word = word.lower()
        if lookup_word in self.embeddings:
            return self.embeddings[lookup_word]
        else:
            dim = next(iter(self.embeddings.values())).shape[0]
            return np.zeros(dim, dtype='float32')

    def _cosine_similarity(self, v1, v2):
        """
        Compute cosine similarity between two vectors.
        """
        norm1 = np.linalg.norm(v1)
        norm2 = np.linalg.norm(v2)
        if norm1 == 0 or norm2 == 0:
            return 0.0
        return float(np.dot(v1, v2) / (norm1 * norm2))

    @torch.no_grad()
    def _get_perplexity(self, prompt):
        """
        Approximate perplexity of a prompt using self.model_name.
        We'll do a direct calculation on the entire prompt.
        """
        inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device)
        outputs = self.model(**inputs, labels=inputs["input_ids"])
        loss = outputs.loss
        return float(torch.exp(loss))

    def build_matrices(self, save_to_disk=True):
        """
        Build M_p (perplexity) and M_s (similarity) for all pairs in vocab.
        M_p[i, j] = perplexity of "vocab[i]" as a clue for "vocab[j]"
        M_s[i, j] = GloVe cosine similarity between vocab[i] and vocab[j]

        :param save_to_disk: if True, save M_p and M_s to .npy files after building
        """
        self.M_p = np.zeros((self.N, self.N), dtype=np.float32)
        self.M_s = np.zeros((self.N, self.N), dtype=np.float32)

        print("[DecryptoClueRecommender] Building similarity matrix M_s (GloVe)...")
        # Outer loop with a progress bar
        for i in tqdm(range(self.N), desc="Building M_s"):
            vec_i = self._get_glove_vector(self.vocab[i])
            for j in range(self.N):
                if i == j:
                    self.M_s[i, j] = 1.0
                else:
                    vec_j = self._get_glove_vector(self.vocab[j])
                    self.M_s[i, j] = self._cosine_similarity(vec_i, vec_j)

        # Optionally save matrices
        if save_to_disk:
            np.save(self.m_s_path, self.M_s)
            print(f"[DecryptoClueRecommender] Saved M_s to {self.m_s_path}")

        print(f"[DecryptoClueRecommender] Building perplexity matrix M_p ({self.model_name})...")
        # Another progress bar for perplexity
        for i in tqdm(range(self.N), desc="Building M_p"):
            for j in range(self.N):
                if i == j:
                    self.M_p[i, j] = 9999.0
                    continue
                clue_word = self.vocab[i]
                keyword_word = self.vocab[j]
                prompt_text = f"Clue for '{keyword_word}': {clue_word}"
                perp = self._get_perplexity(prompt_text)
                self.M_p[i, j] = perp

        # Optionally save matrices
        if save_to_disk:
            np.save(self.m_p_path, self.M_p)
            print(f"[DecryptoClueRecommender] Saved M_p to {self.m_p_path}")

    def _objective(self, c_idx, w1_idx, w2_idx, w3_idx, w4_idx, C_prev_indices,
                   alpha=1.0, beta=0.5, gamma=0.5):
        """
        O(c, w1) = -alpha * M_p[c, w1]
                   + beta * (M_s[c, w2] + M_s[c, w3] + M_s[c, w4])
                   - gamma * avg_{c' in C_prev} M_s[c, c']
        """
        # (A) Encourage low perplexity for (c, w1)
        term_a = -alpha * self.M_p[c_idx, w1_idx]

        # (B) Encourage ambiguity (similar to w2, w3, w4)
        term_b = beta * (
            self.M_s[c_idx, w2_idx] +
            self.M_s[c_idx, w3_idx] +
            self.M_s[c_idx, w4_idx]
        )

        # (C) Dissuade re-using the same domain as previous clues
        if len(C_prev_indices) > 0:
            sim_sum = sum(self.M_s[c_idx, prev_idx] for prev_idx in C_prev_indices)
            avg_sim = sim_sum / len(C_prev_indices)
        else:
            avg_sim = 0.0
        term_c = -gamma * avg_sim

        return term_a + term_b + term_c

    def suggest_clues(self,
                      w1, w2, w3, w4,
                      C_prev=None,
                      top_k=5,
                      alpha=1.0, beta=0.5, gamma=0.5):
        """
        Suggest top-K clues for the primary keyword w1, given the other
        3 secret words (w2, w3, w4) and a list of previously used clues.
        Returns a list of (clue_word, score).
        """
        if self.M_p is None or self.M_s is None:
            raise ValueError("Matrices not built yet! Call build_matrices() or load them from disk.")

        # Convert to indices
        try:
            w1_idx = self.word_to_idx[w1]
            w2_idx = self.word_to_idx[w2]
            w3_idx = self.word_to_idx[w3]
            w4_idx = self.word_to_idx[w4]
        except KeyError as e:
            raise ValueError(f"Keyword {e} not found in vocab!") from e

        if C_prev is None:
            C_prev = []
        C_prev_indices = []
        for clue_word in C_prev:
            if clue_word in self.word_to_idx:
                C_prev_indices.append(self.word_to_idx[clue_word])

        # Compute score for each word in the vocab
        scored_clues = []
        for c_idx, c_word in enumerate(self.vocab):
            # skip if it's the same as the main keyword
            if c_idx == w1_idx:
                continue

            score = self._objective(
                c_idx, w1_idx, w2_idx, w3_idx, w4_idx,
                C_prev_indices,
                alpha=alpha, beta=beta, gamma=gamma
            )
            scored_clues.append((c_word, score))

        # Sort by descending score and take top_k
        scored_clues.sort(key=lambda x: x[1], reverse=True)
        return scored_clues[:top_k]

In [10]:
# 1) Download or re-use GloVe (50d) if it already exists
glove_50d_path = download_and_extract_glove(dim=50, out_folder='.', force=False)

# 2) Example small vocabulary
keywords = ["school", "enemy", "dream", "story"]
vocab = load_words_with_pandas("./cew.txt")

# 3) Create the recommender with optional matrix saving/loading
#    - matrix_dir: Where to save/load .npy files
#    - matrix_prefix: Suffix for filenames, e.g. "decrypto_Mp.npy" and "decrypto_Ms.npy"
#    - force_matrix_build=False => If matrices exist and match shape, they'll be loaded
recommender = DecryptoClueRecommender(
    vocab_list=vocab,
    glove_path=glove_50d_path,
    matrix_dir='matrices',
    matrix_prefix='decrypto',
    force_matrix_build=False  # set True to force rebuild
)

# 4) If matrices weren't already loaded, build them (and save to disk)
if recommender.M_p is None or recommender.M_s is None:
    print("[Main] Matrices not loaded from disk; building now...")
    recommender.build_matrices(save_to_disk=True)
else:
    print("[Main] Matrices were loaded from disk—no need to rebuild.")

# 5) Define our 4 secret words
w1, w2, w3, w4 = keywords

# Suppose we've used these clues for "school" before
prev_clues_for_school = ["fish", "teach"]

# 6) Generate top 5 suggestions
suggestions = recommender.suggest_clues(
    w1, w2, w3, w4,
    C_prev=prev_clues_for_school,
    top_k=5,
    alpha=0.33,     # perplexity weight
    beta=0.33,      # similarity-to-other-keywords weight
    gamma=0.33      # difference-from-previous-clues weight
)

print(f"\nTop clue suggestions for '{w1}' with keywords '{w2}', '{w3}', '{w4}':")
for clue_word, score in suggestions:
    print(f"  - {clue_word} (score={score:.3f})")

[download_and_extract_glove] Found existing file: .\glove.6B.50d.txt
[DecryptoClueRecommender] Loaded matrices from disk:
  M_p: matrices\decrypto_Mp.npy
  M_s: matrices\decrypto_Ms.npy
[Main] Matrices were loaded from disk—no need to rebuild.

Top clue suggestions for 'school' with keywords 'enemy', 'dream', 'story':
  - don't (score=-155.634)
  - a (score=-162.792)
  - syllable (score=-173.336)
  - noun (score=-174.747)
  - I (score=-183.284)
