In [1]:
import os
import json
import numpy as np
import math
from skimage.measure import block_reduce
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

def read_train_data():
    # Load the data dictionary with all the arrays
    with open(os.path.join(data_dir, data_filename)) as f:
        data_dict = json.load(f)

    # Load the text labels dictionary
    with open(os.path.join(data_dir, labels_filename)) as f:
        labels_dict = json.load(f)

    # Find the maximum size for any array and any text label
    # so that we can create fixed sized numpy arrays
    max_data_size = 0
    max_text_size = 0
    num_rows = 0
    for filename, val in data_dict.items():
        num_rows += 1

        data_size = len(data_dict[filename])
        if data_size > max_data_size:
            max_data_size = data_size

        text_size = len(labels_dict[filename])
        if text_size > max_text_size:
            max_text_size = text_size

    # We will reduce the size of our arrays
    # by using the block_reduce function
    # and averaging array values in intervals of FILTER_SIZE
    new_size = math.ceil(max_data_size / FILTER_SIZE)
    # Now we have our fixed size array for our down-sampled data
    data = np.zeros((num_rows, new_size))

    texts = []
    i = 0
    for filename, arr in data_dict.items():

        #turning all values positive before taking the mean (change by ankur)
        arr = np.absolute(arr)

        new_arr = block_reduce(np.array(arr), block_size=(FILTER_SIZE,), func=np.mean)
        new_arr_size = len(new_arr)
        # The array is probably smaller than the maximum allowed length
        # So let's set the boundary for that
        data[i, :new_arr_size] = new_arr

        text = labels_dict[filename]
        # We are padding text labels with empty strings if they are short
        texts.append(text.ljust(max_text_size).lower())
        i += 1

    text_data = np.array(texts)
    return data, text_data

In [None]:
from google.colab import drive

drive.mount('/content/gdrive')
%cd "gdrive/Shared drives/deep learning"
data_dir = "data"

In [None]:
## ----- READING DOWNSAMPLED DATA ----- ##

data_filename = os.path.join(data_dir, "training_data_labels.json")
with open(data_filename) as f:
  data_dict = json.load(f)
  data = np.array(data_dict["training_data"])
  data = data.reshape(np.append(data.shape, 1))
  
  text_data = np.array(data_dict["training_labels"])

print("Data array shape: ", data.shape, "\nFirst 2 rows:")
print(data[:2], end='\n\n')

print("Text labels shape: ", text_data.shape, "\nFirst 2 rows:")
print(text_data[:2], end='\n\n')

In [None]:
alphabet = 'abcdefghijklmnopqrstuvwxyz0123456789,.?;:-_()=+@$!&/\'\" ' 
alphabet_size = len(alphabet)

# Define a mapping between characters and indices
char_to_int = dict((c, i) for i, c in enumerate(alphabet))
int_to_char = dict((i, c) for i, c in enumerate(alphabet))

max_text_length = 0
for d in text_data:
  l = len(d)
  if l > max_text_length:
    max_text_length = l

num_rows = len(text_data)
labels = np.empty((num_rows, max_text_length, alphabet_size))

for i, d in enumerate(text_data):
  # Target text data -> integer encodings
  integer_encodings = [char_to_int[char] for char in d]

  # -> one-hot encodings
  one_hot = tf.one_hot(indices=integer_encodings, depth=alphabet_size)
  labels[i] = one_hot.numpy()

In [None]:
img_width = 6613
class CTCLayer(layers.Layer):
    def __init__(self, name=None):
        super().__init__(name=name)
        self.loss_fn = keras.backend.ctc_batch_cost

    def call(self, y_true, y_pred):
        # Compute the training-time loss value and add it
        # to the layer using `self.add_loss()`.
        batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
        print(batch_len)
        input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
        print(input_length)
        label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

        input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
        label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")
        loss = self.loss_fn(y_true, y_pred, input_length, label_length)
        self.add_loss(loss)

        # At test time, just return the computed predictions
        return y_pred

def create_model():

        labels_new = layers.Input(name="label", shape=(None,), dtype="float32")
        input_img = layers.Input(shape=(img_width,1), name="image", dtype="float32")


        x = layers.Conv1D(filters=64, kernel_size=7, activation="relu", kernel_initializer="he_normal",padding="same")(input_img)

        x = layers.Conv1D(filters=128, kernel_size=7, activation="relu", padding='same')(x)
        x = layers.BatchNormalization(trainable=True)(x)

        x = layers.Conv1D(filters=256, kernel_size=9,activation="relu", padding='same')(x)
        x = layers.BatchNormalization(trainable=True)(x)

        x = layers.Conv1D(filters=128, kernel_size=7,activation="relu", padding='same')(x)
        x = layers.BatchNormalization(trainable=True) (x)



        x = layers.Bidirectional(layers.LSTM(units=128, return_sequences=True, dropout=0.25), name='bi_lstm1')(x)
        x = layers.Bidirectional(layers.LSTM(units=64, return_sequences=True, dropout=0.25), name='bi_lstm2') (x)
        x = layers.Dense(units=alphabet_size+1, activation = "softmax") (x)

        output = CTCLayer(name="ctc_loss")(labels_new, x)

    # Define the model
        model = keras.models.Model(inputs=[input_img, labels_new], outputs=output, name="ocr_model_v1")
    # Optimizer
        opt = keras.optimizers.Adam()
    # Compile the model and return
        model.compile(optimizer=opt)



        # # loss_out = Lambda(ctc_lambda_func, output_shape=(1,), name='ctc')([x, labels, input_length, label_length])
        
        # model.compile(optimizer='adam', loss=tf.keras.losses.MeanSquaredError(), metrics=['accuracy'])
        # # model.compile(optimizer="sgd", loss="mse", metrics=[metric])

        # print(model.summary())
        return model 

In [None]:
model = create_model()
model.summary()

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.model_selection import RepeatedKFold
# train_data, test_data, train_lbl, test_lbl = train_test_split(data, labels, test_size=0.2, random_state=42)
random_state = 42
rkf = RepeatedKFold(n_splits=5, n_repeats=3, random_state=random_state)
for train, test in rkf.split(data):
    print("%s %s" % (train, test))

x_train, x_test, y_train, y_test = data[train], data[test], labels[train], labels[test]

In [None]:
batch_size = 64
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))

validation_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))


In [None]:
epochs = 5
early_stopping_patience = 5
# Add early stopping
early_stopping = keras.callbacks.EarlyStopping(
    monitor="val_loss", patience=early_stopping_patience, restore_best_weights=True
)

# Train the model
history = model.fit(
    train_dataset,
    validation_data=validation_dataset,
    epochs=epochs,
    callbacks=[early_stopping])