#1 Environment preparation

In [1]:
#external dependencies
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt
import math
import os
import urllib
import zipfile
from pathlib import Path
from PIL import Image

In [2]:
np.random.seed(42)

Windows (local) settings

In [3]:
#Edit this to point to project root
sep = "\\"
home = f"B:{sep}Dev{sep}GitHub{sep}University{sep}ThesisMSc{sep}StolenVehicleDetector{sep}Machine Learning{sep}OCR"
data_dir = home + f"{sep}data"
model_dir = home + f"{sep}model"

Linux (colab) settings

In [4]:
sep = "/"
#Edit this to point to project root
home = f"{sep}content{sep}OCR"
data_dir = home + f"{sep}data"
model_dir = home + f"{sep}model"

In [4]:
#Setup project directories
if not os.path.exists(home):
    os.makedirs(home)

if not os.path.exists(data_dir):
    os.makedirs(data_dir)

if not os.path.exists(model_dir):
    os.makedirs(model_dir)

#2 Dataset

##Classes

###Wrappers

In [5]:
class AbstractDatasetWrapper:
    '''Abstract dataset wrapper class
    '''
    dataset_name = ""
    source_url: str = ""

    def __init__(self, data_dir: str) -> None:
        self.data_dir = data_dir
        self.dataset_dir = self.data_dir + "/" + self.dataset_name
        self.images = []
        self.labels = []
        self.unique_characters: set = set()
        self.label_max_length: int = 0

    def __download_dataset(self):
        pass

    def __extract_dataset(self):
        pass

    def prepare(self):
        '''Prepare the whole dataset before working with it
        '''
        pass

    def show_info(self):
        '''Show dataset properties
        '''
        print("Number of images: ", len(self.images))
        print("Number of labels: ", len(self.labels))
        print("Longest label: ", self.label_max_length)
        print("Number of unique characters: ", len(self.unique_characters))
        print("Characters present: ", self.unique_characters)


class CaptchaDatasetWrapper(AbstractDatasetWrapper):
    '''The captcha dataset wrapper class
    '''

    dataset_name = "captcha_images_v2"
    source_url: str = "https://github.com/AakashKumarNain/CaptchaCracker/raw/master/captcha_images_v2.zip"

    def __init__(self, data_dir: str) -> None:
        super().__init__(data_dir)
        self.download_file_name = self.dataset_dir + ".zip"

    def __download_dataset(self):
        download_file_path = Path(self.download_file_name)
        if not download_file_path.exists():
            #Not yet downloaded
            urllib.request.urlretrieve(self.source_url, download_file_path)

    def __extract_dataset(self):
        download_file_path = Path(self.download_file_name)
        if download_file_path.exists():
            #if the target data directory is empty
            if not os.listdir(self.data_dir):
                with zipfile.ZipFile(self.download_file_name, 'r') as zip_ref:
                    zip_ref.extractall(path = self.data_dir)

    def prepare(self):
        '''Prepare the whole dataset before working with itP
        '''
        #Download & extract dataset if it has not been yet
        self.__download_dataset()
        #self.__extract_dataset()

        dataset_path = Path(self.dataset_dir)
        #Get list of the images
        self.images = sorted(list(map(str, list(dataset_path.glob("*.png")))))
        #Labels of images: image names minus ".png"
        self.labels = [img.split(os.path.sep)[-1].split(".png")[0] for img in self.images]
        #Set of distinct characters in the labels
        self.unique_characters = set(char for label in self.labels for char in label)
        #Compute the longest label in the dataset
        self.label_max_length = max([len(label) for label in self.labels])


###Controller

In [28]:
class DatasetController:
    '''Dataset controller class (to split & transform data)
    '''
    def __init__(self, data_wrapper: AbstractDatasetWrapper, img_n: int, channels: int):
        self.data_wrapper = data_wrapper
        # Mapping characters to integers
        self.char_to_num = layers.experimental.preprocessing.StringLookup(
            vocabulary=list(self.data_wrapper.unique_characters), num_oov_indices=0, mask_token=None)
        # Mapping integers back to original characters
        self.num_to_char = layers.experimental.preprocessing.StringLookup(
            vocabulary=self.char_to_num.get_vocabulary(), mask_token=None, invert=True)
        #Required image dimensions
        self.img_n = img_n
        self.channels = channels
        
        #Subsets
        self.train_dataset = []
        self.validation_dataset = []
        #Additional generated dataset info
        self.batch_size = 0
        self.train_ratio = 0

    def split_data(self, batch_size, train_ratio=0.9, shuffle=True):
        self.batch_size = batch_size
        self.train_ratio = train_ratio
        #Get the total size of the input dataset
        size = len(self.data_wrapper.images)
        #Make an indices array and shuffle it, if required
        indices = np.arange(size)
        if shuffle:
            np.random.shuffle(indices)
        #Get the size of training samples
        train_samples = int(size * train_ratio)

        #Split data into training and validation sets
        images = np.array(self.data_wrapper.images)
        labels = np.array(self.data_wrapper.labels)

        x_train, y_train = images[indices[:train_samples]], labels[indices[:train_samples]]
        x_val, y_val = images[indices[train_samples:]], labels[indices[train_samples:]]

        train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
        train_dataset = (
            train_dataset.map(
                self.encode_single_sample, num_parallel_calls=tf.data.experimental.AUTOTUNE)
            .batch(self.batch_size)
            .prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
        )

        validation_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))
        validation_dataset = (
            validation_dataset.map(
                self.encode_single_sample, num_parallel_calls=tf.data.experimental.AUTOTUNE)
            .batch(self.batch_size)
            .prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
        )

        self.train_dataset = train_dataset
        self.validation_dataset = validation_dataset

        return train_dataset, validation_dataset

    def encode_single_sample(self, img_path, label):
        print(img_path)
        #Read the image
        image = tf.io.read_file(img_path)
        print(image)
        #Decode and convert to the appropriate channels
        image = tf.io.decode_png(image, channels=self.channels)
        #Convert to float32 and normalize to the [0, 1) range
        image = tf.image.convert_image_dtype(image, tf.float32)
        #Resize to the desired size
        image = tf.image.resize_with_pad(image, self.img_n, self.img_n)
        #Map the label characters to numbers
        label = self.char_to_num(tf.strings.unicode_split(label, input_encoding="UTF-8"))
        #Return a dict as the model is expecting two inputs
        return {"image": image, "label": label}

    def encode_single_sample2(self, img_path, label):
        #Read the image
        image = Image.open(img_path)

        if(self.channels == 1):
            #Convert to grayscale
            image = image.convert('gray')

        #Convert to float32 and normalize to the [0, 1) range
        image = tf.image.convert_image_dtype(image, tf.float32)
        #Resize to the desired size
        image = tf.image.resize_with_pad(image, self.img_n, self.img_n)
        #Map the label characters to numbers
        label = self.char_to_num(tf.strings.unicode_split(label, input_encoding="UTF-8"))
        #Return a dict as the model is expecting two inputs
        return {"image": image, "label": label}

    def show_train_samples(self):
        self.__show_batch_sample(self.train_dataset)

    def show_validation_samples(self):
        self.__show_batch_sample(self.validation_dataset)

    def __show_batch_sample(self, dataset):
        _, ax = plt.subplots(math.ceil(self.batch_size/4), 4, figsize=(12, 3*(self.batch_size/4)))

        for batch in dataset.take(1):
            images = batch["image"]
            labels = batch["label"]
            for i in range(self.batch_size):
                sample = self.encode_single_sample(images[i], labels[i])
                img = (sample["image"] * 255).numpy().astype("uint8")
                label = tf.strings.reduce_join(self.num_to_char(sample["label"])).numpy().decode("utf-8")
                ax[i // 4, i % 4].imshow(img)
                ax[i // 4, i % 4].set_title(label)
                ax[i // 4, i % 4].axis("off")
            plt.show()

    def show_inference_results(self, model):
        #Check results on validation samples
        for batch in self.validation_dataset.take(1):
            batch_images = batch["image"]
            batch_labels = batch["label"]

            predictions = model.predict(batch_images)
            predicted_texts = self.decode_batch_predictions(predictions)

            original_texts = []
            for label in batch_labels:
                label = tf.strings.reduce_join(self.num_to_char(label)).numpy().decode("utf-8")
                original_texts.append(label)

            _, ax = plt.subplots(math.ceil(self.batch_size/4), 4, figsize=(12, 3*(self.batch_size/4)))
            for i in range(len(predicted_texts)):
                img = (batch_images[i] * 255).numpy().astype(np.uint8)
                title = f"Pred: {predicted_texts[i]}"
                ax[i // 4, i % 4].imshow(img)
                ax[i // 4, i % 4].set_title(title)
                ax[i // 4, i % 4].axis("off")

            plt.show()

    def decode_batch_predictions(self, predictions):
        '''A utility function to decode the output of the network
        '''
        input_length = np.ones(predictions.shape[0]) * predictions.shape[1]
        #Greedy search is used
        #For complex tasks where language models count, beam search can be used
        results = keras.backend.ctc_decode(predictions, input_length=input_length, greedy=True)[0][0][:, :self.label_length]
        raw_output = []
        #Iterate over the results and get back the text
        output_text = []
        for res in results:
            raw_output.append(res)
            res = self.num_to_char(res).numpy()
            output_text.append(res)
            
        return output_text

##Dataset creation

In [29]:
data_wrapper = CaptchaDatasetWrapper(data_dir)
data_wrapper.prepare()
data_wrapper.show_info()

Number of images:  16
Number of labels:  16
Longest label:  2
Number of unique characters:  11
Characters present:  {'a', '1', '3', '9', '6', '4', '8', '0', '5', '2', '7'}


In [30]:
#Required input image dimensions (N x N images)
img_n = 200
channels = 3

In [31]:
#Batch size for training and validation
batch_size = 8
#number of characters in the dataset
num_characters = len(data_wrapper.unique_characters)
#Training set ratio of all the images
train_ratio = 0.5
shuffle = True

In [32]:
dataset_controller = DatasetController(data_wrapper, img_n, channels)
#Split data into training and validation sets
train_dataset, validation_dataset = dataset_controller.split_data(batch_size=batch_size, train_ratio=train_ratio, shuffle=shuffle)

Tensor("args_0:0", shape=(), dtype=string)
Tensor("ReadFile:0", shape=(), dtype=string)
Tensor("args_0:0", shape=(), dtype=string)
Tensor("ReadFile:0", shape=(), dtype=string)


In [None]:
print("train samples:")
dataset_controller.show_train_samples()

In [None]:
print("validation samples:")
dataset_controller.show_validation_samples()

#3 Algorithm

##Classes

###CTC layer

In [None]:
class CTCLayer(layers.Layer):
    
    def __init__(self, name=None):
        super().__init__(name=name)
        self.loss_fn = keras.backend.ctc_batch_cost

    def call(self, y_true, y_pred):
        #Compute the training time loss
        batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
        input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
        label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

        input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
        label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

        loss = self.loss_fn(y_true, y_pred, input_length, label_length)
        #Add it to the parent layer
        self.add_loss(loss)

        #At test time, just return the computed predictions
        return y_pred

## Model Zoo

### OCR model v1

In [None]:
def ocr_model_v1(img_n: int, channels: int, num_characters: int, optimizer):
    #Factor by which the image is going to be downsampled by the convolutional blocks.
    #Two convolutional blocks are used; each block has a pooling layer which downsample the features by a factor of 2.
    #Thus, the total downsampling factor is (2x2)=4.
    downsample_factor = 4
    #Model inputs
    input_image = layers.Input(shape=(img_n, img_n, channels), name="image", dtype="float32")
    labels = layers.Input(name="label", shape=(None,), dtype="float32")

    #1st conv block
    x = layers.Conv2D(32, (3, 3),
        activation="relu",
        kernel_initializer="he_normal",
        padding="same",
        name="Conv1",
    )(input_image)
    #1st max pooling
    x = layers.MaxPooling2D((2, 2), name="MaxPool1")(x)

    #2nd conv block
    x = layers.Conv2D(64, (3, 3),
        activation="relu",
        kernel_initializer="he_normal",
        padding="same",
        name="Conv2",
    )(x)
    #2nd max pooling
    x = layers.MaxPooling2D((2, 2), name="MaxPool2")(x)

    # Two max pool have been used with pool size and strides 2.
    # This way, the downsampled feature maps are 4x smaller. 
    # The number of filters in the last layer is 64. 
    # Reshape accordingly before passing the features to the RNN part.
    new_shape = ((img_n // downsample_factor), (img_n // downsample_factor) * 64)
    x = layers.Reshape(target_shape=new_shape, name="reshape")(x)
    x = layers.Dense(64, activation="relu", name="Dense1")(x)
    x = layers.Dropout(0.2)(x)

    # RNNs
    x = layers.Bidirectional(layers.LSTM(128, return_sequences=True, dropout=0.25))(x)
    x = layers.Bidirectional(layers.LSTM(64, return_sequences=True, dropout=0.25))(x)

    # Output layer
    # +1 is for the empty character
    x = layers.Dense(num_characters + 1, activation="softmax", name="Dense2")(x)

    # Add CTC layer for calculating CTC loss at each step
    output = CTCLayer(name="CTCloss")(labels, x)

    # Define the model
    model = keras.models.Model(
        inputs=[input_image, labels], outputs=output, name="ocr_model_v1")
    # Optimizer
    optimizer = optimizer
    # Compile the model
    model.compile(optimizer=optimizer)
    #Return the model to use
    return model

##Model instantiation

In [None]:
# Create the model
optimizer = keras.optimizers.Adam()
model = ocr_model_v1(img_n, channels, num_characters, optimizer)
model.summary()

Model: "ocr_model_v1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
image (InputLayer)              [(None, 200, 200, 1) 0                                            
__________________________________________________________________________________________________
Conv1 (Conv2D)                  (None, 200, 200, 32) 320         image[0][0]                      
__________________________________________________________________________________________________
MaxPool1 (MaxPooling2D)         (None, 100, 100, 32) 0           Conv1[0][0]                      
__________________________________________________________________________________________________
Conv2 (Conv2D)                  (None, 100, 100, 64) 18496       MaxPool1[0][0]                   
_______________________________________________________________________________________

#4 Training

##Define training properties

In [None]:
epochs = 50
early_stopping_patience = 5

#Early stopping configuration
early_stopping = keras.callbacks.EarlyStopping(
    monitor="val_loss", patience=early_stopping_patience, restore_best_weights=True
)

##Training

In [None]:
#Train the model
history = model.fit(
    train_dataset,
    validation_data=validation_dataset,
    epochs=epochs,
    callbacks=[early_stopping],
)

##Create inference model

In [None]:
#Get the pure prediction model: extract layers till the output layer
prediction_model = keras.models.Model(
    model.get_layer(name="image").input, model.get_layer(name="Dense2").output)
prediction_model.summary()

Model: "model_5"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
image (InputLayer)           [(None, 200, 200, 1)]     0         
_________________________________________________________________
Conv1 (Conv2D)               (None, 200, 200, 32)      320       
_________________________________________________________________
MaxPool1 (MaxPooling2D)      (None, 100, 100, 32)      0         
_________________________________________________________________
Conv2 (Conv2D)               (None, 100, 100, 64)      18496     
_________________________________________________________________
MaxPool2 (MaxPooling2D)      (None, 50, 50, 64)        0         
_________________________________________________________________
reshape (Reshape)            (None, 50, 3200)          0         
_________________________________________________________________
Dense1 (Dense)               (None, 50, 64)            2048

##Inspect the model

In [None]:
# A utility function to decode the output of the network
def decode_batch_predictions(pred):
    input_len = np.ones(pred.shape[0]) * pred.shape[1]
    # Use greedy search. For complex tasks, you can use beam search
    results = keras.backend.ctc_decode(pred, input_length=input_len, greedy=True)[0][0][:, :num_characters]
    raw_output = []
    # Iterate over the results and get back the text
    output_text = []
    for res in results:
      raw_output.append(res)
      res = tf.strings.reduce_join(model_utils.num_to_char(res)).numpy().decode("utf-8")
      output_text.append(res)
    return output_text


#  Check results on some validation samples
for batch in validation_dataset.take(1):
    batch_images = batch["image"]
    batch_labels = batch["label"]

    preds = prediction_model.predict(batch_images)
    pred_texts = decode_batch_predictions(preds)

    orig_texts = []
    for label in batch_labels:
        label = tf.strings.reduce_join(model_utils.num_to_char(label)).numpy().decode("utf-8")
        orig_texts.append(label)

    _, ax = plt.subplots(16, 1, figsize=(30, 15))
    for i in range(len(pred_texts)):
        img = (batch_images[i, :, :, 0] * 255).numpy().astype(np.uint8)
        title = f"Pred: {pred_texts[i]}"
        ax[i % 16].imshow(img, cmap="gray")
        ax[i % 16].set_title(title)
        ax[i % 16].axis("off")
plt.show()

#5 Conversion

In [None]:
converter = ModelConverter()
#Convert the model
tflite_model = converter.KerasToTFLite(model)
#Save the model
converter.saveModel(tflite_model, model_dir + "\\" + model.name)



INFO:tensorflow:Assets written to: /tmp/tmp8x2qpl3o/assets


INFO:tensorflow:Assets written to: /tmp/tmp8x2qpl3o/assets


In [None]:
#Load the TFLite model
tflite_model = converter.loadTFLite(model_dir + "\\quant_model.tflite")

#Get input and output tensors
input_details = tflite_model.get_input_details()
output_details = tflite_model.get_output_details()
print(input_details)
print(output_details)

[{'name': 'serving_default_image:0', 'index': 0, 'shape': array([  1, 200,  50,   1], dtype=int32), 'shape_signature': array([ -1, 200,  50,   1], dtype=int32), 'dtype': <class 'numpy.float32'>, 'quantization': (0.0, 0), 'quantization_parameters': {'scales': array([], dtype=float32), 'zero_points': array([], dtype=int32), 'quantized_dimension': 0}, 'sparsity_parameters': {}}]
[{'name': 'StatefulPartitionedCall:0', 'index': 126, 'shape': array([ 1,  1, 20], dtype=int32), 'shape_signature': array([-1, -1, 20], dtype=int32), 'dtype': <class 'numpy.float32'>, 'quantization': (0.0, 0), 'quantization_parameters': {'scales': array([], dtype=float32), 'zero_points': array([], dtype=int32), 'quantized_dimension': 0}, 'sparsity_parameters': {}}]
