In [38]:
import torch
import torch.nn as nn
import collections
import time
import requests
import re

class HangmanAPI(object):
    def __init__(self, access_token=None, session=None, timeout=None, model=None):
        self.hangman_url = self.determine_hangman_url()
        self.access_token = access_token
        self.session = session or requests.Session()
        self.timeout = timeout
        self.guessed_letters = []
        
        # Load the full dictionary and precompute letter frequency
        full_dictionary_location = "/kaggle/input/word-250000/words_250000_train.txt"
        self.full_dictionary = self.build_dictionary(full_dictionary_location)
        self.full_dictionary_common_letter_sorted = collections.Counter("".join(self.full_dictionary)).most_common()
        
        # Initialize the current plausible dictionary as the full dictionary
        self.current_dictionary = self.full_dictionary
        
        # Load the Bi-LSTM model if provided
        self.model = model
        if self.model:
            self.model.eval()  # Set the model to evaluation mode
        
        # Initialize vowel prior probabilities (based on word length)
        self.vowel_prior = self.initialize_vowel_prior()

    def initialize_vowel_prior(self):
        # Initialize vowel counts for each word length
        vowel_counts = {length: collections.Counter() for length in range(1, 36)}  # Assuming max length of 35
        
        # Define the vowels
        vowels = "aeiou"

        # Count vowels for each word length
        for word in self.full_dictionary:
            word_length = len(word)
            if word_length <= 35:  # Only consider words with lengths up to 35
                for char in word:
                    if char in vowels:
                        vowel_counts[word_length][char] += 1
        
        # Normalize counts to get probabilities
        vowel_prior = {}
        for length, counts in vowel_counts.items():
            total = sum(counts.values())
            if total > 0:
                vowel_prior[length] = {vowel: count / total for vowel, count in counts.items()}
            else:
                vowel_prior[length] = {vowel: 0 for vowel in vowels}  # No vowels for this length
        
        return vowel_prior

    @staticmethod
    def determine_hangman_url():
        links = ['https://trexsim.com', 'https://sg.trexsim.com']
        data = {link: 0 for link in links}

        for link in links:
            requests.get(link)
            for i in range(10):
                s = time.time()
                requests.get(link)
                data[link] = time.time() - s

        link = sorted(data.items(), key=lambda x: x[1])[0][0]
        link += '/trexsim/hangman'
        return link

    def character_encoding(self, word):
        # Encode each character to its index in the alphabet (1-26), treating _ as 0 for padding
        encoded_word = [(ord(c) - ord('a') + 1) if 'a' <= c <= 'z' else 0 for c in word]
        return self.onehot_encoding(encoded_word)

    def onehot_encoding(self, vector, max_len=30):
        # Padding or truncating the sequence to the fixed length (max_len)
        padded_vector = [0] * max_len
        for i, v in enumerate(vector[:max_len]):
            padded_vector[i] = v
        return torch.tensor(padded_vector, dtype=torch.long).unsqueeze(0)  # Add batch dimension for the model

    def next_letter_prediction(self, encoding):
        # Predict the next letter using the Bi-LSTM model
        with torch.no_grad():
            predictions = self.model(encoding)
            probabilities = torch.softmax(predictions, dim=-1)
            
            # Filter out already guessed letters
            for i in range(probabilities.shape[1]):
                if chr(i + ord('a')) in self.guessed_letters:
                    probabilities[0][i] = 0
            return probabilities

    def guess(self, word, tries_remains):
        """
        Predict the next letter using vowel prior for the first 4 incorrect guesses,
        and switch to Bi-LSTM model after that.
        """
        # Clean the word and prepare for matching with dictionary
        clean_word = word[::2].replace("_", ".")
        len_word = len(clean_word)

        # Guess using vowel prior if tries remain > 4
        if tries_remains > 3:
            vowels = list(self.vowel_prior.get(len_word, {}).keys())
            if vowels:
                # Sort vowels based on their prior probabilities and guess the highest one
                guess_letter = max(vowels, key=lambda v: self.vowel_prior[len_word][v] if v in self.vowel_prior[len_word] else 0)
                if guess_letter not in self.guessed_letters:
                    self.guessed_letters.append(guess_letter)
                    return guess_letter
        
        # Grab the current dictionary of possible words and initialize a new dictionary
        new_dictionary = []
        
        # Filter plausible words from the current dictionary
        for dict_word in self.current_dictionary:
            if len(dict_word) == len_word and re.match(clean_word, dict_word):
                new_dictionary.append(dict_word)
        
        # Update the current dictionary with the reduced set of plausible words
        self.current_dictionary = new_dictionary
        
        # Attempt to predict the next letter using the Bi-LSTM model after 4 tries
        guess_letter = '!'
        if self.model and new_dictionary and tries_remains <= 4:
            # If the model is available and there are plausible words, predict
            encoding = self.character_encoding(clean_word)
            probabilities = self.next_letter_prediction(encoding)
            predicted_index = torch.argmax(probabilities, dim=1).item()
            guess_letter = chr(predicted_index + ord('a'))

        # If no model or no suitable prediction, fall back to frequency-based guessing
        if guess_letter == '!':
            full_dict_string = "".join(new_dictionary)
            c = collections.Counter(full_dict_string)
            sorted_letter_count = c.most_common()

            for letter, _ in sorted_letter_count:
                if letter not in self.guessed_letters:
                    guess_letter = letter
                    break

            # Fallback to full dictionary frequency if no match found
            if guess_letter == '!':
                sorted_letter_count = self.full_dictionary_common_letter_sorted
                for letter, _ in sorted_letter_count:
                    if letter not in self.guessed_letters:
                        guess_letter = letter
                        break
        
        # Add the guessed letter to the list of guessed letters
        self.guessed_letters.append(guess_letter)
        return guess_letter

    ##########################################################
    # You'll likely not need to modify any of the code below #
    ##########################################################
    
    def build_dictionary(self, dictionary_file_location):
        text_file = open(dictionary_file_location, "r")
        full_dictionary = text_file.read().splitlines()
        text_file.close()
        return full_dictionary
                
    def start_game(self, practice=True, verbose=True):
        # Reset guessed letters and current plausible dictionary
        self.guessed_letters = []
        self.current_dictionary = self.full_dictionary
                         
        response = self.request("/new_game", {"practice": practice})
        if response.get('status') == "approved":
            game_id = response.get('game_id')
            word = response.get('word')
            tries_remains = response.get('tries_remains')
            if verbose:
                print(f"Successfully started a new game! Game ID: {game_id}. # of tries remaining: {tries_remains}. Word: {word}.")
            while tries_remains > 0:
                guess_letter = self.guess(word, tries_remains)  # Get guessed letter from the guess function
                if verbose:
                    print(f"Guessing letter: {guess_letter}")
                    
                # Append guessed letter to guessed letters list
                self.guessed_letters.append(guess_letter)
                
                try:
                    res = self.request("/guess_letter", {"request": "guess_letter", "game_id": game_id, "letter": guess_letter})
                except HangmanAPIError:
                    print('HangmanAPIError exception caught on request.')
                    continue
                except Exception as e:
                    print('Other exception caught on request.')
                    raise e
               
                if verbose:
                    print(f"Server response: {res}")
                
                status = res.get('status')
                tries_remains = res.get('tries_remains')
                if status == "success":
                    if verbose:
                        print(f"Successfully finished game: {game_id}")
                    return True
                elif status == "failed":
                    reason = res.get('reason', '# of tries exceeded!')
                    if verbose:
                        print(f"Failed game: {game_id}. Reason: {reason}")
                    return False
                elif status == "ongoing":
                    word = res.get('word')
        else:
            if verbose:
                print("Failed to start a new game")
        return False

    def my_status(self):
        return self.request("/my_status", {})

    def request(self, path, args=None, post_args=None, method=None):
        if args is None:
            args = dict()
        if post_args is not None:
            method = "POST"

        if self.access_token:
            if post_args and "access_token" not in post_args:
                post_args["access_token"] = self.access_token
            elif "access_token" not in args:
                args["access_token"] = self.access_token

        time.sleep(0.2)
        num_retry, time_sleep = 50, 2
        for it in range(num_retry):
            try:
                response = self.session.request(
                    method or "GET",
                    self.hangman_url + path,
                    timeout=self.timeout,
                    params=args,
                    data=post_args,
                    verify=False
                )
                break
            except requests.HTTPError as e:
                response = json.loads(e.read())
                raise HangmanAPIError(response)
            except requests.exceptions.SSLError as e:
                if it + 1 == num_retry:
                    raise
                time.sleep(time_sleep)

        headers = response.headers
        if 'json' in headers['content-type']:
            result = response.json()
        elif "access_token" in parse_qs(response.text):
            query_str = parse_qs(response.text)
            if "access_token" in query_str:
                result = {"access_token": query_str["access_token"][0]}
                if "expires" in query_str:
                    result["expires"] = query_str["expires"][0]
            else:
                raise HangmanAPIError(response.json())
        else:
            raise HangmanAPIError('Maintype was not text, or querystring')

        if result and isinstance(result, dict) and result.get("error"):
            raise HangmanAPIError(result)
        return result

class HangmanAPIError(Exception):
    def __init__(self, result):
        self.result = result
        self.code = None
        try:
            self.type = result["error_code"]
        except (KeyError, TypeError):
            self.type = ""

        try:
            self.message = result["error_description"]
        except (KeyError, TypeError):
            try:
                self.message = result["error"]["message"]
                self.code = result["error"].get("code")
                if not self.type:
                    self.type = result["error"].get("type", "")
            except (KeyError, TypeError):
                try:
                    self.message = result["error_msg"]
                except (KeyError, TypeError):
                    self.message = result

        Exception.__init__(self, self.message)


In [2]:
!pip install tqdm




In [3]:
from tqdm import tqdm

In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
import time  # For timing the training duration

class BiLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_size, max_len):
        super(BiLSTM, self).__init__()
        self.embedding = nn.Embedding(vocab_size + 1, embedding_dim, padding_idx=0)  # +1 for padding
        self.bilstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=2, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(hidden_dim * 2, output_size)  # Bi-directional output

    def forward(self, x):
        x = self.embedding(x)
        lstm_out, _ = self.bilstm(x)
        final_output = lstm_out[:, -1, :]  # Take output from the last time step
        out = self.fc(final_output)
        return out

# Training the Bi-LSTM model
def train_model(model, train_loader, criterion, optimizer, num_epochs=10, print_freq=100):
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        start_time = time.time()  # Start timer for the epoch
        
        # tqdm progress bar for batch processing
        progress_bar = tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}', leave=False)
        
        for i, (inputs, targets) in enumerate(progress_bar):
            inputs, targets = inputs.long(), targets.long()

            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, targets)

            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # Accumulate running loss
            running_loss += loss.item()

            # Only update the tqdm display every 'print_freq' batches
            if (i + 1) % print_freq == 0 or (i + 1) == len(train_loader):
                progress_bar.set_postfix(epoch_loss=f'{running_loss / (i + 1):.4f}')  # Update loss less frequently

        # End of the epoch: calculate and print average loss and elapsed time
        end_time = time.time()
        epoch_duration = end_time - start_time
        avg_loss = running_loss / len(train_loader)  # Calculate the average loss for this epoch
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.4f}, Time: {epoch_duration:.2f} seconds")

    # Save the model after training
    save_path = '/kaggle/working/bilstm_hangman_model.pt'
    torch.save(model.state_dict(), save_path)
    print(f"Model saved at {save_path}")


# Prepare DataLoader for training
from torch.utils.data import TensorDataset, DataLoader

def create_dataloaders(X, y, batch_size=64):
    dataset = TensorDataset(torch.tensor(X), torch.tensor(y))
    return DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Load dictionary function
def load_dictionary(file_path):
    with open(file_path, 'r') as file:
        dictionary = file.readlines()
    return dictionary

# Usage example
if __name__ == "__main__":
    # Hyperparameters
    embedding_dim = 50
    hidden_dim = 64
    output_size = 26  # Predict a letter from 'a' to 'z'
    max_len = 30  # Maximum sequence length
    num_epochs = 10
    batch_size = 256
    print_freq = 500  # Update loss every 500 batches

    # Initialize Hangman model and Bi-LSTM model
    hangman_model = HangmanModel(vocab_size=26, embedding_dim=embedding_dim, max_len=max_len)

    # Load the dictionary (list of words)
    dictionary_path = '/kaggle/input/words-250000-train/words_250000_train.txt'  # Replace with your dictionary file path
    dictionary = load_dictionary(dictionary_path)

    # Generate the training data (X, y)
    X, y = hangman_model.generate_data(dictionary)

    # Create DataLoader
    train_loader = create_dataloaders(X, y, batch_size)

    # Initialize Bi-LSTM model
    model = BiLSTM(vocab_size=26, embedding_dim=embedding_dim, hidden_dim=hidden_dim, output_size=output_size, max_len=max_len)

    # Define loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Train and save the model
    train_model(model, train_loader, criterion, optimizer, num_epochs=num_epochs, print_freq=print_freq)


                                                                                  

Epoch 1/10, Loss: 1.9867, Time: 564.16 seconds


                                                                                  

Epoch 2/10, Loss: 1.4846, Time: 558.13 seconds


                                                                                  

Epoch 3/10, Loss: 1.3635, Time: 557.41 seconds


                                                                                  

Epoch 4/10, Loss: 1.2975, Time: 556.29 seconds


                                                                                  

Epoch 5/10, Loss: 1.2549, Time: 553.99 seconds


                                                                                  

Epoch 6/10, Loss: 1.2242, Time: 555.74 seconds


                                                                                  

Epoch 7/10, Loss: 1.2000, Time: 563.03 seconds


                                                                                  

Epoch 8/10, Loss: 1.1817, Time: 556.84 seconds


                                                                                  

Epoch 9/10, Loss: 1.1661, Time: 548.76 seconds


                                                                                   

Epoch 10/10, Loss: 1.1532, Time: 554.62 seconds
Model saved at /kaggle/working/bilstm_hangman_model.pt




In [12]:
if __name__ == "__main__":
    # Assuming the model has been loaded as before
    hangman_model = HangmanModel(vocab_size=26, embedding_dim=50, max_len=30)
    model = BiLSTM(vocab_size=26, embedding_dim=50, hidden_dim=64, output_size=26, max_len=30)
    model.load_state_dict(torch.load('/kaggle/working/bilstm_hangman_model.pt'))
    model.eval()

    # Test with a masked word
    masked_word = "p_werhouse"
    
    # Predict the next letter
    predicted_letter = hangman_model.predict_and_update(masked_word, model)
    print(f"Predicted letter: {predicted_letter}")
    print(f"Guessed letters: {hangman_model.guessed_letters}")

Predicted letter: o
Guessed letters: {'o'}


  model.load_state_dict(torch.load('/kaggle/working/bilstm_hangman_model.pt'))


In [13]:
import json
import requests
import random
import string
import secrets
import time
import re
import collections

try:
    from urllib.parse import parse_qs, urlencode, urlparse
except ImportError:
    from urlparse import parse_qs, urlparse
    from urllib import urlencode

from requests.packages.urllib3.exceptions import InsecureRequestWarning

requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

In [None]:
class HangmanAPI(object):
    def __init__(self, access_token=None, session=None, timeout=None):
        self.hangman_url = self.determine_hangman_url()
        self.access_token = access_token
        self.session = session or requests.Session()
        self.timeout = timeout
        self.guessed_letters = []
        
        full_dictionary_location = "words_250000_train.txt"
        self.full_dictionary = self.build_dictionary(full_dictionary_location)        
        self.full_dictionary_common_letter_sorted = collections.Counter("".join(self.full_dictionary)).most_common()
        
        self.current_dictionary = []
        
    @staticmethod
    def determine_hangman_url():
        links = ['https://trexsim.com', 'https://sg.trexsim.com']

        data = {link: 0 for link in links}

        for link in links:

            requests.get(link)

            for i in range(10):
                s = time.time()
                requests.get(link)
                data[link] = time.time() - s

        link = sorted(data.items(), key=lambda x: x[1])[0][0]
        link += '/trexsim/hangman'
        return link

In [20]:
api = HangmanAPI(access_token="b10d926d7c581f8b023c673a54bf4f", timeout=2000)


In [42]:
api.start_game(practice=1,verbose=True)
[total_practice_runs,total_recorded_runs,total_recorded_successes,total_practice_successes] = api.my_status() # Get my game stats: (# of tries, # of wins)
practice_success_rate = total_practice_successes / total_practice_runs
print('run %d practice games out of an allotted 100,000. practice success rate so far = %.3f' % (total_practice_runs, practice_success_rate))

Successfully started a new game! Game ID: e47269079e10. # of tries remaining: 6. Word: _ _ _ _ _ .
Guessing letter: a
Server response: {'game_id': 'e47269079e10', 'status': 'ongoing', 'tries_remains': 5, 'word': '_ _ _ _ _ '}
Guessing letter: e
Server response: {'game_id': 'e47269079e10', 'status': 'ongoing', 'tries_remains': 4, 'word': '_ _ _ _ _ '}
Guessing letter: s
Server response: {'game_id': 'e47269079e10', 'status': 'ongoing', 'tries_remains': 4, 'word': '_ s _ _ _ '}
Guessing letter: i
Server response: {'game_id': 'e47269079e10', 'status': 'ongoing', 'tries_remains': 4, 'word': 'i s _ _ _ '}
Guessing letter: m
Server response: {'game_id': 'e47269079e10', 'status': 'ongoing', 'tries_remains': 3, 'word': 'i s _ _ _ '}
Guessing letter: l
Server response: {'game_id': 'e47269079e10', 'status': 'ongoing', 'tries_remains': 3, 'word': 'i s _ l _ '}
Guessing letter: o
Server response: {'game_id': 'e47269079e10', 'status': 'ongoing', 'tries_remains': 3, 'word': 'i s o l _ '}
Guessing let