In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
# Extracting dataset
!tar zxvf ../input/hand-written-text-detection-dataset/words.tgz
from IPython.display import clear_output
clear_output()

In [3]:
!tar zxvf ../input/hand-written-text-detection-dataset/ascii.tgz

In [4]:
!head -20 words.txt

# Import Libraries


In [5]:
import numpy as np
import cv2 
from tensorflow import keras
import os
from tensorflow.keras.layers.experimental.preprocessing import StringLookup
import tensorflow as tf

np.random.seed(42)
tf.random.set_seed(42)

In [6]:
words_list = []

words = open("words.txt","r").readlines()

for line in words :
   if line[0] == "#" :
     continue
   if line.split(" ")[1] != "err":
        words_list.append(line)

In [7]:
# check length of words list
len(words_list)

In [8]:
# shuffling words
np.random.shuffle(words_list)

In [9]:
# spliting dataset in train,test and validation in 90:5:5 ratio

split_index = int(0.9*len(words_list))

train_samples = words_list[:split_index]
test_samples = words_list[split_index:]

val_split_idx = int(0.5*len(test_samples))

val_samples = test_samples[:val_split_idx]
test_samples = test_samples[val_split_idx:]


# size of train ,test and validation
print('Total Training Samples:' + str(len(train_samples)))
print('Total Test Samples:' + str(len(test_samples)))
print('Total Validation Samples:' + str(len(val_samples)))

# Data Pipeline (Image Paths)

In [10]:
base_path = os.getcwd()
def get_image_path_labels(samples):
  paths = []
  corrected_samples = []

  for(i,file_line) in enumerate(samples):
    line_split = file_line.strip()
    line_split = line_split.split(" ")
    
    # Each line split will have this format for the corresponding image:
    # part1/part1-part2/part1-part2-part3.png
    image_name = line_split[0]
    part_1 = image_name.split("-")[0]
    part_2 = image_name.split("-")[1]
    img_path = os.path.join(base_path ,part_1,part_1 + "-" + part_2,image_name+".png")

    if os.path.getsize(img_path):
      paths.append(img_path)
      corrected_samples.append(file_line.split("\n")[0])
  
  return paths,corrected_samples

train_img_paths, train_labels = get_image_path_labels(train_samples)
test_img_paths, test_labels = get_image_path_labels(test_samples)
val_img_paths, val_labels = get_image_path_labels(val_samples)

In [11]:
# Ground Truth labels ....
train_labels_cleaned = []
characters = set()
max_length = 0

for label in train_labels:
  label = label.split(" ")[-1].strip()
  for char in label:
    characters.add(char)
  max_length = max(max_length, len(label))
  train_labels_cleaned.append(label)
characters = sorted(list(characters))


# Maximum Length
print("Maximum Length:", max_length)
print("vocabulary size:",len(characters))

train_labels_cleaned[:10]

In [12]:
# validation and test dataset clean

def clean_labels(labels):
  cleaned_labels = []
  for label in labels:
    label = label.split(" ")[-1].strip()
    cleaned_labels.append(label)
  return cleaned_labels

validation_labels_cleaned = clean_labels(val_labels)
test_labels_cleaned = clean_labels(test_labels)

In [13]:
AUTOTUNE = tf.data.AUTOTUNE

# mapping char to numerica
char_to_num = StringLookup(vocabulary=list(characters), mask_token = None)

# Mapping numeric to orignal characters
num_to_char = StringLookup(vocabulary= char_to_num.get_vocabulary(),mask_token = None, invert=True)

In [14]:
# image pre processing (resizing)
def img_resize(image,img_size):
  w,h = img_size
  image = tf.image.resize(image,size = (h,w), preserve_aspect_ratio = True)

  # Check tha amount of padding needed to be done.
  pad_height = h - tf.shape(image)[0]
  pad_width = w - tf.shape(image)[1]

   # Only necessary if you want to do same amount of padding on both sides.
  if pad_height % 2 != 0:
      height = pad_height // 2
      pad_height_top = height + 1
      pad_height_bottom = height
  else:
      pad_height_top = pad_height_bottom = pad_height // 2

  if pad_width % 2 != 0:
      width = pad_width // 2
      pad_width_left = width + 1
      pad_width_right = width
  else:
      pad_width_left = pad_width_right = pad_width // 2

  image = tf.pad(
      image,
      paddings=[
          [pad_height_top, pad_height_bottom],
          [pad_width_left, pad_width_right],
          [0, 0],
      ],
  )

  image = tf.transpose(image, perm=[1, 0, 2])
  image = tf.image.flip_left_right(image)
  return image

In [15]:
batch_size = 64
padding_token = 99
image_width = 128
image_height = 32


def process_img(image_path, img_size = (image_width,image_height)):
  image = tf.io.read_file(image_path)
  image = tf.image.decode_png(image,1)
  image = img_resize(image,img_size)
  image = tf.cast(image,tf.float32)/255.0
  return image

def vectorize_label(label):
  label = char_to_num(tf.strings.unicode_split(label, input_encoding = "UTF-8"))
  length = tf.shape(label)[0]
  pad_amount = max_length - length
  label = tf.pad(label,paddings = [[0,pad_amount]],constant_values = padding_token)
  return label

def process_img_labels(image_path,label):
  image = process_img(image_path)
  label = vectorize_label(label)
  return {"image": image, "label": label}

def prepare_dataset(image_path,label):
  dataset = tf.data.Dataset.from_tensor_slices((image_path,label)).map(process_img_labels, num_parallel_calls=AUTOTUNE)

  return dataset.batch(batch_size).cache().prefetch(AUTOTUNE)

In [16]:
# prepare dataset
train_dataset = prepare_dataset(train_img_paths, train_labels_cleaned)
val_dataset = prepare_dataset(val_img_paths, validation_labels_cleaned)
test_dataset = prepare_dataset(test_img_paths, test_labels_cleaned)

In [17]:
# visualizing train dataset
import matplotlib.pyplot as plt
for data in train_dataset.take(1):
    images, labels = data["image"], data["label"]

    _, ax = plt.subplots(4, 4, figsize=(15, 8))

    for i in range(16):
        img = images[i]
        img = tf.image.flip_left_right(img)
        img = tf.transpose(img, perm=[1, 0, 2])
        img = (img * 255.0).numpy().clip(0, 255).astype(np.uint8)
        img = img[:, :, 0]

        # Gather indices where label!= padding_token.
        label = labels[i]
        indices = tf.gather(label, tf.where(tf.math.not_equal(label, padding_token)))
        # Convert to string.
        label = tf.strings.reduce_join(num_to_char(indices))
        label = label.numpy().decode("utf-8")

        ax[i // 4, i % 4].imshow(img, cmap="gray")
        ax[i // 4, i % 4].set_title(label)
        ax[i // 4, i % 4].axis("off")


plt.show()
from IPython.display import clear_output
clear_output()

In [18]:
# Let us create Model
# ----- CTC loss ---------------
@tf.keras.utils.register_keras_serializable()
class CTCLayer(keras.layers.Layer):
  def __init__(self,name = None ,**kwargs):
    self.loss_fun = keras.backend.ctc_batch_cost
    super(CTCLayer, self).__init__(**kwargs)
  
  def get_config(self):
    config = super().get_config()
    return config

  def call(self, y_true , y_pred):
    batch_len = tf.cast(tf.shape(y_true)[0],dtype = "int64")

    input_length = tf.cast(tf.shape(y_pred)[1],dtype = "int64")
    label_length = tf.cast(tf.shape(y_true)[1],dtype = "int64")

    input_length = input_length * tf.ones(shape = (batch_len , 1) , dtype = "int64") 
    label_length = label_length * tf.ones(shape = (batch_len , 1) , dtype = "int64")

    loss = self.loss_fun(y_true, y_pred, input_length, label_length )
    self.add_loss(loss)

    return y_pred

def build_model():

  # Input layer
  input_img = keras.Input(shape = (image_width, image_height, 1), name="image")
  labels = keras.layers.Input(name = "label",shape = (None,))

  # Conv2D layer 1
  X = keras.layers.Conv2D(
      32,
      (3,3),
      activation = "relu",
      kernel_initializer = "he_normal",
      padding = "same",
      name = "conv1"
  )(input_img)

  X = keras.layers.MaxPool2D(
      pool_size = (2,2),
      name = "pool1"
  )(X)

  # COnv2D layer 2

  X = keras.layers.Conv2D(
      64,
      (3,3),
      activation = "relu",
      kernel_initializer = "he_normal",
      padding = "same",
      name = "conv2"
  )(X)

  X = keras.layers.MaxPool2D(
      pool_size = (2,2),
      name = "pool2"
  )(X)

  # we have used 2 max pool with pool size and strides = 2
  # hence downsampled feature size are 4x smaller... the number
  # of filters in last layers are 64. reshape input accordingly
  # before passing to RNN layers

  new_shape = ((image_width // 4), (image_height // 4) * 64)
  X = keras.layers.Reshape(target_shape = new_shape, name = "reshape")(X)

  # Dense Layer --- 1
  X = keras.layers.Dense(64,activation = "relu", name = "Dense1")(X)
  X = keras.layers.Dropout(0.2)(X)

  # RNN --- bi direction LSTM -1
  X = keras.layers.Bidirectional(
      keras.layers.LSTM(128,return_sequences= True,dropout=0.25)
  )(X)
  
  # RNN --- bi direction LSTM -2
  X = keras.layers.Bidirectional(
      keras.layers.LSTM(128,return_sequences= True,dropout=0.25)
  )(X)

  # Dense Layer ---2
  X = keras.layers.Dense(
      len(char_to_num.get_vocabulary())+2,
      activation = "softmax",
      name = "Dense2"
  )(X)

  # CTC layer 
  output = CTCLayer(name = "ctc_loss")(labels,X)

  # define model
  model = keras.models.Model(
      inputs = [input_img, labels],
      outputs = output,
      name = "Hand_Written_Text_Recognizer"
  )

  # Optimizer ---we will  use adam
  opt = keras.optimizers.Adam()

  # compile model 
  model.compile(optimizer = opt)

  return model


# call model function
model = build_model()
model.summary()

In [25]:
# Evaluation metric : Edit Method is widely used evalution metric for OCR based application
# this metric we will use for callback in model training


validation_images = []
validation_labels = []

for batch in val_dataset:
  validation_images.append(batch["image"])
  validation_labels.append(batch["label"])
from IPython.display import clear_output
clear_output()

In [20]:
def calculate_edit_distance(labels, predictions):
    # Get a single batch and convert its labels to sparse tensors.
    saprse_labels = tf.cast(tf.sparse.from_dense(labels), dtype=tf.int64)

    # Make predictions and convert them to sparse tensors.
    input_len = np.ones(predictions.shape[0]) * predictions.shape[1]
    predictions_decoded = keras.backend.ctc_decode(
        predictions, input_length=input_len, greedy=True
    )[0][0][:, :max_length]
    sparse_predictions = tf.cast(
        tf.sparse.from_dense(predictions_decoded), dtype=tf.int64
    )

    # Compute individual edit distances and average them out.
    edit_distances = tf.edit_distance(
        sparse_predictions, saprse_labels, normalize=False
    )
    return tf.reduce_mean(edit_distances)


class EditDistanceCallback(keras.callbacks.Callback):
    def __init__(self, pred_model):
        super().__init__()
        self.prediction_model = pred_model

    def on_epoch_end(self, epoch, logs=None):
        edit_distances = []

        for i in range(len(validation_images)):
            labels = validation_labels[i]
            predictions = self.prediction_model.predict(validation_images[i])
            edit_distances.append(calculate_edit_distance(labels, predictions).numpy())

        print(
            f" Mean edit distance for epoch {epoch + 1}: {np.mean(edit_distances):.4f}"
        )

# Training
Now we are ready to kick off model training.

In [21]:
epochs = 20  # To get good results this should be at least 50.

model = build_model()
prediction_model = keras.models.Model(
    model.get_layer(name="image").input, model.get_layer(name="Dense2").output
)
edit_distance_callback = EditDistanceCallback(prediction_model)

# Train the model.
history = model.fit(
    train_dataset,
    validation_data=val_dataset,
    epochs=epochs,
    batch_size = 64,
    callbacks=[edit_distance_callback],
)

In [22]:
model.save("./Hand_Written_text_detection.h5")

In [23]:
new_model = tf.keras.models.load_model('./Hand_Written_text_detection.h5')

In [24]:
import matplotlib.pyplot as plt
def decode_batch_pred(pred):
  input_len = np.ones(pred.shape[0])*pred.shape[1]
  results = tf.keras.backend.ctc_decode(
      pred , input_length = input_len , greedy = True 
  )[0][0][:,:max_length]
  output_text = []
  for res in results:
    res = tf.gather(res , tf.where(tf.math.not_equal(res, -1)))
    res = tf.strings.reduce_join(num_to_char(res)).numpy().decode("utf-8")
    output_text.append(res)
  return output_text

for batch in test_dataset.take(1):
  batch_images = batch["image"]
  batch_label =batch["label"]
  _,ax = plt.subplots(4, 4, figsize=(15,10))

  preds = prediction_model.predict(batch_images)
  pred_texts = decode_batch_pred(preds)

  for i in range(16):
    image = batch_images[i]
    image = tf.image.flip_left_right(image)
    image = tf.transpose(image, perm=[1, 0, 2])
    image = (image * 255.0).numpy().clip(0, 255).astype(np.uint8)
    image = image[:,:,0]

    title = f"Prediction: {pred_texts[i]}"
    ax[i // 4, i % 4].imshow(image, cmap="gray")
    ax[i // 4, i % 4].set_title(title)
    ax[i // 4, i % 4].axis("off")

plt.show()