In [1]:
# This command unzips your dataset from Drive into your Colab workspace
# The '-q' flag makes it quiet (less verbose output)
!unzip -q "/content/drive/MyDrive/formula_images.zip" -d "/content/datasets/"

In [3]:
import tensorflow as tf
import numpy as np
import os
import time
from tensorflow.keras.applications import ResNet50V2
from tensorflow.keras.applications.resnet_v2 import preprocess_input as resnet_preprocess_input

# --- 1. CONFIGURATION ---
BATCH_SIZE = 32
BUFFER_SIZE = 1000
EMBEDDING_DIM = 256
UNITS = 512
IMG_SIZE = (224, 224)
EPOCHS = 5
FINE_TUNE_EPOCHS = 3  # Additional epochs for fine-tuning
FINE_TUNE_AT = 140  # Unfreeze layers from this point

# --- 2. DATA LOADING ---
TRAIN_IMG_DIR = '/content/datasets/formula_images/train/images'
TRAIN_LABELS_FILE = '/content/datasets/formula_images/train/labels.txt'
VALID_IMG_DIR = '/content/datasets/formula_images/valid/images'
VALID_LABELS_FILE = '/content/datasets/formula_images/valid/labels.txt'

def load_dataset_from_files(img_dir, labels_file):
    image_paths, captions = [], []
    with open(labels_file, 'r', encoding='utf-8') as f:
        for line in f:
            try:
                filename, caption = line.strip().split(',', 1)
                image_paths.append(os.path.join(img_dir, filename))
                # Add start and end tokens
                captions.append(f'<start> {caption} <end>')
            except ValueError:
                continue
    return image_paths, captions

print("Loading dataset...")
train_image_paths, train_captions = load_dataset_from_files(TRAIN_IMG_DIR, TRAIN_LABELS_FILE)
valid_image_paths, valid_captions = load_dataset_from_files(VALID_IMG_DIR, VALID_LABELS_FILE)

# --- 3. TEXT PREPARATION ---
print("Tokenizing text...")
all_captions = train_captions + valid_captions
max_length = max(len(t.split(" ")) for t in all_captions)
tokenizer = tf.keras.layers.TextVectorization(
    standardize=None,
    ragged=False,
    output_sequence_length=max_length
)
tokenizer.adapt(all_captions)
vocab_size = tokenizer.vocabulary_size()

full_vocabulary = tokenizer.get_vocabulary()
clean_vocabulary = full_vocabulary[2:]
index_to_word = tf.keras.layers.StringLookup(
    vocabulary=clean_vocabulary,
    invert=True,
    oov_token='[UNK]',
    mask_token=''
)
start_token_index = tokenizer.get_vocabulary().index('<start>')
end_token_index = tokenizer.get_vocabulary().index('<end>')
print(f"Vocabulary Size: {vocab_size}")
print(f"Max Sequence Length: {max_length}")

# --- 4. CREATE TF.DATA PIPELINE ---
def load_and_preprocess_image(image_path):
    img = tf.io.read_file(image_path)
    img = tf.io.decode_png(img, channels=3)
    img = tf.image.resize(img, IMG_SIZE)
    img = resnet_preprocess_input(img)
    return img

def map_func(img_path, caption):
    img = load_and_preprocess_image(img_path)
    vectorized_caption = tokenizer(caption)
    return img, vectorized_caption

train_dataset = tf.data.Dataset.from_tensor_slices((train_image_paths, train_captions))\
    .map(map_func, num_parallel_calls=tf.data.AUTOTUNE)\
    .shuffle(BUFFER_SIZE)\
    .batch(BATCH_SIZE)\
    .prefetch(buffer_size=tf.data.AUTOTUNE)

valid_dataset = tf.data.Dataset.from_tensor_slices((valid_image_paths, valid_captions))\
    .map(map_func, num_parallel_calls=tf.data.AUTOTUNE)\
    .batch(BATCH_SIZE)\
    .prefetch(buffer_size=tf.data.AUTOTUNE)

print("\nDataset pipelines created successfully!")

# --- 5. MODEL COMPONENTS ---
class CNN_Encoder(tf.keras.Model):
    def __init__(self, trainable=False):
        super(CNN_Encoder, self).__init__()
        self.image_model = ResNet50V2(
            include_top=False,
            weights='imagenet',
            input_shape=(*IMG_SIZE, 3)
        )
        self.image_model.trainable = trainable

        # Extract features from conv5_block3_out
        feature_output_layer = self.image_model.get_layer('conv5_block3_out').output
        self.reshape = tf.keras.layers.Reshape((-1, feature_output_layer.shape[-1]))

        self.feature_extractor = tf.keras.Model(
            inputs=self.image_model.input,
            outputs=self.reshape(feature_output_layer)
        )

    def call(self, x, training=False):
        # Pass training parameter to maintain batch norm behavior
        return self.feature_extractor(x, training=training)

    def unfreeze_top_layers(self, from_layer=140):
        """Unfreeze top layers for fine-tuning"""
        self.image_model.trainable = True
        for layer in self.image_model.layers[:from_layer]:
            layer.trainable = False
        print(f"Unfrozen layers from {from_layer} onwards")

class BahdanauAttention(tf.keras.layers.Layer):
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        self.W1 = tf.keras.layers.Dense(units)
        self.W2 = tf.keras.layers.Dense(units)
        self.V = tf.keras.layers.Dense(1)

    def call(self, features, hidden):
        hidden_with_time_axis = tf.expand_dims(hidden, 1)
        score = tf.nn.tanh(self.W1(features) + self.W2(hidden_with_time_axis))
        attention_weights = tf.nn.softmax(self.V(score), axis=1)
        context_vector = tf.reduce_sum(attention_weights * features, axis=1)
        return context_vector, attention_weights

class RNN_Decoder(tf.keras.Model):
    def __init__(self, embedding_dim, units, vocab_size):
        super(RNN_Decoder, self).__init__()
        self.units = units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.lstm = tf.keras.layers.LSTM(
            self.units,
            return_sequences=True,
            return_state=True,
            recurrent_initializer='glorot_uniform'
        )
        self.fc1 = tf.keras.layers.Dense(self.units)
        self.dropout = tf.keras.layers.Dropout(0.5)
        self.fc2 = tf.keras.layers.Dense(vocab_size)
        self.attention = BahdanauAttention(self.units)

    def call(self, x, features, hidden, training=False):
        context_vector, attention_weights = self.attention(features, hidden[0])
        x = self.embedding(x)
        x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)
        output, state_h, state_c = self.lstm(x, initial_state=hidden)
        x = self.fc1(output)
        x = tf.reshape(x, (-1, x.shape[2]))
        x = self.dropout(x, training=training)
        x = self.fc2(x)
        return x, [state_h, state_c], attention_weights

    def reset_state(self, batch_size):
        return [tf.zeros((batch_size, self.units)) for _ in range(2)]

# --- 6. INITIALIZE MODELS, OPTIMIZER, AND LOSS ---
encoder = CNN_Encoder(trainable=False)
decoder = RNN_Decoder(EMBEDDING_DIM, UNITS, vocab_size)
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True,
    reduction='none'
)

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)
    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask
    return tf.reduce_mean(loss_)

# --- 7. METRICS ---
train_loss_metric = tf.keras.metrics.Mean(name='train_loss')
valid_loss_metric = tf.keras.metrics.Mean(name='valid_loss')

# --- 8. CUSTOM TRAINING & VALIDATION LOOPS ---
@tf.function
def train_step(img_tensor, target):
    loss = 0
    hidden = decoder.reset_state(batch_size=target.shape[0])
    dec_input = tf.expand_dims([start_token_index] * target.shape[0], 1)

    with tf.GradientTape() as tape:
        features = encoder(img_tensor, training=True)
        for i in range(1, target.shape[1]):
            predictions, hidden, _ = decoder(dec_input, features, hidden, training=True)
            loss += loss_function(target[:, i], predictions)
            dec_input = tf.expand_dims(target[:, i], 1)

    total_loss = loss / int(target.shape[1])
    trainable_variables = encoder.trainable_variables + decoder.trainable_variables
    gradients = tape.gradient(loss, trainable_variables)
    optimizer.apply_gradients(zip(gradients, trainable_variables))

    return total_loss

@tf.function
def validation_step(img_tensor, target):
    loss = 0
    hidden = decoder.reset_state(batch_size=target.shape[0])
    dec_input = tf.expand_dims([start_token_index] * target.shape[0], 1)
    features = encoder(img_tensor, training=False)

    for i in range(1, target.shape[1]):
        predictions, hidden, _ = decoder(dec_input, features, hidden, training=False)
        loss += loss_function(target[:, i], predictions)
        dec_input = tf.expand_dims(target[:, i], 1)

    return loss / int(target.shape[1])

# --- 9. PREDICTION FUNCTION WITH BEAM SEARCH ---
def evaluate(image_path, encoder_model, decoder_model, max_len=None):
    """Generate caption using greedy decoding"""
    if max_len is None:
        max_len = max_length

    img_tensor = load_and_preprocess_image(image_path)
    features = encoder_model(tf.expand_dims(img_tensor, 0), training=False)
    hidden = decoder_model.reset_state(batch_size=1)
    dec_input = tf.expand_dims([start_token_index], 0)
    result = []

    for i in range(max_len):
        predictions, hidden, _ = decoder_model(dec_input, features, hidden, training=False)
        predicted_id = tf.argmax(predictions[0]).numpy()
        predicted_word = index_to_word(tf.constant([predicted_id])).numpy()[0].decode('utf-8')

        if predicted_word == '<end>':
            break
        if predicted_word not in ['<start>', '[UNK]', '']:
            result.append(predicted_word)

        dec_input = tf.expand_dims([predicted_id], 0)

    return " ".join(result)

# --- 10. MAIN EXECUTION BLOCK ---
if __name__ == "__main__":
    checkpoint_dir = './training_checkpoints'
    checkpoint = tf.train.Checkpoint(
        optimizer=optimizer,
        encoder=encoder,
        decoder=decoder
    )
    manager = tf.train.CheckpointManager(checkpoint, checkpoint_dir, max_to_keep=3)

    if manager.latest_checkpoint:
        checkpoint.restore(manager.latest_checkpoint)
        print(f'Restored from checkpoint: {manager.latest_checkpoint}')
    else:
        print('Initializing from scratch.')

    # --- Phase 1: Train with frozen encoder ---
    print("\n=== PHASE 1: Training with frozen encoder ===")
    for epoch in range(EPOCHS):
        start = time.time()
        train_loss_metric.reset_state()
        valid_loss_metric.reset_state()

        for batch, (img_tensor, target) in enumerate(train_dataset):
            batch_loss = train_step(img_tensor, target)
            train_loss_metric.update_state(batch_loss)

            if batch % 50 == 0:
                print(f'Epoch {epoch+1} Batch {batch} Loss {batch_loss:.4f}')

        for img_tensor, target in valid_dataset:
            batch_loss = validation_step(img_tensor, target)
            valid_loss_metric.update_state(batch_loss)

        epoch_train_loss = train_loss_metric.result()
        epoch_valid_loss = valid_loss_metric.result()

        print(f'Epoch {epoch+1}/{EPOCHS} | Train Loss: {epoch_train_loss:.6f} | Valid Loss: {epoch_valid_loss:.6f}')
        manager.save()
        print(f'Checkpoint saved | Time: {time.time()-start:.2f} sec\n')

    # --- Phase 2: Fine-tune with unfrozen top layers ---
    print("\n=== PHASE 2: Fine-tuning with unfrozen encoder layers ===")
    encoder.unfreeze_top_layers(from_layer=FINE_TUNE_AT)
    optimizer.learning_rate = 1e-5  # Lower learning rate for fine-tuning

    for epoch in range(FINE_TUNE_EPOCHS):
        start = time.time()
        train_loss_metric.reset_state()
        valid_loss_metric.reset_state()

        for batch, (img_tensor, target) in enumerate(train_dataset):
            batch_loss = train_step(img_tensor, target)
            train_loss_metric.update_state(batch_loss)

            if batch % 50 == 0:
                print(f'Fine-tune Epoch {epoch+1} Batch {batch} Loss {batch_loss:.4f}')

        for img_tensor, target in valid_dataset:
            batch_loss = validation_step(img_tensor, target)
            valid_loss_metric.update_state(batch_loss)

        epoch_train_loss = train_loss_metric.result()
        epoch_valid_loss = valid_loss_metric.result()

        print(f'Fine-tune Epoch {epoch+1}/{FINE_TUNE_EPOCHS} | Train Loss: {epoch_train_loss:.6f} | Valid Loss: {epoch_valid_loss:.6f}')
        manager.save()
        print(f'Checkpoint saved | Time: {time.time()-start:.2f} sec\n')

    print("Training complete.")

    # --- Test predictions ---
    print("\n=== Testing Predictions ===")
    test_images = valid_image_paths[:5]
    for img_path in test_images:
        prediction = evaluate(img_path, encoder, decoder)
        print(f"Image: {os.path.basename(img_path)}")
        print(f"Prediction: {prediction}\n")

Loading dataset...
Tokenizing text...
Vocabulary Size: 64456
Max Sequence Length: 14

Dataset pipelines created successfully!
Initializing from scratch.

=== PHASE 1: Training with frozen encoder ===
Epoch 1 Batch 0 Loss 3.4542
Epoch 1 Batch 50 Loss 1.4392
Epoch 1 Batch 100 Loss 1.3640
Epoch 1 Batch 150 Loss 1.0079
Epoch 1 Batch 200 Loss 1.2770
Epoch 1 Batch 250 Loss 1.0719
Epoch 1 Batch 300 Loss 1.0466
Epoch 1 Batch 350 Loss 0.9517
Epoch 1 Batch 400 Loss 1.0150
Epoch 1 Batch 450 Loss 0.9343
Epoch 1 Batch 500 Loss 1.1574
Epoch 1 Batch 550 Loss 1.0190
Epoch 1 Batch 600 Loss 0.9572
Epoch 1 Batch 650 Loss 0.9655
Epoch 1 Batch 700 Loss 1.0713
Epoch 1 Batch 750 Loss 0.9407
Epoch 1 Batch 800 Loss 0.9063
Epoch 1 Batch 850 Loss 0.9068
Epoch 1 Batch 900 Loss 1.0507
Epoch 1 Batch 950 Loss 1.0422
Epoch 1 Batch 1000 Loss 0.8699
Epoch 1 Batch 1050 Loss 1.0054
Epoch 1 Batch 1100 Loss 1.0414
Epoch 1 Batch 1150 Loss 0.9659
Epoch 1 Batch 1200 Loss 1.0291
Epoch 1 Batch 1250 Loss 0.8735
Epoch 1 Batch 130

In [4]:
# In your Colab notebook (after the tokenizer has been adapted)
vocab = tokenizer.get_vocabulary()

# Save the vocabulary to a text file
with open('vocab.txt', 'w', encoding='utf-8') as f:
    for token in vocab:
        f.write(token + '\n')

# Download the vocab.txt file to your computer
from google.colab import files
files.download('vocab.txt')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>