In [1]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import Bidirectional, LSTM, Dense, Conv2D, MaxPooling2D, Dropout, Lambda
from tensorflow.keras.layers import Input, Activation, BatchNormalization
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.models import load_model, Model
import tensorflow.keras.backend as K
from Levenshtein import distance as levenshtein_distance
from configs import Configs 
from data_processing import data_preparator, create_dataset
%run "tester_functions.ipynb"

In [2]:
# all for GPU dynamic VRAM allocation 
K.clear_session()
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(e)


In [3]:
def character_error_rate(y_true, y_pred):
    # Assuming y_true and y_pred are already in index form, not one-hot encoded
    y_true = K.get_value(y_true)
    y_pred = K.get_value(y_pred)
    
    cer = []
    for true, pred in zip(y_true, y_pred):
        # Filter out the blank labels (typically 0 for CTC)
        true_str = ''.join([chr(char) for char in true if char != 0])
        pred_str = ''.join([chr(char) for char in pred if char != 0])
        
        # Calculate CER using Levenshtein distance
        edit_distance = levenshtein_distance(true_str, pred_str)
        cer.append(edit_distance / len(true_str) if len(true_str) > 0 else 0)

    return np.mean(cer)

def word_error_rate(y_true, y_pred):
    # Assuming y_true and y_pred are already in index form, not one-hot encoded
    y_true = K.get_value(y_pred)
    y_pred = K.get_value(y_pred)
    
    wer = []
    for true, pred in zip(y_true, y_pred):
        # Decode the predictions and ground truths to strings
        true_str = ''.join([chr(char) for char in true if char != 0])
        pred_str = ''.join([chr(char) for char in pred if char != 0])
        
        # Split into words
        true_words = true_str.split()
        pred_words = pred_str.split()
        
        # Calculate WER using Levenshtein distance
        edit_distance = levenshtein_distance(true_words, pred_words)
        wer.append(edit_distance / len(true_words) if len(true_words) > 0 else 0)

    return np.mean(wer)

In [4]:
# CTC loss function
def ctc_loss_lambda_func(y_true, y_pred):
    input_length = K.ones_like(y_pred[:, 0, 0]) * (K.int_shape(y_pred)[1])
    label_length = K.sum(K.cast(K.not_equal(y_true, -1), 'int32'), axis=-1)
    return K.ctc_batch_cost(y_true, y_pred, input_length, label_length)

In [5]:
def f_map_to_seq(f_map):
    # Get dynamic shape
    shape = tf.shape(f_map)  # Use dynamic shape to handle None dimensions
    batch_size, height, width, channels = shape[0], shape[1], shape[2], shape[3]
    
    # Reshape into (batch_size, timesteps, features)
    sequence = tf.reshape(f_map, (batch_size, width, height * channels))
    
    return sequence


In [6]:
def build_CRNN_model(input_shape, num_classes):
    inputs = Input(shape=input_shape)
    
    # CNN layers
    f_maps = Conv2D(64, (3, 3), padding='same', name='conv1', kernel_initializer='he_normal')(inputs)
    f_maps = BatchNormalization()(f_maps)
    f_maps = Activation('relu')(f_maps)
    f_maps = MaxPooling2D(pool_size=(1, 2), name='max1')(f_maps) # maintain vertical information
    
    f_maps = Conv2D(128, (3, 3), padding='same', name='conv2', kernel_initializer='he_normal')(f_maps)
    f_maps = BatchNormalization()(f_maps)
    f_maps = Activation('relu')(f_maps)
    f_maps = MaxPooling2D(pool_size=(1, 2), name='max2')(f_maps)
    
    f_maps = Conv2D(256, (3, 3), padding='same', name='conv3', kernel_initializer='he_normal')(f_maps)
    f_maps = BatchNormalization()(f_maps)
    f_maps = Activation('relu')(f_maps)
    f_maps = MaxPooling2D(pool_size=(1, 2), name='max3')(f_maps)
    
    f_maps = Conv2D(512, (3, 3), padding='same', name='conv4', kernel_initializer='he_normal')(f_maps)
    f_maps = BatchNormalization()(f_maps)
    f_maps = Activation('relu')(f_maps)
    f_maps = MaxPooling2D(pool_size=(1, 2), name='max4')(f_maps)

    # Dropout
    f_maps = Dropout(0.3)(f_maps)

    # CNN to RNN transition: convert the feature maps into sequences
    sequence = Lambda(f_map_to_seq)(f_maps)

    # RNN layers (Bidirectional LSTMs)
    sequence = Bidirectional(LSTM(256, return_sequences=True, kernel_initializer='glorot_uniform'))(sequence)
    sequence = Dropout(0.3)(sequence)
    sequence = Bidirectional(LSTM(256, return_sequences=True, kernel_initializer='glorot_uniform'))(sequence)

    # Dense layer with softmax activation for classification
    outputs = Dense(num_classes, activation='softmax')(sequence)

    # Create and return model
    model = Model(inputs=inputs, outputs=outputs)
    return model


In [7]:
c = Configs()
data_size = c.data_size
# retrive precessed data that can be used for training 
X, Y = data_preparator(c.image_paths, c.label_path, image_target_height = c.image_height, data_size = data_size, augmentation_probability = c.augmentation_probability )

train_split = int(0.85 * c.batch_size)
X_train_split = X[:train_split]
Y_train_split = Y[:train_split]
# Cross validation sets
CV_test_split = int(0.075 * c.batch_size)
X_cv_split = X[train_split: train_split + CV_test_split]
Y_cv_split = Y[train_split: train_split + CV_test_split]
# testing sets
X_test_split = X[train_split + CV_test_split:]
Y_test_split = Y[train_split + CV_test_split:]

ResourceExhaustedError: Exception encountered when calling layer "random_contrast" "                 f"(type RandomContrast).

{{function_node __wrapped__Maximum_device_/job:localhost/replica:0/task:0/device:GPU:0}} failed to allocate memory [Op:Maximum]

Call arguments received by layer "random_contrast" "                 f"(type RandomContrast):
  • inputs=tf.Tensor(shape=(1550, 2056, 1), dtype=uint8)
  • training=True

In [8]:
# Cell for creating tensorflow datasets to allow for variable images nad ground truth labels
batch_size = c.batch_size #how many training examples should be in one batch
train_dataset = create_dataset(X_train_split, Y_train_split, batch_size)
# shuffle training dataset for as more random data during training will probs help...
buffer_size = 3000
train_dataset = train_dataset.shuffle(buffer_size=buffer_size)
# create crossvalidation set
cv_dataset = create_dataset(X_cv_split, Y_cv_split, batch_size)
# create test set
test_dataset = create_dataset(X_test_split, Y_test_split, batch_size)

In [9]:
# load in model and get it ready for training
model = build_CRNN_model((c.image_height, None, 1), c.num_classes)
learn_rate = c.learning_rate
# define the model optimizer, loss function and metrics we want to track
model.compile(optimizer=Adam(learning_rate=learn_rate),
              loss=ctc_loss_lambda_func,
              metrics=['accuracy' , character_error_rate, word_error_rate])

# Callbacks for selecting the best model and early stopping if more training does nothing 
checkpoint = ModelCheckpoint('OCR model', monitor='val_loss', save_best_only=True, verbose=1)
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True, verbose=1)


In [None]:
# number of epochs for training 
epochs = c.epoch_num 

history = model.fit(
    train_dataset,
    epochs=epochs,
    validation_data=cv_dataset,
    callbacks=[checkpoint, early_stopping],
    verbose=1
)

# Evaluate the model on the test set
test_loss, test_accuracy = model.evaluate(test_dataset)
print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_accuracy}")

In [None]:
# save the model to be able to import later
model.save('OCR model')