#### Imports

In [1]:
!pip install mltu

Collecting mltu
  Downloading mltu-1.2.5-py3-none-any.whl.metadata (3.4 kB)
Collecting qqdm==0.0.7 (from mltu)
  Downloading qqdm-0.0.7.tar.gz (5.3 kB)
  Preparing metadata (setup.py) ... [?25ldone
Collecting onnxruntime>=1.15.0 (from mltu)
  Downloading onnxruntime-1.20.1-cp310-cp310-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (4.5 kB)
Collecting addict (from qqdm==0.0.7->mltu)
  Downloading addict-2.4.0-py3-none-any.whl.metadata (1.0 kB)
Collecting jupyter (from qqdm==0.0.7->mltu)
  Downloading jupyter-1.1.1-py2.py3-none-any.whl.metadata (2.0 kB)
Collecting coloredlogs (from onnxruntime>=1.15.0->mltu)
  Downloading coloredlogs-15.0.1-py2.py3-none-any.whl.metadata (12 kB)
Collecting humanfriendly>=9.1 (from coloredlogs->onnxruntime>=1.15.0->mltu)
  Downloading humanfriendly-10.0-py2.py3-none-any.whl.metadata (9.2 kB)
Collecting jupyter-lsp>=2.0.0 (from jupyterlab->jupyter->qqdm==0.0.7->mltu)
  Downloading jupyter_lsp-2.2.5-py3-none-any.whl.metadata (1.8 kB)
Downloading m

In [2]:
# from datasets import load_dataset
from zipfile import ZipFile
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.layers import Input, Dense, Flatten, Bidirectional, LSTM, Reshape
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Lambda
import random
from PIL import Image
import os

#### Buat Unzip nya ilang ga tau napa

#### Directories

In [None]:
## JANGAN LUPA GANTI DIREKTORI SESUAI ENTAR YANG DI KAGGLE

ROOT_DIR = os.getcwd()
DATASET_DIR = '/kaggle/input/ocr-test'
TRAIN_DIR  =os.path.join(DATASET_DIR, "train", "train")
VAL_DIR = os.path.join(DATASET_DIR, "test", "test")
TRAIN_METADATA_PATH = os.path.join(TRAIN_DIR, "metadata.jsonl")
VAL_METADATA_PATH = os.path.join(VAL_DIR, "metadata.jsonl")
TRAINED_MODEL_PATH = os.path.join(".\\trained_models")

In [4]:
print(len(os.listdir(TRAIN_DIR)))
print(len(os.listdir(VAL_DIR)))

33627
18705


#### Params

In [5]:
BATCH_SIZE = 64
IMAGE_SIZE = (32, 128)
BUFFER_SIZE = 1000
LEARNING_RATE = 1e-4
WORKERS = 10

In [6]:
import json

def load_json_data(file_path):
    image_data = []
    text_data = []
    with open(file_path, 'r') as file:
        for line in file:
            try:
                data = json.loads(line.strip())
                image_data.append(data['file_name'])
                text_data.append(data['text'])
            except json.JSONDecodeError as e:
                print(f"Error decoding JSON: {e}")
    
    return image_data, text_data

#### Preprocessing Images

In [7]:
def preprocess_image(image_path, target_size):
    image = tf.io.read_file(image_path)
    image = tf.image.decode_image(image, channels=3)
    image.set_shape([None, None, 3])  # Ensure the shape is defined
    image = tf.image.resize(image, target_size)
    # image = tf.image.rgb_to_grayscale(image)
    image = tf.cast(image, tf.int32)
    return image

def load_images_as_tensor(data_json, image_size, subset):
    dataset = tf.data.Dataset.from_tensor_slices(data_json)
 
    if subset == "train":
        preprocess_fn = lambda img: preprocess_image(
            tf.strings.join([TRAIN_DIR, img], separator=os.path.sep), image_size
        )
    else:
        preprocess_fn = lambda img: preprocess_image(
            tf.strings.join([VAL_DIR, img], separator=os.path.sep), image_size
        )

    dataset = dataset.map(
        preprocess_fn,
        num_parallel_calls=tf.data.AUTOTUNE
    )
    
    return dataset

In [8]:
train_image_data, train_text_data = load_json_data(TRAIN_METADATA_PATH)
val_image_data, val_text_data = load_json_data(VAL_METADATA_PATH)

#### Vectorizer Settings

In [9]:
char_set = ['[START]', '[END]'] + list('0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ.,!?@$&()[]{}:;/- ')
max_len_text = max(val_text_data, key=len)
max_len = len(max_len_text)

vocab_size = len(char_set) + 2

print(f"Text with maximum length: {max_len_text}")
print(f"Maximum raw text length: {max_len}")

vectorizer = tf.keras.layers.TextVectorization(
    max_tokens=vocab_size,
    standardize="lower",
    split='character',
    output_sequence_length=max_len,
    vocabulary=char_set
)

Text with maximum length: NO REFUNDS OR EXCHANGES WILL BE ENTERTAINED WITHOUT PROOF OF RECEIPT.
Maximum raw text length: 69


#### Preprocessing Texts

In [10]:
def preprocess_text(text, vectorizer):
    tokenized_text = tf.strings.join(['[START]', text, '[END]'])
    vectorized_text = vectorizer(tokenized_text)
    return vectorized_text

def vector_to_text(vectorized_text, vectorizer):
    vocab = vectorizer.get_vocabulary()
    decoded_text = [vocab[i] for i in vectorized_text if i != 0]
    return ''.join(decoded_text)

def load_text_to_tensor(data_text_json, vectorizer):
    dataset = tf.data.Dataset.from_tensor_slices(data_text_json)
    dataset = dataset.map(
        lambda text: preprocess_text(text, vectorizer),
        num_parallel_calls=tf.data.AUTOTUNE
    )
    return dataset

#### Creating The Dataset

In [11]:
def load_dataset(data_path, subset, image_size, vectorizer, num_samples=None):
    images, texts = load_json_data(data_path)
    images_t = load_images_as_tensor(images, image_size, subset)
    texts_t = load_text_to_tensor(texts, vectorizer)
    num_samples = len(images) if num_samples is None else num_samples
    
    dataset = tf.data.Dataset.zip((images_t, texts_t))
    buffer_size = int(0.1 * len(images))
    dataset = (
        dataset
        .shuffle(buffer_size=buffer_size)
        .take(num_samples)
        .cache()
        .batch(batch_size=BATCH_SIZE)
        .prefetch(buffer_size=tf.data.AUTOTUNE)
    )
    return dataset

In [12]:
train_ds = load_dataset(TRAIN_METADATA_PATH, "train", IMAGE_SIZE, vectorizer, 1000)
val_ds = load_dataset(VAL_METADATA_PATH, "val", IMAGE_SIZE, vectorizer, 100)

In [None]:
for image, text in train_ds.take(1):
    print(f"image shape: {image.shape}")
    print(text[1])
    print(f"text shape: {text.shape}")

In [None]:
import matplotlib.pyplot as plt

def plot_images_with_titles(dataset, num_images, vectorizer, title="Images", figsize=(30, 10)):
    fig, axes = plt.subplots(1, num_images, figsize=figsize)

    for idx, (image, text) in enumerate(dataset.take(1)):
        if idx >= num_images:
            break

        for i in range(min(num_images, image.shape[0])):  
            single_img = image[i]  
            single_text = text[i]  
            title_text = vector_to_text(single_text, vectorizer) 
            axes[i].imshow(single_img.numpy().squeeze())
            axes[i].set_title(title_text, fontsize=16) 
            axes[i].axis('off')

    plt.suptitle(title, fontsize=32)
    plt.show()


plot_images_with_titles(train_ds, num_images=5, vectorizer=vectorizer, title="Training Images")
plot_images_with_titles(val_ds, num_images=5, vectorizer=vectorizer, title="Validation Images")

#### Creating Model

In [None]:
from tensorflow.keras import layers, Input, Model
from tensorflow.keras.layers import Bidirectional, LSTM, Dense, Lambda
from mltu.tensorflow.model_utils import residual_block
import tensorflow as tf

def load_crnn_model(input_dim, output_dim, activation='leaky_relu', dropout=0):
    inputs = Input(shape=input_dim, name="input")
    input_normalized = Lambda(lambda x: x / 255.0)(inputs)  # Normalizing input to [0, 1]
    
    # Convolutional Backbone (using custom residual blocks)
    x1 = residual_block(input_normalized, 16, activation=activation, skip_conv=True, strides=1, dropout=dropout)
    x2 = residual_block(x1, 16, activation=activation, skip_conv=True, strides=2, dropout=dropout)
    x3 = residual_block(x2, 16, activation=activation, skip_conv=False, strides=1, dropout=dropout)

    x4 = residual_block(x3, 32, activation=activation, skip_conv=True, strides=2, dropout=dropout)
    x5 = residual_block(x4, 32, activation=activation, skip_conv=False, strides=1, dropout=dropout)

    x6 = residual_block(x5, 64, activation=activation, skip_conv=True, strides=1, dropout=dropout)
    x7 = residual_block(x6, 64, activation=activation, skip_conv=False, strides=1, dropout=dropout)

    # Reshape the tensor to make it compatible with the RNN
    reshaped_output = layers.Reshape((x7.shape[1] * x7.shape[2], x7.shape[-1]))(x7)

    # Add Bidirectional LSTM layers to capture sequential patterns
    blstm1 = Bidirectional(LSTM(64, return_sequences=True, dropout=0.2, recurrent_dropout=0.2))(reshaped_output)
    # blstm2 = Bidirectional(LSTM(64, return_sequences=True, dropout=0.2, recurrent_dropout=0.2))(blstm1)
    output = Dense(output_dim + 1, activation='softmax', name="output")(blstm1)

    # Create and return the model
    model = Model(inputs=inputs, outputs=output)
    return model


In [None]:
#resnet = ResNet50(include_top=False, weights="imagenet", input_shape=(256, 256, 3))

#esnet.summary()

#### Training the model

In [None]:
from mltu.tensorflow.losses import CTCloss
from mltu.tensorflow.metrics import CWERMetric
from keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau, TensorBoard


model = load_crnn_model((32, 128, 3), vocab_size, dropout=0)
model.name = "CRNN_Model_control"

model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE), 
    loss=CTCloss(), 
    metrics=[CWERMetric(padding_token=0)],
    run_eagerly=False
)

earlystopper = EarlyStopping(monitor="val_CER", patience=10, verbose=1, mode='min')
checkpoint = ModelCheckpoint(f"{TRAINED_MODEL_PATH}/{model.name}/weight.keras", monitor="val_CER", verbose=1, save_best_only=True, mode="min")
tb_callback = TensorBoard(f"{TRAINED_MODEL_PATH}/{model.name}/logs", update_freq=10)
reduceLROnPlat = ReduceLROnPlateau(monitor="val_CER", factor=0.9, min_delta=1e-10, patience=5, verbose=1, mode="auto")

model.summary()

In [None]:
history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=100,
    callbacks=[earlystopper, checkpoint, reduceLROnPlat, tb_callback],
)

In [None]:
train_cer = history.history['CER']
val_cer = history.history['val_CER']
epochs = list(range(1, len(train_cer) + 1))

plt.figure(figsize=(10, 6))
plt.plot(epochs, train_cer, label='Training CER', marker='o')
plt.plot(epochs, val_cer, label='Validation CER', marker='o')
plt.title('Training and Validation CER')
plt.xlabel('Epochs')
plt.ylabel('CER')
plt.legend()
plt.grid()
plt.show()

In [None]:
train_loss = history.history['loss']
val_loss = history.history['val_loss']
epochs = epochs = list(range(1, len(train_loss) + 1))

plt.figure(figsize=(10, 6))
plt.plot(epochs, train_loss, label='Training Loss', marker='o')
plt.plot(epochs, val_loss, label='Validation Loss', marker='o')
plt.title('Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.grid()
plt.show()

In [None]:
example_image = train_image_data[31]
image_path = os.path.join(TRAIN_DIR, example_image)
pre_image = preprocess_image(image_path, IMAGE_SIZE)
pre_image = tf.cast(pre_image, tf.float32) / 255.0
pre_image_expanded = tf.expand_dims(pre_image, axis=0)

text = model.predict(pre_image_expanded)

In [None]:
import numpy as np

def ctc_decode(predictions, char_set=None, blank_index=0):
    """
    Decode the CTC softmax output to a string using two blank indices:
    - The first blank token at index `0`.
    - The last index in the output that represents padding (max length).

    Args:
    predictions: A 2D numpy array with shape (timesteps, num_classes).
    char_set: A list of characters corresponding to the indices in the predictions (including the blank tokens).
    blank_index: The index of the first blank token (usually 0).
    
    Returns:
    decoded_text: The decoded text (string).
    """
    decoded_text = []
    
    # Determine the last blank index (max length)
    last_blank_index = predictions.shape[1] - 1  # The last token in the softmax output (usually padding)
    print(last_blank_index)

    for i in range(predictions.shape[0]):
        # Get the predicted character at the current timestep (highest probability)
        char_idx = np.argmax(predictions[i])
        
        # Skip both blank indices: blank_index (0) and last_blank_index (max length padding)
        if char_idx != blank_index and char_idx != last_blank_index:
            # Prevent adding the same character consecutively
            if len(decoded_text) == 0 or decoded_text[-1] != char_idx:
                decoded_text.append(char_idx)
    
    # Map indices to characters using char_set
    if char_set:
        decoded_text = ''.join([char_set[idx] for idx in decoded_text])
    
    return decoded_text

img = Image.open(image_path)
plt.imshow(img)
plt.axis('off')  # Hide axes for a cleaner look
plt.show()


prediction = text.squeeze()
print(np.argmax(prediction, axis=-1))
decoded_sequence = ctc_decode(prediction, vectorizer.get_vocabulary())
print(decoded_sequence)