In [1]:
import os
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

In [3]:
IMAGE_SIZE = (128, 32)
BATCH_SIZE = 64
EPOCHS = 50
PADDING_TOKEN = 99

In [5]:
DATA_INPUT_PATH = "./words.txt"

images_path = []
labels = []

def preprocess_dataset():
    characters = set()
    max_len = 0
    with open(DATA_INPUT_PATH, 'r') as file:
        lines = file.readlines()

        for line_number, line in enumerate(lines):
            # Skip comments and empty lines
            if line.startswith('#') or line.strip() == '':
                continue

            # Split the line and extract information
            parts = line.strip().split()

            # Continue with the rest of the code
            word_id = parts[0]

            first_folder = word_id.split("-")[0]
            second_folder = first_folder + '-' + word_id.split("-")[1]
            # Construct the image filename
            image_filename = f"{word_id}.png"
            image_path = os.path.join('words', first_folder, second_folder, image_filename)

            # Check if the image file exists
            if os.path.isfile(image_path) and os.path.getsize(image_path):

                images_path.append(image_path)

                # Extract labels
                label = parts[-1].strip()
                for char in label:
                    characters.add(char)

                max_len = max(max_len, len(label))
                labels.append(label)

    characters = sorted(list(characters))
    characters = sorted(list(characters))

    print('characters: ', characters)
    print('max_len: ', max_len)
    # Mapping characters to integers.
    char_to_num = tf.keras.layers.StringLookup(
        vocabulary=list(characters), mask_token=None)

    # Mapping integers back to original characters.
    num_to_char = tf.keras.layers.StringLookup(
        vocabulary=char_to_num.get_vocabulary(), mask_token=None, invert=True
    )
    return characters, char_to_num, num_to_char, max_len
    
characters, char_to_num, num_to_char, max_len = preprocess_dataset()

characters:  ['!', '"', '#', '&', "'", '(', ')', '*', '+', ',', '-', '.', '/', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ':', ';', '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']
max_len:  53


In [7]:
def distortion_free_resize(image, img_size):
    w, h = img_size
    image = tf.image.resize(image, size=(h, w), preserve_aspect_ratio=True)

    # Check tha amount of padding needed to be done.
    pad_height = h - tf.shape(image)[0]
    pad_width = w - tf.shape(image)[1]

    # Only necessary if you want to do same amount of padding on both sides.
    if pad_height % 2 != 0:
        height = pad_height // 2
        pad_height_top = height + 1
        pad_height_bottom = height
    else:
        pad_height_top = pad_height_bottom = pad_height // 2

    if pad_width % 2 != 0:
        width = pad_width // 2
        pad_width_left = width + 1
        pad_width_right = width
    else:
        pad_width_left = pad_width_right = pad_width // 2

    image = tf.pad(
        image,
        paddings=[
            [pad_height_top, pad_height_bottom],
            [pad_width_left, pad_width_right],
            [0, 0],
        ],
    )

    image = tf.transpose(image, perm=[1, 0, 2])
    image = tf.image.flip_left_right(image)
    return image

In [9]:
def preprocess_image(image_path, img_size):
    image = tf.io.read_file(image_path)
    image = tf.image.decode_png(image, 1)
    image = distortion_free_resize(image, img_size)
    image = tf.cast(image, tf.float32) / 255.0
    return image

def vectorize_label(label):
    label = char_to_num(tf.strings.unicode_split(
        label, input_encoding="UTF-8"))
    length = tf.shape(label)[0]
    pad_amount = max_len - length
    label = tf.pad(label, paddings=[[0, pad_amount]],
                   constant_values=PADDING_TOKEN)
    return label

In [11]:
def process_images_labels(image_path, label):
    image = preprocess_image(image_path, IMAGE_SIZE)
    label = vectorize_label(label)
    return {"image": image, "label": label}

def prepare_dataset(image_paths, labels):
    AUTOTUNE = tf.data.AUTOTUNE
    print('len(image_paths): ', len(image_paths))
    print('len(labels): ', len(labels))
    dataset = tf.data.Dataset.from_tensor_slices((image_paths, labels)).map(
        process_images_labels, num_parallel_calls=AUTOTUNE
    )
    return dataset.batch(BATCH_SIZE).cache().prefetch(AUTOTUNE)

In [13]:
def split_dataset():
    # Split the data into training, validation, and test sets using train_test_split
    train_images, test_images, train_labels, test_labels = train_test_split(
        images_path, labels, test_size=0.2, random_state=42
    )

    # Further split the test set into validation and final test sets
    val_images, test_images, val_labels, test_labels = train_test_split(
        test_images, test_labels, test_size=0.5, random_state=42
    )

    train_set = prepare_dataset(train_images, train_labels)
    val_set = prepare_dataset(val_images, val_labels)
    test_set = prepare_dataset(test_images, test_labels)
    
    return train_set, val_set, test_set

train_set, val_set, test_set = split_dataset()


len(image_paths):  92254
len(labels):  92254
len(image_paths):  11532
len(labels):  11532
len(image_paths):  11532
len(labels):  11532


In [15]:
import Levenshtein as lev

In [16]:
validation_images = []
validation_labels = []

for batch in val_set:
    validation_images.append(batch["image"])
    validation_labels.append(batch["label"])

In [19]:
class CTCLayer(tf.keras.layers.Layer):
  def __init__(self, name=None):
    super().__init__(name=name)
    self.loss_fn = tf.keras.backend.ctc_batch_cost

  def call(self,y_true, y_pred):
    batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
    input_length = tf.cast(tf.shape(y_pred)[1], dtype = "int64")
    label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

    input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
    label_length = label_length * tf.ones(shape=(batch_len,1), dtype="int64")
    loss = self.loss_fn(y_true, y_pred, input_length, label_length)
    self.add_loss(loss)

    return y_pred



def build_model():
  # Inputs to the model. These are used when the input and output have different structures.
  input_img = tf.keras.Input(shape=(IMAGE_SIZE[0], IMAGE_SIZE[1],1), name = "image")
  labels = tf.keras.layers.Input(name="label", shape=(None,))

  # first conv block.
  x = tf.keras.layers.Conv2D(32, (3,3), activation="relu", kernel_initializer="he_normal", padding="same", name="Conv1")(input_img)
  x = tf.keras.layers.MaxPooling2D((2,1), name="pool1")(x)

  #Second conv block.
  x = tf.keras.layers.Conv2D(64, (3,3), activation = "relu", kernel_initializer="he_normal", padding="same",name="Conv2")(x)
  x = tf.keras.layers.MaxPooling2D((1,1), name = "pool2")(x)

  # Here we have used two max pool with pool size and strides 2.
  # Hence, downsampled feature maps are 4x smaller.
  # The number of filter in the last layer is 64 .
  # Reshape accordingly before passing the output to the RNN part of the model.

  #If the CNN output is of shape (batch_size, new_width, new_height, num_channels), the reshaping converts it to (batch_size, new_width, new_height * num_channels).
  #This is necessary to convert the output of the CNN into a format suitable for the subsequent Dense layer and RNN.
  #primary purpose of Dropout is to prevent overfitting and generalize during the training of the neural network.
  #new_shape = ((IMAGE_SIZE[0] // 4), (IMAGE_SIZE[1] // 4)* 64)
  #new_shape = (32, 32 * 64)
  new_shape = (IMAGE_SIZE[0], (IMAGE_SIZE[1] // 2) * 64)
  x = tf.keras.layers.Reshape(target_shape = new_shape, name="reshape")(x)
  x = tf.keras.layers.Dense(64, activation= "relu", name="dense1")(x)
  x = tf.keras.layers.Dropout(0.2)(x)

  #RNNs
  x = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(128, return_sequences=True, dropout = 0.25))(x)
  x = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(64, return_sequences = True, dropout = 0.25))(x)

  # +2 is to account for the two special tokens introduced by the CTC loss.
  # The recommendation is from https://git.io/J0eXP.
  x = tf.keras.layers.Dense(len(char_to_num.get_vocabulary()) + 2, activation = "softmax", name="dense2")(x)

  #Add CTC layer for calculating CTC loss at each step.
  output = CTCLayer(name="ctc_loss")(labels, x)

  #Define the model.
  model = tf.keras.models.Model(
      inputs = [input_img, labels], outputs = output, name="handwriting_recognizer")
  #optimizer
  opt = tf.keras.optimizers.Adam()
  #compile the model and return
  model.compile(optimizer = opt)
  return model


#Get model
model = build_model()
model.summary()


Model: "handwriting_recognizer"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 image (InputLayer)             [(None, 128, 32, 1)  0           []                               
                                ]                                                                 
                                                                                                  
 Conv1 (Conv2D)                 (None, 128, 32, 32)  320         ['image[0][0]']                  
                                                                                                  
 pool1 (MaxPooling2D)           (None, 64, 32, 32)   0           ['Conv1[0][0]']                  
                                                                                                  
 Conv2 (Conv2D)                 (None, 64, 32, 64)   18496       ['pool1[0][0

In [21]:
def calculate_character_error_rate(labels, predictions):
    sparse_labels = tf.cast(tf.sparse.from_dense(labels), dtype=tf.int64)

    input_len = np.ones(predictions.shape[0]) * predictions.shape[1]
    predictions_decoded = tf.keras.backend.ctc_decode(predictions, input_length=input_len, greedy=True)[0][0]

    predictions_decoded = predictions_decoded.numpy()[:, :max_len]
    sparse_predictions = tf.cast(tf.sparse.from_dense(predictions_decoded), dtype=tf.int64)

    edit_distances = tf.edit_distance(
        sparse_predictions, sparse_labels, normalize=False
    )

    total_characters = tf.reduce_sum(tf.cast(tf.not_equal(labels, 0), dtype=tf.int32))

    return tf.reduce_sum(edit_distances) / tf.cast(total_characters, dtype=tf.float32)


def calculate_word_error_rate(reference, hypothesis):
    return lev.distance(reference.split(), hypothesis.split()) / len(reference.split())


In [23]:
class CharacterErrorRateCallback(tf.keras.callbacks.Callback):
    def __init__(self, pred_model):
        super().__init__()
        self.prediction_model = pred_model
        self.cer_per_epoch = []  # To store CER per 5 epochs
        self.wer_per_epoch = []  # To store WER per 5 epochs

    def on_epoch_end(self, epoch, logs=None):
        # Check if current epoch is a multiple of 5
        character_error_rates = []
        word_error_rates = []

        for i in range(len(validation_images)):
            labels = validation_labels[i]
            predictions = self.prediction_model.predict(validation_images[i])
            cer = calculate_character_error_rate(labels, predictions).numpy()
            character_error_rates.append(cer)

            labels_flat = labels.numpy().flatten() if isinstance(labels, tf.Tensor) else labels.flatten()
            labels_str = ''.join([chr(int(l)) for l in labels_flat if int(l) != 0])
            predictions_flat = np.argmax(predictions, axis=-1).flatten()
            predictions_str = ''.join([chr(int(p)) for p in predictions_flat if int(p) != 0])

            wer = calculate_word_error_rate(labels_str, predictions_str)
            word_error_rates.append(wer)

        # Store CER and WER for this epoch
        self.cer_per_epoch.append(np.mean(character_error_rates))
        self.wer_per_epoch.append(np.mean(word_error_rates))

        print(
            f"Epoch {epoch + 1}: Mean CER = {np.mean(character_error_rates):.4f}, Mean WER = {np.mean(word_error_rates):.4f}"
        )


In [25]:
from tensorflow import keras

In [27]:
gpus = tf.config.experimental.list_physical_devices('GPU')
print("GPUs available: ", gpus)

GPUs available:  [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


In [None]:
with tf.device('/GPU:0'): 
    epochs = 50
    
    model = build_model()
    input_layer = model.inputs[0]
    print(input_layer)
    output_layer = model.get_layer(name="dense2").output
    print(output_layer)
    prediction_model = keras.models.Model(inputs=input_layer, outputs=output_layer)
    
    print(prediction_model)
    
    # prediction_model = keras.models.Model(
       # model.get_layer(name="image").input, model.get_layer(name="dense2").output
    # )
    
    # edit_distance_callback = EditDistanceCallback(prediction_model)
    CER_callback = CharacterErrorRateCallback(prediction_model)
    # Train the model
    history = model.fit(
        train_set,
        validation_data = val_set,
        epochs = epochs,
        callbacks=[CER_callback]
    )

KerasTensor(type_spec=TensorSpec(shape=(None, 128, 32, 1), dtype=tf.float32, name='image'), name='image', description="created by layer 'image'")
KerasTensor(type_spec=TensorSpec(shape=(None, 128, 81), dtype=tf.float32, name=None), name='dense2/Softmax:0', description="created by layer 'dense2'")
<keras.engine.functional.Functional object at 0x0000018FA3A1B430>
Epoch 1/50
Epoch 1: Mean CER = 0.9990, Mean WER = 1.0000
Epoch 2/50
Epoch 2: Mean CER = 0.9880, Mean WER = 1.0485
Epoch 3/50
Epoch 3: Mean CER = 0.9839, Mean WER = 1.0351
Epoch 4/50
Epoch 4: Mean CER = 0.9782, Mean WER = 1.0932
Epoch 5/50
Epoch 5: Mean CER = 0.9615, Mean WER = 1.0795
Epoch 6/50
Epoch 6: Mean CER = 0.9523, Mean WER = 1.0777
Epoch 7/50
Epoch 7: Mean CER = 0.9469, Mean WER = 1.0917
Epoch 8/50
Epoch 8: Mean CER = 0.9441, Mean WER = 1.1012
Epoch 9/50
Epoch 9: Mean CER = 0.9415, Mean WER = 1.1256
Epoch 10/50
Epoch 10: Mean CER = 0.9407, Mean WER = 1.0824
Epoch 11/50
Epoch 11: Mean CER = 0.9402, Mean WER = 1.1059
Epoch

In [None]:
# Plot Training and Validation Loss
epochs_range = range(1, epochs + 1)

plt.figure(figsize=(14, 8))

# Training and Validation Loss
plt.subplot(2, 2, 1)
plt.plot(epochs_range, history.history['loss'], label='Training Loss')
plt.plot(epochs_range, history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()

# Character Error Rate (CER)
plt.subplot(2, 2, 2)
plt.plot(epochs_range, CER_callback.cer_per_epoch, label='Character Error Rate (CER)', color='orange')
plt.xlabel('Epochs')
plt.ylabel('CER')
plt.title('Character Error Rate per Epoch')
plt.legend()

# Word Error Rate (WER)
plt.subplot(2, 2, 3)
plt.plot(epochs_range, CER_callback.wer_per_epoch, label='Word Error Rate (WER)', color='green')
plt.xlabel('Epochs')
plt.ylabel('WER')
plt.title('Word Error Rate per Epoch')
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
prediction_model.save('model_V149.keras')
prediction_model.save('model_V149.h5')