In [1]:
import json
import requests
import random
import string
import secrets
import time
import re
import collections

try:
    from urllib.parse import parse_qs, urlencode, urlparse
except ImportError:
    from urlparse import parse_qs, urlparse
    from urllib import urlencode

from requests.packages.urllib3.exceptions import InsecureRequestWarning

requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

In [2]:
from numpy import loadtxt
from tensorflow.keras.models import load_model
!pip install keras_preprocessing
from keras_preprocessing.sequence import pad_sequences
from copy import deepcopy
from tensorflow.keras.utils import to_categorical
from tensorflow import keras
from sklearn.model_selection import train_test_split
from keras.models import Sequential
from keras.layers import Embedding, Bidirectional, Dense, LSTM, Dropout




In [11]:
class HangmanAPI(object):
    def __init__(self, access_token=None, session=None, timeout=None):
        self.hangman_url = self.determine_hangman_url()
        self.access_token = access_token
        self.session = session or requests.Session()
        self.timeout = timeout
        self.guessed_letters = []
        self.len_max=29
        self.ts=0.3
        self.bs=256
        self.epochs=25
        
        full_dictionary_location = "/kaggle/input/hangman-training-data/words_250000_train.txt"
        self.full_dictionary = self.build_dictionary(full_dictionary_location)        
        self.full_dictionary_common_letter_sorted = collections.Counter("".join(self.full_dictionary)).most_common()
        self.model=load_model('/kaggle/input/hangman-model-2/hangman_model2.keras')
        
        self.curr_dictionary = []
        
    @staticmethod
    def determine_hangman_url():
        links = ['https://trexsim.com', 'https://sg.trexsim.com']

        data = {link: 0 for link in links}

        for link in links:

            requests.get(link)

            for i in range(10):
                s = time.time()
                requests.get(link)
                data[link] = time.time() - s

        link = sorted(data.items(), key=lambda x: x[1])[0][0]
        link += '/trexsim/hangman'
        return link

    def guess(self, word): # word input example: "_ p p _ e "
        ###############################################
        # Replace with your own "guess" function here #
        ###############################################

        # Removing space characters
        word_no_space = word[::2]

        # encode dash as 27 and a to z as 1 to 26
        word_encoded = self.word_encoder(word_no_space)

        # Post padding
        word_padded_post = pad_sequences([word_encoded], maxlen=self.len_max, padding='post')
        
        #self.train_model()

        # Prediction
        prediction = self.model.predict(word_padded_post)

        # Picking the letter with max probability which has not been guessed yet
        prediction = self.get_order(prediction[0])

        for i in prediction:
            if i not in self.guessed_letters:
                letter_guessed = i
                break
        return letter_guessed

    def word_encoder(self, word):
   
        word_encoded = []
        for i in word:
            if ord(i) == ord("_"):
                word_encoded.append(27)
            else:
                word_encoded.append(ord(i)- ord("a")+1)
        return word_encoded

    def get_xy(self):

        

        # Encode the entire dictionary
        encoded_dictionary = []
        for i in self.full_dictionary:
            encoded_dictionary.append(self.word_encoder(i))

        X = []
        y = []
        for encoded_word in encoded_dictionary:
            dic = {cha:[] for cha in list(set(encoded_word))}
            for i in range(len(encoded_word)):
                dic[encoded_word[i]].append(i)
            cha_lis = list(dic.keys())
            for cha,lis in dic.items():
                # We create a copy of the encoded word and mask (hide) every position of the occurence of the word
                masked_word = deepcopy(encoded_word)
                for pos in lis:
                    masked_word[pos] = 27

                target = cha -1
                X.append(masked_word +[i]+[i])
                y.append(target)
                times = 0
                seen = [cha]
                # A new masked word is generated by masking randomly more characters in the previously masked word
                new_masked_word = deepcopy(masked_word)
                while times < len(cha_lis):
                    j = random.randint(0,len(cha_lis)-1)
                    times+=1
                    if cha_lis[j] in seen:
                        continue
                    seen.append(cha_lis[j])
                    for pos in dic[cha_lis[j]]:
                        new_masked_word[pos] = 27
                    X.append(new_masked_word+[i]+[j])
                    y.append(target)

        
            #print(masked_word)
            #print(new_masked_word)

        

        # Max length of the words is found and set
        for i in range(len(X)):
            X[i] = X[i][:-2]
            self.len_max = max(self.len_max, len(X[i]))

        # Padding words to the maximum length
        X = pad_sequences(X, maxlen=self.len_max, padding='post')
        y = to_categorical(y, num_classes=26)
        train_x, test_x, train_y, test_y = train_test_split(X, y, test_size=self.ts, random_state=42)
        return (train_x, test_x, train_y, test_y,self.len_max)

    def train_model(self): # Five-layer bidirectional LSTM
        (train_x, test_x, train_y, test_y) = self.get_xy()

        # Adding embedding layer and Bidirectional LSTM layers followed by Dropout layer for regularization and Activated Layer
        self.model = Sequential()
        self.model.add(Embedding(28, 128, input_length=self.len_max))
        self.model.add(Bidirectional(LSTM(128, return_sequences=True)))
        self.model.add(Bidirectional(LSTM(128, return_sequences=True)))
        self.model.add(Bidirectional(LSTM(128, return_sequences=True)))
        self.model.add(Bidirectional(LSTM(64, return_sequences=True)))
        self.model.add(Bidirectional(LSTM(64)))
        self.model.add(Dropout(0.25))
        self.model.add(Dense(26, activation='softmax'))

        # compile the model with early stopping
    
        self.model.compile(optimizer="adam",loss = "categorical_crossentropy", metrics=['accuracy', 'top_k_categorical_accuracy'])
        from keras.callbacks import EarlyStopping
        earlyStop=EarlyStopping(monitor="val_loss",verbose=2,mode='min',patience=3)

        

        # fit and evaluate the model
        self.model.fit(train_x, train_y,
                batch_size=self.bs,
                epochs=self.epochs,
                validation_data=[test_x, test_y],callbacks=[earlyStop])
        
        # summarize the model
        print(self.model.summary())

        return self.model

    def get_order(self, prediction):

        # Sorting Characters by Decreasing Probs
        d1 = {i:prediction[i] for i in range(len(prediction))}
        list_char_sort = [chr(index + ord("a")) for index, value in sorted(d1.items(), key=lambda x:x[1],reverse = True)]
        return list_char_sort
         

    ##########################################################
    # You'll likely not need to modify any of the code below #
    ##########################################################
    
    def build_dictionary(self, dictionary_file_location):
        text_file = open(dictionary_file_location,"r")
        full_dictionary = text_file.read().splitlines()
        text_file.close()
        return full_dictionary
                
    def start_game(self, practice=True, verbose=True):
        # reset guessed letters to empty set and current plausible dictionary to the full dictionary
        self.guessed_letters = []
        self.curr_dictionary = self.full_dictionary
                         
        response = self.request("/new_game", {"practice":practice})
        if response.get('status')=="approved":
            game_id = response.get('game_id')
            word = response.get('word')
            tries_remains = response.get('tries_remains')
            if verbose:
                print("Successfully start a new game! Game ID: {0}. # of tries remaining: {1}. Word: {2}.".format(game_id, tries_remains, word))
            while tries_remains>0:
                # get guessed letter from user code
                guess_letter = self.guess(word)
                    
                # append guessed letter to guessed letters field in hangman object
                self.guessed_letters.append(guess_letter)
                if verbose:
                    print("Guessing letter: {0}".format(guess_letter))
                    
                try:    
                    res = self.request("/guess_letter", {"request":"guess_letter", "game_id":game_id, "letter":guess_letter})
                except HangmanAPIError:
                    print('HangmanAPIError exception caught on request.')
                    continue
                except Exception as e:
                    print('Other exception caught on request.')
                    raise e
               
                if verbose:
                    print("Sever response: {0}".format(res))
                status = res.get('status')
                tries_remains = res.get('tries_remains')
                if status=="success":
                    if verbose:
                        print("Successfully finished game: {0}".format(game_id))
                    return True
                elif status=="failed":
                    reason = res.get('reason', '# of tries exceeded!')
                    if verbose:
                        print("Failed game: {0}. Because of: {1}".format(game_id, reason))
                    return False
                elif status=="ongoing":
                    word = res.get('word')
        else:
            if verbose:
                print("Failed to start a new game")
        return status=="success"
        
    def my_status(self):
        return self.request("/my_status", {})
    
    def request(
            self, path, args=None, post_args=None, method=None):
        if args is None:
            args = dict()
        if post_args is not None:
            method = "POST"

        # Add `access_token` to post_args or args if it has not already been
        # included.
        if self.access_token:
            # If post_args exists, we assume that args either does not exists
            # or it does not need `access_token`.
            if post_args and "access_token" not in post_args:
                post_args["access_token"] = self.access_token
            elif "access_token" not in args:
                args["access_token"] = self.access_token

        time.sleep(0.2)

        num_retry, time_sleep = 50, 2
        for it in range(num_retry):
            try:
                response = self.session.request(
                    method or "GET",
                    self.hangman_url + path,
                    timeout=self.timeout,
                    params=args,
                    data=post_args,
                    verify=False
                )
                break
            except requests.HTTPError as e:
                response = json.loads(e.read())
                raise HangmanAPIError(response)
            except requests.exceptions.SSLError as e:
                if it + 1 == num_retry:
                    raise
                time.sleep(time_sleep)

        headers = response.headers
        if 'json' in headers['content-type']:
            result = response.json()
        elif "access_token" in parse_qs(response.text):
            query_str = parse_qs(response.text)
            if "access_token" in query_str:
                result = {"access_token": query_str["access_token"][0]}
                if "expires" in query_str:
                    result["expires"] = query_str["expires"][0]
            else:
                raise HangmanAPIError(response.json())
        else:
            raise HangmanAPIError('Maintype was not text, or querystring')

        if result and isinstance(result, dict) and result.get("error"):
            raise HangmanAPIError(result)
        return result
    
class HangmanAPIError(Exception):
    def __init__(self, result):
        self.result = result
        self.code = None
        try:
            self.type = result["error_code"]
        except (KeyError, TypeError):
            self.type = ""

        try:
            self.message = result["error_description"]
        except (KeyError, TypeError):
            try:
                self.message = result["error"]["message"]
                self.code = result["error"].get("code")
                if not self.type:
                    self.type = result["error"].get("type", "")
            except (KeyError, TypeError):
                try:
                    self.message = result["error_msg"]
                except (KeyError, TypeError):
                    self.message = result

        Exception.__init__(self, self.message)

In [None]:
for cha,lis in dic.items():
            #print(cha)
            #print(lis)
            # We create a copy of the encoded word and mask (hide) every position of the occurence of the word
            masked_word = deepcopy(encoded_word)
            for pos in lis:
                #print(pos)
                #print(lis)
                masked_word[pos] = 27

            #print(masked_word)    
            target = cha -1
            #print(target)
            X.append(masked_word)
            #print(masked_word)
            y.append(target)
            times = 0
            seen = [cha]
            # Randomly masking more characters in masked version of word
            new_masked_word = deepcopy(masked_word)
            print(len(cha_lis))
            while times < len(cha_lis):
                j = random.randint(0,len(cha_lis)-1)
                #print(j)
                times+=1
                if cha_lis[j] in seen:
                    continue
                seen.append(cha_lis[j])
                for pos in dic[cha_lis[j]]:
                    new_masked_word[pos] = 27
                X.append(new_masked_word)
                y.append(target)

In [12]:
api = HangmanAPI(access_token="1696416f55200d032a8ca2ebf645ec", timeout=2000)


In [None]:
''''api.start_game(practice=1,verbose=True)
[total_practice_runs,total_recorded_runs,total_recorded_successes,total_practice_successes] = api.my_status() # Get my game stats: (# of tries, # of wins)
#practice_success_rate = total_practice_successes / total_practice_runs
#print('run %d practice games out of an allotted 100,000. practice success rate so far = %.3f' % (total_practice_runs, practice_success_rate))'''


In [20]:
[total_practice_runs,total_recorded_runs,total_recorded_successes,total_practice_successes] = api.my_status() # Get my game stats: (# of tries, # of wins)
practice_success_rate = total_practice_successes / total_practice_runs
print('run %d practice games out of an allotted 100,000. practice success rate so far = %.3f' % (total_practice_runs, practice_success_rate))

run 401 practice games out of an allotted 100,000. practice success rate so far = 0.646


In [13]:
for i in range(100):
        api.start_game(practice=1,verbose=True)

Successfully start a new game! Game ID: 8f07ce3fc25b. # of tries remaining: 6. Word: _ _ _ _ _ _ _ .
Guessing letter: e
Sever response: {'game_id': '8f07ce3fc25b', 'status': 'ongoing', 'tries_remains': 6, 'word': '_ _ _ _ _ _ e '}
Guessing letter: i
Sever response: {'game_id': '8f07ce3fc25b', 'status': 'ongoing', 'tries_remains': 6, 'word': '_ _ _ _ _ i e '}
Guessing letter: a
Sever response: {'game_id': '8f07ce3fc25b', 'status': 'ongoing', 'tries_remains': 5, 'word': '_ _ _ _ _ i e '}
Guessing letter: o
Sever response: {'game_id': '8f07ce3fc25b', 'status': 'ongoing', 'tries_remains': 5, 'word': '_ o _ _ _ i e '}
Guessing letter: r
Sever response: {'game_id': '8f07ce3fc25b', 'status': 'ongoing', 'tries_remains': 4, 'word': '_ o _ _ _ i e '}
Guessing letter: l
Sever response: {'game_id': '8f07ce3fc25b', 'status': 'ongoing', 'tries_remains': 4, 'word': '_ o _ _ l i e '}
Guessing letter: n
Sever response: {'game_id': '8f07ce3fc25b', 'status': 'ongoing', 'tries_remains': 4, 'word': '_ o _ 

In [15]:
for i in range(1000):
    print('Playing ', i, ' th game')
    # Uncomment the following line to execute your final runs. Do not do this until you are satisfied with your submission
    api.start_game(practice=0,verbose=False)
    
    # DO NOT REMOVE as otherwise the server may lock you out for too high frequency of requests
    time.sleep(0.5)

Playing  0  th game
Playing  1  th game
Playing  2  th game
Playing  3  th game
Playing  4  th game
Playing  5  th game
Playing  6  th game
Playing  7  th game
Playing  8  th game
Playing  9  th game
Playing  10  th game
Playing  11  th game
Playing  12  th game
Playing  13  th game
Playing  14  th game
Playing  15  th game
Playing  16  th game
Playing  17  th game
Playing  18  th game
Playing  19  th game
Playing  20  th game
Playing  21  th game
Playing  22  th game
Playing  23  th game
Playing  24  th game
Playing  25  th game
Playing  26  th game
Playing  27  th game
Playing  28  th game
Playing  29  th game
Playing  30  th game
Playing  31  th game
Playing  32  th game
Playing  33  th game
Playing  34  th game
Playing  35  th game
Playing  36  th game
Playing  37  th game
Playing  38  th game
Playing  39  th game
Playing  40  th game
Playing  41  th game
Playing  42  th game
Playing  43  th game
Playing  44  th game
Playing  45  th game
Playing  46  th game
Playing  47  th game
Pl

HangmanAPIError: {'error': 'You have reached 1000 of games', 'status': 'denied'}

In [21]:
[total_practice_runs,total_recorded_runs,total_recorded_successes,total_practice_successes] = api.my_status() # Get my game stats: (# of tries, # of wins)
success_rate = total_recorded_successes/total_recorded_runs
print(total_recorded_runs)
print('overall success rate = %.3f' % success_rate)

1000
overall success rate = 0.610
