# Handwriting Recognition

This code is used for handwriting recognition. It is based on a corresponding model on Kaggle and has been adapted and optimized for the specific example.

In [None]:
# import packages
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

import cv2
import numpy as np
import string
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)
import tensorflow.keras.backend as K

from tensorflow import keras
from tensorflow.keras import layers

from tensorflow.keras.preprocessing.sequence import pad_sequences

from tensorflow.keras.layers import Dense, Reshape, BatchNormalization, Input, Conv2D, MaxPool2D, Lambda, Bidirectional
from tensorflow.compat.v1.keras.layers import CuDNNLSTM
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import *
from tensorflow.keras.utils import to_categorical, Sequence
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tqdm import tqdm
from collections import Counter
from PIL import Image
from itertools import groupby

from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import Sequential
from tensorflow.keras.callbacks import TensorBoard
from nltk.translate.bleu_score import corpus_bleu

In [None]:
# read the text document and use the text document to generate the path for the model
file_path="../input/iam-handwriting-word-database/words_new.txt"
with open(file_path) as f:
    lines = f.readlines()

label_raw=lines[18:]

image_texts =[]
image_paths =[]
default_path="../input/iam-handwriting-word-database/iam_words/words/"
for label in label_raw:
  if label.split()[1]=="ok":
    image_texts.append(label.split()[-1])
    image_paths.append(default_path+label.split()[0].split("-")[0]+"/"+label.split()[0].split("-")[0]+"-"+label.split()[0].split("-")[1]+"/"+label.split()[0]+".png")

In [None]:
# define variables
image_texts=image_texts
image_paths=image_paths

In [None]:
# get paths of corrupt images
corrupt_images = []

for path in image_paths:
    try:
        img = cv2.cvtColor(cv2.imread(path), cv2.COLOR_BGR2GRAY)
    except:
        corrupt_images.append(path)

In [None]:
# check corrupt images
corrupt_images, len(corrupt_images)

In [None]:
# delete corrupt images
for path in corrupt_images:

    corrupt_index = image_paths.index(path)
    del image_paths[corrupt_index]
    del image_texts[corrupt_index]

In [None]:
# get vocabulary for the current dataset
vocab_full = set("".join(map(str, image_texts)))
print(sorted(vocab_full))
len(vocab_full)

In [None]:
# delete upper case letter to make vocabulary smaller
vocab = []

for letter in vocab_full:
    # convert to lower case
    let_low = letter.lower()
    if let_low not in vocab:
        vocab.append(let_low)
    else:
        continue

print(len(vocab))
vocab

In [None]:
# get maximal lenght of words
max_label_len = max([len(str(text)) for text in image_texts])
max_label_len

In [None]:
# sorting vocabulary
char_list = sorted(vocab)

# define label encoding
def encode_to_labels(txt):
    # encoding each output word into digits
    dig_lst = []

    for index, char in enumerate(txt):
        try:
            dig_lst.append(char_list.index(char.lower()))
        except:
            print(char)

    return pad_sequences([dig_lst], maxlen=max_label_len, padding='post', value=len(char_list))[0]

In [None]:
# save padded labels
padded_image_texts = list(map(encode_to_labels, image_texts))
padded_image_texts[0]

In [None]:
# split dataset into train, validation and test data
train_image_paths = image_paths[ : int(len(image_paths) * 0.80)]
train_image_texts = padded_image_texts[ : int(len(image_texts) * 0.80)]

test_image_paths = image_paths[int(len(image_paths) * 0.80) : ]
test_image_texts = padded_image_texts[int(len(image_texts) * 0.80) : ]

In [None]:
# define image preprocessing
def process_single_sample(img_path, label):

    # 1. Read image
    img = tf.io.read_file(img_path)

    # 2. Decode and convert to grayscale
    img = tf.io.decode_png(img, channels=1)

    # 3. Data augmentation
    img = tf.image.random_flip_left_right(img, seed=None)
    img = tf.image.random_flip_up_down(img, seed=None)

    # 4. Convert to float32 in [0, 1] range
    img = tf.image.convert_image_dtype(img, tf.float32)

    # 5. Resize to the desired size
    img = tf.image.resize(img, [32, 128])

#     img = tf.transpose(img, perm=[1, 0, 2])
    return {"image": img, "label": label}

In [None]:
# define batch size
batch_size = 20

train_dataset = tf.data.Dataset.from_tensor_slices((train_image_paths, train_image_texts))
#train_dataset = train_datagen.flow_from_directory(train_image_paths,classes=train_image_texts,batch_size=batch_size)

# training dataset
train_dataset = (
    train_dataset.map(

        process_single_sample, num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
    .batch(batch_size)
    .prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
)

# validation dataset
validation_dataset = tf.data.Dataset.from_tensor_slices((val_image_paths, val_image_texts))
validation_dataset = (
    validation_dataset.map(
        process_single_sample, num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
    .batch(batch_size)
    .prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
)



In [None]:
# Mapping characters to integers
char_to_num = layers.experimental.preprocessing.StringLookup(
    vocabulary=char_list, num_oov_indices=0, mask_token=None
)

# Mapping integers back to original characters
num_to_char = layers.experimental.preprocessing.StringLookup(
    vocabulary=char_to_num.get_vocabulary(), mask_token=None, invert=True
)

train_data_fig, ax = plt.subplots(4, 4, figsize=(15, 10))
train_data_fig.suptitle('Training data', weight='bold', size=18)

# show some images and labels
for batch in train_dataset.take(1):
    images = batch["image"]
    labels = batch["label"]
    #print(labels)

    for i in range(16):
        img = (images[i] * 255).numpy().astype("uint8")
        label = tf.strings.reduce_join(num_to_char(labels[i])).numpy().decode("utf-8")

        label = label.replace('[UNK]', '')
        ax[i // 4, i % 4].imshow(img[:, :, 0], cmap="gray")
        ax[i // 4, i % 4].set_title(label)
        ax[i // 4, i % 4].axis("off")

plt.show()

In [None]:
# define CTC layer
class CTCLayer(layers.Layer):

    def __init__(self, name=None):

        super().__init__(name=name)
        self.loss_fn = keras.backend.ctc_batch_cost

    def call(self, y_true, y_pred):
        # Compute the training-time loss value and add it
        # to the layer using `self.add_loss()`.

        batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
        input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
        label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

        input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
        label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

        loss = self.loss_fn(y_true, y_pred, input_length, label_length)
        self.add_loss(loss)

        # At test time, just return the computed predictions
        return y_pred

In [None]:
# define decoder for predictions
def ctc_decoder(predictions):

    #input: given batch of predictions from text rec model
    #output: return lists of raw extracted text

    text_list = []

    pred_indcies = np.argmax(predictions, axis=2)

    for i in range(pred_indcies.shape[0]):
        ans = ""

        ## merge repeats
        merged_list = [k for k,_ in groupby(pred_indcies[i])]

        ## remove blanks
        for p in merged_list:
            if p != len(char_list):
                ans += char_list[int(p)]

        text_list.append(ans)

    return text_list

In [None]:
figures_list = []

class PlotPredictions(tf.keras.callbacks.Callback):

    def __init__(self, frequency=1):
        self.frequency = frequency
        super(PlotPredictions, self).__init__()

        batch = validation_dataset.take(1)
        self.batch_images = list(batch.as_numpy_iterator())[0]["image"]
        self.batch_labels = list(batch.as_numpy_iterator())[0]["label"]

    def plot_predictions(self, epoch):

        prediction_model = keras.models.Model(
            self.model.get_layer(name="image").input,
            self.model.get_layer(name="dense").output
        )

        preds = prediction_model.predict(self.batch_images)
        pred_texts = ctc_decoder(preds)

        orig_texts = []

        for label in self.batch_labels:
            orig_texts.append("".join([char_list[int(char_ind)] for char_ind in label if not(char_ind == len(char_list))]))

        fig , ax = plt.subplots(4, 4, figsize=(15, 5))
        fig.suptitle('Epoch: '+str(epoch), weight='bold', size=14)

        for i in range(16):

            img = (self.batch_images[i, :, :, 0] * 255).astype(np.uint8)
            title = f"Prediction: {pred_texts[i]}"
            ax[i // 4, i % 4].imshow(img, cmap="gray")
            ax[i // 4, i % 4].set_title(title)
            ax[i // 4, i % 4].axis("off")

        plt.show()

        figures_list.append(fig)

    def on_epoch_end(self, epoch, logs=None):
        if epoch % self.frequency == 0:
            self.plot_predictions(epoch)

In [None]:
# train model
epochs = 50

# input with shape of height=32 and width=128
inputs = Input(shape=(32, 128, 1), name="image")

labels = layers.Input(name="label", shape=(None,), dtype="float32")

conv_1 = Conv2D(32, (3,3), activation = "selu", padding='same')(inputs)
pool_1 = MaxPool2D(pool_size=(2, 2))(conv_1)

conv_2 = Conv2D(64, (3,3), activation = "selu", padding='same')(pool_1)
pool_2 = MaxPool2D(pool_size=(2, 2))(conv_2)

conv_3 = Conv2D(128, (3,3), activation = "selu", padding='same')(pool_2)
conv_4 = Conv2D(128, (3,3), activation = "selu", padding='same')(conv_3)

conv_5 = Conv2D(512, (3,3), activation = "selu", padding='same')(conv_4)
conv_6 = Conv2D(512, (3,3), activation = "selu", padding='same')(conv_5)
drop_out=tf.keras.layers.Dropout(0.2)(conv_6)
conv_7 = Conv2D(512, (3,3), activation = "selu", padding='same')(drop_out)
conv_8 = Conv2D(512, (3,3), activation = "selu", padding='same')(conv_7)

pool_4 = MaxPool2D(pool_size=(2, 1))(conv_8)

conv_5 = Conv2D(256, (3,3), activation = "selu", padding='same')(pool_4)

# Batch normalization layer
batch_norm_5 = BatchNormalization()(conv_5)
conv_6 = Conv2D(256, (3,3), activation = "selu", padding='same')(batch_norm_5)
batch_norm_6 = BatchNormalization()(conv_6)
pool_6 = MaxPool2D(pool_size=(2, 1))(batch_norm_6)
conv_7 = Conv2D(64, (2,2), activation = "selu")(pool_6)
squeezed = Lambda(lambda x: K.squeeze(x, 1))(conv_7)

# bidirectional LSTM layers with units=128
blstm_1 = Bidirectional(CuDNNLSTM(128, return_sequences=True))(squeezed)
blstm_2 = Bidirectional(CuDNNLSTM(512, return_sequences=True))(blstm_1)
blstm_3 = Bidirectional(CuDNNLSTM(512, return_sequences=True))(blstm_2)
blstm_4 = Bidirectional(CuDNNLSTM(512, return_sequences=True))(blstm_3)
blstm_5 = Bidirectional(CuDNNLSTM(128, return_sequences=True))(blstm_4)
dense2=Dense(128,activation = 'relu')(blstm_5)
softmax_output = Dense(len(char_list) + 1, activation = 'softmax', name="dense")(dense2)

output = CTCLayer(name="ctc_loss",)(labels, softmax_output)


optimizer = Adam(lr=0.0001, beta_1=0.9, beta_2=0.999, clipnorm=1.0)

#model to be used at training time
model = Model(inputs=[inputs, labels], outputs=output)
model.compile(loss = "SparseCategoricalCrossentropy", optimizer = optimizer,metrics=[tf.keras.metrics.Accuracy()])
#model.compile(optimizer = optimizer, metrics = [])

print(model.summary())
file_path = "/xxx/xxx/C_LSTM_best_cs.hdf5"

# save best model according to val_loss
checkpoint = ModelCheckpoint(filepath=file_path,
                                monitor='val_loss',
                                verbose=1,
                                save_best_only=True,
                                mode='min')

history = model.fit(train_dataset,
                        epochs = epochs,
                        validation_data=validation_dataset,
                        verbose = 1,
                        shuffle=True)

In [None]:
# save model
model.save("/xxx/xxx/C_LSTM_best_c1.hdf5")

In [None]:
# plot loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'valid'], loc='upper right')
plt.show()

In [None]:
# Get the prediction model by extracting layers till the output layer
prediction_model = keras.models.Model(
    model.get_layer(name="image").input, model.get_layer(name="dense").output
)
prediction_model.summary()

In [None]:
#  Let's check results on some test samples
for batch in validation_dataset.take(1):

    batch_images = batch["image"]
    batch_labels = batch["label"]

    preds = prediction_model.predict(batch_images)
    pred_texts = ctc_decoder(preds)

    orig_texts = []
    for label in batch_labels:
        label = tf.strings.reduce_join(num_to_char(label)).numpy().decode("utf-8")
        label = label.replace('[UNK]', '')
        orig_texts.append(label)

    fig , ax = plt.subplots(4, 4, figsize=(15, 5))
    for i in range(16):

        img = (batch_images[i, :, :, 0] * 255).numpy().astype(np.uint8)
        title = f"Prediction: {pred_texts[i]} / Original: {orig_texts[i]} "
        ax[i // 4, i % 4].imshow(img, cmap="gray")
        ax[i // 4, i % 4].set_title(title)
        ax[i // 4, i % 4].axis("off")

plt.show()

In [None]:
# prediction validation dataset
pred_texts_val = []
orig_texts_val = []
for batch in validation_dataset:

    batch_images = batch["image"]
    batch_labels = batch["label"]

    preds = prediction_model.predict(batch_images)
    pred_texts = ctc_decoder(preds)
    for pred in pred_texts:
        pred_texts_val.append(pred)


    for label in batch_labels:
        label = tf.strings.reduce_join(num_to_char(label)).numpy().decode("utf-8")
        label = label.replace('[UNK]', '')
        orig_texts_val.append(label)
    #print(orig_texts)
print(len(pred_texts_val))
print(len(orig_texts_val))

In [None]:
# calculate prediction accuracy with bleu score
score_val = corpus_bleu(orig_texts_test, pred_texts_test)
print(score_val)

In [None]:
print('BLEU-1: %f' % corpus_bleu(orig_texts_val, pred_texts_val, weights=(1.0, 0, 0, 0)))
print('BLEU-2: %f' % corpus_bleu(orig_texts_val, pred_texts_val, weights=(0.5, 0.5, 0, 0)))
print('BLEU-3: %f' % corpus_bleu(orig_texts_val, pred_texts_val, weights=(0.3, 0.3, 0.3, 0)))
print('BLEU-4: %f' % corpus_bleu(orig_texts_val, pred_texts_val, weights=(0.25, 0.25, 0.25, 0.25)))