<a href="https://colab.research.google.com/github/SridharaniKatipally/video-captioning-using-contextual-and-temporal-GANs/blob/main/videoCaptioningUsingContextuandAndTemporalGANs.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
import os
import pandas as pd
import cv2
import numpy as np
from keras.applications import ResNet50
from keras.models import Model, Sequential
from keras.layers import LSTM, Dense, Input
from keras.optimizers import Adam
from tqdm import tqdm
import zipfile


In [11]:
# Step 1: Unzip the Dataset
dataset_zip_path = '/content/dataset.zip'  # Update this path if needed
unzip_dir = '/content/dataset'
os.makedirs(unzip_dir, exist_ok=True)

with zipfile.ZipFile(dataset_zip_path, 'r') as zip_ref:
    zip_ref.extractall(unzip_dir)

print(f"Dataset extracted to {unzip_dir}")

Dataset extracted to /content/dataset


In [12]:

# Step 2: Move Videos to the Correct Directory
video_dir = '/content/videos'
os.makedirs(video_dir, exist_ok=True)

# Paths to uploaded individual video files
uploaded_videos = [
    '/content/A cat is sitting on a couch.mp4',
    '/content/A dog is playing with a ball.mp4',
    '/content/a person is riding a bicycle.mp4'
]


In [13]:
# Rename and move videos to the dataset directory
video_names = ["video1.mp4", "video2.mp4", "video3.mp4"]
for src, dest in zip(uploaded_videos, video_names):
    os.rename(src, os.path.join(video_dir, dest))

print("Videos moved to the video directory.")

Videos moved to the video directory.


In [14]:
# Step 3: Verify Files
annotations_path = os.path.join(unzip_dir, 'annotations.txt')
video_corpus_path = os.path.join(unzip_dir, 'video_corpus.csv')

print("Annotations file:", annotations_path)
print("Metadata file:", video_corpus_path)
print("Video files:", os.listdir(video_dir))

Annotations file: /content/dataset/annotations.txt
Metadata file: /content/dataset/video_corpus.csv
Video files: ['video2.mp4', 'video1.mp4', 'video3.mp4']


In [15]:
# Paths to extracted files
video_dir = os.path.join(unzip_dir, 'videos')
annotations_path = os.path.join(unzip_dir, 'annotations.txt')
metadata_path = os.path.join(unzip_dir, 'video_corpus.csv')

In [16]:
# Step 2: Load Metadata and Annotations
video_metadata = pd.read_csv(metadata_path)

In [17]:
# Parse annotations into a dictionary
annotation_dict = {}
with open(annotations_path, 'r') as f:
    for line in f:
        video_id, caption = line.split(' ', 1)
        annotation_dict.setdefault(video_id.strip(), []).append(caption.strip())

In [20]:
# Update paths in the metadata
video_metadata['Source'] = video_metadata['VideoID'].apply(lambda x: f"/content/videos/{x}")
print(video_metadata[['VideoID', 'Source']].head())


      VideoID                      Source
0  video1.mp4  /content/videos/video1.mp4
1  video2.mp4  /content/videos/video2.mp4
2  video3.mp4  /content/videos/video3.mp4


In [21]:
for _, row in video_metadata.iterrows():
    video_id = row['VideoID']
    video_path = row['Source']
    if not os.path.exists(video_path):
        print(f"Video file missing: {video_path}")
        continue
    output_dir = os.path.join(frames_dir, video_id.split('.')[0])
    extract_frames(video_path, output_dir)


Frames extracted and saved to /content/frames/video1
Frames extracted and saved to /content/frames/video2
Frames extracted and saved to /content/frames/video3


In [22]:
import numpy as np
from keras.applications import ResNet50
from keras.models import Model
import cv2

In [23]:
# Step 5: Define Contextual Feature Extraction Function
def extract_contextual_features(frame_dir):
    # Load ResNet50 pre-trained model
    base_model = ResNet50(weights='imagenet', include_top=False, pooling='avg')
    contextual_model = Model(inputs=base_model.input, outputs=base_model.output)

    features = []
    for frame in sorted(os.listdir(frame_dir)):
        img_path = os.path.join(frame_dir, frame)
        img = cv2.imread(img_path)
        if img is None:
            print(f"Error loading frame: {img_path}")
            continue
        img = cv2.resize(img, (224, 224))  # ResNet50 input size
        img = img / 255.0  # Normalize
        feature = contextual_model.predict(np.expand_dims(img, axis=0))
        features.append(feature.flatten())
    return np.array(features)

In [24]:
# Step 6: Extract Contextual Features for All Videos
features_dir = '/content/features'
os.makedirs(features_dir, exist_ok=True)

for video_id in os.listdir(frames_dir):
    frame_dir = os.path.join(frames_dir, video_id)
    contextual_features = extract_contextual_features(frame_dir)
    np.save(os.path.join(features_dir, f"{video_id}_features.npy"), contextual_features)
    print(f"Contextual features extracted and saved for {video_id}")

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5
[1m94765736/94765736[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 0us/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 3s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 174ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 176ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 281ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 177ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 173ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 182ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 175ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 186ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 174ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37

In [25]:
for feature_file in os.listdir(features_dir):
    features = np.load(os.path.join(features_dir, feature_file))
    print(f"{feature_file}: {features.shape}")


video2_features.npy: (113, 2048)
video1_features.npy: (173, 2048)
video3_features.npy: (106, 2048)


In [26]:
from keras.models import Sequential
from keras.layers import LSTM, Dense, Embedding, Input
from keras.optimizers import Adam

# Build the Temporal Model
def build_temporal_model():
    model = Sequential([
        Input(shape=(None, 2048)),  # Input: (sequence_length, feature_size)
        LSTM(256, return_sequences=True),
        LSTM(256),
        Dense(128, activation='relu'),
        Dense(1, activation='sigmoid')  # Output: Example binary classification (can modify for captioning)
    ])
    model.compile(loss='binary_crossentropy', optimizer=Adam(learning_rate=0.001), metrics=['accuracy'])
    return model

temporal_model = build_temporal_model()
temporal_model.summary()


In [28]:
from keras.layers import GRU, Embedding

def build_generator(vocab_size=10000, max_caption_len=20):
    model = Sequential([
        Input(shape=(None, 2048)),  # Input: contextual features
        GRU(256, return_sequences=True),
        GRU(256),
        Dense(128, activation='relu'),
        Dense(vocab_size, activation='softmax')  # Output: vocabulary distribution
    ])
    return model

generator = build_generator()
generator.summary()
#Define the Generator


In [29]:
def build_discriminator():
    model = Sequential([
        Dense(256, activation='relu', input_dim=20),  # Input: caption embedding (sequence length=20)
        Dense(128, activation='relu'),
        Dense(1, activation='sigmoid')  # Output: real or fake
    ])
    return model

discriminator = build_discriminator()
discriminator.compile(loss='binary_crossentropy', optimizer=Adam(learning_rate=0.001), metrics=['accuracy'])
discriminator.summary()
#Define the Discriminator

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


In [32]:
from keras.layers import Embedding, TimeDistributed

def build_generator(vocab_size=10000, embedding_dim=256, max_caption_len=20):
    model = Sequential([
        Input(shape=(None, 2048)),  # Contextual features
        GRU(256, return_sequences=True),
        GRU(256, return_sequences=True),
        TimeDistributed(Dense(embedding_dim, activation='relu')),  # Generate embeddings
        Dense(vocab_size, activation='softmax')  # Vocabulary distribution (optional)
    ])
    return model

generator = build_generator()
generator.summary()
#Updated Generator

In [33]:
from keras.layers import Flatten

def build_discriminator(embedding_dim=256, max_caption_len=20):
    model = Sequential([
        Input(shape=(max_caption_len, embedding_dim)),  # Sequence of embeddings
        Flatten(),  # Flatten the sequence for dense processing
        Dense(256, activation='relu'),
        Dense(128, activation='relu'),
        Dense(1, activation='sigmoid')  # Real or fake
    ])
    return model

discriminator = build_discriminator()
discriminator.compile(loss='binary_crossentropy', optimizer=Adam(learning_rate=0.001), metrics=['accuracy'])
discriminator.summary()
#Updated Discriminator


In [36]:
from keras.layers import GRU, TimeDistributed, Dense

def build_generator(embedding_dim=256):
    model = Sequential([
        Input(shape=(None, 2048)),  # Contextual features
        GRU(256, return_sequences=True),
        GRU(256, return_sequences=True),
        TimeDistributed(Dense(embedding_dim, activation='relu'))  # Generate embeddings
    ])
    return model

generator = build_generator()
generator.summary()
#Updated Generator1:

In [37]:
from keras.models import Model
from keras.layers import Input
from keras.optimizers import Adam

def build_gan(generator, discriminator):
    # Freeze discriminator weights during generator training
    discriminator.trainable = False

    # Input: contextual features
    gan_input = Input(shape=(None, 2048))  # Contextual features

    # Generate embeddings using the generator
    generated_embeddings = generator(gan_input)

    # Evaluate the generated embeddings using the discriminator
    gan_output = discriminator(generated_embeddings)

    # Compile GAN Model
    gan = Model(inputs=gan_input, outputs=gan_output)
    gan.compile(loss='binary_crossentropy', optimizer=Adam(learning_rate=0.0001), metrics=['accuracy'])
    return gan

gan_model = build_gan(generator, discriminator)
gan_model.summary()
#Adjust GAN Integration

In [40]:
# Toggle trainability of the discriminator
def set_trainable(model, trainable):
    for layer in model.layers:
        layer.trainable = trainable


In [48]:
import numpy as np
import tensorflow as tf

# Placeholder Data (Replace with real contextual features and embeddings)
real_contextual_features = np.random.rand(10, 20, 2048).astype('float32')  # Contextual features (batch_size, seq_len, feature_dim)
real_captions = np.ones((10, 1)).astype('float32')  # Real captions labeled as 1
fake_labels = np.zeros((10, 1)).astype('float32')   # Fake captions labeled as 0
gan_labels = np.ones((10, 1)).astype('float32')     # GAN labels for generator training

# Convert placeholders to TensorFlow tensors
real_contextual_features = tf.convert_to_tensor(real_contextual_features)
real_captions = tf.convert_to_tensor(real_captions)
fake_labels = tf.convert_to_tensor(fake_labels)
gan_labels = tf.convert_to_tensor(gan_labels)

# Recompile the discriminator to ensure no inconsistencies
discriminator.compile(loss='binary_crossentropy', optimizer=Adam(learning_rate=0.001), metrics=['accuracy'])

# Training Loop
epochs = 10

for epoch in range(epochs):
    print(f"\nEpoch {epoch + 1}/{epochs}")

    # Step 1: Generate Real Embeddings from the Generator
    try:
        real_embeddings = generator(real_contextual_features, training=False)  # Generate embeddings for real inputs
        print(f"Real embeddings shape: {real_embeddings.shape}")
    except Exception as e:
        print(f"Error generating embeddings: {e}")
        break

    # Ensure embeddings match the discriminator input shape
    if real_embeddings.shape[-1] != 256:
        print(f"Expected embeddings with last dimension 256, but got {real_embeddings.shape[-1]}")
        break

    # Step 2: Create Fake Embeddings
    try:
        fake_embeddings = tf.random.uniform(real_embeddings.shape, dtype=tf.float32)  # Generate fake embeddings
        print(f"Fake embeddings shape: {fake_embeddings.shape}")
    except Exception as e:
        print(f"Error generating fake embeddings: {e}")
        break

    # Step 3: Train Discriminator on Real and Fake Data
    set_trainable(discriminator, True)  # Enable discriminator training
    try:
        d_loss_real = discriminator.train_on_batch(tf.convert_to_tensor(real_embeddings), real_captions)
        print(f"Discriminator Real Loss: {d_loss_real}")
    except Exception as e:
        print(f"Error training discriminator on real embeddings: {e}")
        break

    try:
        d_loss_fake = discriminator.train_on_batch(tf.convert_to_tensor(fake_embeddings), fake_labels)
        print(f"Discriminator Fake Loss: {d_loss_fake}")
    except Exception as e:
        print(f"Error training discriminator on fake embeddings: {e}")
        break

    # Step 4: Train Generator (via GAN)
    set_trainable(discriminator, False)  # Freeze discriminator during GAN training
    try:
        g_loss = gan_model.train_on_batch(real_contextual_features, gan_labels)
        print(f"Generator Loss: {g_loss}")
    except Exception as e:
        print(f"Error training GAN: {e}")
        break



Epoch 1/10
Real embeddings shape: (10, 20, 256)
Fake embeddings shape: (10, 20, 256)
Discriminator Real Loss: [array(0.7139863, dtype=float32), array(0.2, dtype=float32)]
Discriminator Fake Loss: [array(0.6889399, dtype=float32), array(0.35, dtype=float32)]
Generator Loss: [array(0.6889399, dtype=float32), array(0.6889399, dtype=float32), array(0.2, dtype=float32), array(0.2, dtype=float32)]

Epoch 2/10
Real embeddings shape: (10, 20, 256)
Fake embeddings shape: (10, 20, 256)
Discriminator Real Loss: [array(0.6298826, dtype=float32), array(0.56666666, dtype=float32)]
Discriminator Fake Loss: [array(0.63386446, dtype=float32), array(0.575, dtype=float32)]
Generator Loss: [array(0.63386446, dtype=float32), array(0.63386446, dtype=float32), array(0.6, dtype=float32), array(0.6, dtype=float32)]

Epoch 3/10
Real embeddings shape: (10, 20, 256)
Fake embeddings shape: (10, 20, 256)
Discriminator Real Loss: [array(0.58507794, dtype=float32), array(0.66, dtype=float32)]
Discriminator Fake Loss

In [50]:
# Save models in the recommended Keras format
generator.save('/content/generator_model.keras')
discriminator.save('/content/discriminator_model.keras')
gan_model.save('/content/gan_model.keras')

print("Models saved in the native Keras format.")


Models saved in the native Keras format.


In [52]:
from keras.models import load_model
from keras.optimizers import Adam

# Load the models
loaded_generator = load_model('/content/generator_model.h5')
loaded_discriminator = load_model('/content/discriminator_model.h5')
loaded_gan = load_model('/content/gan_model.h5')

# Compile the models manually
loaded_discriminator.compile(loss='binary_crossentropy', optimizer=Adam(learning_rate=0.001), metrics=['accuracy'])
loaded_gan.compile(loss='binary_crossentropy', optimizer=Adam(learning_rate=0.0001), metrics=['accuracy'])

print("Models loaded and compiled successfully.")




Models loaded and compiled successfully.


In [53]:
# Assuming `new_video_features` is the extracted features of a new video
new_video_features = np.random.rand(1, 20, 2048).astype('float32')  # Replace with actual features
generated_embeddings = loaded_generator.predict(new_video_features)

print(f"Generated embeddings shape: {generated_embeddings.shape}")
# Use embeddings for further tasks (e.g., classification, caption generation)


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 741ms/step
Generated embeddings shape: (1, 20, 256)


In [54]:
# Example fake embeddings for evaluation
fake_embeddings = np.random.rand(1, 20, 256).astype('float32')  # Replace with actual data
fake_labels = np.zeros((1, 1))  # Fake data labeled as 0

loss, accuracy = loaded_discriminator.evaluate(fake_embeddings, fake_labels, verbose=0)
print(f"Discriminator Loss: {loss}, Accuracy: {accuracy}")


Discriminator Loss: 0.42174550890922546, Accuracy: 1.0


In [55]:
loaded_generator.save('/content/generator_model.keras', save_format='keras')
loaded_discriminator.save('/content/discriminator_model.keras', save_format='keras')
loaded_gan.save('/content/gan_model.keras', save_format='keras')

print("Models saved in .keras format.")




Models saved in .keras format.
