In [1]:
import numpy as np
import pandas as pd
from pathlib import Path
from collections import Counter

import tensorflow as tf
from tensorflow.keras.models import save_model, load_model

import string

from pathlib import Path
from collections import Counter


# Steps involved in RNN modeling

1. preprocessing
2. training
3. post-processing

* **Pre-processing**

  * This step involves converting
      * the raw game state and guessed letters into a format that can be fed into the model.
  * For example,
  * Game state can be represented as
      * a sequence of integers, where each integer corresponds to a letter of the alphabet
      * (with a special value for unknown letters).
  * The guessed letters could be
      * represented as a binary vector,
      * where each element indicates whether a particular letter has been guessed.



*  **Training**

 * Training involves
     * feeding the preprocessed data into the model and
     * adjusting the model’s parameters based on its predictions.
 * The goal is to
     * minimize the difference between the model’s predictions and the actual outcomes.
 * This is typically done using a variant of stochastic gradient descent.
 * You would need a large dataset of words to train the model.
 * The loss function could be the
     * cross-entropy loss between the model’s predictions and the actual next letters.

### NN Architecture

 1. state embedding - currenyt state of the game
 2. guessed word embedding
 3. Model - combine above two embeddings and dense layers

In [2]:
def generate_state_embedding(maxlen = 29):
    inp = tf.keras.layers.Input(shape=(maxlen,))
    x = tf.keras.layers.Embedding(30, 100, mask_zero = True)(inp)
    x = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(100, dropout=0.3, return_sequences=True))(x)
    x = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(100, dropout=0.2, return_sequences=True))(x)
    x = tf.keras.layers.GlobalAveragePooling1D()(x)
    x = tf.keras.layers.Dense(128, activation='relu')(x)
    x = tf.keras.layers.Dense(64, activation='relu')(x)

    return(tf.keras.models.Model(inp, x, name='StateEmbedding'))


def generate_guessed_embedding():
    inp = tf.keras.layers.Input(shape=(26))
    x = tf.keras.layers.Dense(256, activation='relu')(inp)
    x = tf.keras.layers.Dense(128, activation='relu')(x)
    x = tf.keras.layers.Dense(64, activation='relu')(x)
    return(tf.keras.models.Model(inp, x, name='GuessedEmbedding'))


def generate_action_network(maxlen=29):
    state_embedding = generate_state_embedding(maxlen=maxlen)
    guessed_embedding = generate_guessed_embedding()
    x = tf.keras.layers.Concatenate()([state_embedding.output, guessed_embedding.output])
    x = tf.keras.layers.Dense(128, activation='relu')(x)
    x = tf.keras.layers.Dense(64, activation='relu')(x)
    x = tf.keras.layers.Dense(26, activation='softmax')(x)
    final_model = tf.keras.models.Model([state_embedding.input, guessed_embedding.input], x, name='action_network')
    final_model.compile(loss = 'categorical_crossentropy', optimizer= tf.keras.optimizers.Nadam(1e-3, clipnorm=1))
    return final_model

rnn_model = generate_action_network()
rnn_model.summary()


Model: "action_network"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 29)]                 0         []                            
                                                                                                  
 embedding (Embedding)       (None, 29, 100)              3000      ['input_1[0][0]']             
                                                                                                  
 bidirectional (Bidirection  (None, 29, 200)              160800    ['embedding[0][0]']           
 al)                                                                                              
                                                                                                  
 bidirectional_1 (Bidirecti  (None, 29, 200)              240800    ['bidirectional[0

In [4]:
def encode_state(eps_states, inference=False):
    '''
    This function is used to convert a list of strings (`eps_states`) into a numerical representation using a ascii encoding scheme
    Parameters:
      * eps_states: list of strings (game state) that need to be encoded
      * inference` (optional): Boolean flag - treats `eps_states` as a single string rather than a list of strings.
    '''
    if inference == True:
        eps_states = [eps_states]

    # Array init
    eps_state_int = []
    for i in range(len(eps_states)):
        eps_state_int.append([ord(c) - ord('a') + 1  if c != '_' else 27 for c in eps_states[i]])

    # max number of letters in the vocab
    word_len = 29
    eps_steps_count = len(eps_state_int)

    # filling the array
    # padding of 0
    eps_state_int_np = np.zeros((eps_steps_count, word_len))
    for eps_inx, eps_step_list in enumerate(eps_state_int):
        eps_state_int_np[eps_inx, :len(eps_step_list)] = eps_step_list

    return eps_state_int_np


The `encode_guess` function is used to convert a list of guessed letters (`eps_guessed_letters`) into a binary numerical representation. Here's a step-by-step explanation:

1. **Input**: The function takes two arguments:
   - `eps_guessed_letters`: A list of strings where each string represents a set of guessed letters.
   - `inference` (optional): A boolean flag that defaults to `False`. If it's `True`, the function treats `eps_guessed_letters` as a single string rather than a list of strings.

2. **Encoding Scheme**: Each character in the string is converted to an integer. The conversion is based on the position of the character in the English alphabet (i.e., 'a' is 0, 'b' is 1, ..., 'z' is 25). The underscore character ('_') is treated specially and is assigned the number 26.

3. **Array Initialization**: An empty numpy array `eps_guessed_int` of shape `(len(eps_guessed_letters), num_letters)` is initialized with zeros, where `num_letters` is the number of letters in the English alphabet (26).

4. **Filling the Array**: The encoded integers are then used to fill the `eps_guessed_int` array. Each row of the array corresponds to one string from `eps_guessed_letters`. For each guessed letter in a string, the corresponding position in the array is set to 1.

5. **Output**: The function returns the numpy array `eps_guessed_int` containing the binary encoded representation of `eps_guessed_letters`.

This function could be used in various applications such as natural language processing or machine learning where textual data needs to be converted into a numerical format that can be processed by algorithms. It's important to note that the specific encoding scheme used here (i.e., 'a' to 0, 'b' to 1, etc.) is just one of many possible ways to encode textual data. The best encoding scheme to use can depend on the specific requirements of the application.

In [5]:
def encode_guess(eps_guessed_letters, inference=False):
    '''
    Function is used to convert a list of guessed letters (`eps_guessed_letters`) into a binary numerical representation.
    Encoding scheme: 'abz' : [1, 1, 0, 0, 0, ..... 0, 1]
    At start of game, encoded vector = zeros(26)
    Parameters:
    * eps_guessed_letters: A list of strings where each string represents a set of guessed letters.
    * inference` (optional): Boolean flag - treats `eps_states` as a single string rather than a list of strings.
    '''
    if inference == True:
        eps_guessed_letters = [eps_guessed_letters]

    num_letters = 26
    eps_guessed_int = np.zeros((len(eps_guessed_letters), num_letters))

    for eps_inx, eps_guess_list in enumerate(eps_guessed_letters):
        eps_guess_list = [ord(c) - ord('a') if c != '_' else 26 for c in eps_guess_list]
        eps_guessed_int[eps_inx, eps_guess_list] = 1

    return eps_guessed_int

### Predict and play hangman game

 1. Generate raw state from hangman
 2. Initiate guessed word list
 3. encode the state and guessed_letters to format suitable for NN
 4. Generate prediction
 5. Decode the prediction
 6. Feed it to hangman and get next state

In [6]:
current_working_dir = Path('.')
current_working_dir.absolute()

with open(current_working_dir.absolute() / 'words_250000_train.txt') as file:
    content = file.read()
    words = content.split()

data_letters = "".join(words)
len(words), words[:5], len(data_letters), data_letters[:20],


(227300,
 ['aaa', 'aaaaaa', 'aaas', 'aachen', 'aaee'],
 2124746,
 'aaaaaaaaaaaasaachena')

In [7]:
count_letters = Counter(data_letters)
len(count_letters.keys())

26

In [8]:
ascii_letters = list(string.ascii_lowercase)
ascii_letters

['a',
 'b',
 'c',
 'd',
 'e',
 'f',
 'g',
 'h',
 'i',
 'j',
 'k',
 'l',
 'm',
 'n',
 'o',
 'p',
 'q',
 'r',
 's',
 't',
 'u',
 'v',
 'w',
 'x',
 'y',
 'z']

In [10]:
def model_prediction(rnn_model, current_state, guessed_letters, policy='greedy'):
    '''
    Function is used to make a prediction based on the current state and the letters that have been guessed so far.
    Parameters:
    * rnn_model: The trained RNN model used for making predictions.
    * current_state: The current state of the game
    * guessed_letters: list of letters already guessed.
    * policy (optional): The policy used for making predictions.
      * Defaults : 'greedy' - choose the letter with highest output
      * If 'stochastic': random choice based on the probabilities output by the model.

    '''

    current_state = "".join(current_state)

    # encode
    eps_state_int = encode_state(current_state, inference=True)
    eps_guessed_int = encode_guess(guessed_letters, inference=True)

    probs = rnn_model([eps_state_int, eps_guessed_int])
    probs = probs.numpy().squeeze()
    probs /= probs.sum()

    if policy == 'greedy':
        i = 1
        sorted_probs = probs.argsort()
        # if predict letter already used
        while ascii_letters[sorted_probs[-i]] in guessed_letters:
            i += 1
        idx_act = sorted_probs[-i]
    elif policy == 'stochastic':
        idx_act = np.random.choice(np.arange(probs.shape[0]), p=probs)
        while ascii_letters[idx_act] in guessed_letters:
            idx_act = np.random.choice(np.arange(probs.shape[0]), p=probs)

    # extarct the letter for represented number encoding
    guess_char = ascii_letters[idx_act]

    return guess_char



In [11]:

import random

def hangman_game(random_word, rnn_model, display=False):

    state = ['_' for _ in random_word]
    guessed_letters = []
    attempts = len(random_word)

    if display == True:
        print('TRUE WORD:', random_word)

    eps_states = [state.copy()]
    eps_guessed_letters = [guessed_letters.copy()]

    while (attempts > 0) and not (set(random_word) <= set(state)):
        if display == True:
            print('\t \n' + ' '.join(state))
        guess = model_prediction(rnn_model, state, guessed_letters, policy='greedy')

        if display == True:
            print(f'Current state: {state}, \t Prediction: {guess}, \t Guesssed :{guessed_letters}')
        if (guess in random_word) and (guess not in guessed_letters):
            for i in range(len(random_word)):
                if random_word[i] == guess:
                    state[i] = guess
        else:
            attempts -= 1
            if display == True:
                print(f"\t That letter - '{guess}' doesn't appear in the word; Attempts left: {attempts}")

        guessed_letters.append(guess)
        eps_states.append(state.copy())
        eps_guessed_letters.append(guessed_letters.copy())

        if '_' not in state:
            if display == True:
                print(f"You guessed the word! \n You survived! \n Word was: {random_word}")
            return True, eps_states, eps_guessed_letters, random_word
    else:
        if display == True:
            print(f"You lost! \nWord was: {random_word}")
        return False, eps_states, eps_guessed_letters, random_word

random_word = 'ascaridiasis' # 'zebra'
result = hangman_game(random_word, rnn_model)
print(result)

(False, [['_', '_', '_', '_', '_', '_', '_', '_', '_', '_', '_', '_'], ['_', '_', '_', '_', '_', '_', '_', '_', '_', '_', '_', '_'], ['_', '_', '_', '_', '_', '_', '_', '_', '_', '_', '_', '_'], ['_', '_', 'c', '_', '_', '_', '_', '_', '_', '_', '_', '_'], ['_', '_', 'c', '_', '_', '_', '_', '_', '_', '_', '_', '_'], ['_', '_', 'c', '_', '_', '_', '_', '_', '_', '_', '_', '_'], ['_', '_', 'c', '_', '_', '_', '_', '_', '_', '_', '_', '_'], ['_', '_', 'c', '_', '_', 'i', '_', 'i', '_', '_', 'i', '_'], ['_', '_', 'c', '_', '_', 'i', '_', 'i', '_', '_', 'i', '_'], ['_', '_', 'c', '_', 'r', 'i', '_', 'i', '_', '_', 'i', '_'], ['_', '_', 'c', '_', 'r', 'i', '_', 'i', '_', '_', 'i', '_'], ['_', '_', 'c', '_', 'r', 'i', '_', 'i', '_', '_', 'i', '_'], ['_', '_', 'c', '_', 'r', 'i', '_', 'i', '_', '_', 'i', '_'], ['_', '_', 'c', '_', 'r', 'i', 'd', 'i', '_', '_', 'i', '_'], ['_', 's', 'c', '_', 'r', 'i', 'd', 'i', '_', 's', 'i', 's'], ['_', 's', 'c', '_', 'r', 'i', 'd', 'i', '_', 's', 'i', 's'],

In [12]:

def prepare_training_data(eps_states, eps_guessed_letters, answer):
    '''
    Function prepares the training data for a RNN model that is learning to play hangman
    Parameters:
    * eps_states: The states of the game at each step.
    * eps_guessed_letters: The letters that have been guessed at each step.
    * answer: The correct word that is being guessed.

    '''

    # encode in proper format
    eps_state_int = encode_state(eps_states)
    eps_guessed_int = encode_guess(eps_guessed_letters)

    # Target - SL
    # correct action = choose unused letter + exist in the word

    # Target - predict the correct letter which is not already guessed
    correct_vector = [1 if l in answer else 0 for l in ascii_letters]
    target = correct_vector * (1-eps_guessed_int)
    norm_ = target.sum(axis=1)

    # Remove the last state of succesful prediction cases
    if np.isclose(norm_, 0.0).any():
        if np.isclose(norm_, 0.0).sum() > 1:
            print('target:', target)
            print('norm:', norm_)
            print('eps_guessed_letters', eps_guessed_letters[-1])
            print('eps_states', eps_states[-1])
            print('answer', answer)
            raise()
        else:

            eps_state_int = eps_state_int[:-1,:]
            eps_guessed_int = eps_guessed_int[:-1,:]
            target = target[:-1,:]
            norm_ = target.sum(axis=1)

    target /= norm_.reshape(-1, 1)

    # Target - RL
    # if action_letter part of answer = 1 else 0
    # target = np.array(answer)

    return(eps_state_int, eps_guessed_int, target)


In [13]:

import logging
import time
from sklearn.model_selection import train_test_split

logging.basicConfig(filename= current_working_dir.absolute() / 'rnn_embedding.log', level=logging.INFO, filemode='w')
logging.info(f" \n \nRNN w/ embedding")


In [15]:

buffer_states_data = None
buffer_guessed_letters = None
buffer_target = None
results = []
losses = []
callabck_freq = 50
epochs = 2500
batch_size = 300
buffer_size = 2000
wins_in_epoch = 0
training_iters = 3
patience = 5
patience_counter = 0
best_loss = np.inf

rnn_model = generate_action_network()
start_time = time.time()

for eps in range(1, epochs):

    # word for the game
    random_word = random.choice(words)
    logging.info(f"\n Episode: {eps}, Random word: {random_word}")

    # play the game
    result, eps_states, eps_guessed_letters, answer = hangman_game(random_word, rnn_model)
    # encode the episode data
    eps_state_int, eps_guessed_int, target = prepare_training_data(eps_states, eps_guessed_letters, answer)

    eps_len = eps_state_int.shape[0]
    logging.info(f"eps len: {eps_len}")
    logging.info(eps_guessed_letters[-1])

    # fill the data in buffer
    if eps == 1:
        buffer_states_data, buffer_guessed_letters, buffer_target = eps_state_int, eps_guessed_int, target
        print(eps, buffer_states_data.shape, buffer_guessed_letters.shape, buffer_target.shape)
    else:
        if buffer_states_data.shape[0] > buffer_size:
            start_inx = eps_len
        else:
            start_inx = 0

        buffer_states_data = np.vstack([buffer_states_data[start_inx:, :], eps_state_int])
        buffer_guessed_letters = np.vstack([buffer_guessed_letters[start_inx:, :], eps_guessed_int])
        buffer_target = np.vstack([buffer_target[start_inx:, :], target])

        assert np.array_equal(buffer_states_data[-1, :], eps_state_int[-1, :])

    if buffer_states_data.shape[0] > (batch_size + eps_len + 1):
        idx = np.random.choice(buffer_states_data.shape[0] - eps_len, batch_size, replace=False)
    else:
        idx = np.arange(buffer_states_data.shape[0] - eps_len)

    # sample data from buffer for training and testing
    tt_states_data = np.vstack([buffer_states_data[idx, :], eps_state_int])
    tt_guessed_letters = np.vstack([buffer_guessed_letters[idx, :], eps_guessed_int])
    tt_target = np.vstack([buffer_target[idx, :], target])

    inx_range = np.arange(tt_states_data.shape[0])
    train_inx, test_inx = train_test_split(inx_range, test_size=0.2, random_state=42)

    # train and test samples
    train_states_data, test_states_data = tt_states_data[train_inx, :], tt_states_data[test_inx, :]
    train_guessed_letters, test_guessed_letters = tt_guessed_letters[train_inx, :], tt_guessed_letters[test_inx, :]
    train_target, test_target = tt_target[train_inx, :], tt_target[test_inx, :]

    # train the model
    for _ in range(training_iters):
        loss = rnn_model.train_on_batch([train_states_data, train_guessed_letters], train_target)

    val_loss = rnn_model.evaluate([test_states_data, test_guessed_letters], test_target, verbose=0)

    results.append(result)
    losses.append(val_loss)
    wins_in_epoch += result
    if eps % callabck_freq == 0:

        avg_val_loss = sum(losses)/len(losses)
        min_val_loss = min(losses)

        if avg_val_loss < best_loss:
            best_loss = min_val_loss
            patience_counter = 0
            save_model(rnn_model, current_working_dir.absolute() / 'rnn_model_current_best.h5')
        else:
            patience_counter += 1
        if patience_counter > patience:
            print("Early stopping")
            break

        print(f"Episode: {eps}, Loss: {avg_val_loss:.2f}, #WINS (in last {callabck_freq} eps): {wins_in_epoch}, \
              Time taken: {time.time() - start_time:.2f} secs")
        print('\t', eps_guessed_letters[-1])
        logging.info(f"Loss: {avg_val_loss:.2f}, \
                     #WINS (in last {callabck_freq} eps): {wins_in_epoch}, \
                     \Time taken: {time.time() - start_time:.2f} secs"
                    )


        losses = []
        wins_in_epoch = 0
        start_time = time.time()


1 (11, 29) (11, 26) (11, 26)


  save_model(rnn_model, current_working_dir.absolute() / 'rnn_model_current_best.h5')


Episode: 50, Loss: 2.25, #WINS (in last 50 eps): 21,               Time taken: 96.91 secs
	 ['s', 'h', 'r', 'e', 't', 'n', 'p', 'u', 'd', 'c', 'b', 'a', 'o', 'l', 'm', 'i', 'k', 'x', 'w', 'g', 'z', 'f', 'y', 'v']
Episode: 100, Loss: 2.04, #WINS (in last 50 eps): 10,               Time taken: 51.41 secs
	 ['e', 'r', 'i', 's', 'n', 'a', 'l', 't', 'u', 'h', 'm', 'o', 'p', 'g', 'w', 'd', 'c', 'y']
Episode: 150, Loss: 2.00, #WINS (in last 50 eps): 20,               Time taken: 44.20 secs
	 ['r', 's', 'w', 'i', 'o', 'd', 'e']
Episode: 200, Loss: 1.93, #WINS (in last 50 eps): 11,               Time taken: 40.41 secs
	 ['e', 'f', 's', 't', 'w', 'k', 'u', 'i', 'n', 'o']
Episode: 250, Loss: 1.86, #WINS (in last 50 eps): 18,               Time taken: 41.81 secs
	 ['i', 'o', 'e', 't', 'u', 'a', 's', 'b', 'h', 'f', 'c', 'r', 'l', 'm', 'y', 'd', 'p', 'n']
Episode: 300, Loss: 1.84, #WINS (in last 50 eps): 15,               Time taken: 42.88 secs
	 ['e', 'a', 's', 'm', 'i', 'h', 'u', 'r', 't']
Episode

## Saving and testing the model

In [16]:
save_model(rnn_model, current_working_dir.absolute() / 'rnn_model.h5')

  save_model(rnn_model, current_working_dir.absolute() / 'rnn_model.h5')


In [17]:
# del rnn_model


In [18]:
loaded_model = load_model(current_working_dir.absolute() / 'rnn_model.h5')


In [22]:
count = 0
for i in range(1, 10):
    random_word = random.choice(words)

    result, eps_states, eps_guessed_letters, answer = hangman_game(random_word, loaded_model, display=False)

    count += result
    print(f'Eps: {i}, WIN: {count}, %WIN: {(count / i)*100:.2f}')

Eps: 1, WIN: 1, %WIN: 100.00
Eps: 2, WIN: 1, %WIN: 50.00
Eps: 3, WIN: 1, %WIN: 33.33
Eps: 4, WIN: 1, %WIN: 25.00
Eps: 5, WIN: 2, %WIN: 40.00
Eps: 6, WIN: 2, %WIN: 33.33
Eps: 7, WIN: 3, %WIN: 42.86
Eps: 8, WIN: 4, %WIN: 50.00
Eps: 9, WIN: 5, %WIN: 55.56
