In [None]:
# default_exp model.core

# Model

> API details. @nathan

In [None]:
# export
import json

import tensorflow as tf

from abc import ABC, abstractmethod
from pathlib import Path
from tokenizers import Tokenizer

In [None]:
# hide
# Setup
import numpy as np
import pandas as pd

from icodegen.data.core import convert_df_to_tfds, java_special_tokens, train_tokenizer

df_fake = pd.DataFrame(
    ["aaaa(bb(aaaa(bb()()ccc)dd)()ccc)dd", "aaaa(bb()ccccc)dd"], columns=["code"]
)

# Tokenize the data
max_length = 16
batch_size = 1
vocab_sz = 100
tokenizer = train_tokenizer(df_fake, java_special_tokens, max_length, vocab_sz=vocab_sz)
dataset = convert_df_to_tfds(df_fake, tokenizer, max_length, batch_size)

In [None]:
# export
def _loss(labels, logits):
    return tf.keras.losses.sparse_categorical_crossentropy(
        labels, logits, from_logits=True
    )

In [None]:
# export
class Model(ABC):
    def __init__(self, tokenizer, model):
        self.tokenizer = tokenizer
        self.model = model

    @abstractmethod
    def from_path(path):
        pass

    @abstractmethod
    def get_probs(self, inputs):
        pass

    @abstractmethod
    def save(self, path):
        pass

    @abstractmethod
    def tokenize(self, method):
        pass

    @abstractmethod
    def train(self, ds, epochs):
        pass

In [None]:
# export
class TransformerModel(Model):
    def from_path(path):
        pass

    def generate(self, n):
        pass

    def get_probs(self, inputs):
        outputs = self.model(inputs)
        logits = outputs[0]
        probs = tf.nn.softmax(logits)

        return probs

    def save(self, path):
        pass

    def tokenize(self, method):
        return self.tokenizer(method, return_tensors="tf")

    def train(self, ds, epochs):
        pass

In [None]:
# export
class RNNModel(Model):
    _RNN_TYPE = {
        "rnn": tf.keras.layers.SimpleRNN,
        "gru": tf.keras.layers.GRU,
        "lstm": tf.keras.layers.LSTM,
    }

    def __init__(
        self,
        rnn_type,
        n_layers,
        vocab_size,
        embedding_dim,
        rnn_units,
        batch_size,
        out_path,
        tokenizer,
    ):
        self.rnn_type = rnn_type
        self.n_layers = n_layers
        self.vocab_size = vocab_size
        self.embedding_dim = embedding_dim
        self.rnn_units = rnn_units

        self.config_name = (
            f"{rnn_type}_vocab{vocab_size}_embed{embedding_dim}_units{rnn_units}"
        )
        self.out_path = Path(out_path) / self.config_name
        self.out_path.mkdir(exist_ok=True)
        tensorboard_path = self.out_path / "tensorboard_logs"
        tensorboard_path.mkdir(exist_ok=True)
        self.callbacks = [
            tf.keras.callbacks.ModelCheckpoint(
                filepath=self.out_path / "ckpt_{epoch}", save_weights_only=True
            ),
            tf.keras.callbacks.TensorBoard(
                log_dir=str(tensorboard_path),
                histogram_freq=0,  # How often to log histogram visualizations
                embeddings_freq=0,  # How often to log embedding visualizations
                update_freq="epoch",
            ),  # How often to write logs (default: once per epoch)
            tf.keras.callbacks.EarlyStopping(
                # Stop training when `val_loss` is no longer improving
                monitor="val_loss",
                # "no longer improving" being defined as "no better than 1e-2 less"
                min_delta=1e-2,
                # "no longer improving" being further defined as "for at least 5 epochs"
                patience=5,
                verbose=1,
            ),
        ]

        layer = RNNModel._RNN_TYPE[rnn_type]
        rnn_layers = [
            layer(
                rnn_units,
                return_sequences=True,
                recurrent_initializer="glorot_uniform",
                # following BigCode != Big Vocab Paper
                dropout=0.5,
            )
            for _ in range(n_layers)
        ]
        model = tf.keras.Sequential(
            [
                tf.keras.layers.Embedding(
                    input_dim=vocab_size,
                    output_dim=embedding_dim,
                    mask_zero=True,  # Zero cannot be used in the vocabulary
                ),
            ]
            + rnn_layers
            + [
                tf.keras.layers.Dense(vocab_size),
            ]
        )

        super().__init__(tokenizer, model)

    @staticmethod
    def from_path(path):
        path = Path(path)

        tokenizer = Tokenizer.from_file(str(path / "tokenizer.json"))
        with open(path / "model_config.json", "r") as f:
            model_config = json.load(f)

        model = RNNModel(
            model_config["rnn_type"],
            model_config["n_layers"],
            model_config["vocab_size"],
            model_config["embedding_dim"],
            model_config["rnn_units"],
            1,
            path,
            tokenizer,
        )
        model.model = tf.keras.models.load_model(
            str(path), custom_objects={"_loss": _loss}
        )

        return model

    def get_probs(self, method):
        text_generated = self.tokenizer.encode("<sos>" + method).ids
        input_eval = tf.expand_dims(text_generated, 0)

        logits = self.model(input_eval)
        probs = tf.nn.softmax(logits)[0].numpy()

        return probs

    def generate(self, n, temperature=1.0):
        # Converting our start string to numbers (vectorizing)
        text_generated = [self.tokenizer.encode("<sos>").ids[0]]
        input_eval = tf.expand_dims(text_generated, 0)

        # Here batch size == 1
        self.model.reset_states()
        for i in range(n):
            predictions = self.model(input_eval)
            # remove the batch dimension
            predictions = tf.squeeze(predictions, 0)

            # using a categorical distribution to predict the character
            # returned by the model
            predictions = predictions / temperature
            predicted_id = tf.random.categorical(predictions, num_samples=1)[
                -1, 0
            ].numpy()

            text_generated.append(predicted_id)
            # Pass the predicted character as the next input to the model
            # along with the previous hidden state
            input_eval = tf.expand_dims(text_generated, 0)

        return self.tokenizer.decode(text_generated, skip_special_tokens=False)

    def save(self):
        self.tokenizer.save(str(self.out_path / "tokenizer.json"), pretty=True)
        self.model.save(str(self.out_path))
        model_config = {
            "rnn_type": self.rnn_type,
            "n_layers": self.n_layers,
            "vocab_size": self.vocab_size,
            "embedding_dim": self.embedding_dim,
            "rnn_units": self.rnn_units,
        }
        with open(self.out_path / "model_config.json", "w") as f:
            json.dump(model_config, f)

    def tokenize(self, method):
        return self.tokenizer(method, return_tensors="tf")

    # TODO add tensorboard call back for easy visualization
    def train(self, ds_trn, ds_val, epochs):
        self.model.compile(optimizer="adam", loss=_loss)
        history = self.model.fit(
            ds_trn, epochs=epochs, callbacks=self.callbacks, validation_data=ds_val
        )

        return history

In [None]:
PARAM_COUNT = 124_772

rnn_type = "gru"
n_layers = 1
vocab_size = tokenizer.get_vocab_size()
embedding_dim = 128
rnn_units = 128
batch_size = 1
out_path = "/tmp"
gru = RNNModel(
    rnn_type,
    n_layers,
    vocab_size,
    embedding_dim,
    rnn_units,
    batch_size,
    out_path,
    tokenizer,
)

assert PARAM_COUNT == gru.model.count_params()

In [None]:
EPOCHS = 1
chkpt_path = Path(out_path) / (
    f"{rnn_type}_vocab{vocab_size}_embed{embedding_dim}_units{rnn_units}"
)
history = gru.train(dataset, dataset, EPOCHS)

assert chkpt_path.exists()
assert EPOCHS == len(list(chkpt_path.glob("*.index")))
assert EPOCHS == len(history.history["loss"])

In [None]:
# How do I test this?
text = "test"
text_generated = tokenizer.encode("<sos>" + text).ids
input_eval = tf.expand_dims(text_generated, 0)

logits = gru.model(input_eval)[0].numpy()
probs = gru.get_probs(text)

for i in range(len(probs)):
    assert np.isclose(1.0, probs[i].sum())
    assert np.argmax(logits[i]) == np.argmax(probs[i])

In [None]:
# Add test case for earlystopping

In [None]:
# # this test will break sometimes due to encoding adding space prefixes to some
# # tokens when add_prefix_space=False it should be fine :/
# NUM_TOKENS = 10
# text = gru.generate(NUM_TOKENS)
# tokenizer.no_padding()
# ids = tokenizer.encode(text).ids
# tokenizer.enable_padding(length=max_length)

# # -1 for the <sos> token that's always prepended
# assert NUM_TOKENS == len(ids) - 1

In [None]:
gru.save()

assert (gru.out_path / "assets").exists()
assert (gru.out_path / "model_config.json").exists()
assert (gru.out_path / "saved_model.pb").exists()
assert (gru.out_path / "tokenizer.json").exists()
assert (gru.out_path / "variables").exists()

In [None]:
loaded_gru = RNNModel.from_path(str(gru.out_path))

assert gru.tokenizer.get_vocab() == loaded_gru.tokenizer.get_vocab()
assert gru.model.count_params() == loaded_gru.model.count_params()
assert gru.model.evaluate(dataset, verbose=2) == loaded_gru.model.evaluate(
    dataset, verbose=2
)

In [None]:
# hide
from nbdev.export import notebook2script

notebook2script()

Converted 00_data.core.ipynb.
Converted 01_data.transforms.ipynb.
Converted 02_model.core.ipynb.
Converted 04_evaluation.core.ipynb.
Converted index.ipynb.
