<a href="https://colab.research.google.com/github/BYU-Handwriting-Lab/GettingStarted/blob/master/notebooks/HWRActiveTransferLearning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Active Transfer Learning for Handwriting Recognition

This notebook contains code to combine active and transfer learning into a
cohesive framework. The hope is to allow for new handwriting datasets to be
fitted quickly.

In this notebook, we will start the process of fitting the Washington dataset
when our list of pre-trained models were trained on the IAM, Rimes, and Bentham
datasets.

## Dependencies

First, let's import our dependencies such as Tensorflow, numpy, etc.

In [None]:
import os

import tensorflow as tf
import tensorflow.keras.layers as kl
import numpy as np
from tqdm import tqdm

## Load the data

Here are a few functions to help us load our data into a format compatible with
Tensorflow

In [None]:
# The default list of characters used in the recognition model
DEFAULT_CHARS = ' !"#$%&\'()*+,-./0123456789:;=?ABCDEFGHIJKLMNOPQRSTUVWXYZ[]_`abcdefghijklmnopqrstuvwxyz|~£§¨«¬\xad' \
                '°²´·º»¼½¾ÀÂÄÇÈÉÊÔÖÜßàáâäæçèéêëìîïñòóôöøùúûüÿłŒœΓΖΤάήαδεηικλμνξοπρτυχψωόώІ‒–—†‡‰‹›₂₤℔⅓⅔⅕⅖⅗⅘⅙⅚⅛∆∇∫≠□♀♂✓ｆ’í‘\\♪'
# The default list of non-punctuation characters needed for the word beam search decoding algorithm
DEFAULT_NON_PUNCTUATION = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyzÀÂÄÇÈÉÊÔÖÜßàáâäæçèéêëìîïñòóôöøùúûüÿ' \
                          'łŒœΓΖΤάήαδεηικλμνξοπρτυχψωόώІ'
# The default list of punctuation characters needed for hte word beam search decoding algorithm
DEFAULT_PUNCTUATION = ' !"#$%&\'()*+,-./0123456789:;=?[]_`|~£§¨«¬°²´·º»¼½¾‒–—†‡‰‹›₂₤℔⅓⅔⅕⅖⅗⅘⅙⅚⅛∆∇∫≠□♀♂✓'


def str_charset_to_lists(charset):
    """
    Turns string containing all desired characters into list of chars and indices. This is required for mapping
    between integer and char representations for use in the recognition model.

    :param charset: charset as string of chars to be represented in model.
    """
    chars = list(charset)
    indices = list(range(1, len(chars) + 1))
    return chars, indices


def get_char2idx(charset):
    """
    A tensorflow lookup table is created and returned which allows us to encode word transcriptions on the fly
    in the tf.data api. A standard python dictionary won't work when tensorflow is running in graph mode. This
    function will return a lookup table to convert between chars and indices.

    :param charset: string containing all desired characters to be represented
    :return: A tensorflow lookup table to convert characters to integers
    """
    chars, indices = str_charset_to_lists(charset)

    char2idx = tf.lookup.StaticHashTable(
        tf.lookup.KeyValueTensorInitializer(
            keys=tf.constant(chars, dtype=tf.string),
            values=tf.constant(indices, dtype=tf.int32),
            key_dtype=tf.string,
            value_dtype=tf.int32
        ),
        default_value=0,
        name='char2idx_lookup'
    )

    return char2idx


def get_idx2char(charset):
    """
    A tensorflow lookup table is created and returned which allows us to encode word transcriptions on the fly
    in the tf.data api. A standard python dictionary won't work when tensorflow is running in graph mode. This
    function will return a lookup table to convert between indices and chars.

    :param charset: string containing all desired characters to be represented.
    :return: A tensorflow lookup table to convert integers to characters
    """
    chars, indices = str_charset_to_lists(charset)

    idx2char = tf.lookup.StaticHashTable(
        tf.lookup.KeyValueTensorInitializer(
            keys=tf.constant(indices, dtype=tf.int32),
            values=tf.constant(chars, dtype=tf.string),
            key_dtype=tf.int32,
            value_dtype=tf.string
        ),
        default_value='',
        name='idx2char_lookup'
    )

    return idx2char


def pad_or_truncate(t, sequence_size=128):
    """
    Pad or truncate a tensor to a fixed sequence length. Works for use in the tf.data api in graph mode.

    :param t: The tensor to pad or truncate
    :param sequence_size: The final sequence length of the tensor
    :return:
    """
    dim = tf.size(t)
    return tf.cond(tf.equal(dim, sequence_size), lambda: t,
                   lambda: tf.cond(tf.greater(dim, sequence_size), lambda: tf.slice(t, [0], [sequence_size]),
                                   lambda: tf.concat([t, tf.zeros(sequence_size - dim, dtype=tf.int32)], 0)))


def merge_repeating_values(t):
    """
    Merge repeating indices/characters in a tensor. Utilizes only tf.* functions which makes it
    usable in graph mode.

    :param t: The tensor to have repeated indices/characters merged
    :return: A new tensor with repeating values merged
    """
    t2 = tf.roll(tf.pad(t, [[0, 1]], constant_values=-1), -1, 0)[:tf.size(t)]
    not_equal = tf.math.not_equal(t, t2)
    indices = tf.where(not_equal)
    return tf.reshape(tf.gather(t, indices), [-1])


def str_to_idxs(string, char2idx, sequence_size):
    """
    Perform the actual lookup to convert a string to its integer representation. This function also performs
    padding according to the given sequence size. Works for use in the tf.data api in graph mode.

    :param string: The string to be converted
    :param char2idx: The tf lookup table
    :param sequence_size: The final sequence length
    :return: The converted string now in its integer representation
    """
    idxs = tf.map_fn(lambda char: char2idx.lookup(char), tf.strings.unicode_split(string, 'UTF-8'), dtype=tf.int32)
    return pad_or_truncate(idxs, sequence_size=sequence_size)


def idxs_to_str(idxs, idx2char, merge_repeated=True):
    """
    Perform the actual lookup to convert an integer to its string representation.
    Works for use in the tf.data api in graph mode.

    :param idxs: The idxs to be converted
    :param idx2char: The tf lookup table
    :param merge_repeated: Bool indicating whether or not to merge repeating values in the idx tensor
    :return: The converted idxs now in its string representation
    """
    if merge_repeated:
        idxs = merge_repeating_values(idxs)

    string = tf.map_fn(lambda idx: idx2char.lookup(idx), idxs, dtype=tf.string)
    string = tf.strings.reduce_join(string)
    return tf.strings.strip(string)


def str_to_idxs_batch(batch, char2idx, sequence_size=128):
    """
    Perform the same function as str_to_idxs, except a batch of strings are given as input

    :param batch: A batch of strings as tensor, list, or numpy array
    :param char2idx: The tf lookup table
    :param sequence_size: The final sequence length of each string
    :return: The converted strings now in its integer representation
    """
    return tf.map_fn(lambda string: str_to_idxs(string, char2idx, sequence_size=sequence_size), batch,
                     dtype=tf.int32)


def idxs_to_str_batch(batch, idx2char, merge_repeated=True):
    """
    Perform the same function as idxs_to_str, except a batch of idxs are given as input

    :param batch: A batch of idxs as tensor, list, or numpy array
    :param idx2char: The tf lookup table
    :param merge_repeated: Bool indicating whether or not to merge repeating values in the idx tensor
    :return: The converted idxs now in its string representation
    """
    return tf.map_fn(lambda idxs: idxs_to_str(idxs, idx2char, merge_repeated=merge_repeated), batch,
                     dtype=tf.string)

def img_resize_with_pad(img_tensor, desired_size, pad_value=255):
  """
  The standard tf.image.resize_with_pad function does not allow for specifying the pad value,
  so we create a function with that capability here. Aspect ratio will be preserved.

  :param img_tensor: The image tensor to be resized and padded
  :param desired_size: The desired size (height, width)
  :param pad_value: The value to pad the tensor with
  """
  img_size = tf.shape(img_tensor)

  img_ratio = img_size[0] / img_size[1]
  desired_ratio = desired_size[0] / desired_size[1]

  if img_ratio >= desired_ratio:
      # Solve by height
      new_height = desired_size[0]
      new_width = int(desired_size[0] // img_ratio)
  else:
      new_height = int(desired_size[1] * img_ratio)
      new_width = desired_size[1]
      # Solve by width

  resized_img = tf.image.resize(img_tensor, (new_height, new_width), method=tf.image.ResizeMethod.BICUBIC)

  pad_height = desired_size[0] - new_height
  pad_width = desired_size[1] - new_width

  img_padded = tf.pad(resized_img, [[pad_height, 0], [0, pad_width], [0, 0]], constant_values=pad_value)

  return img_padded


def img_resize_with_pad_numpy(img, desired_size, pad_value=255):
  """
  Same as img_resize_with_pad, except pillow and numpy are used to resize and pad the image
  compared to the default tensorflow image operations.

  :param img_tensor: The pillow image to be resized and padded
  :param desired_size: The desired size (height, width)
  :param pad_value: The value to pad the tensor with
  """
  img_size = np.array(img).shape

  img_ratio = img_size[0] / img_size[1]
  desired_ratio = desired_size[0] / desired_size[1]

  if img_ratio >= desired_ratio:  # Solve by height
    new_height = desired_size[0]
    new_width = int(desired_size[0] // img_ratio)
  else:  # Solve by width
    new_height = int(desired_size[1] * img_ratio)
    new_width = desired_size[1]

  img = np.array(img.resize((new_width, new_height)))

  border_top = desired_size[0] - new_height
  border_right = desired_size[1] - new_width

  img = np.pad(img, [(border_top, 0), (0, border_right)], mode='constant', constant_values=pad_value)

  return img


def read_and_encode_image(img_path, img_size=(64, 1024)):
  """
  Used by both encode_img_and_transcription (training) and encode_img_with_name (inference). This method
  simply loads the image given a file path and performs the necessary encoding/resizing/transposing that
  is necessary for use on the recognition model.

  :param img_path: The path to the desired image
  :param img_size: The size of the image after resizing/padding
  :return: The encoded image in its tensor/integer representation
  """
  img_bytes = tf.io.read_file(img_path)
  img = tf.image.decode_image(img_bytes, channels=1, expand_animations=False)
  img = img_resize_with_pad(img, img_size)
  img = tf.image.per_image_standardization(img)

  return img


def read_and_encode_image_pillow(img_path, img_size=(64, 1024)):
  """
  Same as read_and_encode_image function except using pillow to load the image rather than
  default tensorflow image operations

  :param img_path: The path to the desired image
  :param img_size: The size of the image after resizing/padding
  :return: The encoded image in its tensor/integer representation
  """
  img = Image.open(img_path.numpy())
  # img = img.convert('RGB')
  img = img_resize_with_pad_numpy(img, img_size.numpy())
  img = tf.constant(img, dtype=tf.float32)

  return img


def encode_img_and_transcription(img_path, transcription, char2idx, sequence_size=128, img_size: tuple = (64, 1024)):
  """
  The actual function to map image paths and string transcriptions to its tensor/integer representation.

  :param img_path: The path to the desired image
  :param transcription: The transcription of the image in integer form
  :param char2idx: The tf lookup table
  :param sequence_size: The final sequence length for transcriptions
  :param img_size: The size of the image after resizing/padding
  :return: The image and transcription in their tensor/integer representations.
  """
  img = tf.py_function(read_and_encode_image_pillow, [img_path, img_size], [tf.float32])
  img = tf.transpose(img, [2, 1, 0])
  line = str_to_idxs(transcription, char2idx, sequence_size)
  return img, line


def encode_img_with_name(img_path, img_size=(64, 1024)):
    """
    Used to map img_paths to encoded images for inference. Returned is the encoded image and image name.

    :param img_path: The file path to the image
    :param img_size: The size of the image after resizing/padding
    :return: The encoded image and image path
    """
    img = read_and_encode_image(img_path, img_size)
    return img, img_path


def get_dataset_size(csv_path):
    """
    The tf.data api has a hard time producing the the dataset size. The cardinality() method often
    returns unknown even with the CsvDataset. This function uses pandas to get the length.

    :param csv_path: The path to csv containing information about the dataset
    :return: The size of the dataset
    """
    return len(pd.read_csv(csv_path, sep='\t', header=None, names=['img_path', 'transcription']))


def get_encoded_dataset_from_csv(csv_path, char2idx, max_seq_size, img_size):
    """
    Using the tf.data api, load the desired csv with img_path and transcription data, encode the images and
    transcriptions for use on the recognition model and return the desired tf dataset.

    :param csv_path: The path to the tab delimited csv file containing | Image Path | Transcription |
    :param char2idx: The tf lookup table to map characters to their respective integer representation
    :param max_seq_size: The final sequence length for transcriptions
    :param img_size: The size of the image after resizing/padding (height, width).
    :return: The tf dataset containing encoded images and their respective transcriptions
    """
    path_sep = os.path.sep
    path_prefix = tf.strings.join(csv_path.split('/')[:-1], path_sep)
    return tf.data.experimental.CsvDataset(csv_path, ['img', 'trans'], field_delim='\t', use_quote_delim=False).map(
        lambda img_path, transcription: encode_img_and_transcription(
            tf.strings.join([path_prefix, tf.strings.reduce_join(tf.strings.split(img_path, '/'), separator=path_sep)],
                            separator=path_sep),
            transcription, char2idx, max_seq_size, img_size),
        num_parallel_calls=tf.data.experimental.AUTOTUNE)


def get_encoded_inference_dataset_from_img_path(img_path, img_size):
    """
    Using the tf.data api, load all images from the desired path and return a dataset containing encoded images
    and the image name (without path or extension information).

    :param img_path: The path to the directory containing images
    :param img_size: The size of the image after resizing/padding (height, width)
    :return: The tf dataset containing encoded images and their respective string names
    """
    return tf.data.Dataset.list_files(img_path + '/*', shuffle=False).map(
        lambda path: encode_img_with_name(path, img_size),
        num_parallel_calls=tf.data.experimental.AUTOTUNE)

## Build the Model

Here is the necessary code we need to build the model for training

In [None]:
class FullGatedConv2D(kl.Conv2D):
    def __init__(self, filters, **kwargs):
        super(FullGatedConv2D, self).__init__(filters=filters * 2, **kwargs)
        self.nb_filters = filters

    def call(self, inputs):
        output = super(FullGatedConv2D, self).call(inputs)
        linear = kl.Activation("linear")(output[:, :, :, :self.nb_filters])
        sigmoid = kl.Activation("sigmoid")(output[:, :, :, self.nb_filters:])

        return kl.Multiply()([linear, sigmoid])

    def compute_output_shape(self, input_shape):
        output_shape = super(FullGatedConv2D, self).compute_output_shape(input_shape)
        return tuple(output_shape[:3]) + (self.nb_filters,)

    def get_config(self):
        config = super(FullGatedConv2D, self).get_config()
        config['nb_filters'] = self.nb_filters
        del config['filters']
        return config


class Recognizer(Model):
  def __init__(self, sequence_size=128, vocabulary_size=197):
    super(Recognizer, self).__init__(name='flor_recognizer')

    self.conv1 = tf.keras.Sequential(name='conv1')
    self.conv1.add(kl.Conv2D(filters=16, kernel_size=(3,3), strides=(2,2), padding="same", kernel_initializer="he_uniform"))
    self.conv1.add(kl.PReklU(shared_axes=[1,2]))
    self.conv1.add(kl.BatchNormalization(renorm=True))
    self.conv1.add(FullGatedConv2D(filters=16, kernel_size=(3,3), padding="same"))
    
    self.conv2 = tf.keras.Sequential(name='conv2')
    self.conv2.add(kl.Conv2D(filters=32, kernel_size=(3,3), strides=(1,1), padding="same", kernel_initializer="he_uniform"))
    self.conv2.add(kl.PReklU(shared_axes=[1,2]))
    self.conv2.add(kl.BatchNormalization(renorm=True))
    self.conv2.add(FullGatedConv2D(filters=32, kernel_size=(3,3), padding="same"))

    self.conv3 = tf.keras.Sequential(name='conv3')
    self.conv3.add(kl.Conv2D(filters=64, kernel_size=(2,4), strides=(2,4), padding="same", kernel_initializer="he_uniform"))
    self.conv3.add(kl.PReklU(shared_axes=[1,2]))
    self.conv3.add(kl.BatchNormalization(renorm=True))
    self.conv3.add(FullGatedConv2D(filters=64, kernel_size=(3,3), padding="same", kernel_constraint=C.MaxNorm(4, [0,1,2])))
    self.dropout1 = kl.Dropout(rate=0.3, name='dropout1')

    self.conv4 = tf.keras.Sequential(name='conv4')
    self.conv4.add(kl.Conv2D(filters=128, kernel_size=(3,3), strides=(1,1), padding="same", kernel_initializer="he_uniform"))
    self.conv4.add(kl.PReklU(shared_axes=[1,2]))
    self.conv4.add(kl.BatchNormalization(renorm=True))
    self.conv4.add(FullGatedConv2D(filters=128, kernel_size=(3,3), padding="same", kernel_constraint=C.MaxNorm(4, [0,1,2])))
    self.dropout2 = kl.Dropout(rate=0.3, name='dropout2')

    self.conv5 = tf.keras.Sequential(name='conv5')
    self.conv5.add(kl.Conv2D(filters=256, kernel_size=(2,4), strides=(2,4), padding="same", kernel_initializer="he_uniform"))
    self.conv5.add(kl.PReklU(shared_axes=[1,2]))
    self.conv5.add(kl.BatchNormalization(renorm=True))
    self.conv5.add(FullGatedConv2D(filters=256, kernel_size=(3,3), padding="same", kernel_constraint=C.MaxNorm(4, [0,1,2])))
    self.dropout3 = kl.Dropout(rate=0.3, name='dropout3')

    self.conv6 = tf.keras.Sequential(name='conv6')
    self.conv6.add(kl.Conv2D(filters=512, kernel_size=(3,3), strides=(1,1), padding="same", kernel_initializer="he_uniform"))
    self.conv6.add(kl.PReklU(shared_axes=[1,2]))
    self.conv6.add(kl.BatchNormalization(renorm=True))
    
    self.mp = kl.MaxPooling2D(pool_size=(1,2), strides=(1,2), padding="valid", name='mp')

    self.gru1 = tf.keras.Sequential(name='gru1')
    self.gru1.add(kl.Bidirectional(kl.GRU(units=256, return_sequences=True, dropout=0.5)))
    self.gru1.add(kl.Dense(units=512))
    self.gru1.add(kl.PReklU())

    self.gru2 = tf.keras.Sequential(name='gru2')
    self.gru2.add(kl.Bidirectional(kl.GRU(units=256, return_sequences=True, dropout=0.5)))
    self.gru2.add(kl.Dense(units=vocabulary_size))
    
  def call(self, x, training=False):
    # CNN
    out = self.conv1(x)
    out = self.conv2(out)
    out = self.conv3(out)
    out = self.dropout1(out, training=training)
    out = self.conv4(out)
    out = self.dropout2(out, training=training)
    out = self.conv5(out)
    out = self.dropout3(out, training=training)
    out = self.conv6(out)

    # MaxPool and Reshape
    out = self.mp(out)
    # out = tf.squeeze(out)
    out = tf.reshape(out, (-1, out.shape[1], out.shape[2] * out.shape[3]))

    # RNN
    out = self.gru1(out)
    out = self.gru2(out)

    return out

## Choose a Pre-trained Model

Out of all of our pre-trained models, let's find the best model to start with.

In [None]:
# First, load in all of our pre-trained models
iam_model = Recognizer()
iam_model.load_weights()

rimes_model = Recognizer()
rimes_model.load_weights()

bentham_model = Recognizer()
bentham_model.load_weights()

models = [iam_model, rimes_model, bentham_model]

# Take a subset of the dataset to use for testing
dataset = get_inference_dataset_from_...

# Metrics to help us select the best model 
mean_confidence = tf.keras.metrics.Mean(name="confidence")
best_model = None
best_score = 1000

# Iterate over each of the models
for model in models:
    mean_confidence.reset_states()

    # Iterate over the subset of the dataset
    for img, img_name in dataset:
        output = model(img)
        prediction, confidence = predict_with_confidence(output)
    
    if mean_confidence.result() < best_score:
        best_model = model

print('Best Model:', best_model.name)
print('Confidence Score:', best_score)

In [None]:
# Run through the dataset and see if there are any lines where we are highly
# confident in our predictions
model = best_model
confidence_threshold = .4

training_set = []
labeling_set = []

for img, img_name in dataset:
    output = model(img)
    prediction, confidence = predict_with_confidence(output)

    if confidence > confidence_threshold:
        training_set.append([img, img_name])