In [1]:
# Install dependencies
!pip install tensorflow opencv-python-headless --quiet


In [2]:
# Create directories
import os

os.makedirs("data/videos", exist_ok=True)
os.makedirs("data/captions", exist_ok=True)


In [3]:
# Upload the four MSVD caption files
from google.colab import files
print("Upload `map.txt`, `train.txt`")
uploaded = files.upload()

# Save them into data/captions
import shutil
for fn in uploaded.keys():
    shutil.move(fn, f"data/captions/{fn}")


Upload `map.txt`, `train.txt`


Saving map.txt to map.txt
Saving train.txt to train.txt


In [4]:
# Upload videos (batch upload recommended)
print("Upload your video clips...")
uploaded_videos = files.upload()

for fn in uploaded_videos.keys():
    shutil.move(fn, f"data/videos/{fn}")


Upload your video clips...


Saving -wa0umYJVGg_23_41.avi to -wa0umYJVGg_23_41.avi
Saving -wa0umYJVGg_100_115.avi to -wa0umYJVGg_100_115.avi
Saving -wa0umYJVGg_117_123.avi to -wa0umYJVGg_117_123.avi
Saving -wa0umYJVGg_139_157.avi to -wa0umYJVGg_139_157.avi
Saving -wa0umYJVGg_168_176.avi to -wa0umYJVGg_168_176.avi
Saving -wa0umYJVGg_271_276.avi to -wa0umYJVGg_271_276.avi
Saving -wa0umYJVGg_286_290.avi to -wa0umYJVGg_286_290.avi
Saving 05gNigkqfNU_24_32.avi to 05gNigkqfNU_24_32.avi
Saving 05gNigkqfNU_25_34.avi to 05gNigkqfNU_25_34.avi
Saving 05gNigkqfNU_78_84.avi to 05gNigkqfNU_78_84.avi
Saving 05gNigkqfNU_11_23.avi to 05gNigkqfNU_11_23.avi
Saving -_hbPLsZvvo_5_8.avi to -_hbPLsZvvo_5_8.avi
Saving -_hbPLsZvvo_18_25.avi to -_hbPLsZvvo_18_25.avi
Saving -_hbPLsZvvo_19_25.avi to -_hbPLsZvvo_19_25.avi
Saving -_hbPLsZvvo_19_26.avi to -_hbPLsZvvo_19_26.avi
Saving -_hbPLsZvvo_43_55.avi to -_hbPLsZvvo_43_55.avi
Saving -_hbPLsZvvo_49_55.avi to -_hbPLsZvvo_49_55.avi
Saving -_hbPLsZvvo_172_179.avi to -_hbPLsZvvo_172_179.avi
Savi

In [5]:

def map_captions(map_file, train_file, output_file):
    """Maps video IDs in train.txt to original video names using map.txt.

    Args:
        map_file: Path to the map.txt file.
        train_file: Path to the train.txt file.
        output_file: Path to the output file.
    """
    video_map = {}
    with open(map_file, 'r') as f:
        for line in f:
            video_id, video_name = line.strip().split()
            video_map[video_id] = video_name

    with open(train_file, 'r') as f_in, open(output_file, 'w') as f_out:
        for line in f_in:
            parts = line.strip().split()
            video_id = parts[0]
            caption = " ".join(parts[1:])  # Combine caption parts

            if video_id in video_map:
                f_out.write(f"{video_map[video_id]} {caption}\n")
            else:
                print(f"Warning: Video ID '{video_id}' not found in map.txt")


# Example usage:
map_captions("data/captions/map.txt", "data/captions/train.txt", "data/captions/mapped_train.txt")


CAPTION DICTIONARY

In [6]:
import json

def read_captions_from_file(file_path):
    captions_dict = {}

    with open(file_path, 'r') as file:
        for line in file:
            parts = line.strip().split(' ', 1)  # Split into 2 parts: video ID and caption
            if len(parts) == 2:
                video_id, caption = parts
                # If the video ID already exists, append the caption to the list
                if video_id in captions_dict:
                    captions_dict[video_id].append(caption)
                else:
                    captions_dict[video_id] = [caption]

    return captions_dict

# Example usage:
file_path = "/content/data/captions/mapped_train.txt"  # Replace with the actual path to your text file
captions_dict = read_captions_from_file(file_path)

# Save the dictionary to a JSON file
output_file = "captions_dict.json"  # Specify the output file path
with open(output_file, 'w') as json_file:
    json.dump(captions_dict, json_file, indent=4)

print(f"Captions saved to {output_file}")


Captions saved to captions_dict.json


FEATURE EXTRACTION

In [9]:
import os
import cv2
import numpy as np
from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.image import img_to_array

# Load pre-trained CNN (VGG16)
base_model = VGG16(weights="imagenet")
model = Model(inputs=base_model.input, outputs=base_model.get_layer("fc2").output)

# Frame extraction
def extract_frames(video_path, num_frames=10):
    cap = cv2.VideoCapture(video_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    frame_indices = np.linspace(0, total_frames - 1, num_frames, dtype=int)
    frames = []

    for idx in frame_indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ret, frame = cap.read()
        if ret:
            frame = cv2.resize(frame, (224, 224))
            frame = img_to_array(frame)
            frame = preprocess_input(frame)
            frames.append(frame)
    cap.release()
    return np.array(frames)

# Feature extractor
def extract_video_features(video_path):
    frames = extract_frames(video_path)
    if len(frames) == 0:
        return None
    features = model.predict(frames, verbose=0)
    return np.mean(features, axis=0)

# Paths
input_video_folder = "/content/data/videos"
output_feature_folder = "/content/data/features"
os.makedirs(output_feature_folder, exist_ok=True)

# Loop through videos
for video_file in os.listdir(input_video_folder):
    if video_file.endswith(".mp4"):
        video_path = os.path.join(input_video_folder, video_file)
        video_id = os.path.splitext(video_file)[0]
        print(f"Extracting from {video_file}...")
        features = extract_video_features(video_path)
        if features is not None:
            np.save(os.path.join(output_feature_folder, f"{video_id}.npy"), features)


Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/vgg16/vgg16_weights_tf_dim_ordering_tf_kernels.h5
[1m553467096/553467096[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 0us/step


In [11]:
# Extract features for all captioned videos
video_features = {}
missing_files = []

for video_file in captions_dict.keys():
    path = f"data/videos/{video_file}.avi"  # Ensure extension matches your dataset
    print(f"Extracting features from {video_file}...")
    feats = extract_video_features(path)
    if feats is not None:
        video_features[video_file] = feats
    else:
        print(f"Failed to extract features for {video_file}")
        missing_files.append(video_file)

print(f"Extracted features for {len(video_features)} videos")

Extracting features from -_hbPLsZvvo_172_179...
Extracting features from -_hbPLsZvvo_18_25...
Extracting features from -_hbPLsZvvo_19_25...
Extracting features from -_hbPLsZvvo_19_26...
Extracting features from -_hbPLsZvvo_211_219...
Extracting features from -_hbPLsZvvo_269_275...
Extracting features from -_hbPLsZvvo_288_305...
Extracting features from -_hbPLsZvvo_323_328...
Extracting features from -_hbPLsZvvo_43_55...
Extracting features from -_hbPLsZvvo_49_55...
Extracting features from -_hbPLsZvvo_5_8...
Extracting features from -wa0umYJVGg_100_115...
Extracting features from -wa0umYJVGg_117_123...
Extracting features from -wa0umYJVGg_139_157...
Extracting features from -wa0umYJVGg_168_176...
Extracting features from -wa0umYJVGg_23_41...
Extracting features from -wa0umYJVGg_271_276...
Extracting features from -wa0umYJVGg_286_290...
Extracting features from 05gNigkqfNU_11_23...
Extracting features from 05gNigkqfNU_24_32...
Extracting features from 05gNigkqfNU_25_34...
Extracting fea

In [20]:
# Save features dictionary for later use
with open("video_features.pkl", "wb") as f:
    pickle.dump(video_features, f)

print("Saved features to video_features.pkl")

Saved features to video_features.pkl


TOKENIZE

In [17]:
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

# Prepare all captions in one list
all_captions = []
for caps in captions_dict.values():
    for cap in caps:
        all_captions.append(f"<start> {cap} <end>")  # Add tokens

# Fit tokenizer
tokenizer = Tokenizer(num_words=5000, oov_token="<unk>")
tokenizer.fit_on_texts(all_captions)

vocab_size = len(tokenizer.word_index) + 1
print("Vocab size:", vocab_size)

# Max caption length
max_length = max(len(caption.split()) for caption in all_captions)
print("Max caption length:", max_length)


Vocab size: 858
Max caption length: 34


In [19]:
# Clean and tokenize captions
def clean_caption(caption):
    caption = caption.lower().strip()
    return f"startseq {caption} endseq"

all_captions = []
for caps in captions.values():
    for cap in caps:
        all_captions.append(clean_caption(cap))

tokenizer = Tokenizer()
tokenizer.fit_on_texts(all_captions)
vocab_size = len(tokenizer.word_index) + 1
max_length = max(len(c.split()) for c in all_captions)


TRAINING SEQUENCE

In [23]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, LSTM, Embedding, Dropout, add
from tensorflow.keras.preprocessing.sequence import pad_sequences
import pickle

# Clean and add special tokens
def clean_caption(caption):
    caption = caption.lower().strip()
    return f"<start> {caption} <end>"

# Create training sequences
def create_sequences(tokenizer, max_length, descriptions, features, vocab_size):
    X1, X2, y = [], [], []
    for key, caps in descriptions.items():
        if key not in features:
            continue
        feature = features[key]
        for cap in caps:
            cap = clean_caption(cap)
            seq = tokenizer.texts_to_sequences([cap])[0]
            for i in range(1, len(seq)):
                in_seq, out_seq = seq[:i], seq[i]
                in_seq = pad_sequences([in_seq], maxlen=max_length, padding='post')[0]
                out_seq = tf.keras.utils.to_categorical([out_seq], num_classes=vocab_size)[0]
                X1.append(feature)
                X2.append(in_seq)
                y.append(out_seq)
    return np.array(X1), np.array(X2), np.array(y)

# Define the model
def define_model(vocab_size, max_length):
    # Feature extractor (video)
    inputs1 = Input(shape=(4096,))
    fe1 = Dropout(0.5)(inputs1)
    fe2 = Dense(256, activation="relu")(fe1)

    # Sequence processor (caption)
    inputs2 = Input(shape=(max_length,))
    se1 = Embedding(vocab_size, 256, mask_zero=False)(inputs2)  # mask_zero=False for compatibility
    se2 = Dropout(0.5)(se1)
    se3 = LSTM(256)(se2)

    # Decoder (fusion)
    decoder1 = add([fe2, se3])
    decoder2 = Dense(256, activation="relu")(decoder1)
    outputs = Dense(vocab_size, activation="softmax")(decoder2)

    # Final model
    model = Model(inputs=[inputs1, inputs2], outputs=outputs)
    model.compile(loss="categorical_crossentropy", optimizer="adam")
    return model


In [25]:
model = define_model(vocab_size, max_length)
model.summary()

In [26]:
# Train the model
model.fit([X1, X2], y, epochs=10, batch_size=64)
model.save("video_caption_model.h5")


Epoch 1/10
[1m221/221[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m64s[0m 274ms/step - loss: 4.7238
Epoch 2/10
[1m221/221[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m81s[0m 271ms/step - loss: 3.3455
Epoch 3/10
[1m221/221[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m59s[0m 265ms/step - loss: 2.8560
Epoch 4/10
[1m221/221[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m82s[0m 267ms/step - loss: 2.6090
Epoch 5/10
[1m221/221[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m58s[0m 263ms/step - loss: 2.4532
Epoch 6/10
[1m221/221[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m83s[0m 269ms/step - loss: 2.3592
Epoch 7/10
[1m221/221[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m81s[0m 266ms/step - loss: 2.2366
Epoch 8/10
[1m221/221[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m81s[0m 264ms/step - loss: 2.2212
Epoch 9/10
[1m221/221[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m61s[0m 275ms/step - loss: 2.1172
Epoch 10/10
[1m221/221[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[



caption prediction

greedy search

In [27]:
# 🧾 Caption prediction
def generate_caption(model, tokenizer, photo, max_length):
    in_text = "startseq"
    for _ in range(max_length):
        sequence = tokenizer.texts_to_sequences([in_text])[0]
        sequence = pad_sequences([sequence], maxlen=max_length)
        yhat = model.predict([photo.reshape((1,4096)), sequence], verbose=0)
        yhat = np.argmax(yhat)
        word = tokenizer.index_word.get(yhat)
        if word is None:
            break
        in_text += " " + word
        if word == "endseq":
            break
    return in_text.replace("startseq", "").replace("endseq", "").strip()


In [40]:
from tensorflow.keras.models import load_model

# Load the trained model
model = load_model("video_caption_model.h5")

# Load tokenizer and video features (if needed again)
with open("video_features.pkl", "rb") as f:
    video_features = pickle.load(f)

# Let's choose a random video to test
import random
sample_video = random.choice(list(video_features.keys()))
print(f" Testing video: {sample_video}")

# Extract feature
feature = video_features[sample_video]

# Generate caption
predicted_caption = generate_caption(model, tokenizer, feature, max_length)
print(f" Generated caption: {predicted_caption}")

# Show reference captions (ground truth)
print("\n Ground truth captions:")
for cap in captions[sample_video]:
    print("-", cap)




 Testing video: -_hbPLsZvvo_5_8
 Generated caption: dog dog dog dog dog dog dog dog dog dog dog dog dog dog dog dog dog dog dog is is is barking end end end end barking end barking end end end barking

 Ground truth captions:
- a dog is chewing
- a dog appears to be talking next to a woman cook
- a dog is barking
- a dog is barking
- a dog is barking
- a dog is barking
- a dog is chewing food
- a dog is barking
- a dog is eating
- a dog barks
- a dog is chewing something
- a dog is barking
- a dog is barking
- a dog is chewing on food
- the dog happily ate the sushi
- the dog is eating
- a dog barks
- the dog ate the sushi
- a dog is barking
- a puppy is barking
- a dog barking and cooking with her master in the kitchen
- a women cooked a tasty food with her dog
- a dog is craying
- learn to make an easy japanese lunch bento
- a dog is chewing
- a woman making a bento
- a dog is eating
- a dog is barking
- the dog is sitting
- a lady is cooking with dog
- a dog is barking
- a cooking s

beam search

In [38]:
def generate_caption_beam(model, tokenizer, video_feat, max_length, beam_width=3):
    start = ["startseq"]
    sequences = [(start, 0.0)]  # (caption so far, log probability)

    for _ in range(max_length):
        all_candidates = []
        for seq, score in sequences:
            if seq[-1] == "endseq":
                all_candidates.append((seq, score))
                continue
            # Convert to sequence
            sequence = tokenizer.texts_to_sequences([" ".join(seq)])[0]
            sequence = pad_sequences([sequence], maxlen=max_length, padding='post')
            yhat = model.predict([video_feat.reshape((1, 4096)), sequence], verbose=0)
            # Get top candidates
            top_indices = np.argsort(yhat[0])[-beam_width:]
            for idx in top_indices:
                word = tokenizer.index_word.get(idx)
                if word:
                    candidate = seq + [word]
                    log_prob = np.log(yhat[0][idx] + 1e-10)
                    all_candidates.append((candidate, score + log_prob))
        # Order by score
        ordered = sorted(all_candidates, key=lambda tup: tup[1], reverse=True)
        sequences = ordered[:beam_width]

    final_caption = sequences[0][0]
    return " ".join(final_caption).replace("startseq", "").replace("endseq", "").strip()


In [42]:
from tensorflow.keras.models import load_model

# Load the trained model
model = load_model("video_caption_model.h5")

# Load tokenizer and video features (if needed again)
with open("video_features.pkl", "rb") as f:
    video_features = pickle.load(f)

# Let's choose a random video to test
import random
sample_video = random.choice(list(video_features.keys()))
print(f" Testing video: {sample_video}")

# Extract feature
feature = video_features[sample_video]

# Generate caption
predicted_caption = generate_caption_beam(model, tokenizer, feature, max_length)
print(f" Generated caption: {predicted_caption}")

# Show reference captions (ground truth)
print("\n Ground truth captions:")
for cap in captions[sample_video]:
    print("-", cap)




 Testing video: 05gNigkqfNU_25_34
 Generated caption: one is cutting potatoes end salad end end salad end end end end salad end end end end end end end end end end end end end end end end end end end end

 Ground truth captions:
- a person cutting up potatoes
- a chef cuts a potato
- a person cuts potato wedges
- a person is chopping potato slices with a knife
- a person is chopping potatoes
- a person is slicing some potato
- a potato is being cut
- a woman chops strips of raw potato into cubes with a knife
- a woman is chopping a potato
- potatoes are being chopped
- someone chopped up a potatoe
- someone dicing potatoes
- someone is chopping potatoes
- someone is chopping up the potatoe
- someone is dicing potatoes
- the person is dicing potatoes
- slicing egg
- teaching how to make country potato salad
- a guy slicimg the eggs
- a cooker is cutting a potato
- some one is slicing potatoes
- a man is cutting potatto
- potatoes are being cut the proper way
- a man is cuting some potat

evaluation

In [32]:
import nltk
nltk.download('wordnet')
nltk.download('punkt')


[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


True

In [35]:
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from nltk.translate.meteor_score import meteor_score

def evaluate_caption(reference_captions, predicted_caption):
    """Compares a predicted caption against references using BLEU and METEOR."""
    smooth = SmoothingFunction().method4

    # Tokenize
    references = [ref.split() for ref in reference_captions]
    candidate = predicted_caption.split()

    bleu1 = sentence_bleu(references, candidate, weights=(1.0, 0, 0, 0), smoothing_function=smooth)
    bleu2 = sentence_bleu(references, candidate, weights=(0.5, 0.5, 0, 0), smoothing_function=smooth)

    print(f"BLEU-1: {bleu1:.4f}")
    print(f"BLEU-2: {bleu2:.4f}")

xtras

split stuff

In [None]:
caption_dict = {}

with open("data/captions/mapped_train.txt", "r") as f:
    for line in f:
        filename, caption = line.strip().split(" ", 1)  # split only once
        if filename not in caption_dict:
            caption_dict[filename] = []
        caption_dict[filename].append(caption)

print(f"Loaded {len(caption_dict)} videos with captions.")


feature extraction

In [None]:
# 🖼️ Extract features from videos (frame sampling)
import cv2
import numpy as np
from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.image import img_to_array
import pickle

# Load pre-trained VGG16 model
base_model = VGG16(weights="imagenet")
cnn_model = Model(inputs=base_model.input, outputs=base_model.get_layer("fc2").output)

def extract_frames(video_path, num_frames=10):
    frames = []
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error opening video file: {video_path}")
        return np.array([])
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    frame_idxs = np.linspace(0, total_frames - 1, num_frames, dtype=int)
    for idx in frame_idxs:
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ret, frame = cap.read()
        if ret:
            frame = cv2.resize(frame, (224, 224))
            frame = img_to_array(frame)
            frame = preprocess_input(frame)
            frames.append(frame)
    cap.release()
    return np.array(frames)

def extract_video_features(video_path):
    frames = extract_frames(video_path)
    if frames.size == 0:
        return None
    features = cnn_model.predict(frames, verbose=0)
    return np.mean(features, axis=0)


training seq

In [None]:
import numpy as np

def create_sequences(caption_dict, feature_path, tokenizer, max_length):
    X1, X2, y = [], [], []

    for video_name, captions in caption_dict.items():
        try:
            feature = np.load(f"{feature_path}/{video_name}.npy")
        except:
            continue  # skip missing videos

        for caption in captions:
            seq = tokenizer.texts_to_sequences([f"<start> {caption} <end>"])[0]
            for i in range(1, len(seq)):
                in_seq, out_seq = seq[:i], seq[i]
                in_seq = pad_sequences([in_seq], maxlen=max_length)[0]
                out_seq = np.zeros(vocab_size)
                out_seq[out_seq := out_seq] = 1.0  # one-hot encode

                X1.append(feature)       # video features
                X2.append(in_seq)        # input caption so far
                y.append(out_seq)        # next word to predict

    return np.array(X1), np.array(X2), np.array(y)

# Use it:
feature_path = r"C:\Users\sweth\Downloads\archive (1).zip\MSVD\features"
X1, X2, y = create_sequences(caption_dict, feature_path, tokenizer, max_length)


In [None]:
# Create training sequences
def create_sequences(tokenizer, max_length, descriptions, features):
    X1, X2, y = [], [], []
    for key, caps in descriptions.items():
        if key not in features:
            continue
        feature = features[key]
        for cap in caps:
            cap = clean_caption(cap)
            seq = tokenizer.texts_to_sequences([cap])[0]
            for i in range(1, len(seq)):
                in_seq, out_seq = seq[:i], seq[i]
                # pad sequences to the right instead of left
                in_seq = pad_sequences([in_seq], maxlen=max_length, padding='post')[0]
                out_seq = tf.keras.utils.to_categorical([out_seq], num_classes=vocab_size)[0]
                X1.append(feature)
                X2.append(in_seq)
                y.append(out_seq)
    return np.array(X1), np.array(X2), np.array(y)

X1, X2, y = create_sequences(tokenizer, max_length, captions, video_features)

# Define the model
def define_model(vocab_size, max_length):
    inputs1 = Input(shape=(4096,))
    fe1 = Dropout(0.5)(inputs1)
    fe2 = Dense(256, activation="relu")(fe1)

    inputs2 = Input(shape=(max_length,))
    se1 = Embedding(vocab_size, 256, mask_zero=False)(inputs2) # Set mask_zero to False
    se2 = Dropout(0.5)(se1)
    se3 = LSTM(256)(se2)

    decoder1 = add([fe2, se3])
    decoder2 = Dense(256, activation="relu")(decoder1)
    outputs = Dense(vocab_size, activation="softmax")(decoder2)

    model = Model(inputs=[inputs1, inputs2], outputs=outputs)
    model.compile(loss="categorical_crossentropy", optimizer="adam")
    return model

model = define_model(vocab_size, max_length)
model.summary()