In [None]:
import sys

import numpy as np
import tensorflow as tf
import tensorflow.data as tfd
import pandas as pd
import json
import os
import tensorflow as tf
import tensorflow.data as tfd
from tensorflow.keras.layers import *
from tensorflow.keras.models import Model, load_model 
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

import os
import numpy as np
import pandas as pd
from tqdm import tqdm
from glob import glob
import matplotlib.pyplot as plt
import matplotlib.image as implt
from IPython.display import clear_output as cls

In [None]:
IMG_WIDTH = 200
IMG_HEIGHT = 50
IMG_SIZE = (IMG_WIDTH, IMG_HEIGHT)
BATCH_SIZE = 8
EPOCHS = 150
LEARNING_RATE = 1e-3
MODEL_NAME = "CharacterRecognition-Model"
TRAIN_SIZE = BATCH_SIZE * 3000
VALID_SIZE = BATCH_SIZE * 1500
TEST_SIZE  = BATCH_SIZE * 300
AUTOTUNE = tfd.AUTOTUNE

# Paths 
img_path = '../dataset/img'
transcriptions = '../dataset/transcriptions.json'

# SetUp random seeds for numpy and TensorFlow
np.random.seed(2569)
tf.random.set_seed(2569)

In [None]:
# Load the json files and transform into dataframes

def json_to_csv(json_path):
    # Load the JSON file
    with open(json_path, 'r') as f:
        data = json.load(f)
    
    # Convert the data to a pandas DataFrame
    return pd.DataFrame(data)
 
all_transcriptions = json_to_csv(transcriptions)   

# Convert the JSON files to CSV


In [None]:
all_transcriptions.head()

In [None]:

def add_file_path(df):
    df['file_name'] = df['img'].apply(lambda x: "../dataset/img/" + x)
    return df

def drop_unnecessary_columns(df):
    return df.drop(columns=['decade_id', 'nameset'])

def transform_pipeline(df):
    return add_file_path(drop_unnecessary_columns(df))

all_transcriptions = transform_pipeline(all_transcriptions)

def check_img_exists(df):
    #drop imgs that do not exist
    return df[df['file_name'].apply(lambda x: os.path.exists(x))]

all_transcriptions = check_img_exists(all_transcriptions)

length = len(all_transcriptions)
train_csv = all_transcriptions[:int(length * 0.8)]
test_csv = all_transcriptions[int(length * 0.8):]

length = len(train_csv)
valid_csv = train_csv[int(length * 0.8):]
train_csv = train_csv[:int(length*0.8)]

print(len(train_csv['img']))
print(len(valid_csv['img']))
print(len(test_csv['img']))

In [None]:
# get the train labels 
train_labels = [str(word) for word in train_csv["text"].to_numpy()]
val_labels = [str(word) for word in valid_csv["text"].to_numpy()]
test_labels = [str(word) for word in test_csv["text"].to_numpy()]

# extract all the unique characters
unique_characters_train = set(char for word in train_labels for char in word)
unique_characters_test = set(char for word in test_labels for char in word)
unique_characters_val = set(char for word in val_labels for char in word)

# get the unique characters
unique_characters = unique_characters_train.union(unique_characters_test).union(unique_characters_val)

# define the number of classes (for labels) based on the number of unique characters
n_classes = len(unique_characters)

MAX_LABEL_LENGTH = max(map(len, train_labels))

In [None]:
n_classes

In [None]:
print(f"Number of unique characters: {n_classes}")
temp = ['a', 'b', '\u00e8', '\u00f2', '\u00e9', '\u2013']

for char in temp:
    if char not in unique_characters:
        print(f"Character {char} not found in the unique characters")

In [None]:
MAX_LABEL_LENGTH

In [None]:
test_csv.head()

In [None]:
from keras.src.layers import StringLookup

# Char to Num
char_to_num = StringLookup(vocabulary=list(unique_characters), mask_token=None)
num_to_char = StringLookup(vocabulary = char_to_num.get_vocabulary(), mask_token = None, invert = True)

In [None]:
def load_image(image_path):
    """
    This function gets the image path and 
    reads the image using TensorFlow, Then the image will be decoded and 
    will be converted to float data type. next resize and transpose will be applied to it.
    In the final step the image will be converted to a Numpy Array using tf.cast
    """
    # read the image
    image = tf.io.read_file(image_path)
    # decode the image
    decoded_image = tf.image.decode_jpeg(contents=image, channels=1)
    # convert image data type to float32
    convert_imgs = tf.image.convert_image_dtype(image=decoded_image, dtype=tf.float32)
    # resize and transpose 
    resized_image = tf.image.resize(images=convert_imgs, size=(IMG_HEIGHT, IMG_WIDTH))
    image = tf.transpose(resized_image, perm = [1, 0, 2])

    # to numpy array (Tensor)
    image_array = tf.cast(image, dtype=tf.float32)

    return image_array

In [None]:
train_images = train_csv['file_name']
train_labels = train_csv['text']

In [None]:
def encode_single_sample(image_path, label:str):
    if image_path is None:
        print("Found None in image_path")
        raise ValueError("Image path is None")
    if label is None:
        print("Found None in label")
        raise ValueError("Label is None")
    
    # Get the image
    image = load_image(image_path)
    # Convert the label into characters
    chars = tf.strings.unicode_split(label, input_encoding='UTF-8')
    # Convert the characters into vectors
    vecs = char_to_num(chars)
    
    for vec in vecs:
        if vec is None:
            print("Found None in vec")
            raise ValueError("Found None in vec after char_to_num")
        if vec == char_to_num('[UNK]'):
            print(f"Found OOV token in vec: {vec}")
            image_path_string = tf.py_function(func=lambda x: x.numpy().decode('utf-8'), inp=[image_path], Tout=tf.string)
            print(tf.print(image_path, output_stream=sys.stderr))
            raise ValueError(f"Found OOV token and label")
            # Optionally handle OOV tokens here

    # Pad label
    pad_size = MAX_LABEL_LENGTH - tf.shape(vecs)[0]
    vecs = tf.pad(vecs, paddings = [[0, pad_size]], constant_values=n_classes+1)

    return {'image':image, 'label':vecs}

In [None]:
# Training Data
train_ds = tf.data.Dataset.from_tensor_slices(
    (np.array(train_csv['file_name'].to_list()), np.array(train_csv['text'].to_list()))
).shuffle(1000).map(encode_single_sample, num_parallel_calls=AUTOTUNE).batch(BATCH_SIZE).prefetch(AUTOTUNE)

# Validation data
valid_ds = tf.data.Dataset.from_tensor_slices(
    (np.array(valid_csv['file_name'].to_list()), np.array(valid_csv['text'].to_list()))
).map(encode_single_sample, num_parallel_calls=AUTOTUNE).batch(BATCH_SIZE).prefetch(AUTOTUNE)

# Testing data.
test_ds = tf.data.Dataset.from_tensor_slices(
    (np.array(test_csv['file_name'].to_list()), np.array(test_csv['text'].to_list()))
).map(encode_single_sample, num_parallel_calls=AUTOTUNE).batch(BATCH_SIZE).prefetch(AUTOTUNE)

In [None]:
print(f"Training Data Size   : {tf.data.Dataset.cardinality(train_ds).numpy() * BATCH_SIZE}")
print(f"Validation Data Size : {tf.data.Dataset.cardinality(valid_ds).numpy() * BATCH_SIZE}")

In [None]:

unknown_token = char_to_num([tf.constant("[UNK]")])

print("unknown", unknown_token)

for v in iter(test_ds):
    
    for c in iter(v['label']):
        if c is None:
            print("Found None in label")
            raise ValueError("Found None in label")
        for t in c.numpy():
            if t == 0:
                print("Found 0")
            # if int(t) == n_classes+1:
            #     print("Found OOV token in label")
                # raise ValueError("Found OOV token in label")

In [None]:
# check the data distribution 
print(f"Training Data Size   : {tf.data.Dataset.cardinality(train_ds).numpy() * BATCH_SIZE}")
print(f"Validation Data Size : {tf.data.Dataset.cardinality(valid_ds).numpy() * BATCH_SIZE}")
print(f"Testing Data Size    : {tf.data.Dataset.cardinality(test_ds).numpy() * BATCH_SIZE}")



In [None]:
import matplotlib.pyplot as plt
from IPython.display import clear_output as cls


def show_images(data, GRID=[4,4], FIGSIZE=(25, 8), cmap='binary_r', model=None, decode_pred=None):
    
    # Plotting configurations
    plt.figure(figsize=FIGSIZE)
    n_rows, n_cols = GRID
    
    # Loading Data 
    data = next(iter(data))
    images, labels = data['image'], data['label']
    
    # Iterate over the data 
    for index, (image, label) in enumerate(zip(images, labels)):
        
        # Label processing
        text_label = num_to_char(label)
        text_label = tf.strings.reduce_join(text_label).numpy().decode('UTF-8')
        text_label = text_label.replace("[UNK]", " ").strip()
        
        # Create a sub plot
        plt.subplot(n_rows, n_cols, index+1)
        plt.imshow(tf.transpose(image, perm=[1,0,2]), cmap=cmap)
        plt.axis('off')
        
        if model is not None and decode_pred is not None:
            # Make prediction
            pred = model.predict(tf.expand_dims(image, axis=0))
            pred = decode_pred(pred)[0]
            title = f"True : {text_label}\nPred : {pred}"
            plt.title(title)
        else:
            # add title
            plt.title(text_label)

    # Show the final plot
    cls()
    plt.show()

In [None]:
show_images(data=train_ds, GRID=[4,4], FIGSIZE=(25, 8))

In [None]:
distinct_elements = test_csv['img'].nunique()
distinct_elements

In [None]:
class CTCLayer(Layer):
    def __init__(self, **kwargs) -> None:
        super().__init__(**kwargs)

        # define the loss function 
        self.loss_function = tf.keras.backend.ctc_batch_cost

      

    def call(self, y_true, y_hat):
        # Get the batch length 
        batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")

        # get the input and label lengths
        input_len = tf.cast(tf.shape(y_hat)[1], dtype='int64') * tf.ones(shape=(batch_len, 1), dtype='int64')
        label_len = tf.cast(tf.shape(y_true)[1], dtype='int64') * tf.ones(shape=(batch_len, 1), dtype='int64')

        # calculate the loss
        loss = self.loss_function(y_true, y_hat, input_len, label_len) 

        self.add_loss(loss)

        return y_hat
# from tensorflow.keras import layers, models
# import keras
# model = models.Sequential([
#     # First convolutional layer
#     layers.Conv2D(16, (3, 3), activation='relu', input_shape=(200, 50, 1)),
#     layers.MaxPooling2D((2, 2)),
#     
#     # Second convolutional layer
#     layers.Conv2D(32, (3, 3), activation='relu'),
#     layers.MaxPooling2D((2, 2)),
#     
#     # Third convolutional layer
#     layers.Conv2D(32, (3, 3), activation='relu'),
#     
#     # Fourth convolutional layer
#     layers.Conv2D(32, (3, 3), activation='relu'),
#     
#     # Flattening the 3D output to 1D before feeding it into the dense layer
#     layers.Flatten(),
#     
#     # Dense layers for classification
#     layers.Dense(64, activation='relu'),
#     layers.Dense(10, activation='softmax')  # For 10 classes
# ])
# 
# 
# model.compile(optimizer='adam', 
#               loss='sparse_categorical_crossentropy', 
#               metrics=['accuracy'])
# 
# BATCH_SIZE = 32
# 
# history = model.fit(
#     train_ds,
#     epochs=10,
#     batch_size=BATCH_SIZE,
#     validation_data=valid_ds)

In [None]:

# Input Layer
input_images = Input(shape=(IMG_WIDTH, IMG_HEIGHT, 1), name="image")

# Labels : These are added for the training purpose.
input_labels = Input(shape=(None, ), name="label")

### Convolutional layers
# layer 1 
conv_1 = Conv2D(64, 3, strides=1, padding="same", kernel_initializer="he_normal", activation="relu", name="conv_1")(input_images)
# layer 2
conv_2 = Conv2D(32, 3, strides=1, padding="same", kernel_initializer="he_normal", activation="relu", name="conv_2")(conv_1)
max_pool_1 = MaxPool2D(pool_size=(2, 2), strides=(2, 2))(conv_2)
# layer 3
conv_3 = Conv2D(64, 3, strides=1, padding='same', activation='relu', kernel_initializer='he_normal', name="conv_3")(max_pool_1)
conv_4 = Conv2D(32, 3, strides=1, padding='same', activation='relu', kernel_initializer='he_normal', name="conv_4")(conv_3)
max_pool_2 = MaxPool2D(pool_size=(2, 2), strides=(2, 2))(conv_4)



### Encoding 
reshape = Reshape(target_shape=((IMG_WIDTH//4), (IMG_HEIGHT//4)*32), name="reshape_layer")(max_pool_2)
dense_encoding = Dense(64, kernel_initializer="he_normal", activation="relu", name="enconding_dense")(reshape)
dense_encoding_2 = Dense(64, kernel_initializer="he_normal", activation="relu", name="enconding_dense_2")(dense_encoding)
dropout = Dropout(0.4)(dense_encoding_2)

# Decoder
lstm_1 = Bidirectional(LSTM(128, return_sequences=True, dropout=0.25), name="bidirectional_lstm_1")(dropout)
lstm_2 = Bidirectional(LSTM(64, return_sequences=True, dropout=0.25), name="bidirectional_lstm_2")(lstm_1)

# Final Output layer
output = Dense(len(char_to_num.get_vocabulary())+1, activation="softmax", name="output_dense")(lstm_2)

# Add the CTC loss 
# ctc_loss_layer = CTCLayer()(input_labels, output) 
# Define the final model
model = Model(inputs=[input_images, input_labels], outputs=[output])

In [None]:
# draw the model plot
# tf.keras.utils.plot_model(
#     model,
#     to_file='model-graph.png'
# )

model.summary()

In [None]:
model.compile(optimizer="adam", loss=tf.keras.losses.SparseCategoricalCrossentropy(), metrics=["accuracy"])


In [None]:
# Check if there are None values in your data
for data in train_ds:
    if None in data:
        print("None values found in train_ds")
        break

for data in valid_ds:
    if None in data:
        print("None values found in valid_ds")
        break

In [None]:
def check_none_values(dataset):
    for batch in dataset:
        for data in batch:
            if data is None:
                return True
    return False

# Check train_ds
if check_none_values(train_ds):
    print("None values found in train_ds")

# Check valid_ds
if check_none_values(valid_ds):
    print("None values found in valid_ds")

# If no None values are found, proceed with fitting the model
if not check_none_values(train_ds) and not check_none_values(valid_ds):
    history = model.fit(train_ds, validation_data=valid_ds, epochs=EPOCHS)
else:
    print("Datasets contain None values. Please fix the data before training the model.")

# def filter_none_values(dataset):
#     filtered_dataset = []
#     for batch in dataset:
#         if None not in batch:
#             filtered_dataset.append(batch)
#     return filtered_dataset
# 
# # Filter train_ds and valid_ds
# train_ds = filter_none_values(train_ds)
# valid_ds = filter_none_values(valid_ds)
# 
# # # Proceed with fitting the model
# history = model.fit(train_ds, validation_data=valid_ds, epochs=EPOCHS, batch_size=BATCH_SIZE)



In [None]:
# check the data distribution 
print(f"Training Data Size   : {tf.data.Dataset.cardinality(train_ds).numpy() * BATCH_SIZE}")
print(f"Validation Data Size : {tf.data.Dataset.cardinality(valid_ds).numpy() * BATCH_SIZE}")
print(f"Testing Data Size    : {tf.data.Dataset.cardinality(test_ds).numpy() * BATCH_SIZE}")