In [1]:
import os 

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
os.environ['TF_USE_LEGACY_KERAS'] = '1'

import tensorflow as tf
import tensorflow_hub as hub
from tensorflow.keras.layers import Input, Dense, Dropout, Concatenate, GlobalAveragePooling2D
from tensorflow.keras.applications import ResNet50

from sklearn.preprocessing import MinMaxScaler

import pandas as pd
import numpy as np

from transformers import AutoTokenizer, TFAutoModel


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
tokenizer = AutoTokenizer.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
text_embedding_model = TFAutoModel.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
img_embedding_model = ResNet50(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

Some weights of the PyTorch model were not used when initializing the TF 2.0 model TFBertModel: ['embeddings.position_ids']
- This IS expected if you are initializing TFBertModel from a PyTorch model trained on another task or with another architecture (e.g. initializing a TFBertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFBertModel from a PyTorch model that you expect to be exactly identical (e.g. initializing a TFBertForSequenceClassification model from a BertForSequenceClassification model).
All the weights of TFBertModel were initialized from the PyTorch model.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFBertModel for predictions without further training.


In [3]:
def get_img_text_model(img_embedding_model, text_embedding_model, num_classes):

    text_inputs =Input(shape=(None,), dtype=tf.int32)
    attention_mask = Input(shape=(None,), dtype=tf.int32)
    image_inputs = Input(shape=(224, 224, 3), name="image_input")

    text_embeddings = text_embedding_model(text_inputs, attention_mask=attention_mask)[0]
    image_embeddings = img_embedding_model(image_inputs)
    image_embeddings = GlobalAveragePooling2D()(image_embeddings)

    mask = tf.cast(tf.expand_dims(attention_mask, axis=-1), tf.float32)
    text_embeddings = tf.reduce_sum(text_embeddings * mask, axis=1) / tf.clip_by_value(tf.reduce_sum(mask, axis=1), clip_value_min=1e-9, clip_value_max=tf.float32.max)

    pooled_embeddings = Concatenate()([text_embeddings, image_embeddings])
    x = Dense(256, activation='relu')(pooled_embeddings)
    x = Dropout(0.3)(x)
    x = Dense(128, activation='relu')(x)
    x = Dropout(0.3)(x)
    x = Dense(64, activation='relu')(x)

    # Numerical Outputs (shared branch)
    numerical_outputs = [Dense(12, activation='softmax', name=f"numerical_output_{i}")(x) for i in range(7)]

    # Separate dense layers for unit output
    y = Dense(64, activation='relu')(pooled_embeddings)
    y = Dropout(0.3)(y)
    y = Dense(32, activation='relu')(y)
    units_output = Dense(num_classes, activation='softmax', name="units")(y)
    return tf.keras.Model(inputs=[image_inputs, text_inputs, attention_mask], outputs=numerical_outputs + [units_output])

In [4]:
# from tensorflow.keras.utils import plot_model
# model.summary()
# plot_model(model, to_file='model_architecture.png', show_shapes=True, show_layer_names=True)

In [5]:
# model_url = "https://www.kaggle.com/models/spsayakpaul/vision-transformer/TensorFlow2/vit-b8-fe/1"
# model = hub.load(model_url)
# tf.saved_model.save(model, "vit_model")
# vit_model = tf.saved_model.load("vit_model")

In [6]:
img_dir = "dataset/"
data = pd.read_csv("data/height_with_ocr.csv")

image_paths = data["id"].values
ocr_text = data["ocr_text"].values.tolist()
units= data["unit"].values
numeric_value  = data['numeric_value']
# numeric_value = np.array(numeric_value).reshape(-1, 1)
# scaler = MinMaxScaler(feature_range=(0, 10))
# numeric_value = scaler.fit_transform(numeric_value)
tokenized_input = tokenizer(ocr_text, padding=True, truncation=True, max_length=128, return_tensors='tf')

reverse_mapping = {i: str(i) for i in range(0, 10)}
reverse_mapping[10] = '.'
reverse_mapping[11] = ''

mapping = {str(i): i for i in range(0, 10)}
mapping['.'] = 10
mapping[' '] = 11

def numeric_to_digit_sequence(numeric_value):
    # Convert the numeric values to string tensors
    string_value = str(numeric_value)
    if(len(string_value) > 7):
        string_value = string_value[:7]
    # Initialize a list to store the encoded sequences
    encoded_sequence = []
    
    for char in string_value:
        # Encode the character as an integer value
        encoded_value = mapping[char]
        encoded_sequence.append(encoded_value)
    
    # Pad the sequence with the value representing "no value" if length is less than 7
    while len(encoded_sequence) < 7:
        encoded_sequence.append(11)
    return encoded_sequence

def digit_sequence_to_numeric(encoded_sequence):
    # Initialize a list to store the characters for the current sequence
    char_list = []
    
    # Iterate over each encoded value in the sequence
    for encoded_value in encoded_sequence:
        # Convert the encoded value back to the corresponding character
        char = reverse_mapping[encoded_value]
        char_list.append(char)
    
    # Join the characters to form the original string representation of the number
    number_str = ''.join(char_list).strip()
    
    # Convert the string back to a numeric value
    return number_str

numeric_value = numeric_value.apply(numeric_to_digit_sequence)
numeric_value = np.array(numeric_value.tolist())
print("numeric_value shape:", numeric_value.shape)

string_lookup = tf.keras.layers.StringLookup()
string_lookup.adapt(units)

units_number = string_lookup(units)
units_number = tf.expand_dims(units_number, axis=-1)
print("units_number shape:", units_number.shape)

output = tf.concat([numeric_value, units_number], axis=-1)
print("output shape:", output.shape)

numeric_value shape: (21756, 7)
units_number shape: (21756, 1)
output shape: (21756, 8)


In [7]:
def load_dataset(path, token, output):
    img_dir = 'dataset/' 
    path = tf.strings.as_string(path)
    path = tf.strings.join([img_dir, path, '.jpg'])  
    img = tf.io.read_file(path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.resize(img, (224, 224))
    img = img / 255.0  

    # Split the output into numerical and units
    numerical_labels = output[:7]  # First 7 elements for numerical outputs
    units_label = output[7]  # 8th element for units output

    # Return inputs and outputs
    return (img, token['input_ids'], token['attention_mask']), {
        "units": units_label,  # 1 output for units with 6 classes
        "numerical_output_0": numerical_labels[0],  # Separate labels for each of the 7 numerical outputs
        "numerical_output_1": numerical_labels[1],
        "numerical_output_2": numerical_labels[2],
        "numerical_output_3": numerical_labels[3],
        "numerical_output_4": numerical_labels[4],
        "numerical_output_5": numerical_labels[5],
        "numerical_output_6": numerical_labels[6],
    }


dataset = tf.data.Dataset.from_tensor_slices((image_paths, tokenized_input, output))

dataset = dataset.map(lambda path, token, output: load_dataset(path, token, output),
                      num_parallel_calls=tf.data.experimental.AUTOTUNE)

dataset = dataset.shuffle(buffer_size=len(image_paths))

train_size = int(0.9 * len(image_paths))
val_size = len(image_paths) - train_size

train_dataset = dataset.take(train_size)
val_dataset = dataset.skip(train_size)

BATCH_SIZE = 32
train_dataset = train_dataset.batch(BATCH_SIZE).prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
val_dataset = val_dataset.batch(BATCH_SIZE).prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

In [8]:
model = get_img_text_model(img_embedding_model, text_embedding_model, string_lookup.vocabulary_size())

In [9]:
steps_per_epoch = train_size // BATCH_SIZE

# Define the learning rate decay function
def lr_decay(epoch, lr):
    decay_rate = 0.1
    decay_step = 10
    if epoch % decay_step == 0 and epoch:
        return lr * decay_rate
    return lr

# Create the callbacks
checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath='height_2.keras',
    save_freq=steps_per_epoch * 2  # Save every 2 epochs
)

lr_scheduler_callback = tf.keras.callbacks.LearningRateScheduler(lr_decay)

early_stopping_callback = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',
    patience=5,
    restore_best_weights=True
)

reduce_lr_callback = tf.keras.callbacks.ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.2,
    patience=3,
    min_lr=0.0001
)

In [10]:
losses = {
    "units": "sparse_categorical_crossentropy",  # For the unit output
}
for i in range(7):
    losses[f"numerical_output_{i}"] = "sparse_categorical_crossentropy"  # For numerical outputs

model.compile(
    optimizer='adam',
    loss=losses,  # Same loss for all outputs
    metrics=['accuracy']  # Same metric for all outputs
)

In [11]:
model.fit(train_dataset,
          validation_data=val_dataset, 
          epochs=100,
          callbacks=[checkpoint_callback, lr_scheduler_callback, early_stopping_callback, reduce_lr_callback])

Epoch 1/100


I0000 00:00:1726329881.498402  238357 device_compiler.h:186] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.


Epoch 2/100



Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100


<tf_keras.src.callbacks.History at 0x727c982986a0>

In [12]:
import matplotlib.pyplot as plt

plt.plot(model.history.history['loss'])

ModuleNotFoundError: No module named 'matplotlib'

In [None]:
id = 10
# Ensure tokenized_input[0].ids and tokenized_input[0].attention_mask are numpy arrays or tensors
input_ids = tokenized_input[id].ids
attention_mask = tokenized_input[id].attention_mask

# Convert to tensors if they are not already
input_ids = tf.convert_to_tensor(input_ids, dtype=tf.int32)
attention_mask = tf.convert_to_tensor(attention_mask, dtype=tf.int32)

# Add batch dimension
input_ids = tf.expand_dims(input_ids, axis=0)
attention_mask = tf.expand_dims(attention_mask, axis=0)

# Read and preprocess the image
img_path = f'dataset/{image_paths[id]}.jpg'  # Replace with the correct path to your image
img = tf.io.read_file(img_path)
img = tf.image.decode_jpeg(img, channels=3)
img = tf.image.resize(img, (224, 224))
img = img / 255.0  # Normalize the image to [0, 1]

# Add batch dimension to the image
img = tf.expand_dims(img, axis=0)

# Make the prediction
predictions = model.predict([img, input_ids, attention_mask])


scaler =  MinMaxScaler(feature_range=(0, 100))
numeric_value = data['numeric_value']
numeric_value = np.array(numeric_value).reshape(-1, 1)
numeric_value = scaler.fit_transform(numeric_value)
numerical_value = predictions[0][0][0]
# Scale the numerical value back to its original range
original_numerical_value = scaler.inverse_transform([[numerical_value]])[0][0]

reverse = tf.keras.layers.StringLookup(vocabulary=string_lookup.get_vocabulary(), invert=True)
unit_probabilities = predictions[1][0]
predicted_unit_index = tf.argmax(unit_probabilities).numpy()
print("Predicted Unit Index:", image_paths[id])
print("Predicted Unit Index:", reverse(predicted_unit_index))
print("Original Numerical Value:", original_numerical_value)
# Print the predictions