In [None]:
!pip3 install -U captcha

## Setup

In [None]:
from captcha.image import ImageCaptcha
import cv2
import numpy as np
import os
import pandas as pd
import random
import string
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import time

# captcha label setup

symbols = ' ' + ''.join(sorted(string.ascii_uppercase + string.ascii_lowercase + string.digits + '#%-:<>[]{}'))

captcha_lens = [1, 2, 3, 4, 5, 6]
captcha_max_len = 8

# captcha image setup

img_height, img_width = 64, 128

# utility functions

decode_label = lambda s: ''.join([symbols[x] for x in s[:s.index(0)]])
encode_label = lambda s: [symbols.find(x) for x in s.ljust(captcha_max_len)]

## Sample Generation

In [None]:
class DataGenerator(keras.utils.Sequence):
    def __init__(self, len_batch, n_batch, img_height, img_width, symbols, encode_label, max_len_label, lens_label):
        self.len_batch = len_batch
        self.n_batch = n_batch

        self.height, self.width = img_height, img_width

        self.symbols = symbols
        self.n_symbols = len(symbols)
        self.encode_label = encode_label

        self.max_len_label = max_len_label
        self.lens_label = lens_label

        self.generator = ImageCaptcha(img_width, img_height)

        self.cache = [None for _ in range(n_batch)]
        
    def __len__(self):
        return self.n_batch
    
    def __getitem__(self, idx):
        if self.cache[idx] != None:
            return self.cache[idx]
        
        inputs = np.zeros((self.len_batch, self.width, self.height, 1), dtype=np.float32)
        labels = np.zeros((self.len_batch, self.max_len_label,), dtype=np.int64)

        avail_symbols = self.symbols[1:]

        for i in range(self.len_batch):
            text = ''.join([random.choice(avail_symbols) for _ in range(random.choice(self.lens_label))])

            img = self.generator.generate_image(text)
            img = cv2.cvtColor(np.array(img), cv2.COLOR_BGR2GRAY)

            # img = cv2.GaussianBlur(img, (3, 3), 0)
            # _, img = cv2.threshold(img, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)

            # kernel = cv2.getStructuringElement(cv2.MORPH_CROSS, (3, 3))
            # img = cv2.morphologyEx(img, cv2.MORPH_OPEN, kernel)

            img = img / 255.0
            img = np.expand_dims(img, axis=-1)
            img = img.transpose(1, 0, 2) 
            inputs[i] = img

            labels[i] = encode_label(text)
        
        self.cache[idx] = {
            "image": inputs, 
            "label": labels,
        }

        return self.cache[idx]
    
    def reset(self):
        self.cache = [None for _ in range(self.n_batch)]

    def on_epoch_end(self):
        random.shuffle(self.cache)

def print_example_dataset_row():
    from google.colab.patches import cv2_imshow

    dataset = DataGenerator(
                            len_batch=4,
                            n_batch=1,
                            img_width=img_width,
                            img_height=img_height,
                            symbols=symbols,
                            encode_label=encode_label,
                            max_len_label=captcha_max_len,
                            lens_label = captcha_lens,
                            )
    
    batch = dataset.__getitem__(0)

    for i in range(len(batch['image'])):
        img = batch['image'][i].transpose(1, 0, 2) * 255.0
        cv2_imshow(img)

        label = decode_label([int(x) for x in batch['label'][i]])
        print(label)

print_example_dataset_row()


## Model

In [None]:
class CTCLayer(layers.Layer):
    def __init__(self, name=None):
        super().__init__(name=name)
        self.loss_fn = keras.backend.ctc_batch_cost

    def call(self, y_true, y_pred):
        batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")

        input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
        input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")

        label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")
        label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

        loss = self.loss_fn(y_true, y_pred, input_length, label_length)
        self.add_loss(loss)

        return loss


def build_model(len_label_max, n_symbols):
    input = layers.Input(shape=(img_width, img_height, 1), name="image", dtype="float32")
    label = layers.Input(name="label", shape=(len_label_max,), dtype="int64")

    x = input

    # cnn module

    x = layers.Conv2D(64, (3, 3), padding='same', name='conv1', kernel_initializer='he_normal')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.MaxPooling2D(pool_size=(2, 2), name='max1')(x)

    x = layers.Conv2D(128, (3, 3), padding='same', name='conv2', kernel_initializer='he_normal')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.MaxPooling2D(pool_size=(2, 2), name='max2')(x)

    x = layers.Conv2D(256, (3, 3), padding='same', name='conv3', kernel_initializer='he_normal')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Conv2D(256, (3, 3), padding='same', name='conv4', kernel_initializer='he_normal')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.MaxPooling2D(pool_size=(1, 2), name='max3')(x)

    x = layers.Conv2D(512, (3, 3), padding='same', name='conv5', kernel_initializer='he_normal')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Conv2D(512, (3, 3), padding='same', name='conv6')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.MaxPooling2D(pool_size=(1, 2), name='max4')(x)

    x = layers.Conv2D(512, (2, 2), padding='same', kernel_initializer='he_normal', name='con7')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    
    model = keras.Model(input, x, name='cnn')
    x = model(input)

    # bridging

    conv_shape = x.get_shape()
    x = layers.Reshape((int(conv_shape[1]), int(conv_shape[3] * conv_shape[2])))(x)

    x = layers.Dense(32)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Dropout(0.25)(x)

    # rnn module

    rnn_size = 128

    x = layers.Bidirectional(layers.GRU(rnn_size, kernel_initializer='he_normal', return_sequences=True))(x)
    x = layers.Bidirectional(layers.GRU(rnn_size, kernel_initializer='he_normal', return_sequences=True))(x)

    x = layers.Dropout(0.25)(x)
    x = layers.Dense(n_symbols + 1, kernel_initializer='he_normal', activation='softmax')(x)

    # prediction model

    predict_model = keras.Model(input, x)

    # ctc loss

    loss_out = CTCLayer(name='ctc_loss')(label, x)

    # training model

    model = keras.Model(inputs=[input, label], outputs=loss_out)
    model.compile(optimizer=keras.optimizers.Adam())

    return model, predict_model


model, predict_model = build_model(len_label_max=captcha_max_len, n_symbols=len(symbols))

In [None]:
# model.summary()
# predict_model.summary()

## Train

In [None]:
from pathlib import Path

checkpoint = Path('/content/drive/MyDrive/CS7NS1/ctc.h5')
rescue = Path('/content/drive/MyDrive/CS7NS1/ctc.rescue.h5')

In [None]:
if checkpoint.is_file():
    model.load_weights(checkpoint)
    print('Loaded checkpoint from %s' % (checkpoint.name))

In [None]:
import multiprocessing

model_callbacks = [
    keras.callbacks.EarlyStopping(monitor='val_loss',
                                  patience=5,
                                  restore_best_weights=True),
    keras.callbacks.ModelCheckpoint(checkpoint, save_best_only=True),
]

device = tf.device('/cpu:0')

device_name = tf.test.gpu_device_name()
if device_name == '/device:GPU:0':
    print('Using GPU at: %s' % (device_name))
    device = tf.device(device_name)

training_dataset = DataGenerator(len_batch=32, n_batch=256, 
                                 img_width=img_width, img_height=img_height, 
                                 symbols=symbols,
                                 encode_label=encode_label,
                                 max_len_label=captcha_max_len,
                                 lens_label=captcha_lens)

validation_dataset = DataGenerator(len_batch=32, n_batch=16, 
                                   img_width=img_width, img_height=img_height, 
                                   symbols=symbols,
                                   encode_label=encode_label,
                                   max_len_label=captcha_max_len,
                                   lens_label=captcha_lens)

with device:
    for _ in range(5):
        training_dataset.reset()
        validation_dataset.reset()

        try:
            model.fit(training_dataset,
                      validation_data=validation_dataset,
                      epochs=100,
                      callbacks=[model_callbacks],
                      workers=multiprocessing.cpu_count())
        except KeyboardInterrupt:
            model.save(rescue)
            print('Stopped by keyboard interrupt')
            break


## Benchmark

In [None]:
def decode_prediction(y_pred):
    input_length = np.ones(y_pred.shape[0]) * y_pred.shape[1]
    y_pred = keras.backend.ctc_decode(y_pred, input_length, greedy=False)[0][0][:, :captcha_max_len]
    y_pred = [decode_label(list(y)) for y in y_pred]
    return y_pred

def benchmark():
    prediction_model = predict_model

    dataset = DataGenerator(len_batch=1000, n_batch=1, 
                                       img_width=img_width, img_height=img_height, 
                                       symbols=symbols,
                                       encode_label=encode_label,
                                       max_len_label=captcha_max_len,  
                                       lens_label=captcha_lens)

    batch = dataset.__getitem__(0)
    batch_size = len(batch['image'])

    x = batch['image']
    x = x.reshape((-1, 128, 64, 1))

    y_true = batch['label']
    y_true = [decode_label(list(y)) for y in y_true]

    y_pred = prediction_model.predict(x)
    y_pred = decode_prediction(y_pred)

    correct = 0
    for i, _ in enumerate(x):
        true, pred = y_true[i], y_pred[i]
        if true == pred:
            correct = correct + 1
        else:
            print('%d:\t[%d] %s\t[%d] %s' % (i, len(true), true.ljust(captcha_max_len), len(pred), pred.ljust(captcha_max_len)))
    print('%d of %d are correctly predicted' % (correct, len(x)))


benchmark()

## Inference

In [None]:
!rm -r images images.csv output.csv
!tar xf images.tar
!ls -ahl

In [None]:
def batch_predict(model):
    parent_path = os.path.abspath('./images')

    df = pd.read_csv('./images.csv', header=None, index_col=False, names=['filename'])[['filename']]
    df['result'] = ''

    start_time = time.time()
    for idx, row in df.iterrows():
        filename = row['filename']
        filename = os.path.join(parent_path, filename)

        x = cv2.imread(filename, cv2.IMREAD_GRAYSCALE)
        x = xx / 255.0
        x = np.expand_dims(x, axis=-1)
        x = x.transpose(1, 0, 2) 

        x = np.expand_dims(x, axis=0)
        y_pred = model.predict(x)
        y_pred = decode_prediction(y_pred)[0]

        # print('%s: %s' % (os.path.basename(filename), y_pred))

        row['result'] = y_pred

        if idx % 100 == 0:
            print('predicted %d of %d images in %f seconds' % (idx, df.shape[0], time.time() - start_time))

    print('predicted %d images in %f seconds' % (df.shape[0], time.time() - start_time))
    
    df.sort_values(by=['filename'], ascending=True)
    df.to_csv('./output.csv', columns=['filename', 'result'], header=False, index=False)

batch_predict(predict_model)

## TFLite

In [None]:
lite_path = Path('/content/drive/MyDrive/CS7NS1/ctc-lite.h5')

In [None]:
def convert_to_tflite(model, quantization='dr'):
    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    return converter.convert()

predict_model.compile(optimizer=keras.optimizers.Adam())
predict_model.load_weights(checkpoint)

lite_model = convert_to_tflite(predict_model)

with open(lite_path, 'wb') as f:
    f.write(lite_model)

In [None]:
!pip3 install -U pyctcdecode

In [None]:
def decode_ctc_lite(logits, symbols):
    output, last_logit = [], None
    for logit in logits.argmax(axis=1):
        if (logit < len(symbols)) and (logit != last_logit):
            output.append(logit)
        last_logit = logit

    return output

def benchmark_lite():
    interpreter = tf.lite.Interpreter(model_path=str(lite_path))
    interpreter.allocate_tensors()

    dataset = DataGenerator(len_batch=1000, n_batch=1, 
                                       img_width=img_width, img_height=img_height, 
                                       symbols=symbols,
                                       encode_label=encode_label,
                                       max_len_label=captcha_max_len,  
                                       lens_label=captcha_lens)

    batch = dataset.__getitem__(0)
    batch_size = len(batch['image'])

    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()

    correct = 0
    for i in range(batch_size):
        x = batch['image'][i]
        y_true = decode_label(list(batch['label'][i]))

        x = x.reshape((-1, 128, 64, 1))

        interpreter.set_tensor(input_details[0]['index'], x)
        interpreter.invoke()

        y_pred = interpreter.get_tensor(output_details[0]['index'])
        y_pred = decode_label(decode_ctc_lite(y_pred[0], symbols))

        if y_true == y_pred:
            correct = correct + 1
        else:
            print('%d:\t[%d] %s\t[%d] %s' % (i, 
                                             len(y_true), 
                                             y_true.ljust(captcha_max_len), 
                                             len(y_pred), 
                                             y_pred.ljust(captcha_max_len)))

    print('%d of %d are correctly predicted' % (correct, batch_size))


benchmark_lite()

In [None]:
def batch_predict_lite():
    # lite model setup

    interpreter = tf.lite.Interpreter(model_path=str(lite_path))
    interpreter.allocate_tensors()

    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()

    # read inference list

    parent_path = os.path.abspath('./images')

    df = pd.read_csv('./images.csv', header=None, index_col=False, names=['filename'])[['filename']]
    df['result'] = ''

    start_time = time.time()
    for idx, row in df.iterrows():
        filename = row['filename']
        filename = os.path.join(parent_path, filename)

        x = cv2.imread(filename, cv2.IMREAD_GRAYSCALE)  # = float64
        x = np.array(x, dtype=np.float32) / 255.0       # = float32
        x = np.expand_dims(x, axis=-1)                  # = (128, 64)
        x = x.transpose(1, 0, 2)                        # = (64, 128, 1)
        x = np.expand_dims(x, axis=0)                   # = (1, 128, 64, 1)

        interpreter.set_tensor(input_details[0]['index'], x)
        interpreter.invoke()
        y_pred = interpreter.get_tensor(output_details[0]['index'])

        y_pred = decode_label(decode_ctc_lite(y_pred[0], symbols))

        # print('%s: %s' % (os.path.basename(filename), y_pred))

        row['result'] = y_pred

        if idx % 100 == 0:
            print('predicted %d of %d images in %f seconds' % (idx, df.shape[0], time.time() - start_time))

    print('predicted %d images in %f seconds' % (df.shape[0], time.time() - start_time))
    
    df.sort_values(by=['filename'], ascending=True)
    df.to_csv('./output_lite.csv', columns=['filename', 'result'], header=False, index=False)

batch_predict_lite()