# CROHME 2023

## Imports

In [None]:
import tensorflow as tf
import tensorflow_datasets as tfds
import keras
from keras import layers
import matplotlib.pyplot as plt

print("Num GPUs Available: ", len(tf.config.list_physical_devices("GPU")))


Import `crohme_dataset`

In [None]:
import datasets.crohme_dataset  # Register `crohme_dataset`

ds = tfds.load("crohme_dataset")  # `crohme_dataset` registered
test: tf.data.Dataset = ds["test"]
train: tf.data.Dataset = ds["train"]
validation: tf.data.Dataset = ds["validation"]

### Extra: Previewing InkML Files

I also created a little utility in C++ and GTK to render out an inkml file from the dataset. It reads the InkML file, and renders out the strokes as well as the LaTeX of what it's supposed to be. It was a fun project!

In [None]:
import os

random_data_point = next(iter(validation.shuffle(200_000).take(1)))
filepath = random_data_point["filepath"].numpy().decode("ascii")
os.system(f"inkmlviewer {filepath}")

## Preprocessing

### Text Vectorization

We will use `pylatexenc` to parse the LaTeX into nodes for custom splitting

In [33]:
from pylatexenc.latexwalker import (
    LatexWalker,
    LatexMacroNode,
    LatexEnvironmentNode,
    LatexCharsNode,
    LatexGroupNode,
)

START_TOKEN, END_TOKEN = "<START>", "<END>"


# Define the tokenization function using pylatexenc
def latex_tokenizer(latex_string):
    """
    Tokenizes a LaTeX string into tokens using pylatexenc.
    """
    if not latex_string:
        return []
    walker = LatexWalker(latex_string)

    def parse_node(nodelist):
        if len(nodelist) == 0:
            return []
        try:
            tokens = []
            for node in nodelist:
                if not node:
                    continue
                elif node.isNodeType(LatexMacroNode):
                    tokens.append(f"\\{node.macroname}")
                    # Parse arguments if they exist
                    tokens += parse_node(node.nodeargd.argnlist)
                elif node.isNodeType(LatexEnvironmentNode):
                    tokens.append(f"\\begin{{{node.environmentname}}}")
                    tokens += parse_node(node.nodeargd.argnlist)
                    tokens += parse_node(node.nodelist)
                    tokens.append(f"\\end{{{node.environmentname}}}")
                elif node.isNodeType(LatexCharsNode):
                    tokens += list(node.chars)
                elif node.isNodeType(LatexGroupNode):
                    tokens.append(node.delimiters[0])
                    tokens += parse_node(node.nodelist)
                    tokens.append(node.delimiters[1])
            return tokens
        except Exception as e:
            return []

    nodelist, _, _ = walker.get_latex_nodes()
    return parse_node(nodelist)


# Wrap the tokenizer for use in TextVectorization
def tokenize_fn(latex_tensor):
    tokens = []
    for latex_string in latex_tensor:
        tokenized_string = latex_tokenizer(latex_string.numpy().decode("utf-8"))
        tokenized_string.insert(0, START_TOKEN)
        tokenized_string.append(END_TOKEN)
        tokens.append(tokenized_string)
    return tf.ragged.constant(tokens, dtype=tf.string)


# Create a TensorFlow-compatible wrapper
def tf_tokenizer(latex_string):
    return tf.py_function(
        func=tokenize_fn,
        inp=[latex_string],
        Tout=tf.RaggedTensorSpec([None, None], dtype=tf.string),
    )

Create the vectorizer and use a vocabulary file to adapt it

In [None]:
# Create the TextVectorization layer
max_tokens = 10_000  # Adjust depending on your vocabulary size

vectorizer = layers.TextVectorization(
    max_tokens=max_tokens,
    standardize=None,  # Custom tokenizer, so no built-in preprocessing
    split=tf_tokenizer,
    ragged=True,
)
dataset = tf.data.TextLineDataset("vocabulary.txt")
dataset = dataset.map(lambda line: [line])
vectorizer.adapt(dataset)

Test it out to make sure it works properly

In [None]:
def latex_to_token(string):
    return vectorizer(string)


id_to_token = {i: token for i, token in enumerate(vectorizer.get_vocabulary())}


def token_to_latex(tokens):
    return "".join([id_to_token[id] for id in tokens.numpy()])


latex_array = [
    r"E = mc^2",
    r"\frac{a}{b} + \sqrt{c}",
    r"\sum_{i=1}^n i^2 = \frac{n(n+1)(2n+1)}{6}",
    r"A = \pi r^2",
    r"G=\begin{bmatrix}1&\dots&1&0&\dots&0\\ \ast&\ast&\ast&&G^{\prime}&\\ \end{bmatrix}",
]
latex_data = tf.constant(latex_array)

# Tokenize and vectorize
tokenized_output = latex_to_token(latex_data)
print(tokenized_output)

E_mc2 = token_to_latex(tokenized_output[0])
print(E_mc2)

Save the vocabulary just in case. You would still need the custom latex parser function imported to use it in another project

In [None]:
with open("vectorizer_vocabulary.txt", "w") as f:
    for word in vectorizer.get_vocabulary():
        f.write(word)
        f.write("\n")

To re-create the vectorizer using the vocabulary, run the following code

In [34]:
max_tokens = 10_000
vectorizer = layers.TextVectorization(
    max_tokens=max_tokens,
    standardize=None,  # Custom tokenizer, so no built-in preprocessing
    split=tf_tokenizer,
    ragged=True,
)
with open("vectorizer_vocabulary.txt", "r") as f:
    lines = [line[:-1] for line in f]
    vectorizer.set_vocabulary(lines)

Test it out to make sure it works properly

In [None]:
def latex_to_token(string):
    return vectorizer(string)


id_to_token = {i: token for i, token in enumerate(vectorizer.get_vocabulary())}


def token_to_latex(tokens):
    return "".join([id_to_token[id] for id in tokens.numpy()])


latex_array = [
    r"E = mc^2",
    r"\frac{a}{b} + \sqrt{c}",
    r"\sum_{i=1}^n i^2 = \frac{n(n+1)(2n+1)}{6}",
    r"A = \pi r^2",
    r"G=\begin{bmatrix}1&\dots&1&0&\dots&0\\ \ast&\ast&\ast&&G^{\prime}&\\ \end{bmatrix}",
]
latex_data = tf.constant(latex_array)

# Tokenize and vectorize
tokenized_output = latex_to_token(latex_data)
print(tokenized_output)

E_mc2 = token_to_latex(tokenized_output[0])
print(E_mc2)

Try an actual data point

In [None]:
random_data_point = next(iter(validation.take(1)))
tokenized_data_point = latex_to_token([random_data_point['ground_truth'].numpy()])
print(tokenized_data_point)
detokenized_data_point = token_to_latex(tokenized_data_point[0])
print(detokenized_data_point)

### Preprocessing Strokes

Instead of images, this model takes in a stream of strokes, such as writing with a stylus on a tablet. Our dataset gives us a list of strokes, and each stroke is itself a list of coordinates [x, y] of the position of the stylus. Both the number of strokes and the length of each strokes changes for every value in our dataset, so we are going to pre-process the stroke data so it will be normalized (scaled to be between 0 and 1), and always fit in a tensor with shape `(64, 64, 2,)`. FOr this, I am using the [Ramer-Douglas-Peucker Algorithm](https://en.wikipedia.org/wiki/Ramer%E2%80%93Douglas%E2%80%93Peucker_algorithm) for polyline decimation.

In [123]:
@tf.function
def preprocess_strokes(strokes: tf.RaggedTensor):
    # First, scale values to between 0.0 and 1.0
    min_vals = tf.reduce_min(strokes, axis=(0, 1))
    max_vals = tf.reduce_max(strokes, axis=(0, 1))
    normalized_strokes = tf.map_fn(
        elems=strokes,
        fn=lambda stroke: (stroke - min_vals) / (max_vals - min_vals + 1e-6),
    )

    def point_line_distance(point, start, end):
        """
        Calculate the perpendicular distance from `point` to the line segment
        defined by `start` and `end`.
        """
        # Convert to 3D by adding a zero z-component
        point_3d = tf.concat([point, tf.zeros([1], dtype=tf.float32)], axis=0)
        start_3d = tf.concat([start, tf.zeros([1], dtype=tf.float32)], axis=0)
        end_3d = tf.concat([end, tf.zeros([1], dtype=tf.float32)], axis=0)

        # Return the perpendicular distance (norm of the cross product / norm of the line segment)
        return tf.norm(
            tf.linalg.cross(end_3d - start_3d, point_3d - start_3d)
        ) / tf.norm(end_3d - start_3d)

    def douglas_peucker(stroke, epsilon=0.02):
        """
        Non-recursive Douglas-Peucker algorithm implementation.
        """

        # Initialize the list of points to keep
        simplified_stroke: tf.TensorArray = tf.TensorArray(
            dtype=stroke.dtype, size=0, dynamic_size=True
        ).write(0, stroke[0])

        # Stack for processing: Each entry contains a tuple (start_index, end_index)
        stack: tf.TensorArray = tf.TensorArray(
            dtype=tf.int32, size=0, dynamic_size=True
        ).write(0, [0, tf.shape(stroke)[0] - 1])

        def cond(stack: tf.TensorArray, _simplified_stroke: tf.TensorArray):
            return tf.not_equal(stack.size(), 0)

        def body(stack: tf.TensorArray, simplified_stroke: tf.TensorArray):
            # Pop from the stack
            border_idxs = stack.read(stack.size() - 1)
            start_idx, end_idx = border_idxs[0], border_idxs[1]

            if stack.size() > 1:
                new_stack_tensor = stack.gather(tf.range(stack.size() - 1))
                # Create a new stack with the remaining elements
                new_stack = tf.TensorArray(dtype=tf.int32, size=0, dynamic_size=True)
                for i in tf.range(stack.size() - 1):
                    new_stack = new_stack.write(i, new_stack_tensor[i])
                stack = new_stack
            else:
                stack = tf.TensorArray(dtype=tf.int32, size=0, dynamic_size=True)

            # Get the relevant slice of the stroke
            sub_stroke = stroke[start_idx : end_idx + 1]

            # Calculate the perpendicular distances of all intermediate points
            start, end = sub_stroke[0], sub_stroke[-1]
            distances = tf.vectorized_map(
                elems=sub_stroke[1:-1], fn=lambda p: point_line_distance(p, start, end)
            )

            if tf.size(distances) > 0:
                max_distance = tf.reduce_max(distances)
                max_idx = (
                    tf.argmax(distances, output_type=tf.int32) + 1
                )  # +1 because we skip the start point

                # If the max distance is greater than epsilon, continue splitting
                if max_distance > epsilon:
                    stack = stack.write(stack.size(), [start_idx, start_idx + max_idx])
                    stack = stack.write(stack.size(), [start_idx + max_idx, end_idx])
                else:
                    # Otherwise, keep the start and end points
                    simplified_stroke = simplified_stroke.write(
                        simplified_stroke.size(), end
                    )
            else:
                # If no intermediate points exist, just keep the start and end points
                simplified_stroke = simplified_stroke.write(
                    simplified_stroke.size(), end
                )

            return stack, simplified_stroke

        # Return the simplified stroke
        return tf.while_loop(cond, body, [stack, simplified_stroke])[1].stack()

    downsampled_strokes = tf.map_fn(elems=normalized_strokes, fn=douglas_peucker)
    return downsampled_strokes

Let's try it out first!

In [None]:
random_data_point = next(iter(validation.shuffle(100_000).take(1)))
# print(random_data_point['strokes'])
print(preprocess_strokes(random_data_point["strokes"]))

In [None]:
random_data_point = next(iter(validation.shuffle(200_000).take(1)))
print(random_data_point["strokes"])
print(preprocess_strokes(random_data_point["strokes"]))
# filepath = random_data_point["filepath"].numpy().decode("ascii")
# os.system(f"inkmlviewer {filepath}")

### Preprocessing datasets

Now, we can go through our datasets a preprocess all the data. We will need both our vectorizer and our stroke preprocessor together. Since we are going with an encoder-decoder model, we need input data for the decoder as well, which should be the desired output, but just missing the last token, and with the start token added in front.

In [103]:
def preprocess_data(data):
    input_strokes = preprocess_strokes(data["strokes"])
    ground_truth = vectorizer([data["ground_truth"]])
    decoder_input = ground_truth[0][:-1]
    decoder_output = ground_truth[0][1:]
    return (input_strokes, decoder_input), decoder_output

In [None]:
for data in test.shuffle(200_000).take(5).map(preprocess_data):
    # pp_data = preprocess_data(data)
    # pp_data = data
    print("Decoder input:", token_to_latex(data[0][1]))
    print("True value:", token_to_latex(data[1]))

In [None]:
test = test.map(preprocess_data).shuffle(200_000).batch(32).prefetch(tf.data.AUTOTUNE)
train = train.map(preprocess_data).shuffle(200_000).batch(32).prefetch(tf.data.AUTOTUNE)
validation = (
    validation.map(preprocess_data)
    .shuffle(200_000)
    .batch(32)
    .prefetch(tf.data.AUTOTUNE)
)

Let's test this out to make sure it worked!

In [28]:
print(next(iter(validation.take(1))))

## Model Architecture

My model is an encoder-decoder architecture, with a CNN for the encoder, a feedforward network to get to the latent space, and a LSTM RNN for the decoder.

### Encoder

In [None]:
input_strokes = layers.Input(shape=(64, 64, 2))
x = layers.Conv2D(64, kernel_size=(3, 3), padding="same")(input_strokes)
x = layers.MaxPooling2D((2, 2))(x)
x = layers.Conv2D(128, kernel_size=(3, 3), padding="same")(x)
x = layers.MaxPooling2D((2, 2))(x)
x = layers.Conv2D(256, kernel_size=(3, 3), padding="same")(x)
x = layers.MaxPooling2D((2, 2))(x)
x = layers.Conv2D(512, kernel_size=(3, 3), padding="same")(x)
x = layers.MaxPooling2D((2, 2))(x)

latent_space = layers.Dense(1024, activation="relu")(x)
latent_space = layers.Dense(512, activation="relu")(latent_space)
latent_space = layers.Dense(512, activation="relu")(latent_space)
latent_space = layers.Dense(512, activation="relu")(latent_space)
latent_space = layers.Dense(512, activation="relu")(latent_space)

### Decoder

In [None]:
latent_space_h = layers.Dense(256, activation="relu")(latent_space)
latent_space_c = layers.Dense(256, activation="relu")(latent_space)

decoder_input = layers.Input(shape=(None,))
decoder_embedding = layers.Embedding(input_dim=vectorizer.vocabulary_size(), output_dim=128)(
    decoder_input
)
decoder_lstm = layers.LSTM(128, return_sequences=True)
decoder_output = decoder_lstm(
    decoder_embedding, initial_state=[latent_space_h, latent_space_c]
)

### Output

In [None]:
output = layers.Dense(vectorizer.vocabulary_size(), activation="softmax")(decoder_output)
model = keras.Model([input_strokes, decoder_input], output)
model.compile(
    optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)

## Training the Model

Now we can finally train our model! We have a ton of training and validation data to use. We'll also save the history so we can get a graph of the change in loss over each epoch.

In [None]:
history = model.fit(
    train,
    validation_data=validation,
    epochs=20,
    verbose=1,
    callbacks=[
        tf.keras.callbacks.EarlyStopping(patience=3, restore_best_weights=True),
        tf.keras.callbacks.ModelCheckpoint("model_checkpoint.h5", save_best_only=True),
    ],
)