# Import Required Libraries

In [2]:
import os
import numpy as np
from tensorflow.keras.applications import VGG16
from tensorflow.keras.applications.vgg16 import preprocess_input
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.layers import Flatten
from tqdm import tqdm
import re
from collections import Counter
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Embedding, LSTM, Dense, Dropout, Add
from tensorflow.keras.callbacks import EarlyStopping
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from pycocoevalcap.cider.cider import Cider
from tensorflow.keras.layers import RepeatVector
from tensorflow.keras.utils import to_categorical
import tensorflow as tf
from sklearn.model_selection import train_test_split
import cv2
import time
import matplotlib.pyplot as plt
from tensorflow.keras.models import load_model
from tensorflow.keras.callbacks import ModelCheckpoint
import cv2

In [8]:
model = VGG16(weights='imagenet', include_top=False)  # Exclude the dense layers

In [9]:
def preprocess_image(image_path):
    image = load_img(image_path, target_size=(224, 224))  # Resize image
    image = img_to_array(image)  # Convert to array
    image = preprocess_input(image)  # Apply VGG16 preprocessing
    image = np.expand_dims(image, axis=0)  # Add batch dimension
    return image

In [10]:
def extract_features(image_dir):
    features = {}
    for img_name in tqdm(os.listdir(image_dir)):
        img_path = os.path.join(image_dir, img_name)
        if img_name.lower().endswith(('png', 'jpg', 'jpeg')):
            image = preprocess_image(img_path)
            feature = model.predict(image, verbose=0)
            features[img_name] = feature.flatten()
    return features

In [13]:
image_dir = '/content/Images/Images'  # Replace with actual path
features = extract_features(image_dir)
# Save features as a numpy file
np.save('/content/image_features.npy', features)
print("Features saved successfully.")

100%|██████████| 8091/8091 [10:21<00:00, 13.01it/s]


Features saved successfully.


In [14]:
features = np.load('/content/image_features.npy', allow_pickle=True).item()

In [15]:
def clean_caption(caption):
    caption = caption.lower()  # Convert to lowercase
    caption = re.sub(r'[^a-z\s]', '', caption)  # Remove special characters
    caption = re.sub(r'\s+', ' ', caption).strip()  # Remove extra spaces
    return caption

In [16]:
def load_and_clean_captions(captions_file):
    captions_dict = {}
    with open(captions_file, 'r') as file:
        next(file)  # Skip the header line
        for line in file:
            img_name, caption = line.strip().split(',', 1)
            clean_cap = clean_caption(caption)
            if img_name not in captions_dict:
                captions_dict[img_name] = []
            captions_dict[img_name].append(clean_cap)
    return captions_dict

In [17]:
captions_file = '/content/drive/MyDrive/DL Lab 6&7/captions.txt'
captions = load_and_clean_captions(captions_file)

In [18]:
def build_vocabulary(captions_dict, threshold=5):
    all_words = []
    for caption_list in captions_dict.values():
        for caption in caption_list:
            all_words.extend(caption.split())
    word_counts = Counter(all_words)
    # Filter words by threshold
    vocab = {word for word, count in word_counts.items() if count >= threshold}
    return vocab

In [19]:
vocab = build_vocabulary(captions, threshold=5)
print(f"Vocabulary size: {len(vocab)}")

Vocabulary size: 2984


In [20]:
def create_mappings(vocab):
    vocab = sorted(vocab)  # Sort for consistent indexing
    word2idx = {word: idx+1 for idx, word in enumerate(vocab)}
    word2idx['<start>'] = len(word2idx) + 1
    word2idx['<end>'] = len(word2idx) + 1
    idx2word = {idx: word for word, idx in word2idx.items()}
    return word2idx, idx2word

In [21]:
word2idx, idx2word = create_mappings(vocab)

In [22]:
def captions_to_sequences(captions_dict, word2idx):
    sequences = {}
    for img_name, caption_list in captions_dict.items():
        sequences[img_name] = []
        for caption in caption_list:
            seq = [word2idx['<start>']]
            seq.extend([word2idx[word] for word in caption.split() if word in word2idx])
            seq.append(word2idx['<end>'])
            sequences[img_name].append(seq)
    return sequences

In [23]:
sequences = captions_to_sequences(captions, word2idx)

In [24]:
# Split into train, and test
image_names = list(features.keys())
train_imgs, test_imgs = train_test_split(image_names, test_size=0.15, random_state=42)

print(f"Train: {len(train_imgs)}, Test: {len(test_imgs)}")

Train: 6877, Test: 1214


In [25]:
train_features = {img: features[img] for img in train_imgs if img in features}
test_features = {img: features[img] for img in test_imgs if img in features}

train_captions = {img: sequences[img] for img in train_imgs if img in sequences}
test_captions = {img: sequences[img] for img in test_imgs if img in sequences}

In [26]:
def build_caption_model(vocab_size, max_caption_length, embedding_dim, feature_vector_dim):
    # Image feature vector input
    image_input = Input(shape=(feature_vector_dim,), name="image_input")
    image_dense = Dense(embedding_dim, activation='relu', name="image_dense")(image_input)
    image_dropout = Dropout(0.5, name="image_dropout")(image_dense)  # Dropout added here

    # Repeat image features to match the sequence length of text input
    image_repeated = RepeatVector(max_caption_length, name="image_repeat")(image_dropout)

    # Text input
    text_input = Input(shape=(max_caption_length,), name="text_input")
    text_embedding = Embedding(vocab_size, embedding_dim, mask_zero=True, name="text_embedding")(text_input)
    text_lstm = LSTM(256, return_sequences=True, dropout=0.5, name="text_lstm")(text_embedding)  # Dropout in LSTM

    # Combine image and text features
    combined = Add(name="add_features")([image_repeated, text_lstm])
    combined_lstm = LSTM(256, return_sequences=False, dropout=0.5, name="combined_lstm")(combined)  # Dropout in LSTM

    # Dense output layer to predict the next word
    output = Dense(vocab_size, activation='softmax', name="output")(combined_lstm)

    # Define the model
    model = Model(inputs=[image_input, text_input], outputs=output, name="caption_generator")
    return model

In [27]:
vocab_size = len(word2idx) + 1  # Add 1 for padding
embedding_dim = 256
feature_vector_dim = 25088  # Update to the flattened size
max_caption_length = 20  # Define based on your dataset

model = build_caption_model(vocab_size, max_caption_length, embedding_dim, feature_vector_dim)
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
model.summary()

In [28]:
def data_generator_tf(features, sequences, max_caption_length, vocab_size):
    def generator():
        for img_name, caption_list in sequences.items():
            if img_name not in features:
                continue
            for caption in caption_list:
                for i in range(1, len(caption)):
                    input_seq = caption[:i]
                    target_word = caption[i]

                    # Ensure sequence length doesn't exceed max_caption_length
                    if len(input_seq) > max_caption_length:
                        input_seq = input_seq[:max_caption_length]

                    # Pad sequence to exactly max_caption_length
                    input_seq = np.pad(
                        input_seq,
                        (0, max_caption_length - len(input_seq)),
                        mode='constant'
                    )

                    # Yield data as tuples
                    yield (features[img_name], input_seq), to_categorical(target_word, num_classes=vocab_size)

    # Define output types and shapes
    output_signature = (
        (tf.TensorSpec(shape=(25088,), dtype=tf.float32),  # Feature vector shape
         tf.TensorSpec(shape=(max_caption_length,), dtype=tf.int32)),  # Caption sequence shape
        tf.TensorSpec(shape=(vocab_size,), dtype=tf.float32)  # Target word shape
    )

    return tf.data.Dataset.from_generator(generator, output_signature=output_signature)


In [29]:
early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=5,  # Stop if val_loss doesn't improve for 5 epochs
    restore_best_weights=True
)

In [30]:
batch_size = 64

# Total number of caption data points
total_data_points = sum(len(caption) - 1 for caption_list in train_captions.values() for caption in caption_list)
# Compute steps per epoch
steps_per_epoch = total_data_points // batch_size
validation_steps = steps_per_epoch // 10  # Use 10% of steps for validation

# Create datasets for training and validation
train_dataset = data_generator_tf(train_features, train_captions, max_caption_length, vocab_size).batch(batch_size).prefetch(tf.data.AUTOTUNE)
validation_dataset = data_generator_tf(train_features, train_captions, max_caption_length, vocab_size).batch(batch_size).prefetch(tf.data.AUTOTUNE)

checkpoint = ModelCheckpoint(
    filepath='/content/checkpoints/best_model.keras',
    save_best_only=True,
    monitor='val_accuracy',
    mode='max',
    verbose=1
)

model.fit(
    train_dataset,
    epochs=50,
    steps_per_epoch=steps_per_epoch,  # Specify steps per epoch
    validation_data=validation_dataset,
    validation_steps=validation_steps,  # Specify validation steps
    callbacks=[early_stopping, checkpoint]
)


Epoch 1/50
[1m6204/6205[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 40ms/step - accuracy: 0.1299 - loss: 5.5145
Epoch 1: val_accuracy improved from -inf to 0.13627, saving model to /content/checkpoints/best_model.keras
[1m6205/6205[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m273s[0m 43ms/step - accuracy: 0.1299 - loss: 5.5145 - val_accuracy: 0.1363 - val_loss: 5.3414
Epoch 2/50
[1m   1/6205[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m2:32[0m 25ms/step - accuracy: 0.0938 - loss: 5.3668

  self.gen.throw(typ, value, traceback)



Epoch 2: val_accuracy did not improve from 0.13627
[1m6205/6205[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m19s[0m 3ms/step - accuracy: 0.0938 - loss: 5.3668 - val_accuracy: 0.1342 - val_loss: 5.4053
Epoch 3/50
[1m6204/6205[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 39ms/step - accuracy: 0.1334 - loss: 5.4553
Epoch 3: val_accuracy improved from 0.13627 to 0.13647, saving model to /content/checkpoints/best_model.keras
[1m6205/6205[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m262s[0m 42ms/step - accuracy: 0.1334 - loss: 5.4553 - val_accuracy: 0.1365 - val_loss: 5.3770
Epoch 4/50
[1m   1/6205[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m2:33[0m 25ms/step - accuracy: 0.0938 - loss: 5.5471
Epoch 4: val_accuracy did not improve from 0.13647
[1m6205/6205[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m20s[0m 3ms/step - accuracy: 0.0938 - loss: 5.5471 - val_accuracy: 0.1299 - val_loss: 5.4385
Epoch 5/50
[1m6204/6205[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 39ms/

<keras.src.callbacks.history.History at 0x7dbb15290250>

In [31]:
# Load the best model after training
best_model = load_model('/content/checkpoints/best_model.keras')

In [36]:
def evaluate_model(model, features, sequences, idx2word, max_caption_length):
    smoothing_function = SmoothingFunction().method4
    bleu_scores = []
    cider_scorer = Cider()

    references = []
    hypotheses = []

    for img_name, caption_list in sequences.items():
        if img_name not in features:
            continue

        # Get the feature for the image
        image_feature = features[img_name]
        image_feature = np.expand_dims(image_feature, axis=0)  # Add batch dimension

        # Generate a caption
        generated_caption = []
        input_seq = np.zeros((1, max_caption_length))  # Initialize input sequence
        input_seq[0, 0] = word2idx['<start>']  # Start with the <start> token

        for i in range(max_caption_length - 1):
            predictions = model.predict([image_feature, input_seq], verbose=0)
            next_word_idx = np.argmax(predictions[0])  # Get index of the predicted word
            if next_word_idx == word2idx['<end>']:
                break
            generated_caption.append(idx2word[next_word_idx])
            input_seq[0, i + 1] = next_word_idx  # Update input sequence with predicted word

        # Join the predicted words to form a sentence
        generated_sentence = ' '.join(generated_caption)

        # Convert integer sequences in caption_list to words
        references.append([[idx2word[idx] for idx in caption if idx in idx2word] for caption in caption_list])
        hypotheses.append(generated_sentence.split())

        # Compute BLEU score for the generated sentence
        bleu_score = sentence_bleu(references[-1], hypotheses[-1], smoothing_function=smoothing_function)
        bleu_scores.append(bleu_score)

    # Compute CIDEr score
    cider_score, _ = cider_scorer.compute_score(references, hypotheses)

    print(f"Average BLEU Score: {np.mean(bleu_scores):.4f}")
    print(f"CIDEr Score: {cider_score:.4f}")
    return np.mean(bleu_scores), cider_score

In [None]:
# Evaluate the model
average_bleu, cider = evaluate_model(model, test_features, test_captions, idx2word, max_caption_length)

In [3]:
def generate_caption(image, model, idx2word, word2idx, max_caption_length):
    """Generate a caption for an input image."""
    feature = model.predict(image)  # Extract image features
    feature = feature.flatten()
    feature = np.expand_dims(feature, axis=0)

    # Start generating the caption
    caption = []
    input_seq = np.zeros((1, max_caption_length))
    input_seq[0, 0] = word2idx['<start>']

    for i in range(max_caption_length - 1):
        predictions = model.predict([feature, input_seq], verbose=0)
        next_word_idx = np.argmax(predictions[0])
        if next_word_idx == word2idx['<end>']:
            break
        caption.append(idx2word[next_word_idx])
        input_seq[0, i + 1] = next_word_idx

    return ' '.join(caption)

# Start camera capture
cap = cv2.VideoCapture(0)  # Use 0 for the default webcam
if not cap.isOpened():
    print("Cannot open camera")
    exit()

print("Starting camera. Press 'q' to exit.")

while True:
    ret, frame = cap.read()  # Capture a frame
    if not ret:
        print("Can't receive frame (stream end?). Exiting ...")
        break

    # Preprocess the frame
    resized_frame = cv2.resize(frame, (224, 224))
    processed_frame = preprocess_image(resized_frame)  # Use the defined preprocessing function

    # Generate caption
    caption = generate_caption(processed_frame, model, idx2word, word2idx, max_caption_length)

    # Display frame with caption
    cv2.putText(frame, caption, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)
    cv2.imshow('Live Captioning', frame)

    if cv2.waitKey(1) & 0xFF == ord('q'):  # Exit on pressing 'q'
        break

cap.release()
cv2.destroyAllWindows()

Cannot open camera
Starting camera. Press 'q' to exit.
Can't receive frame (stream end?). Exiting ...


In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()
with open('model.tflite', 'wb') as f:
    f.write(tflite_model)