In [None]:
import json
import requests
import random
import string
import secrets
import time
import re
import collections
import string
import os
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import TextVectorization

# Data

In [None]:
import nltk
nltk.download('brown')

In [None]:
from nltk.corpus import brown
import numpy as np

np.random.seed(1)

# training_set stores the rest word types for training
training_set = []
# test_set stores 1000 word types for testing
test_set = []

#words from brown corpus
brown_words = brown.words()

#lowercase the corpus and remove the word which contain non-alphabetic characters
processed_words = []
for word in brown_words:
    if word.isalpha():
        processed_words.append(word.lower())

#unique words in brown corpus
unique_words = list(set(processed_words))

# change to array in order to conduct np shuffle, then convert to list
unique_words = np.array(unique_words)
np.random.shuffle(unique_words)
unique_words = unique_words.tolist()

# BERT Model

In [None]:
loss_fn = keras.losses.SparseCategoricalCrossentropy(reduction=keras.losses.Reduction.NONE)
loss_tracker = keras.metrics.Mean(name="loss")

class MaskedLanguageModel(tf.keras.Model):
  def train_step(self, inputs):
    if len(inputs) == 3:
      features, labels, sample_weight = inputs
    else:
      features, labels = inputs
      sample_weight = None

    with tf.GradientTape() as tape:
      predictions = self(features, training=True)
      loss = loss_fn(labels, predictions, sample_weight=sample_weight)

    # Compute gradients
    trainable_vars = self.trainable_variables
    gradients = tape.gradient(loss, trainable_vars)

    # Update weights
    self.optimizer.apply_gradients(zip(gradients, trainable_vars))

    # Compute our own metrics
    loss_tracker.update_state(loss, sample_weight=sample_weight)

    # Return a dict mapping metric names to current value
    return {"loss": loss_tracker.result()}

  @property
  def metrics(self):
      return [loss_tracker]

class BertModel():
  def __init__(self, full_dictionary):
    self.full_dictionary = full_dictionary
    self.special_tokens = '.'

    self.max_len = 30 # max length of a word
    self.vocab_size = 29 # number of letters plus sepcial tokens
    self.batch_size = 32
    self.lr = 0.001
    self.embed_dim = 29
    self.num_head = 8
    self.ff_dim = 128
    self.num_layers = 1

    self.texts = self.preprocess_words()
    self.vectorize_layer = self.vectorize_word(self.texts)
    self.mask_token_id = self.get_mask_token_id(self.vectorize_layer)

  # Reset radom seed
  def reset_random_seeds(self):
    os.environ['PYTHONHASHSEED'] = str(0)
    random.seed(1)
    np.random.seed(2)
    tf.random.set_seed(3)

  # Shuffle all words
  def preprocess_words(self):
    texts = np.array(self.full_dictionary)
    np.random.shuffle(texts)
    texts = texts.tolist()
    return texts

  # Build word vectorization
  def vectorize_word(self, texts):
    max_seq = self.max_len

    vectorize_layer = TextVectorization(
        output_mode="int",
        standardize="lower",
        split="character",
        output_sequence_length=max_seq,
    )
    vectorize_layer.adapt(texts)

    vocab = vectorize_layer.get_vocabulary(include_special_tokens=True)
    vocab = vocab + [self.special_tokens]
    vectorize_layer.set_vocabulary(vocab)
    return vectorize_layer

  # Get special token id
  def get_mask_token_id(self, vectorize_layer):
    mask_token_id = vectorize_layer([self.special_tokens]).numpy()[0][0]
    return mask_token_id

  # Prepare model's input and output
  def process_data(self):
    # Encode all words
    encoded_texts = self.vectorize_layer(self.texts)
    encoded_texts = encoded_texts.numpy()

    self.reset_random_seeds()
    # 15% BERT masking
    inp_mask = np.random.rand(*encoded_texts.shape) < 0.15
    # Do not mask special tokens
    inp_mask[encoded_texts <= 2] = False
    # Set labels for masked tokens
    labels = -1 * np.ones(encoded_texts.shape, dtype=int)
    labels[inp_mask] = encoded_texts[inp_mask]

    # Prepare input
    encoded_texts_masked = np.copy(encoded_texts)
    # Leave 10% masked token unchanged
    inp_mask_2mask = inp_mask & (np.random.rand(*encoded_texts.shape) < 0.90)
    encoded_texts_masked[inp_mask_2mask] = self.mask_token_id
    # Set 10% masked token to a random token
    inp_mask_2random = inp_mask_2mask & (np.random.rand(*encoded_texts.shape) < 1 / 9)
    encoded_texts_masked[inp_mask_2random] = np.random.randint(
        3, self.mask_token_id, inp_mask_2random.sum()
    )

    # Prepare sample_weights to pass to .fit() method
    sample_weights = np.ones(labels.shape)
    sample_weights[labels == -1] = 0

    # y_labels would be same as encoded_texts i.e input tokens
    y_labels = np.copy(encoded_texts)

    return encoded_texts_masked, y_labels, sample_weights

  # Create BERT (pretraining) model
  def bert_module(self, query, key, value, i):
    # Multi headed self-attention
    attention_output = layers.MultiHeadAttention(
        num_heads=self.num_head,
        key_dim=self.embed_dim // self.num_head,
        name="encoder_{}/multiheadattention".format(i),
    )(query, key, value)
    attention_output = layers.Dropout(0.1, name="encoder_{}/att_dropout".format(i))(
        attention_output
    )
    attention_output = layers.LayerNormalization(
        epsilon=1e-6, name="encoder_{}/att_layernormalization".format(i)
    )(query + attention_output)

    # Feed-forward layer
    ffn = keras.Sequential(
        [
            layers.Dense(self.ff_dim, activation="relu"),
            layers.Dense(self.embed_dim),
        ],
        name="encoder_{}/ffn".format(i),
    )
    ffn_output = ffn(attention_output)
    ffn_output = layers.Dropout(0.1, name="encoder_{}/ffn_dropout".format(i))(
        ffn_output
    )
    sequence_output = layers.LayerNormalization(
        epsilon=1e-6, name="encoder_{}/ffn_layernormalization".format(i)
    )(attention_output + ffn_output)
    return sequence_output

  def get_pos_encoding_matrix(self, max_len, d_emb):
    pos_enc = np.array(
        [
            [pos / np.power(10000, 2 * (j // 2) / d_emb) for j in range(d_emb)]
            if pos != 0
            else np.zeros(d_emb)
            for pos in range(max_len)
        ]
    )
    pos_enc[1:, 0::2] = np.sin(pos_enc[1:, 0::2])  # dim 2i
    pos_enc[1:, 1::2] = np.cos(pos_enc[1:, 1::2])  # dim 2i+1
    return pos_enc

  def create_masked_language_bert_model(self):
      inputs = layers.Input((self.max_len,), dtype=tf.int64)

      word_embeddings = layers.Embedding(
          self.vocab_size, self.embed_dim, name="word_embedding"
      )(inputs)
      position_embeddings = layers.Embedding(
          input_dim=self.max_len,
          output_dim=self.embed_dim,
          weights=[self.get_pos_encoding_matrix(self.max_len, self.embed_dim)],
          name="position_embedding",
      )(tf.range(start=0, limit=self.max_len, delta=1))
      embeddings = word_embeddings + position_embeddings

      encoder_output = embeddings
      for i in range(self.num_layers):
          encoder_output = self.bert_module(encoder_output, encoder_output, encoder_output, i)

      mlm_output = layers.Dense(self.vocab_size, name="mlm_cls", activation="softmax")(
          encoder_output
      )
      mlm_model = MaskedLanguageModel(inputs, mlm_output, name="masked_bert_model")

      optimizer = keras.optimizers.Adam(learning_rate=self.lr)
      mlm_model.compile(optimizer=optimizer)
      return mlm_model

  def train(self):
    # Initiate the model
    bert_masked_model = self.create_masked_language_bert_model()

    # Prepare data for masked language model
    x_masked_train, y_masked_labels, sample_weights = self.process_data()
    mlm_ds = tf.data.Dataset.from_tensor_slices(
        (x_masked_train, y_masked_labels, sample_weights)
    )
    mlm_ds = mlm_ds.shuffle(5000).batch(self.batch_size)

    # Fit the model
    bert_masked_model.fit(mlm_ds, epochs=1)
    return bert_masked_model

  def bert_guess(self, clean_word, guessed_letters):
    vectorize_layer = self.vectorize_layer
    mask_token_id = self.mask_token_id
    bert_masked_model = self.train()

    letters = list(set(string.ascii_lowercase) - set(guessed_letters))

    try:
      sample_tokens = vectorize_layer([clean_word])
      prediction = bert_masked_model.predict(sample_tokens)
      masked_index = np.where(sample_tokens == mask_token_id)
      letter_probs = pd.DataFrame(prediction[0][masked_index], index=masked_index, columns=vectorize_layer.get_vocabulary())[letters]
      guess_letter = letter_probs.sum(axis=0).idxmax()
    except:
      guess_letter = '!'

    return guess_letter

# N-gram Model

In [None]:
class NgramModel():
  def __init__(self, full_dictionary):
    self.full_dictionary = full_dictionary
    self.n_gram_max = 6
    self.full_dictionary_ngram_count = self.build_dictionary_ngram_count(self.full_dictionary)
    self.full_dictionary_by_length = self.build_dictionary_by_length(self.full_dictionary)
    self.full_dictionary_1gram_freq_by_length = self.build_dictionary_1gram_freq_by_length(self.full_dictionary_by_length)

  def build_dictionary_ngram_count(self, dictionary):
    dict_ngram_count = collections.defaultdict(lambda: collections.defaultdict(float))
    for dict_word in dictionary:
      dict_word = '#' + dict_word + '#'
      for n in range(self.n_gram_max-1, -1, -1):
        for i in range(n, len(dict_word)):
          sliding_word = dict_word[i-n:i+1]
          if sliding_word == '#':
            continue
          dict_ngram_count[n][tuple([*sliding_word])] += 1
    return dict_ngram_count

  def build_dictionary_by_length(self, full_dictionary):
    full_dictionary_by_length = collections.defaultdict(list)
    for dict_word in full_dictionary:
      full_dictionary_by_length[len(dict_word)].append(dict_word)
    return full_dictionary_by_length

  def build_dictionary_1gram_freq_by_length(self, full_dictionary_by_length):
    full_dictionary_1gram_freq = collections.defaultdict(lambda: collections.defaultdict(float))
    for length in full_dictionary_by_length:
      full_dictionary_at_length = full_dictionary_by_length[length]
      letters_count_at_length = collections.Counter("".join(full_dictionary_at_length))
      length_total_count = 0
      for letter in letters_count_at_length:
        length_total_count += letters_count_at_length[letter]
      for letter in letters_count_at_length:
        full_dictionary_1gram_freq[length][letter] = letters_count_at_length[letter] / length_total_count
    return full_dictionary_1gram_freq

  def ngram_guess(self, clean_word, guessed_letters):
    n_gram_smoothing = [i/sum(range(1, self.n_gram_max+1)) for i in range(1, self.n_gram_max+1)]
    padded_word = '#' + clean_word + '#'

    letter_probs = collections.defaultdict(float)
    letters = list(set(string.ascii_lowercase) - set(guessed_letters))

    if clean_word.count('.') == len(clean_word):
      # 1gram frequency by length
      full_dictionary_1gram_freq_at_length = self.full_dictionary_1gram_freq_by_length[len(clean_word)]
      for letter in letters:
        letter_probs[letter] = full_dictionary_1gram_freq_at_length[letter]
    else:
      for n in range(self.n_gram_max-1, -1, -1):
        for i in range(n, len(padded_word)):
          sliding_word = padded_word[i-n:i+1]
          if sliding_word.count('.') == 1:
            total_count = 0
            count_by_letter = {}
            for letter in letters:
              sliding_word_potential = sliding_word.replace('.', letter)
              count = self.full_dictionary_ngram_count[n][tuple([*sliding_word_potential])]
              total_count += count
              count_by_letter[letter] = count
            if total_count > 0:
              for letter in letters:
                letter_probs[letter] += (count_by_letter[letter]/total_count) * n_gram_smoothing[n]

    if sum(letter_probs.values()) == 0:
      guess_letter = '!'
    else:
      guess_letter = max(letter_probs, key=letter_probs.get)
    return guess_letter

# Hangman Game

In [None]:
class Hangman():
  def __init__(self, full_dictionary):
    self.full_dictionary = full_dictionary
    self.guessed_letters = []
    self.full_dictionary_common_letter_sorted = collections.Counter("".join(self.full_dictionary)).most_common()
    self.bert_model = BertModel(self.full_dictionary)
    self.ngram_model = NgramModel(self.full_dictionary)

    def guess(self, word):
      word = word.lower()
      word = word.replace(" ", "")
      clean_word = word.replace("_",".")

      # apply ngram algo to guess the letter of the highest probability
      guess_letter = self.ngram_model.ngram_guess(clean_word, self.guessed_letters)

      # if word matching so far not found
      if guess_letter == '!':
        sorted_letter_count = self.full_dictionary_common_letter_sorted
        for letter,instance_count in sorted_letter_count:
          if letter not in self.guessed_letters:
            guess_letter = letter
            break

      return guess_letter

    def start_game(self, test_words):
      success = 0
      total_trials = len(test_words)
      word_fails = []
      for word in test_words[:total_trials]:
        self.guessed_letters = []
        word = word.replace(" ", "")
        mask = ['_'] * len(word)
        masked_word = '_' * len(word)

        # start tries
        tries_remains = 6
        while tries_remains>0:
            print('tries_remains: ', tries_remains)
            # get guessed letter from user code
            guess_letter = self.guess(masked_word)
            print('Guess is ', guess_letter)

            # append guessed letter to guessed letters field in hangman object
            self.guessed_letters.append(guess_letter)
            if guess_letter in word and len(guess_letter) == 1:
              for i, c in enumerate(word):
                if c == guess_letter:
                  mask[i] = c
              masked_word = ''.join(mask)
              print('Correct! Word Update: ', mask)
              if '_' not in masked_word:
                success += 1
                print('!succeed!')
                break
            else:
              tries_remains -= 1
        if tries_remains == 0: # fails
          word_fails.append(word)
          print('!fail!')