In [1]:
# import library
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import numpy as np
import pandas as pd 
import cv2 
import io
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.layers import Dense, Reshape, BatchNormalization, Input, Conv2D, MaxPool2D, Lambda, Bidirectional, Dropout, LSTM
# from tensorflow.compat.v1.keras.layers import CuDNNLSTM
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import backend as K
import matplotlib.pyplot as plt
from itertools import groupby

In [2]:
# parameter
alphabets = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ "
max_str_len = 19  # max length of input labels
num_of_characters = len(alphabets) + 1  # +1 for ctc pseudo blank
num_of_timestamps = 64  # max length of predicted labels
default_path = "iam-handwriting-word-database/iam_words/"
# batch size
batch_size = 512

def label_to_num(txt):
    # encoding each output word into digits
    dig_lst = []
    
    for index, char in enumerate(txt):
        try:
            dig_lst.append(alphabets.index(char))
        except:
            print(char)
    
    return pad_sequences([dig_lst], maxlen=max_str_len, padding='post', value=len(alphabets))[0]

def ctc_decoder(predictions):
    '''
    input: given batch of predictions from text rec model
    output: return lists of raw extracted text

    '''
    text_list = []
    
    pred_indcies = np.argmax(predictions, axis=2)
    
    for i in range(pred_indcies.shape[0]):
        ans = ""
        
        ## merge repeats
        merged_list = [k for k,_ in groupby(pred_indcies[i])]
        
        ## remove blanks
        for p in merged_list:
            if p != len(alphabets):
                ans += alphabets[int(p)]
        
        text_list.append(ans)
        
    return text_list

def num_to_label(num):
    ret = ""
    for ch in num:
        if ch == -1:  # CTC Blank
            break
        else:
            ret += alphabets[ch]
    return ret
print(batch_size)

def process_single_sample(img_path, label):

    # 1. Read image
    img = tf.io.read_file(img_path)

    # 2. Decode and convert to grayscale
    img = tf.io.decode_png(img, channels=1)

    # 3. Convert to float32 in [0, 1] range
    img = tf.image.convert_image_dtype(img, tf.float32)

    # 4. Resize to the desired size
    img = tf.image.resize(img, [32, 128])
    
#     img = tf.transpose(img, perm=[1, 0, 2])
    return {"image": img, "label": label}

512


In [3]:
# load dataset
data = pd.read_excel('excel-for-iam-dataset/data.xlsx')
data = pd.DataFrame(data, columns = ['Fpath','Identify']).astype(str)

# remove some images in dataset with the NaNs label
data.dropna(axis=0, inplace=True)
print(data.shape)   

# separate data into train or valid
# Chọn ngẫu nhiên 90% các hàng cho phần train
train = data.sample(frac=0.9, random_state=42)
unique_train = train['Fpath'].unique()
# Chọn các hàng còn lại cho phần validation
valid = data.drop(train.index)
# view
print(train.shape)
print(valid.shape)
print(data.shape)

(115318, 2)
(103786, 2)
(11532, 2)
(115318, 2)


In [4]:
train = train[0:80000]
valid = valid[0:8000]
# reset index
train.reset_index(inplace=True, drop=True)
valid.reset_index(inplace=True, drop=True)

vocab = set("".join(map(str, valid['Identify'])))
print(sorted(vocab))
vocab = set("".join(map(str, train['Identify'])))
print(sorted(vocab))

[' ', '!', '#', '&', '(', ')', '+', ',', '-', '.', '/', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ':', ';', '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']
[' ', '!', '#', '&', '(', ')', '*', '+', ',', '-', '.', '/', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ':', ';', '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']


In [5]:
# size of dataset
train_size = 80000
valid_size = 8000
print(valid_size)

8000


In [6]:
# create list for image data
train_x = []
valid_x = []

# i is params for getting image or label, count is params for loop 
# import image into train_x and valid_x
for i in range(valid_size):
    path= valid.loc[i, 'Fpath']
    img_dir = default_path + path    
    valid_x.append(img_dir)

for i in range(train_size):
    path= train.loc[i, 'Fpath']
    img_dir = default_path + path
    train_x.append(img_dir)

# label for valid data
valid_y = []
for i in range(valid_size):
    string = valid.loc[i, 'Identify']
    valid_y.append(label_to_num(string))

# label for train data
train_y = []
for i in range(train_size):
    string = train.loc[i, 'Identify']
    train_y.append(label_to_num(string))
    
print(len(valid_y))
print(len(train_y))

.
.
,
,
.
,
#
.
,
.
&
;
,
&
;
&
;
,
.
&
;
&
;
.
.
.
&
;
.
,
.
,
-
-
&
;
.
.
.
.
-
.
-
&
;
.
.
&
;
.
,
&
;
-
-
.
.
&
;
.
.
.
&
;
#
.
.
&
;
.
&
;
&
;
-
.
,
.
,
:
&
;
-
:
&
;
,
:
,
.
.
,
.
.
.
.
.
&
;
.
&
;
.
,
-
.
-
.
-
.
.
,
.
.
&
;
&
;
-
.
,
&
;
.
&
;
,
&
;
:
,
,
.
.
,
.
,
&
;
.
.
.
.
,
.
.
,
.
,
-
,
-
-
,
-
,
,
,
.
,
,
.
-
,
,
-
,
.
,
,
,
-
.
.
,
,
,
.
,
.
.
&
;
.
&
;
.
.
,
-
.
.
,
,
.
.
.
-
,
&
;
.
.
,
&
;
.
,
,
,
,
,
.
,
.
-
-
.
.
-
.
,
,
&
;
&
;
&
;
&
;
,
,
.
,
&
;
&
;
.
&
;
.
.
,
.
-
.
.
.
.
,
.
/
-
,
.
&
;
,
&
;
&
;
.
.
.
,
,
&
;
,
&
;
.
,
&
;
&
;
.
,
,
,
&
;
.
.
-
.
&
;
.
,
.
.
.
-
&
;
.
-
,
,
.
-
.
-
&
;
,
?
.
.
.
-
,
,
;
&
;
&
;
.
-
,
,
&
;
.
?
?
,
.
#
.
.
.
,
.
.
,
.
.
.
.
.
.
.
.
.
.
.
-
,
?
,
.
.
.
.
,
&
;
.
.
.
!
.
-
,
.
.
#
.
,
.
-
,
,
:
.
.
,
&
;
&
;
&
;
&
;
,
,
#
,
.
-
-
.
.
,
,
&
;
.
&
;
,
.
(
.
,
.
&
;
&
;
,
&
;
&
;
.
&
;
,
,
.
.
,
.
.
&
;
&
;
,
&
;
&
;
,
&
;
,
.
)
-
&
;
)
,
;
(
&
;
,
.
.
,
.
,
,
.
,
,
.
-
.
,
-
&
;
.
-
&
;
,
.
,
,
.
,
&
;
.
.
-
&
;
&
;
.
,
,
,
,
,
,


In [7]:
train_dataset = tf.data.Dataset.from_tensor_slices((train_x, train_y))

train_dataset = (
    train_dataset.map(
        process_single_sample, num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
    .batch(batch_size)
    .prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
)

valid_dataset = tf.data.Dataset.from_tensor_slices((valid_x, valid_y))
valid_dataset = (
    valid_dataset.map(
        process_single_sample, num_parallel_calls=tf.data.experimental.AUTOTUNE
    )
    .batch(batch_size)
    .prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
)

In [8]:
valid_y[0]

array([17, 10, 28, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63,
       63, 63])

In [9]:
class CTCLayer(layers.Layer):

    def __init__(self, name=None):

        super().__init__(name=name)
        self.loss_fn = K.ctc_batch_cost

    def call(self, y_true, y_pred):
        # Compute the training-time loss value and add it
        # to the layer using `self.add_loss()`.

        batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
        input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
        label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

        input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
        label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

        loss = self.loss_fn(y_true, y_pred, input_length, label_length)
        self.add_loss(loss)

        # At test time, just return the computed predictions
        return y_pred

In [10]:
input_data = Input(shape=(32, 128, 1), name='image')
labels = layers.Input(name="label", shape=(None,), dtype="float32")

inner = Conv2D(32, (3, 3), padding='same', name='conv1', activation='selu')(input_data)
inner = MaxPool2D(pool_size=(2, 2), name='max1')(inner)

inner = Conv2D(64, (3, 3), padding='same', name='conv2', activation='selu')(inner)
inner = MaxPool2D(pool_size=(2, 2), name='max2')(inner)

inner = Conv2D(128, (3, 3), padding='same', name='conv3', activation='selu')(inner)
inner = Conv2D(128, (3, 3), padding='same', name='conv4', activation='selu')(inner)

inner = Conv2D(512, (3, 3), padding='same', name='conv5', activation='selu')(inner)
inner = Conv2D(512, (3, 3), padding='same', name='conv6', activation='selu')(inner)
inner = Dropout(0.2)(inner)

inner = Conv2D(512, (3, 3), padding='same', name='conv7', activation='selu')(inner)
inner = Conv2D(512, (3, 3), padding='same', name='conv8', activation='selu')(inner)
inner = MaxPool2D(pool_size=(2, 1), name='max8')(inner)

inner = Conv2D(256, (3, 3), padding='same', name='conv9',  activation='selu')(inner)
inner = BatchNormalization()(inner)
inner = Dropout(0.2)(inner)

inner = Conv2D(256, (3, 3), padding='same', name='conv10', activation='selu')(inner)
inner = BatchNormalization()(inner)
inner = MaxPool2D(pool_size=(2, 1), name='max10')(inner)
inner = Dropout(0.2)(inner)

inner = Conv2D(64, (2,2), name='conv11', activation='selu')(inner)
inner = Dropout(0.2)(inner)

# CNN to RNN
squeezed = Lambda(lambda x: K.squeeze(x, 1))(inner)
# RNN
inner = Bidirectional(LSTM(128, return_sequences=True), name='lstm1')(squeezed)
inner = Bidirectional(LSTM(512, return_sequences=True), name='lstm2')(inner)
inner = Bidirectional(LSTM(512, return_sequences=True), name='lstm3')(inner)
inner = Bidirectional(LSTM(512, return_sequences=True), name='lstm4')(inner)
inner = Bidirectional(LSTM(128, return_sequences=True), name='lstm5')(inner)
dense_= Dense(128,activation = 'relu')(inner)
# OUTPUT
y_pred = Dense(num_of_characters,activation = 'softmax', name='dense2')(dense_)
output = CTCLayer(name="ctc_loss",)(labels, y_pred)





In [11]:
# model for test
model = Model(inputs=input_data, outputs=y_pred)
model.summary()

# model for train
train_model = Model(inputs=[input_data, labels], outputs=output)
train_model.summary()

In [12]:
train_model.compile(optimizer=Adam(learning_rate=0.001, beta_1=0.9, beta_2=0.999, clipnorm=1.0),
                    metrics=[tf.keras.metrics.Accuracy()])

filepath = "working/best_model.h5"

# function callback
checkpoint = ModelCheckpoint(filepath=filepath,
                             monitor= 'val_loss',
                             verbose=1, save_best_only=True, save_weights_only=True, mode='auto')

earlyStopping = EarlyStopping(monitor='val_loss', mode='auto', patience=15)

callbacks_list = [checkpoint, earlyStopping]

# method 1
history = train_model.fit(train_dataset,
                          epochs=150,
                          validation_data=valid_dataset,
                          verbose = 1,
                          shuffle=True,   
                          callbacks=callbacks_list)

# method 2
# history = train_model.fit(dataset_final, epochs=10, verbose=1)
model.save('working/my_model.h5')

ValueError: When using `save_weights_only=True` in `ModelCheckpoint`, the filepath provided must end in `.weights.h5` (Keras weights format). Received: filepath=working/best_model.h5

In [None]:
# show correct accuracy
model.load_weights('working/best_model.h5')
prediction = []
for batch in valid_dataset.as_numpy_iterator():
    preds = model.predict(batch)
    prediction.extend(ctc_decoder(preds))

y_true = valid.loc[0:valid_size, 'Identify']
correct_char = 0
total_char = 0
correct = 0
for i in range(valid_size):
    pr = prediction[i]
    tr = y_true[i]
    total_char += len(tr)

    for j in range(min(len(tr), len(pr))):
        if tr[j] == pr[j]:
            correct_char += 1

    if pr == tr:
        correct += 1

print('Correct characters predicted : %.2f%%' % (correct_char * 100 / total_char))
print('Correct words predicted      : %.2f%%' % (correct * 100 / valid_size))

In [None]:
model.load_weights('working/my_model.h5')
prediction = []
for batch in valid_dataset.as_numpy_iterator():
    preds = model.predict(batch)
    prediction.extend(ctc_decoder(preds))
y_true = valid.loc[0:valid_size, 'Identify']
correct_char = 0
total_char = 0
correct = 0
for i in range(valid_size):
    pr = prediction[i]
    tr = y_true[i]
    total_char += len(tr)

    for j in range(min(len(tr), len(pr))):
        if tr[j] == pr[j]:
            correct_char += 1

    if pr == tr:
        correct += 1

print('Correct characters predicted : %.2f%%' % (correct_char * 100 / total_char))
print('Correct words predicted      : %.2f%%' % (correct * 100 / valid_size))

In [None]:
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Training Accuracy vs Validation Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()

In [None]:
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Training Loss vs Validation Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()

In [None]:
import pickle

# Lưu lịch sử huấn luyện thành file
with open('working/history.pkl', 'wb') as f:
    pickle.dump(history.history, f)