Feature extraction model

In [38]:
import tensorflow as tf
from keras import layers, models
from keras.applications import InceptionV3,EfficientNetB0 #type: ignore
import os
import numpy as np
import tensorflow as tf
from PIL import Image
from tqdm import tqdm
import json

def inception_block(x, filters):
    branch1x1 = layers.Conv2D(filters, (1, 1), padding='same', activation='relu')(x)
    branch3x3 = layers.Conv2D(filters, (1, 1), padding='same', activation='relu')(x)
    branch3x3 = layers.Conv2D(filters, (3, 3), padding='same', activation='relu')(branch3x3)
    branch5x5 = layers.Conv2D(filters, (1, 1), padding='same', activation='relu')(x)
    branch5x5 = layers.Conv2D(filters, (5, 5), padding='same', activation='relu')(branch5x5)
    branch_pool = layers.MaxPooling2D((3, 3), strides=(1, 1), padding='same')(x)
    branch_pool = layers.Conv2D(filters, (1, 1), padding='same', activation='relu')(branch_pool)
    x = layers.concatenate([branch1x1, branch3x3, branch5x5, branch_pool], axis=-1)
    x = layers.BatchNormalization()(x)
    return x

def mbconv_block(x, filters, kernel_size, strides=(1, 1), expand_ratio=6):
    input_tensor = x
    in_channels = x.shape[-1]
    x = layers.Conv2D(in_channels * expand_ratio, (1, 1), padding='same', use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    x = layers.DepthwiseConv2D(kernel_size, strides=strides, padding='same', use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    x = layers.Conv2D(filters, (1, 1), padding='same', use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    if strides == (1, 1) and in_channels == filters:
        x = layers.add([x, input_tensor])
    return x

def efficientnet_encoder(input_shape=(224, 224, 3)):
    inputs = layers.Input(shape=input_shape)
    x = layers.Conv2D(32, (3, 3), strides=(2, 2), padding="same", use_bias=False, kernel_initializer='he_normal')(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    x = mbconv_block(x, 64, (3, 3), strides=(1, 1), expand_ratio=1)
    x = mbconv_block(x, 128, (3, 3), strides=(2, 2), expand_ratio=6)
    x = mbconv_block(x, 128, (3, 3), strides=(1, 1), expand_ratio=6)
    x = mbconv_block(x, 256, (3, 3), strides=(2, 2), expand_ratio=6)
    x = mbconv_block(x, 256, (3, 3), strides=(1, 1), expand_ratio=6)
    x = mbconv_block(x, 512, (3, 3), strides=(2, 2), expand_ratio=6)
    x = mbconv_block(x, 512, (3, 3), strides=(1, 1), expand_ratio=6)
    x = mbconv_block(x, 1024, (3, 3), strides=(2, 2), expand_ratio=6)
    x = layers.GlobalAveragePooling2D()(x)
    return models.Model(inputs, x)

def inception_decoder(input_tensor, num_classes=3):
    x = input_tensor
    x = inception_block(x, 32)
    x = inception_block(x, 64)  
    x = inception_block(x, 128)
    x = inception_block(x, 256)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(activation='relu')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(0.5)(x)  
    x = layers.Dense(num_classes, activation='softmax')(x)
    return models.Model(input_tensor, x), 

def custom_model(input_shape=(224, 224, 3), num_classes=3):
    encoder = efficientnet_encoder(input_shape)  
    x = encoder.output  
    print("Encoder output shape:", x.shape)
    x = layers.Reshape((1, 1, 1024))(x)  
    x = layers.Conv2D(1024, (1, 1), activation='relu')(x)
    x = layers.BatchNormalization()(x)
    decoder = inception_decoder(input_tensor=x, num_classes=num_classes)
    outputs = decoder(x)  
    model = models.Model(inputs=encoder.input, outputs=outputs)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    return model

In [None]:
train_dir = "data/resized_train2017"
val_dir = "data/resized_val2017"
json_path = "your_annotations_file.json"  # Replace with actual path

# Load the annotation JSON
with open(json_path, "r") as f:
    annotations = json.load(f)

# Build lookup dict: {filename: (label, [x, y, w, h])}
annotation_dict = {
    item["filename"]: (item["class_id"], item["bbox"]) for item in annotations
}

# Function to load image, label and bbox
def load_image_and_label(image_path):
    filename = tf.strings.split(image_path, os.sep)[-1]
    image = tf.io.read_file(image_path)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.convert_image_dtype(image, tf.float32)  # normalize [0, 1]
    image = tf.image.resize(image, [128, 128])

    label_and_bbox = tf.py_function(
        func=lambda f: annotation_dict[f.numpy().decode()],
        inp=[filename],
        Tout=(tf.int32, tf.float32)
    )
    label, bbox = label_and_bbox
    bbox.set_shape([4])
    return image, (label, bbox)

# Create datasets
def create_dataset(image_dir, batch_size=32, shuffle=True):
    image_paths = tf.data.Dataset.list_files(os.path.join(image_dir, "*.jpg"), shuffle=shuffle)
    dataset = image_paths.map(load_image_and_label, num_parallel_calls=tf.data.AUTOTUNE)
    if shuffle:
        dataset = dataset.shuffle(500)
    dataset = dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return dataset

train_ds = create_dataset(train_dir)
val_ds = create_dataset(val_dir, shuffle=False)

In [40]:
def extract_features(images):
    cnn_model = custom_model()  
    feature_model = tf.keras.Model(inputs=cnn_model.input, outputs=cnn_model.get_layer(index=-4).output)  
    return feature_model.predict(images)

In [41]:
class CaptionGenerator(tf.keras.Model):
    def __init__(self, embedding_dim, units, vocab_size):
        super(CaptionGenerator, self).__init__()
        self.units = units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(units,
                                       return_sequences=True,
                                       return_state=True,
                                       recurrent_initializer='glorot_uniform')
        self.fc1 = tf.keras.layers.Dense(units)
        self.fc2 = tf.keras.layers.Dense(vocab_size)

    def call(self, x, features):
        x = self.embedding(x)
        x = tf.concat([tf.expand_dims(features, 1), x], axis=-2)
        x, _ = self.gru(x)
        x = self.fc1(x)
        return self.fc2(x)

In [42]:
def rnn_decoder(vocab_size, max_caption_length, embedding_dim=256, feature_dim=512, units=512):
    image_input = layers.Input(shape=(feature_dim,))
    image_emb = layers.Dense(units, activation='relu')(image_input)
    image_emb = layers.RepeatVector(max_caption_length)(image_emb) 
    caption_input = layers.Input(shape=(max_caption_length,))
    caption_emb = layers.Embedding(vocab_size, embedding_dim, mask_zero=True)(caption_input)
    x = layers.concatenate([image_emb, caption_emb])
    x = layers.LSTM(units, return_sequences=True)(x)
    x = layers.TimeDistributed(layers.Dense(vocab_size, activation='softmax'))(x)
    model = models.Model(inputs=[image_input, caption_input], outputs=x)
    return model

In [43]:
image_input = layers.Input(shape=(128, 128, 3))
caption_input = layers.Input(shape=(158,))
cnn_encoder = custom_cnn_encoder(input_shape=(128, 128, 3))
image_features = cnn_encoder(image_input)
decoder = rnn_decoder(10000, 158)
caption_output = decoder([image_features, caption_input])
full_model = models.Model(inputs=[image_input, caption_input], outputs=caption_output)
full_model.compile(loss='sparse_categorical_crossentropy', optimizer='adam')

In [44]:
IMG_DIR = 'images/' 
CAPTIONS_FILE = 'captions.txt'  
MAX_LENGTH = 30
VOCAB_SIZE = 10000  

def load_captions_json(json_path):
    with open(json_path, 'r') as f:
        captions_dict = json.load(f)

    for img in captions_dict:
        img['output'] = [f"<start> {cap} <end>" for cap in img['output']]
    return captions_dict

def build_text_vectorizer(captions_dict, vocab_size=VOCAB_SIZE, max_length=MAX_LENGTH):
    all_captions = []
    for cap_list in captions_dict.values():
        all_captions.extend(cap_list)

    vectorizer = layers.TextVectorization(
        max_tokens=vocab_size,
        output_mode='int',
        output_sequence_length=max_length,
        standardize='lower_and_strip_punctuation'
    )
    vectorizer.adapt(tf.data.Dataset.from_tensor_slices(all_captions).batch(64))
    return vectorizer

In [45]:
def load_image(img_path, target_size=(128, 128)):
    img = Image.open(img_path).convert('RGB')
    img = img.resize(target_size)
    return np.array(img) / 255.0

def create_dataset(captions_dict, vectorizer, max_length=30):
    img_tensors, cap_inputs, cap_targets, cap_all = [], [], [], []
    for img_name, captions in tqdm(captions_dict.items()):
        img_path = os.path.join(IMG_DIR, img_name)
        img_array = load_image(img_path)
        for cap in captions:
            seq = vectorizer([cap])[0].numpy()
            input_seq = seq[:-1]
            target_seq = seq[1:]
            input_seq = np.pad(input_seq, (0, max_length - 1 - len(input_seq)), constant_values=0)
            target_seq = np.pad(target_seq, (0, max_length - 1 - len(target_seq)), constant_values=0)
            combined_seq = np.concatenate([input_seq, target_seq])
            img_tensors.append(img_array)
            cap_inputs.append(input_seq)
            cap_targets.append(target_seq)
            cap_all.append(combined_seq)

    return (
        np.array(img_tensors),
        np.array(cap_inputs, dtype=np.int32),
        np.expand_dims(np.array(cap_targets, dtype=np.int32), -1),
        np.array(cap_all, dtype=np.int32)  
    )

In [46]:
def train_pipeline(images, cap_inputs, cap_targets, vectorizer, batch_size=64, epochs=20):
    cnn_features = extract_features(images)
    embedding_dim = 256
    units = 512
    vocab_size = len(vectorizer.get_vocabulary())
    rnn_model = CaptionGenerator(embedding_dim, units, vocab_size)

    rnn_model.compile(
        optimizer='adam',
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=['accuracy']
    )

    rnn_model.fit(
        [cap_inputs, cnn_features],
        cap_targets,
        batch_size=batch_size,
        epochs=epochs
    )
    
    return rnn_model

In [None]:
model = train_pipeline(images, cap_inputs, cap_targets, vectorizer, batch_size=64, epochs=20)

In [None]:
model.save("final_model.keras")