In [1]:
from tensorflow import keras

import matplotlib.pyplot as plt
import tensorflow as tf
import numpy as np
import os
import cv2

np.random.seed(42)
tf.random.set_seed(42)

In [2]:
def GenerateOffLine():
    with open("./data/offline/words.txt") as f:
        line = f.readlines()
    
    lines = []
    for l in line:
        if(l.startswith("#")):
            continue
        else:
            lines.append(l)

    new_lines = []
    for i in range(len(lines)):
        line = lines[i]
        splits = line.split(' ')
        status = splits[1]

        if status == 'ok':
            new_lines.append(lines[i])
                        
        idx = int(0.9 * len(new_lines))
        train_samples = new_lines[:idx]
        test_samples = new_lines[idx:]
        val_idx = int(0.5 * len(test_samples))
        validation_samples = test_samples[:val_idx]
        test_samples = test_samples[val_idx:]
        
    return train_samples,test_samples,validation_samples

In [3]:
train_samples,test_samples,validation_samples = GenerateOffLine()

In [4]:
train_samples[:5]

['a01-000u-00-00 ok 154 408 768 27 51 AT A\n',
 'a01-000u-00-01 ok 154 507 766 213 48 NN MOVE\n',
 'a01-000u-00-02 ok 154 796 764 70 50 TO to\n',
 'a01-000u-00-03 ok 154 919 757 166 78 VB stop\n',
 'a01-000u-00-04 ok 154 1185 754 126 61 NPT Mr.\n']

In [5]:
import os

In [6]:
def get_samples(samples):
    paths = []
    labels = []
    for i in range (len(samples)):
        s = samples[i]
        s = s.split(" ")
        file = s[0]
        label = s[len(s)-1]
        label = label.split("\n")[0]
        file_path = file.split("-")
        img_path = "./data/offline/iam/" + file_path[0] + "/" + file_path[0] + "-" + file_path[1] + "/" + file + ".png"
        if os.path.getsize(img_path):
            paths.append(img_path)
            labels.append(label)
            
    return paths, labels

In [7]:
train_img_paths, train_labels = get_samples(train_samples)
validation_img_paths, validation_labels = get_samples(validation_samples)
test_img_paths, test_labels = get_samples(test_samples)

In [8]:
train_img_paths[:5]

['./data/offline/iam/a01/a01-000u/a01-000u-00-00.png',
 './data/offline/iam/a01/a01-000u/a01-000u-00-01.png',
 './data/offline/iam/a01/a01-000u/a01-000u-00-02.png',
 './data/offline/iam/a01/a01-000u/a01-000u-00-03.png',
 './data/offline/iam/a01/a01-000u/a01-000u-00-04.png']

In [9]:
train_labels[:5]

['A', 'MOVE', 'to', 'stop', 'Mr.']

In [10]:
characters = []
max_len = 0
for i in range(len(train_labels)):
    label = train_labels[i]
    for char in label:
        characters.append(char)
        
    if(len(label)>max_len):
        max_len = len(label)
        
characters = set(characters)

In [11]:
from tensorflow.keras.layers.experimental.preprocessing import StringLookup

In [12]:
AUTOTUNE = tf.data.AUTOTUNE
char_to_num = StringLookup(vocabulary=list(characters), mask_token=None)
num_to_char = StringLookup(vocabulary=char_to_num.get_vocabulary(), mask_token=None, invert=True)

In [13]:
def distortion_free_resize(image, img_size):
   
    w, h = img_size
    image  = tf.image.resize(image, size=(h, w), preserve_aspect_ratio=True)

    # Check tha amount of padding needed to be done.
    pad_height = h - tf.shape(image)[0]
    pad_width = w - tf.shape(image)[1]
    
    # Only necessary if you want to do same amount of padding on both sides.
    if pad_height % 2 != 0:
        height = pad_height // 2
        pad_height_top = height + 1
        pad_height_bottom = height
    else:
        pad_height_top = pad_height_bottom = pad_height // 2
    
    if pad_width % 2 != 0:
        width = pad_width // 2
        pad_width_left = width + 1
        pad_width_right = width
    else:
        pad_width_left = pad_width_right = pad_width // 2
   
    image = tf.pad(
        image,
        paddings=[
                  [pad_height_top, pad_height_bottom],
                  [pad_width_left, pad_width_right],
                  [0, 0]
                ]
        )

    image = tf.transpose(image, perm=[1, 0, 2])
    image = tf.image.flip_left_right(image)
    return image

In [14]:
batch_size = 64
padding_token = 99
image_width = 128
image_height = 32

In [15]:
from deslant_img import deslant_img

In [16]:
prepro_img_train = []
prepro_label_train = [] 
for i in range (len(train_img_paths)):
    if(i%1000==0):
        print(i)
    path = train_img_paths[i]
    label = train_labels[i]
    
    img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
    res = deslant_img(img)
    img = res.img
    img = np.reshape(img,(img.shape[0],img.shape[1],1))
    img = distortion_free_resize(img, img_size=(image_width, image_height))
    img = tf.cast(img, tf.float32) / 255.
    prepro_img_train.append(img)
    
    label = char_to_num(tf.strings.unicode_split(label, input_encoding="UTF-8"))
    length = tf.shape(label)[0]
    pad_amount = max_len - length
    label = tf.pad(label, paddings=[[0, pad_amount]], constant_values=padding_token)
    prepro_label_train.append(label)

0
1000
2000
3000
4000
5000
6000
7000
8000
9000
10000
11000
12000
13000
14000
15000
16000
17000
18000
19000
20000
21000
22000
23000
24000
25000
26000
27000
28000
29000
30000
31000
32000
33000
34000
35000
36000
37000
38000
39000
40000
41000
42000
43000
44000
45000
46000
47000
48000
49000
50000
51000
52000
53000
54000
55000
56000
57000
58000
59000
60000
61000
62000
63000
64000
65000
66000
67000
68000
69000
70000
71000
72000
73000
74000
75000
76000
77000
78000
79000
80000
81000
82000
83000
84000
85000
86000


In [17]:
prepro_img_valid = []
prepro_label_valid = [] 
for i in range (len(validation_img_paths)):
    if(i%1000==0):
        print(i)
    path = validation_img_paths[i]
    label = validation_labels[i]
    
    img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
    res = deslant_img(img)
    img = res.img
    img = np.reshape(img,(img.shape[0],img.shape[1],1))
    img = distortion_free_resize(img, img_size=(image_width, image_height))
    img = tf.cast(img, tf.float32) / 255.
    prepro_img_valid.append(img)
    
    label = char_to_num(tf.strings.unicode_split(label, input_encoding="UTF-8"))
    length = tf.shape(label)[0]
    pad_amount = max_len - length
    label = tf.pad(label, paddings=[[0, pad_amount]], constant_values=padding_token)
    prepro_label_valid.append(label)

0
1000
2000
3000
4000


In [18]:
prepro_img_test = []
prepro_label_test= [] 
for i in range (len(test_img_paths)):
    if(i%1000==0):
        print(i)
    path = test_img_paths[i]
    label = test_labels[i]
    
    img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
    res = deslant_img(img)
    img = res.img
    img = np.reshape(img,(img.shape[0],img.shape[1],1))
    img = distortion_free_resize(img, img_size=(image_width, image_height))
    img = tf.cast(img, tf.float32) / 255.
    prepro_img_test.append(img)
    
    label = char_to_num(tf.strings.unicode_split(label, input_encoding="UTF-8"))
    length = tf.shape(label)[0]
    pad_amount = max_len - length
    label = tf.pad(label, paddings=[[0, pad_amount]], constant_values=padding_token)
    prepro_label_test.append(label)

0
1000
2000
3000
4000


In [19]:
print(prepro_img_test[0].shape)
print(prepro_label_test[0].shape)

(128, 32, 1)
(21,)


In [20]:
def get_image(image):
    img = image
    return img

def get_label(label):
    label_ = label
    return label_


def get_dataset(image_path, label):
    image = get_image(image_path)
    label = get_label(label)
    return {"xs": image, "ys": label}


def prepare_dataset(image_paths, labels):
    dataset = tf.data.Dataset.from_tensor_slices((image_paths, labels)).map(
        get_dataset, num_parallel_calls=AUTOTUNE
    )
    return dataset.batch(batch_size).cache().prefetch(AUTOTUNE)

In [21]:
train_tf = prepare_dataset(prepro_img_train, prepro_label_train)
validation_tf = prepare_dataset(prepro_img_valid, prepro_label_valid)
test_tf = prepare_dataset(prepro_img_test, prepro_label_test)

In [25]:
from keras.layers import Conv2D, MaxPooling2D, Dropout, Dense, Reshape,BatchNormalization, Activation, Input, Lambda
from tensorflow.keras import backend as K
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from keras.callbacks import ModelCheckpoint

In [26]:
class CTCLayer(keras.layers.Layer):
    def __init__(self, name=None):
        super().__init__(name=name)
        self.loss_fn = keras.backend.ctc_batch_cost

    def call(self, y_true, y_pred):
        batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
        input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
        label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

        input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
        label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")
        loss = self.loss_fn(y_true, y_pred, input_length, label_length)
        self.add_loss(loss)
        return y_pred

In [27]:
class Offline_Model(object):
    def __init__(self,preload):
        self.model = self.get_model()
        self.pred_model = self.get_premodel("softmax")
        self.compile()
        
        if preload:
            self.pretrained = "./model/offline/offline_without_children_CNN2_batch64_blstm.h5"
            print("preloading model weights from" + self.pretrained)
            self.load_weights(file_name=self.pretrained)
            
    def get_premodel(self, layer_name):
        pre_model = Model(inputs=self.model.get_layer("xs").output,
                         outputs=self.model.get_layer(layer_name).output)
       
        optimizer = Adam(learning_rate=0.001)
        pre_model.compile(loss={layer_name: lambda y_true, y_pred: y_pred}, optimizer=optimizer)
        return pre_model
    
    def get_model(self):
        input_shape = (image_width,image_height,1)
        inputs =  keras.Input(shape=input_shape, name="xs")
        labels =  keras.layers.Input(name="ys", shape=(None,))

        conv2d_1 = Conv2D(32, (3, 3), activation="relu", kernel_initializer="he_normal",padding="same",name="Conv1",)(inputs)
        batch_1 = BatchNormalization()(conv2d_1)
        relu_1 = keras.layers.Activation('relu')(batch_1)
        pool_1 = MaxPooling2D((2, 2), name="pool1")(relu_1)
    
        conv2d_2 =  Conv2D(64, (3, 3), activation="relu", kernel_initializer="he_normal", padding="same", name="Conv2",)(pool_1)
        batch_2 = BatchNormalization()(conv2d_2)
        relu_2 = keras.layers.Activation('relu')(batch_2)
        pool_2 = keras.layers.MaxPooling2D((2, 2), name="pool2")(relu_2)
    
        new_shape = ((image_width // 4), (image_height // 4) * 64)
        reshape = Reshape(target_shape=new_shape, name="reshape")(pool_2)
        dense =  Dense(64, activation="relu", name="dense1")(reshape)
        dropout =  Dropout(0.2)(dense)
        
        blstm_1 =  keras.layers.Bidirectional(keras.layers.LSTM(128, return_sequences=True, dropout=0.25))(dropout)
        blstm_2 =  keras.layers.Bidirectional(keras.layers.LSTM(64, return_sequences=True, dropout=0.25))(blstm_1)
        blstm_3 =  keras.layers.Bidirectional(keras.layers.LSTM(64, return_sequences=True, dropout=0.25))(blstm_2)
        
        dense_2 =  Dense(len(char_to_num.get_vocabulary()) + 2, name="dense2")(blstm_3)
        y_pred = Activation('softmax', name='softmax')(dense_2)
    
        output = CTCLayer(name="ctc_loss")(labels, y_pred)

        model = Model(inputs=[inputs, labels], outputs=output)
        return model
    
    def fit(self, train_seq, test_seq, epochs=100, earlystop=10):
        
        filepath="offline_without_children_CNN2_blstm.h5"
        early = tf.keras.callbacks.EarlyStopping(patience=earlystop)

        checkpoint = ModelCheckpoint(filepath=filepath, monitor='val_loss', verbose=1, save_best_only=True, mode='auto')
        self.history = self.model.fit(
            train_seq,
            validation_data=test_seq,
            shuffle=True,
            verbose=1,
            epochs=epochs,
            callbacks=[checkpoint, early]
        )
        
    def get_history(self):
        return self.history
    
    def compile(self):
        optimizer = Adam()
        self.model.compile(optimizer=optimizer)
        
    def save_weights(self, file_name=None):
        self.model.save_weights(file_name)

    def load_weights(self, file_name=None):
        self.model.load_weights(file_name)
        self.compile()
        
    def predict(self,eval_data):
        pred = self.model.predict(eval_data)
        return pred
    
    def get_model_summary(self):
        return self.model.summary()

In [29]:
offline_model = Offline_Model(preload=False)
offline_model.get_model_summary()

Model: "model_2"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 xs (InputLayer)                [(None, 128, 32, 1)  0           []                               
                                ]                                                                 
                                                                                                  
 Conv1 (Conv2D)                 (None, 128, 32, 32)  320         ['xs[0][0]']                     
                                                                                                  
 batch_normalization_2 (BatchNo  (None, 128, 32, 32)  128        ['Conv1[0][0]']                  
 rmalization)                                                                                     
                                                                                            

In [30]:
history = offline_model.fit(train_tf, validation_tf, epochs=50,earlystop=25)

Epoch 1/50
Epoch 1: val_loss improved from inf to 10.34103, saving model to offline_without_children_CNN2_blstm.h5
Epoch 2/50
Epoch 2: val_loss improved from 10.34103 to 8.05530, saving model to offline_without_children_CNN2_blstm.h5
Epoch 3/50
Epoch 3: val_loss improved from 8.05530 to 6.36568, saving model to offline_without_children_CNN2_blstm.h5
Epoch 4/50
Epoch 4: val_loss improved from 6.36568 to 4.89658, saving model to offline_without_children_CNN2_blstm.h5
Epoch 5/50
Epoch 5: val_loss improved from 4.89658 to 3.87021, saving model to offline_without_children_CNN2_blstm.h5
Epoch 6/50
Epoch 6: val_loss improved from 3.87021 to 3.40487, saving model to offline_without_children_CNN2_blstm.h5
Epoch 7/50
Epoch 7: val_loss improved from 3.40487 to 3.06186, saving model to offline_without_children_CNN2_blstm.h5
Epoch 8/50
Epoch 8: val_loss improved from 3.06186 to 2.89447, saving model to offline_without_children_CNN2_blstm.h5
Epoch 9/50
Epoch 9: val_loss improved from 2.89447 to 2.74

KeyboardInterrupt: 

In [31]:
offline_model.save_weights('offline_without_children_CNN2_batch64_blstm.h5')

In [34]:
def decode_batch_predictions(pred=None, top_n=1):
    pred = pred
    top_n = top_n
    input_len = np.ones(pred.shape[0]) * pred.shape[1]
   # Use greedy search. For complex tasks, you can use beam search.
   
    if(top_n>1):
        results_beam = []
        for i in range(top_n):
            results = keras.backend.ctc_decode(pred, input_length=input_len, greedy=False,beam_width=25,top_paths=5)[0][i][
                :, :max_len
            ]
  
            output_text = []
            for res in results:
                res = tf.gather(res, tf.where(tf.math.not_equal(res, -1)))
                res = tf.strings.reduce_join(num_to_char(res)).numpy().decode("utf-8")
                output_text.append(res)
            results_beam.append(output_text)
        return results_beam
    
    elif(top_n==1):
        results_beam = keras.backend.ctc_decode(pred, input_length=input_len, greedy=False,beam_width=25,top_paths=1)[0][0][
              :, :max_len]

        output_beam = []
        for res in results_beam:
            res = tf.gather(res, tf.where(tf.math.not_equal(res, -1)))
            res = tf.strings.reduce_join(num_to_char(res)).numpy().decode("utf-8")
            output_beam.append(res)
            
        results_greedy = keras.backend.ctc_decode(pred, input_length=input_len, greedy=True)[0][0][
              :, :max_len]
   
        output_greedy = []
        for res in results_greedy:
            res = tf.gather(res, tf.where(tf.math.not_equal(res, -1)))
            res = tf.strings.reduce_join(num_to_char(res)).numpy().decode("utf-8")
            output_greedy.append(res)
            
        return output_beam, output_greedy

In [36]:
preds = offline_model.predict(test_tf)
beam = preds[0]
beam = np.reshape(beam,(1,beam.shape[0],beam.shape[1]))
beam_pred = decode_batch_predictions(pred=beam,top_n=5)
beam_pred[:3][0][0]

'sounded'

In [37]:
pred_beam, pred_greedy = decode_batch_predictions(preds, top_n=1)

In [38]:
import Levenshtein  as lv
total_jaro = 0
for i in range(len(pred_beam)):
    total_jaro+=lv.jaro(pred_beam[i], test_labels[i])
    
cer = (1-total_jaro/len(pred_beam))*100
print('Jaro Winkler Offline Model Beam Search CER:', round(cer,2))

count = 0
for i in range(len(pred_beam)):
    if(pred_beam[i]==test_labels[i]):
        count +=1
        
wer = (1-count/len(test_labels))*100
print('Jaro Winkler Offline Model Beam Search WER:', round(wer,2))

Jaro Winkler Offline Model Beam Search CER: 12.78
Jaro Winkler Offline Model Beam Search WER: 40.75


In [39]:
total_jaro = 0
for i in range(len(pred_greedy)):
    total_jaro+=lv.jaro(pred_greedy[i], test_labels[i])
    
cer = (1-total_jaro/len(pred_greedy))*100
print('Jaro Winkler Offline Model Greedy Search CER:', round(cer,2))

count = 0
for i in range(len(pred_greedy)):
    if(pred_greedy[i]==test_labels[i]):
        count +=1
        
wer = (1-count/len(test_labels))*100
print('Jaro Winkler Offline Model Greedy Search WER:', round(wer,2))

Jaro Winkler Offline Model Greedy Search CER: 12.5
Jaro Winkler Offline Model Greedy Search WER: 38.22
