# Imports

In [3]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers.experimental.preprocessing import TextVectorization
from dataclasses import dataclass
import numpy as np
from pprint import pprint
import pickle

# Config

In [4]:
@dataclass
class Config:
    MAX_LEN = 45
    BATCH_SIZE = 10000
    LR = 0.001
    VOCAB_SIZE = 29
    EMBED_DIM = 128
    NUM_HEAD = 8  # used in bert model
    FF_DIM = 128  # used in bert model
    NUM_LAYERS = 1


config = Config()

# Load and preprocess Data

In [5]:
data_location = "../data/words_250000_train.txt"
with open(data_location,'r') as f:
    word_list = f.read().splitlines()
word_list[:2]

['aaa', 'aaaaaa']

In [6]:
def preprocess_word_list(word_list):
    sentence_list = []
    new_word_list = list(set(word_list))
    for word in new_word_list:
        if(len(word) < 3):
            continue
        sentence = word.strip().lower()
        sentence = " ".join(list(sentence))
        sentence_list.append(sentence)
    return sentence_list

In [7]:
sentence_list = preprocess_word_list(word_list)

In [8]:
sentence_list[:2]

['a m b i t e n d e n c y', 'u n s h a r p e n i n g']

# Data Preparation for MLM

## Get vectorize layer

In [9]:
vocab_file_location = "vocab.txt"
vectorize_layer = TextVectorization(
    max_tokens=config.VOCAB_SIZE,
    standardize=None,
    output_mode="int",
    output_sequence_length=config.MAX_LEN
)
vectorize_layer.adapt(sentence_list)
vocab = vectorize_layer.get_vocabulary()
vocab = vocab[2 : config.VOCAB_SIZE - 1] + ["_"]
vectorize_layer.set_vocabulary(vocab)

RuntimeError: module compiled against API version 0xf but this version of numpy is 0xd

RuntimeError: module compiled against API version 0xf but this version of numpy is 0xd

RuntimeError: module compiled against API version 0xf but this version of numpy is 0xd

In [8]:
len(vectorize_layer.get_vocabulary())

29

In [9]:
vectorize_layer.get_vocabulary()

['',
 '[UNK]',
 'e',
 'i',
 'a',
 'n',
 'o',
 'r',
 's',
 't',
 'l',
 'c',
 'u',
 'd',
 'p',
 'm',
 'h',
 'g',
 'y',
 'b',
 'f',
 'v',
 'k',
 'w',
 'z',
 'x',
 'q',
 'j',
 '_']

In [10]:
# Get mask token id for masked language model
mask_token_id = vectorize_layer(["_"]).numpy()[0][0]
mask_token_id

28

## Encod Sentences

In [11]:
def encode(texts):
    encoded_texts = vectorize_layer(texts)
    return encoded_texts.numpy()

In [12]:
# Prepare data for masked language model
encoded_sentence_array = encode(sentence_list)

In [13]:
print(encoded_sentence_array.shape)
encoded_sentence_array

(227019, 45)


array([[14,  2, 10, ...,  0,  0,  0],
       [16,  3, 17, ...,  0,  0,  0],
       [14,  4,  7, ...,  0,  0,  0],
       ...,
       [ 7,  2,  9, ...,  0,  0,  0],
       [14,  7,  2, ...,  0,  0,  0],
       [15, 12, 10, ...,  0,  0,  0]])

In [14]:
for c in sentence_list[0].split(" "):
    c_id = vectorize_layer([c]).numpy()[0][0]
    print(f"{c} - {c_id}", end=" | ")

p - 14 | e - 2 | l - 10 | a - 4 | r - 7 | g - 17 | i - 3 | 

In [15]:
print(len(encoded_sentence_array[0]))
encoded_sentence_array[0]

45


array([14,  2, 10,  4,  7, 17,  3,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0])

In [16]:
inp_mask = np.random.rand(*encoded_sentence_array.shape) < 0.50
inp_mask

array([[False, False, False, ..., False,  True, False],
       [ True,  True, False, ...,  True,  True, False],
       [False,  True, False, ..., False, False, False],
       ...,
       [False,  True,  True, ..., False, False,  True],
       [ True, False,  True, ..., False, False,  True],
       [False, False,  True, ...,  True,  True,  True]])

## Get masked inputs and labels

In [17]:
def get_masked_input_and_labels(encoded_texts):
    inp_mask = np.random.rand(*encoded_texts.shape) < 0.50
    # Do not mask special tokens
    inp_mask[encoded_texts <= 2] = False
    # Set targets to -1 by default, it means ignore
    labels = -1 * np.ones(encoded_texts.shape, dtype=int)
    # Set labels for masked tokens
    labels[inp_mask] = encoded_texts[inp_mask]

    # Prepare input
    encoded_texts_masked = np.copy(encoded_texts)
    # Set input to [mask] for inp_mask
    encoded_texts_masked[inp_mask] = mask_token_id  # mask token is the last in the dict

    # Prepare sample_weights to pass to .fit() method
    sample_weights = np.ones(labels.shape)
    sample_weights[labels == -1] = 0

    # y_labels would be same as encoded_texts i.e input tokens
    y_labels = np.copy(encoded_texts)

    return encoded_texts_masked, y_labels, sample_weights

In [18]:
x_masked_train, y_masked_labels, sample_weights = get_masked_input_and_labels(encoded_sentence_array)
mlm_ds = tf.data.Dataset.from_tensor_slices((x_masked_train, y_masked_labels, sample_weights))
mlm_ds = mlm_ds.shuffle(250000, reshuffle_each_iteration=True).batch(config.BATCH_SIZE)

# Create BERT model (Pretraining Model) for masked language modeling

In [5]:
def bert_module(query, key, value, i):
    # Multi headed self-attention
    attention_output = layers.MultiHeadAttention(
        num_heads=config.NUM_HEAD,
        key_dim=config.EMBED_DIM // config.NUM_HEAD,
        name="encoder_{}/multiheadattention".format(i),
    )(query, key, value)
    attention_output = layers.Dropout(0.1, name="encoder_{}/att_dropout".format(i))(
        attention_output
    )
    attention_output = layers.LayerNormalization(
        epsilon=1e-6, name="encoder_{}/att_layernormalization".format(i)
    )(query + attention_output)

    # Feed-forward layer
    ffn = keras.Sequential(
        [
            layers.Dense(config.FF_DIM, activation="relu"),
            layers.Dense(config.EMBED_DIM),
        ],
        name="encoder_{}/ffn".format(i),
    )
    ffn_output = ffn(attention_output)
    ffn_output = layers.Dropout(0.1, name="encoder_{}/ffn_dropout".format(i))(
        ffn_output
    )
    sequence_output = layers.LayerNormalization(
        epsilon=1e-6, name="encoder_{}/ffn_layernormalization".format(i)
    )(attention_output + ffn_output)
    return sequence_output


def get_pos_encoding_matrix(max_len, d_emb):
    pos_enc = np.array(
        [
            [pos / np.power(10000, 2 * (j // 2) / d_emb) for j in range(d_emb)]
            if pos != 0
            else np.zeros(d_emb)
            for pos in range(max_len)
        ]
    )
    pos_enc[1:, 0::2] = np.sin(pos_enc[1:, 0::2])  # dim 2i
    pos_enc[1:, 1::2] = np.cos(pos_enc[1:, 1::2])  # dim 2i+1
    return pos_enc


loss_fn = keras.losses.SparseCategoricalCrossentropy(
    reduction=tf.keras.losses.Reduction.NONE
)
loss_tracker = tf.keras.metrics.Mean(name="loss")


class MaskedLanguageModel(tf.keras.Model):
    def train_step(self, inputs):
        if len(inputs) == 3:
            features, labels, sample_weight = inputs
        else:
            features, labels = inputs
            sample_weight = None

        with tf.GradientTape() as tape:
            predictions = self(features, training=True)
            loss = loss_fn(labels, predictions, sample_weight=sample_weight)

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        # Compute our own metrics
        loss_tracker.update_state(loss, sample_weight=sample_weight)

        # Return a dict mapping metric names to current value
        return {"loss": loss_tracker.result()}

    @property
    def metrics(self):
        # We list our `Metric` objects here so that `reset_states()` can be
        # called automatically at the start of each epoch
        # or at the start of `evaluate()`.
        # If you don't implement this property, you have to call
        # `reset_states()` yourself at the time of your choosing.
        return [loss_tracker]


def create_masked_language_bert_model():
    inputs = layers.Input((config.MAX_LEN,), dtype=tf.int64)

    word_embeddings = layers.Embedding(
        config.VOCAB_SIZE, config.EMBED_DIM, name="word_embedding"
    )(inputs)
    position_embeddings = layers.Embedding(
        input_dim=config.MAX_LEN,
        output_dim=config.EMBED_DIM,
        weights=[get_pos_encoding_matrix(config.MAX_LEN, config.EMBED_DIM)],
        name="position_embedding",
    )(tf.range(start=0, limit=config.MAX_LEN, delta=1))
    embeddings = word_embeddings + position_embeddings

    encoder_output = embeddings
    for i in range(config.NUM_LAYERS):
        encoder_output = bert_module(encoder_output, encoder_output, encoder_output, i)

    mlm_output = layers.Dense(config.VOCAB_SIZE, name="mlm_cls", activation="softmax")(
        encoder_output
    )
    mlm_model = MaskedLanguageModel(inputs, mlm_output, name="masked_bert_model")

    optimizer = keras.optimizers.Adam(learning_rate=config.LR)
    mlm_model.compile(optimizer=optimizer)
    return mlm_model


# id2token = dict(enumerate(vectorize_layer.get_vocabulary()))
# token2id = {y: x for x, y in id2token.items()}

In [20]:
class MaskedTextGenerator(keras.callbacks.Callback):
    def __init__(self, sample_tokens, top_k=5):
        self.sample_tokens = sample_tokens
        self.k = top_k

    def decode(self, tokens):
        return " ".join([id2token[t] for t in tokens if t != 0])

    def convert_ids_to_tokens(self, id):
        return id2token[id]

    def on_epoch_end(self, epoch, logs=None):
        prediction = self.model.predict(self.sample_tokens)

        masked_index = np.where(self.sample_tokens == mask_token_id)
        masked_index = masked_index[1]
        mask_prediction = prediction[0][masked_index]

        top_indices = mask_prediction[0].argsort()[-self.k :][::-1]
        values = mask_prediction[0][top_indices]

        for i in range(len(top_indices)):
            p = top_indices[i]
            v = values[i]
            tokens = np.copy(sample_tokens[0])
            tokens[masked_index[0]] = p
            result = {
                "input_text": self.decode(sample_tokens[0].numpy()),
                "prediction": self.decode(tokens),
                "probability": v,
                "predicted mask token": self.convert_ids_to_tokens(p),
            }
            pprint(result)


sample_tokens = vectorize_layer(["c _ t"])
generator_callback = MaskedTextGenerator(sample_tokens.numpy())

In [21]:
generator_callback

<__main__.MaskedTextGenerator at 0x7ff54b881f10>

In [22]:
bert_masked_model = create_masked_language_bert_model()
bert_masked_model.summary()

Model: "masked_bert_model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 45)]         0                                            
__________________________________________________________________________________________________
word_embedding (Embedding)      (None, 45, 128)      3712        input_1[0][0]                    
__________________________________________________________________________________________________
tf.__operators__.add (TFOpLambd (None, 45, 128)      0           word_embedding[0][0]             
__________________________________________________________________________________________________
encoder_0/multiheadattention (M (None, 45, 128)      66048       tf.__operators__.add[0][0]       
                                                                 tf.__operators__.

In [23]:
bert_masked_model.fit(
    mlm_ds, 
    epochs=10, 
    callbacks=[generator_callback]
)

Epoch 1/10


2023-10-21 22:31:53.735885: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:176] None of the MLIR Optimization Passes are enabled (registered 2)


{'input_text': 'c _ t',
 'predicted mask token': 'n',
 'prediction': 'c n t',
 'probability': 0.124638356}
{'input_text': 'c _ t',
 'predicted mask token': 'a',
 'prediction': 'c a t',
 'probability': 0.1096268}
{'input_text': 'c _ t',
 'predicted mask token': 'r',
 'prediction': 'c r t',
 'probability': 0.095599815}
{'input_text': 'c _ t',
 'predicted mask token': 'o',
 'prediction': 'c o t',
 'probability': 0.08754144}
{'input_text': 'c _ t',
 'predicted mask token': 'u',
 'prediction': 'c u t',
 'probability': 0.08186258}
Epoch 2/10
{'input_text': 'c _ t',
 'predicted mask token': 'a',
 'prediction': 'c a t',
 'probability': 0.16653284}
{'input_text': 'c _ t',
 'predicted mask token': 'o',
 'prediction': 'c o t',
 'probability': 0.12689862}
{'input_text': 'c _ t',
 'predicted mask token': 'n',
 'prediction': 'c n t',
 'probability': 0.104830734}
{'input_text': 'c _ t',
 'predicted mask token': 'r',
 'prediction': 'c r t',
 'probability': 0.10035587}
{'input_text': 'c _ t',
 'predict

<tensorflow.python.keras.callbacks.History at 0x7ff52fc39850>

# predict function

In [93]:
guessed_letters = []

In [89]:
def predict_letter(model, id2token, token2id, word = "b _ y", special_tokens = ["", "[UNK]", "_"]):
    clean_word = word.strip().lower()
    
    encoded_word = [token2id[c] for c in clean_word.split(" ")]
    len_word = len(encoded_word)
    encoded_word = np.array(encoded_word)
    encoded_word = np.pad(encoded_word, (0, 45-len_word))
    encoded_word = encoded_word.reshape(1, 45)
    
    model_output = model.predict(encoded_word)[0]
    
    blank_index_list = [c_index for c_index, c in enumerate(clean_word.split(" ")) if(c == "_")]
    model_output = model_output[blank_index_list]
    
    model_output = np.max(model_output, axis = 0)
    model_output = np.argsort(model_output)[::-1]
    for id in model_output:
        token = id2token[id]
        if(token not in (guessed_letters + special_tokens)):
            guessed_letters.append(token)
            return token

In [94]:
word = "_ a"
predict_letter(bert_masked_model, id2token, token2id, word)

's'

In [92]:
guessed_letters

['n']

# Save required elements

In [53]:
bert_masked_model.save("bert_mlm", save_format="tf")

2023-10-21 23:27:42.601431: W tensorflow/python/util/util.cc:348] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.


INFO:tensorflow:Assets written to: bert_mlm/assets


INFO:tensorflow:Assets written to: bert_mlm/assets


In [100]:
pickle.dump(id2token, open("id2token.pickle", "wb"))

In [101]:
pickle.dump(token2id, open("token2id.pickle", "wb"))

# Predict

In [9]:
model = tf.keras.models.load_model("bert_mlm.h5", custom_objects = {"MaskedLanguageModel": MaskedLanguageModel})

In [10]:
id2token = pickle.load(open("id2token.pickle", "rb"))
token2id = pickle.load(open("token2id.pickle", "rb"))

In [11]:
def predict_letter(model, id2token, token2id, word = "b _ y", special_tokens = ["", "[UNK]", "_"]):
    clean_word = word.strip().lower()
    
    encoded_word = [token2id[c] for c in clean_word.split(" ")]
    len_word = len(encoded_word)
    encoded_word = np.array(encoded_word)
    encoded_word = np.pad(encoded_word, (0, 45-len_word))
    encoded_word = encoded_word.reshape(1, 45)
    
    model_output = model.predict(encoded_word)[0]
    
    blank_index_list = [c_index for c_index, c in enumerate(clean_word.split(" ")) if(c == "_")]
    model_output = model_output[blank_index_list]
    
    model_output = np.max(model_output, axis = 0)
    model_output = np.argsort(model_output)[::-1]
    for id in model_output:
        token = id2token[id]
        if(token not in (guessed_letters + special_tokens)):
            guessed_letters.append(token)
            return token

In [12]:
guessed_letters = []

In [18]:
word = "a _"
predict_letter(model, id2token, token2id, word)

RuntimeError: module compiled against API version 0xf but this version of numpy is 0xd

RuntimeError: module compiled against API version 0xf but this version of numpy is 0xd

RuntimeError: module compiled against API version 0xf but this version of numpy is 0xd

't'

In [19]:
guessed_letters

['n', 'r', 'l', 'v', 'm', 't']