In [None]:
import keras
from tensorflow.keras.layers import Lambda, Dense, Bidirectional, GRU, Flatten, TimeDistributed, Permute, Activation, Input
from tensorflow.keras.layers import LSTM, Reshape, Conv2D, MaxPooling2D, BatchNormalization, ZeroPadding2D
from tensorflow.keras import backend as K
import numpy as np
import os
import tensorflow as tf

# Utils

In [None]:
label_dict = np.loadtxt('/content/dictionary.txt', dtype=np.str)
print(label_dict)
print(len(label_dict))
num_classes = len(label_dict) + 1

['blank' '，' '的' ... '柒' '¥' '：']
5996


# Model

In [None]:
def feature_extractor(input):
    initializer = keras.initializers.he_normal()
    x = Conv2D(64, (3,3), strides=(1,1), padding="same", 
               kernel_initializer=initializer, 
               use_bias=True, name='conv2d_1')(input) # 32*128*64 
    x = BatchNormalization(name="BN_1")(x)
    x = Activation("relu", name="relu_1")(x)
    x = MaxPooling2D(pool_size=(2,2), strides=2, padding='valid', name='maxpl_1')(x) # 16*64*64

    x = Conv2D(128, (3,3), strides=(1,1), padding="same", 
               kernel_initializer=initializer, use_bias=True, 
               name='conv2d_2')(x) # 16*64*128
    x = BatchNormalization(name="BN_2")(x)
    x = Activation("relu", name="relu_2")(x)
    x = MaxPooling2D(pool_size=(2,2), strides=2, padding='valid', name='maxpl_2')(x) # 8*32*128
    
    x = Conv2D(256, (3,3), strides=(1,1), padding="same", 
               kernel_initializer=initializer, use_bias=True, 
               name='conv2d_3')(x)  # 8*32*256
    x = BatchNormalization(name="BN_3")(x)
    x = Activation("relu", name="relu_3")(x)

    x = Conv2D(256, (3,3), strides=(1,1), padding="same", 
               kernel_initializer=initializer, use_bias=True, 
               name='conv2d_4')(x) # 8*32*256
    x = BatchNormalization(name="BN_4")(x)
    x = Activation("relu", name="relu_4")(x)
    x = MaxPooling2D(pool_size=(2,1), strides=(2,1), name='maxpl_3')(x) # 4*32*256
    
    x = Conv2D(512, (3,3), strides=(1,1), padding="same", 
               kernel_initializer=initializer, use_bias=True, 
               name='conv2d_5')(x) # 4*32*512
    x = BatchNormalization(axis=-1, name='BN_5')(x)
    x = Activation("relu", name='relu_5')(x)

    x = Conv2D(512, (3,3), strides=(1,1), padding="same", 
               kernel_initializer=initializer, use_bias=True, 
               name='conv2d_6')(x) # 4*32*512
    x = BatchNormalization(axis=-1, name='BN_6')(x)
    x = Activation("relu", name='relu_6')(x)
    x = MaxPooling2D(pool_size=(2,1), strides=(2,1), name='maxpl_4')(x) # 2*32*512
    
    x = Conv2D(512, (2,2), strides=(1,1), padding='same', 
               activation='relu', kernel_initializer=initializer, 
               use_bias=True, name='conv2d_7')(x) # 2*32*512
    x = BatchNormalization(name="BN_7")(x)
    x = Activation("relu", name="relu_7")(x)
    conv_otput = MaxPooling2D(pool_size=(2, 1), name="conv_output")(x) # 1*32*512
    
    return conv_otput

In [None]:
def rnn(input):
    initializer = keras.initializers.he_normal()

    x = Permute((2, 3, 1), name='permute')(input) # 32*512*1
    rnn_input = TimeDistributed(Flatten(), name='for_flatten_by_time')(x) # 32*512

    # RNN part
    y = Bidirectional(LSTM(256, kernel_initializer=initializer, return_sequences=True), 
                      merge_mode='sum', name='LSTM_1')(rnn_input) # 32*512
    y = BatchNormalization(name='BN_8')(y)

    rnn_output = Bidirectional(LSTM(256, kernel_initializer=initializer, 
                                return_sequences=True), name='LSTM_2')(y) 

    return rnn_output

In [None]:
def build_model(input_shape=(32, 280, 1), num_classes=6004, max_label_len=23, is_training=True):
    cnn_inputs = Input(shape=input_shape, name='image_input')
    cnn_output = feature_extractor(cnn_inputs)
    rnn_output = rnn(cnn_output)
    y_pred = Dense(num_classes, activation='softmax', name='y_pred')(rnn_output)
    base_model = keras.models.Model(inputs=cnn_inputs, outputs=y_pred)
    return base_model

In [None]:
model = build_model(num_classes=num_classes)
# model.summary()

In [None]:
! wget https://github.com/Liumihan/CRNN_kreas/raw/master/trained_weights/300wbest_vgg_blstm_ctc_best_weight.h5

--2021-12-24 03:30:24--  https://github.com/Liumihan/CRNN_kreas/raw/master/trained_weights/300wbest_vgg_blstm_ctc_best_weight.h5
Resolving github.com (github.com)... 192.30.255.112
Connecting to github.com (github.com)|192.30.255.112|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/Liumihan/CRNN_kreas/master/trained_weights/300wbest_vgg_blstm_ctc_best_weight.h5 [following]
--2021-12-24 03:30:24--  https://raw.githubusercontent.com/Liumihan/CRNN_kreas/master/trained_weights/300wbest_vgg_blstm_ctc_best_weight.h5
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.109.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 45118132 (43M) [application/octet-stream]
Saving to: ‘300wbest_vgg_blstm_ctc_best_weight.h5’


2021-12-24 03:30:26 (281 MB/s) - ‘300wbest_vg

In [None]:
org_model = build_model(num_classes=5991)
org_model.load_weights('/content/300wbest_vgg_blstm_ctc_best_weight.h5')

In [None]:
for i in range(32):
    model.layers[i].set_weights(org_model.layers[i].get_weights())

# Data

In [None]:
!unzip contract_data.zip

In [None]:
import cv2
def load_images(image_path, image_size):
    image = cv2.imdecode(np.fromfile(image_path, dtype=np.uint8),
                         cv2.IMREAD_COLOR)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    image = cv2.resize(image, (image_size[1], image_size[0]))
    image = image.astype(np.float32)
    return image


In [None]:
from tensorflow.keras.utils import Sequence
class BatchGenerator(Sequence):
    """Generator for the input data to the OCR model. We're also preparing
    arrays for the CTC loss which are related to the output dimensions"""

    def __init__(self, contents, batch_size,
                 img_size, down_sample_size, test=True, start=0):
        super(BatchGenerator, self).__init__()
        self.contents = contents[start:]
        self.batch_size = batch_size
        self.img_h, self.img_w = img_size
        self.test = test
        self.single_pred_len = int(self.img_w // down_sample_size)

        # total number of unique characters
        self.num_chars = 6004
        self.epoch_size = int(len(contents)//batch_size)
        self.image_amount = len(contents)

    def __len__(self):
        """Denotes the number of batches per epoch
        :return: number of batches per epoch """
        return self.epoch_size

    def __getitem__(self, index):
        """Generate one batch of data"""

        # stores the length (number of characters) of each word in a batch
        label_length = np.zeros((self.batch_size, 1), dtype=np.float64)
        pred_length = np.full((self.batch_size, 1), self.single_pred_len, dtype=np.float64)

        data_contents = self.contents[index * self.batch_size:(index + 1) * self.batch_size]
        data_contents = [data.replace('\n', '') for data in data_contents]

        batch_images = []
        batch_labels = []

        max_word_len_batch = max([len(word.split(' ')[1:]) for word in data_contents])
        for idx, content in enumerate(data_contents):
            image_path = '/content/contract_data/' + content.split(' ')[0]
            batch_images.append(load_images(image_path, image_size=(self.img_h, self.img_w)))

            label_str = content.split(' ')[1:]
            label_length[idx][0] = len(label_str)
            label = [int(l_str) for l_str in label_str]
            for n in range(max_word_len_batch - len(label_str)):
                label.append(num_classes)

            if not self.test:
                label.append(len(label_str))
                label.append(self.single_pred_len)
            batch_labels.append(label)
        batch_images = np.array(batch_images, dtype=np.float64) / 255. * 2 - 1
        batch_images = np.expand_dims(batch_images, axis=-1)
        batch_labels = np.array(batch_labels, dtype=np.float64)

        return batch_images, batch_labels

In [None]:
with open('/content/correct_labels.txt', 'r', encoding='utf-8') as file:
    dataset = file.readlines()
    train_dataset = dataset[:int(len(dataset) * 0.9)]
    validation_dataset = dataset[int(len(dataset) * 0.9):]

print(len(train_dataset))
print(len(validation_dataset))
train_data = BatchGenerator(contents=train_dataset, batch_size=32, 
                img_size=(32, 280), down_sample_size=4,
                validation=False)

validation_data = BatchGenerator(contents=train_dataset, batch_size=32, 
                img_size=(32, 280), down_sample_size=4,
                validation=False)

1846
206


# loss

In [None]:
def sparse_labels(y_true):
    labels = y_true[:, :-2]
    label_length = y_true[:, -2]
    logit_length = y_true[:, -1]
    labels = tf.cast(labels, dtype=tf.int32)
    label_length = tf.cast(label_length, dtype=tf.int32)
    label_length = tf.expand_dims(label_length, axis=-1)
    logit_length = tf.cast(logit_length, dtype=tf.int32)
    logit_length = tf.expand_dims(logit_length, axis=-1)

    return labels, label_length, logit_length

# for i in range(1):
#     x, y = train_data.__getitem__(i)
#     y = tf.convert_to_tensor(y)
#     labels, label_len, logit_len = sparse_labels(y)
#     print(labels[:2])
#     print(label_len[:2])
#     print(logit_len[:2])

In [None]:
class MyCTCLoss(tf.keras.losses.Loss):
    def call(self, y_true, y_pred):
        labels, label_length, logit_length = sparse_labels(y_true)

        # loss_value = tf.nn.ctc_loss(labels=labels,
        #                logits=y_pred,
        #                label_length=label_length,
        #                logit_length=logit_length,
        #                logits_time_major=False)
        loss_value = tf.keras.backend.ctc_batch_cost(y_true=labels,
                                y_pred=y_pred,
                                input_length=logit_length,
                                label_length=label_length)
        
        return tf.reduce_mean(loss_value)


# Metrics

In [None]:
def accuracy(y_true, y_pred):
    labels, label_length, logit_length = sparse_labels(y_true)
    
    batch_size = labels.shape[0]
    logit_length = tf.squeeze(logit_length)

    if len(logit_length.shape) == 0:
        logit_length = [tf.keras.backend.get_value(logit_length)]
    else:
        logit_length = tf.keras.backend.get_value(logit_length)
    y_pred_list, _ = keras.backend.ctc_decode(y_pred, logit_length, greedy=True)
    pred_label_tensor = y_pred_list[0]
    pred_label = np.array(tf.keras.backend.get_value(pred_label_tensor))
    pred_label = pred_label[:, :labels.shape[1]]
    # for i in range(len(pred_label)):
    pred_label[np.where(pred_label == -1)] = num_classes
    m = tf.keras.metrics.Accuracy()

    m.update_state(pred_label, labels)
    return m.result()



# Compile and Train

In [None]:
optimizer = tf.keras.optimizers.Adam()
loss = MyCTCLoss()
model.compile(optimizer=optimizer, loss=loss, run_eagerly=True, metrics=[accuracy])

In [None]:
model.fit(train_data,
    steps_per_epoch=train_data.epoch_size,
    epochs=10, 
    validation_data=validation_data)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x7f2dc6db1a10>

In [None]:
model.save_weights('/content/test_weights.h5')

# Predict

In [None]:
model_pred = build_model(num_classes=num_classes)
model_pred.load_weights('/content/test_weights.h5')

In [None]:
from matplotlib import pyplot as plt
test_image_path = ['/content/contract_data/gz_image000_000_010.png', '/content/contract_data/gz_image000_000_011.png',
                   '/content/contract_data/gz_image000_000_012.png', '/content/contract_data/gz_image000_000_013.png']
image_list = []
for path in test_image_path:
    test_image = cv2.imread(path)
    test_image = cv2.cvtColor(test_image, cv2.COLOR_BGR2GRAY)
    test_image = cv2.resize(test_image, (280, 32))
    test_image = test_image / 255. * 2.0 - 1.0
    test_image = np.expand_dims(test_image, axis=-1)
    image_list.append(test_image)


In [None]:
def predict_list_images(image_list):
    image_list = np.array(image_list)
    # test_image = np.expand_dims(test_image, axis=0)
    prob_matrix = model_pred.predict(image_list)
    y_pred_len = np.full((len(image_list), ), int(image_list[0].shape[1] // 4))

    y_pred_list, _ = keras.backend.ctc_decode(prob_matrix, y_pred_len, greedy=True)
    pred_label_tensor = y_pred_list[0]
    predictions = keras.backend.get_value(pred_label_tensor)
    for single in predictions:
        char = []
        for label in single:
            if label == -1:
                continue
            char.append(label_dict[label])

        print(''.join(char))


In [None]:
predict_list_images(image_list)

人民币¥11,442,186.
14（大写
人民币壹仟壹佰肆
拾肆万贰仟壹佰捌


In [None]:
def predict_single_image(path):
    test_image = cv2.imread(path)
    test_image = cv2.cvtColor(test_image, cv2.COLOR_BGR2GRAY)
    test_image = cv2.resize(test_image, (280, 32))
    test_image = test_image / 255. * 2.0 - 1.0
    test_image = np.expand_dims(test_image, axis=-1)
    test_image = np.expand_dims(test_image, axis=0)

    prob_matrix = model_pred.predict(test_image)
    y_pred_len = np.full((len(test_image), ), int(test_image[0].shape[1] // 4))

    y_pred_list, _ = keras.backend.ctc_decode(prob_matrix, y_pred_len, greedy=True)
    pred_label_tensor = y_pred_list[0]
    predictions = keras.backend.get_value(pred_label_tensor)

    char = []
    for label in predictions[0]:
        if label == -1:
            continue
        char.append(label_dict[label])

    print(''.join(char))
predict_single_image(test_image_path[0])

人民币¥11,442,186.
