In [1]:
import collections
import json
import math
import pickle
import random
import requests
import string
import time
from urllib.parse import parse_qs
import torch
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import pandas as pd
import os
from requests.packages.urllib3.exceptions import InsecureRequestWarning
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"


In [15]:
class HangmanAPI(object):
    def __init__(self, access_token=None, session=None, timeout=None):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        #self.device = torch.device("cpu")
        self.verify_cuda_usage()
        self.hangman_url = self.determine_hangman_url()
        self.access_token = access_token

        # Suppress SSL warnings
        requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

        self.session = session or requests.Session()
        self.timeout = timeout
        self.guessed_letters = []
        sample_words = ["apple", "banana", "cherry", "date", "elderberry", "fig", "grape"]

        # Build the dictionary and sample words
        dictionary_file_location = "words_250000_train.txt"
        #self.full_dictionary = self.build_dictionary(dictionary_file_location)
        self.full_dictionary = sample_words
        self.df_aug = self.create_intermediate_data(self.full_dictionary)
    
        # Create masked dictionary and vowel prior
        self.masked_dictionary = self.create_masked_dictionary(self.df_aug)
        self.vowel_prior = self.get_vowel_prior(self.df_aug)
        self.save_vowel_prior(self.vowel_prior)

        # Encode data
        self.input_data, self.target_data = self.encode_words(self.masked_dictionary)
        self.save_input_output_data(self.input_data, self.target_data)
        self.input_tensor, self.target_tensor = self.convert_to_tensor(self.input_data, self.target_data)
        
        self.model = self.build_model()
        self.train_model(epochs=8, lr=0.01, batch_size=128)  # Adjust epochs and lr as needed


    def verify_cuda_usage(self):
        if self.device.type == 'cuda':
            print('CUDA is being used.')
        else:
            print('CUDA is not being used.')

    @staticmethod
    def determine_hangman_url():
        links = ['https://trexsim.com', 'https://sg.trexsim.com']
        data = {link: 0 for link in links}

        for link in links:
            requests.get(link)
            for i in range(10):
                s = time.time()
                requests.get(link)
                data[link] = time.time() - s

        link = sorted(data.items(), key=lambda x: x[1])[0][0]
        link += '/trexsim/hangman'
        return link

    def create_intermediate_data(self, words):
        df = pd.DataFrame(words, columns=['word'])
        df['length'] = df['word'].apply(len)
        df['vowels_present'] = df['word'].apply(lambda p: set(p).intersection({'a', 'e', 'i', 'o', 'u'}))
        df['vowels_count'] = df['vowels_present'].apply(len)
        df['unique_char_count'] = df['word'].apply(lambda p: len(set(p)))
        df = df[~((df['unique_char_count'].isin([0, 1, 2])) | (df['length'] <= 3)) & (df['vowels_count'] != 0)]
        return df

    def loop_for_permutation(self, unique_letters, word, all_perm, i):
        random_letters = random.sample(unique_letters, i + 1)
        new_permuted_word = word
        for letter in random_letters:
            new_permuted_word = new_permuted_word.replace(letter, "_")
            all_perm.append(new_permuted_word)

    def permute_all(self, word, vowel_permutation_loop=False):
        unique_letters = list(set(word))
        all_perm = []
        if vowel_permutation_loop:
            for i in range(len(unique_letters) - 1):
                self.loop_for_permutation(unique_letters, word, all_perm, i)
            all_perm = list(set(all_perm))
            return all_perm
        else:
            for i in range(len(unique_letters) - 2):
                self.loop_for_permutation(unique_letters, word, all_perm, i)
            all_perm = list(set(all_perm))
            return all_perm
        
    def permute_consonants(self, word):
        vowel_word = "".join([i if i in "aeiou" else "_" for i in word])
        vowel_idxs = [i for i, letter in enumerate(vowel_word) if letter != "_"]
        abridged_vowel_word = vowel_word.replace("_", "")
        all_permute_consonants = self.permute_all(abridged_vowel_word, vowel_permutation_loop=True)
        permuted_consonants = []
        for permuted_word in all_permute_consonants:
            a = ["_"] * len(word)
            for idx, vowel in enumerate(permuted_word):
                a[vowel_idxs[idx]] = vowel
            permuted_consonants.append("".join(a))
        return permuted_consonants

    def create_masked_dictionary(self, df_aug):
        masked_dictionary = {}
        for counter, word in enumerate(df_aug['word']):
            all_masked_words_for_word = self.permute_all(word)
            all_masked_words_for_word += self.permute_consonants(word)
            masked_dictionary[word] = list(set(all_masked_words_for_word))
            if counter % 10000 == 0:
                print(f"Iteration {counter} completed")
        return masked_dictionary

    def get_vowel_prob(self, df_vowel, vowel):
        try:
            return df_vowel[0].apply(lambda p: vowel in p).value_counts(normalize=True).loc[True]
        except:
            return 0

    def get_vowel_prior(self, df_aug):
        prior_json = {}
        for word_len in range(df_aug['length'].max()):
            prior_json[word_len + 1] = []
            df_vowel = df_aug[df_aug['length'] == word_len]
            for vowel in ['a', 'e', 'i', 'o', 'u']:
                prior_json[word_len + 1].append(self.get_vowel_prob(df_vowel, vowel))
            prior_json[word_len + 1] = pd.DataFrame([pd.Series(['a', 'e', 'i', 'o', 'u']), pd.Series(prior_json[word_len + 1])]).T.sort_values(by=1, ascending=False)
        return prior_json

    def save_vowel_prior(self, vowel_prior):
        with open("prior_probabilities.pkl", "wb") as f:
            pickle.dump(vowel_prior, f)

    def encode_output(self, word):
        char_mapping = self.get_char_mapping()
        last_char = word[-1]
        if last_char not in char_mapping:
            raise ValueError(f"Invalid character '{last_char}' in word")
        return char_mapping[last_char] - 1


    def encode_input(self, word):
        char_mapping = self.get_char_mapping()
        embedding_len = 35
        word = ''.join(char for char in word if char in char_mapping)  # Remove non-mapped characters
        word_vector = [0] * embedding_len
        for idx, letter in enumerate(word, start=embedding_len - len(word)):
            word_vector[idx] = char_mapping[letter]
        return word_vector


    def encode_words(self, masked_dictionary):
        input_data = []
        target_data = []
        for output_word, input_words in masked_dictionary.items():
            output_vector = self.encode_output(output_word)
            for input_word in input_words:
                input_data.append(self.encode_input(input_word))
                target_data.append(output_vector)
        return input_data, target_data

    def save_input_output_data(self, input_data, target_data):
        with open(r'input_features.txt', 'w') as fp:
            for item in input_data:
                fp.write("%s\n" % item)
            print('Done')
        with open(r'target_features.txt', 'w') as fp:
            for item in target_data:
                fp.write("%s\n" % item)
            print('Done')

    def convert_to_tensor(self, input_data, target_data):
        # Ensure input_data and target_data have the same length
        assert len(input_data) == len(target_data)

        # Convert input_data and target_data to tensors
        input_tensor = torch.tensor(input_data, dtype=torch.long)
        target_tensor = torch.tensor(target_data, dtype=torch.long)

        # Reshape the tensors to have the same batch size
        input_tensor = input_tensor.view(-1, 1)
        target_tensor = target_tensor.view(-1, 1)

        return input_tensor, target_tensor
    
    def get_char_mapping(self):
        char_mapping = {char: i for i, char in enumerate(string.ascii_lowercase, 1)}
        char_mapping[' '] = 0  # Add an entry for the space character
        return char_mapping
        
        
    def build_model(self):
        class BiLSTMModel(torch.nn.Module):
            def __init__(self):
                super(BiLSTMModel, self).__init__()
                self.embedding = torch.nn.Embedding(27, 128)
                self.lstm = torch.nn.LSTM(128, 256, bidirectional=True, batch_first=True)
                self.linear = torch.nn.Linear(256 * 2, 27)
            
            def forward(self, x):
                x = self.embedding(x)
                x, _ = self.lstm(x)
                x = self.linear(x[:, -1, :])
                return torch.nn.functional.softmax(x, dim=1)
        
        model = BiLSTMModel()
        return model.to(self.device)  # Move model to GPU if available
    
    def load_model(self, epoch):
        model = self.build_model()
        model.load_state_dict(torch.load(f"model_checkpoint_{epoch}.pth"))
        model = model.to(self.device)
        model.eval()
        self.model = model

    def train_model(self, epochs=10, lr=0.01, batch_size=32):
        dataset = TensorDataset(self.input_tensor, self.target_tensor)
        dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
        
        criterion = torch.nn.CrossEntropyLoss()
        optimizer = torch.optim.AdamW(self.model.parameters(), lr=lr)
        scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)
        for epoch in range(epochs):
            self.model.train()
            for batch in dataloader:
                encoded_words_batch, next_letters_batch = batch[0].to(self.device), batch[1].to(self.device)
                
                optimizer.zero_grad()
                output = self.model(encoded_words_batch)
                next_letters_batch = next_letters_batch.view(-1)
                loss = criterion(output, next_letters_batch)
                loss.backward()
                torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
                optimizer.step()
            scheduler.step()
            print(f"Epoch {epoch+1}, Loss: {loss.item()}")
            torch.save(self.model.state_dict(), f"model_checkpoint_{epoch+1}.pth")


    def guess(self, word, tries_remains):
        word = word.strip()  # Remove leading and trailing whitespace
        max_vowel_guess_limit = 5
        if tries_remains > 4 and len(self.guessed_letters) <= max_vowel_guess_limit:
            return self.guess_vowel(word)
        else:
            return self.guess_bilstm(word)

    def guess_vowel(self, word):
        word_len = len(word)
        prior_prob = self.vowel_prior.get(word_len)
        if not prior_prob.empty:  # Check if the DataFrame is not empty
            for vowel, prob in prior_prob:
                if vowel not in self.guessed_letters:
                    return vowel
        return None
    
    def guess_bilstm(self, word):
        encoded_word = torch.tensor([self.encode_input(word)], dtype=torch.long).to(self.device)
        self.model.eval()
        with torch.no_grad():
            output = self.model(encoded_word)
            _, predicted = torch.max(output.cpu().data, 1)
            predicted_letter = chr(predicted.item() + ord('a'))
        
        if predicted_letter in self.guessed_letters:
            _, topk_predicted = torch.topk(output.data, len(self.guessed_letters) + 1)
            for i in range(len(self.guessed_letters) + 1):
                predicted_letter = chr(topk_predicted[0, i].item() + ord('a'))
                if predicted_letter not in self.guessed_letters:
                    break

        return predicted_letter
    
    def build_dictionary(self, dictionary_file_location):
        text_file = open(dictionary_file_location, "r")
        full_dictionary = text_file.read().splitlines()
        text_file.close()
        return full_dictionary
                
    def start_game(self, practice=True, verbose=True):
        self.guessed_letters = []
        self.current_dictionary = self.full_dictionary
                         
        response = self.request("/new_game", {"practice": practice})
        if response.get('status') == "approved":
            game_id = response.get('game_id')
            word = response.get('word')
            tries_remains = response.get('tries_remains')
            if verbose:
                print(f"Successfully start a new game! Game ID: {game_id}. # of tries remaining: {tries_remains}. Word: {word}.")
            while tries_remains > 0:
                guess_letter = self.guess(word, tries_remains)
                self.guessed_letters.append(guess_letter)
                if verbose:
                    print(f"Guessing letter: {guess_letter}")
                    
                try:    
                    res = self.request("/guess_letter", {"request": "guess_letter", "game_id": game_id, "letter": guess_letter})
                except HangmanAPIError:
                    print('HangmanAPIError exception caught on request.')
                    continue
                except Exception as e:
                    print('Other exception caught on request.')
                    raise e
               
                if verbose:
                    print(f"Sever response: {res}")
                status = res.get('status')
                tries_remains = res.get('tries_remains')
                if status == "success":
                    if verbose:
                        print(f"Successfully finished game: {game_id}")
                    return True
                elif status == "failed":
                    reason = res.get('reason', '# of tries exceeded!')
                    if verbose:
                        print(f"Failed game: {game_id}. Because of: {reason}")
                    return False
                elif status == "ongoing":
                    word = res.get('word')
        else:
            if verbose:
                print("Failed to start a new game")
        return status == "success"
        
    def my_status(self):
        return self.request("/my_status", {})
    
    def request(self, path, args=None, post_args=None, method=None):
        if args is None:
            args = dict()
        if post_args is not None:
            method = "POST"

        if self.access_token:
            if post_args and "access_token" not in post_args:
                post_args["access_token"] = self.access_token
            elif "access_token" not in args:
                args["access_token"] = self.access_token

        time.sleep(0.2)

        num_retry, time_sleep = 50, 2
        for it in range(num_retry):
            try:
                response = self.session.request(
                    method or "GET",
                    self.hangman_url + path,
                    timeout=self.timeout,
                    params=args,
                    data=post_args,
                    verify=False
                )
                break
            except requests.HTTPError as e:
                response = json.loads(e.read())
                raise HangmanAPIError(response)
            except requests.exceptions.SSLError as e:
                if it + 1 == num_retry:
                    raise
                time.sleep(time_sleep)

        headers = response.headers
        if 'json' in headers['content-type']:
            result = response.json()
        elif "access_token" in parse_qs(response.text):
            query_str = parse_qs(response.text)
            if "access_token" in query_str:
                result = {"access_token": query_str["access_token"][0]}
                if "expires" in query_str:
                    result["expires"] = query_str["expires"][0]
            else:
                raise HangmanAPIError(response.json())
        else:
            raise HangmanAPIError('Maintype was not text, or querystring')

        if result and isinstance(result, dict) and result.get("error"):
            raise HangmanAPIError(result)
        return result
    
class HangmanAPIError(Exception):
    def __init__(self, result):
        self.result = result
        self.code = None
        try:
            self.type = result["error_code"]
        except (KeyError, TypeError):
            self.type = ""

        try:
            self.message = result["error_description"]
        except (KeyError, TypeError):
            try:
                self.message = result["error"]["message"]
                self.code = result["error"].get("code")
                if not self.type:
                    self.type = result["error"].get("type", "")
            except (KeyError, TypeError):
                try:
                    self.message = result["error_msg"]
                except (KeyError, TypeError):
                    self.message = result
        Exception.__init__(self, self.message)

In [16]:
api = HangmanAPI(access_token="549f556bc32c3166631310e172e2e5", timeout=2000)

CUDA is being used.
Iteration 0 completed
Done
Done


AssertionError: Size mismatch between tensors

: 

In [8]:
api.start_game(practice=1,verbose=True)
[total_practice_runs,total_recorded_runs,total_recorded_successes,total_practice_successes] = api.my_status() # Get my game stats: (# of tries, # of wins)
practice_success_rate = total_practice_successes / total_practice_runs
print('run %d practice games out of an allotted 100,000. practice success rate so far = %.3f' % (total_practice_runs, practice_success_rate))

Successfully start a new game! Game ID: 0a316d1e1c68. # of tries remaining: 6. Word: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ .
Guessing letter: e
Sever response: {'game_id': '0a316d1e1c68', 'status': 'ongoing', 'tries_remains': 6, 'word': '_ e _ _ _ _ _ _ _ _ _ _ _ _ _ e _ '}
Guessing letter: y
Sever response: {'game_id': '0a316d1e1c68', 'status': 'ongoing', 'tries_remains': 5, 'word': '_ e _ _ _ _ _ _ _ _ _ _ _ _ _ e _ '}
Guessing letter: a
Sever response: {'game_id': '0a316d1e1c68', 'status': 'ongoing', 'tries_remains': 4, 'word': '_ e _ _ _ _ _ _ _ _ _ _ _ _ _ e _ '}
Guessing letter: k
Sever response: {'game_id': '0a316d1e1c68', 'status': 'ongoing', 'tries_remains': 3, 'word': '_ e _ _ _ _ _ _ _ _ _ _ _ _ _ e _ '}
Guessing letter: x
Sever response: {'game_id': '0a316d1e1c68', 'status': 'ongoing', 'tries_remains': 2, 'word': '_ e _ _ _ _ _ _ _ _ _ _ _ _ _ e _ '}
Guessing letter: o
Sever response: {'game_id': '0a316d1e1c68', 'status': 'ongoing', 'tries_remains': 1, 'word': '_ e _ _ _ _ _ _ 