### Imports

In [1]:
import os
import tensorflow as tf
import matplotlib.pyplot as plt
import datetime
import load_data
import load_transfer_data
import models
import matplotlib.pyplot as plt
import numpy as np
import pickle

print(f"TensorFlow version: {tf.__version__}")

Maximum length:  29
Vocab size:  67
TensorFlow version: 2.10.1


Config Variables

In [2]:
from config import *

### Dataset

In [3]:
load_data.print_samples(IAM_DATASET_PATH)
x_train_img_paths, y_train_labels = load_data.get_train_data()
x_test_img_paths, y_test_labels = load_data.get_test_data()
x_val_img_paths, y_val_labels = load_data.get_validation_data()

Total train samples: 10209
Total validation samples: 567
Total test samples: 568
Maximum length:  93
Vocab size:  79


In [4]:
import tokenizer

train_ds = tokenizer.prepare_dataset(
    x_train_img_paths, y_train_labels, (IMAGE_WIDTH, IMAGE_HEIGHT), BATCH_SIZE
)
val_ds = tokenizer.prepare_dataset(
    x_val_img_paths, y_val_labels, (IMAGE_WIDTH, IMAGE_HEIGHT), BATCH_SIZE
)
test_ds = tokenizer.prepare_dataset(
    x_test_img_paths, y_test_labels, (IMAGE_WIDTH, IMAGE_HEIGHT), BATCH_SIZE
)
aug_train_ds = tokenizer.prepare_augmented_dataset(
    x_train_img_paths, y_train_labels, BATCH_SIZE
)

### Load Model

In [None]:
char = len(tokenizer.char_to_num.get_vocabulary())
model = models.build_model9v4(IMAGE_WIDTH, IMAGE_HEIGHT, char, LEARNING_RATE)
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE))
model.summary()


### Learning Rate Scheduler

In [None]:
lr_scheduler = tf.keras.optimizers.schedules.CosineDecay(
    initial_learning_rate=LEARNING_RATE / 10,
    decay_steps=500,
    alpha=LEARNING_RATE / 100,
    warmup_target=LEARNING_RATE,
    warmup_steps=100,
    name="cosine_decay",
)


### Train Model

In [10]:
callbacks = [
    tf.keras.callbacks.ModelCheckpoint(
        filepath="model_checkpoint.h5", save_best_only=True
    ),
    tf.keras.callbacks.EarlyStopping(
        monitor="val_loss", patience=PATIENCE, restore_best_weights=True
    ),
    tf.keras.callbacks.TensorBoard(log_dir="logs"),
]


In [None]:
model.fit(train_ds, epochs=EPOCHS, validation_data=val_ds, callbacks=callbacks)

### Training Loop with GradientTape

In [6]:
train_loss = tf.keras.metrics.Mean("train_loss", dtype=tf.float32)
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy("train_accuracy")
val_loss = tf.keras.metrics.Mean("test_loss", dtype=tf.float32)
val_accuracy = tf.keras.metrics.SparseCategoricalAccuracy("test_accuracy")


In [7]:
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)

In [8]:
def train_step(model, optimizer, x_train, y_train):
    with tf.GradientTape() as tape:
        predictions = model(x_train, training=True)
        loss = loss_object(y_train, predictions)
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

    train_loss(loss)
    train_accuracy(y_train, predictions)


def validation_step(model, x_val, y_val):
    predictions = model(x_val)
    loss = loss_object(y_val, predictions)

    val_loss(loss)
    val_accuracy(y_val, predictions)


In [9]:
current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
train_log_dir = f"logs/custom_train_tb/{current_time}/train"
test_log_dir = f"logs/custom_train_tb/{current_time}/validation"
train_summary_writer = tf.summary.create_file_writer(train_log_dir)
test_summary_writer = tf.summary.create_file_writer(test_log_dir)


In [None]:
for epoch in range(EPOCHS):
    train_loss.reset_states()
    val_loss.reset_states()
    train_accuracy.reset_states()
    val_accuracy.reset_states()

    for x_train, y_train in train_ds:
        train_step(model, optimizer, x_train, y_train)
    with train_summary_writer.as_default():
        tf.summary.scalar("loss", train_loss.result(), step=epoch)
        tf.summary.scalar("accuracy", train_accuracy.result(), step=epoch)

    for x_val, y_val in val_ds:
        validation_step(model, x_val, y_val)
    with test_summary_writer.as_default():
        tf.summary.scalar("loss", val_loss.result(), step=epoch)
        tf.summary.scalar("accuracy", val_accuracy.result(), step=epoch)

    template = "Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, Test Accuracy: {}"
    print(
        template.format(
            epoch + 1,
            train_loss.result(),
            train_accuracy.result() * 100,
            val_loss.result(),
            val_accuracy.result() * 100,
        )
    )

for callback in callbacks:
    callback.on_epoch_end(
        epoch + 1,
        {
            "loss": train_loss.result(),
            "accuracy": train_accuracy.result(),
            "val_loss": val_loss.result(),
            "val_accuracy": val_accuracy.result(),
        },
    )


### Show Predictions

In [None]:
def decode_batch_predictions(pred):
    input_len = np.ones(pred.shape[0]) * pred.shape[1]
    # Use greedy search. For complex tasks, you can use beam search.
    results = keras.backend.ctc_decode(pred, input_length=input_len, greedy=True)[0][0][
        :, : load_data.max_len
    ]
    # Iterate over the results and get back the text.
    output_text = []
    for res in results:
        res = tf.gather(res, tf.where(tf.math.not_equal(res, -1)))
        res = tf.strings.reduce_join(tokenizer.num_to_char(res)).numpy().decode("utf-8")
        output_text.append(res)
    return output_text


def plot_evaluation(name, dir_path, save_fig):
    for batch in val_ds.take(1):
        batch_images = batch["image"]
        _, ax = plt.subplots(4, 4, figsize=(32, 8))

        preds = prediction_model.predict(batch_images)
        pred_texts = decode_batch_predictions(preds)

        for i in range(min(16, BATCH_SIZE)):
            img = batch_images[i]
            img = tf.image.flip_left_right(img)
            img = tf.transpose(img, perm=[1, 0, 2])
            img = (img * 255.0).numpy().clip(0, 255).astype(np.uint8)
            img = img[:, :, 0]

            title = f"Prediction: {pred_texts[i]}"
            ax[i // 4, i % 4].imshow(img, cmap="gray")
            ax[i // 4, i % 4].set_title(title)
            ax[i // 4, i % 4].axis("off")
    if save_fig:
        path = os.path.join(dir_path, name + "_result.png")
        plt.savefig(path)

    plt.show()


### Launch Tensorboard

In [None]:
%tensorboard --logdir logs/custom_train_tb

### Save Model

In [None]:
weights_keras_string = "_weights.keras"


def create_dir(path_to_dir):
    isExist = os.path.exists(path_to_dir)
    if not isExist:
        os.makedirs(path_to_dir)


In [None]:
if not os.path.exists(MODEL_DIR_NAME):
    create_dir(MODEL_DIR_NAME)
model_path = os.path.join(MODEL_DIR_NAME, "{model_name}".format(model_name=MODEL_NAME))
model.save(model_path)
model.save_weights(
    os.path.join(model_path, f"{MODEL_NAME}{weights_keras_string}"),
    overwrite=True,
    save_format=None,
    options=None,
)
json_string = model.to_json()

with open(os.path.join(model_path, f"{MODEL_NAME}.json"), "w") as f:
    f.write(json_string)

data_to_save = (load_data.max_len, load_data.characters)

with open(os.path.join(model_path, "handwriting_chars.pkl"), "wb") as file:
    pickle.dump(data_to_save, file)
