In [None]:
# Install required libraries
!pip install moviepy transformers yt_dlp wget pytube

# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Import necessary libraries
import os
import cv2
import numpy as np
import moviepy.editor as mp
import wget
import yt_dlp as youtube_dl
import tensorflow as tf
from google.colab import files
from transformers import AutoTokenizer, AutoModelForCausalLM
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Model
from tensorflow.keras.layers import (
    Input,
    ConvLSTM2D,
    BatchNormalization,
    Flatten,
    Conv2D,
    MaxPooling2D,
    GlobalAveragePooling2D,
    TimeDistributed,
    Embedding,
    LSTM,
    Dense,
    Dropout,
    RepeatVector,
    Add,
    Activation,
    Permute,
    Reshape,
    Softmax,
    Dot
)
import pickle
import glob
import zipfile


Download video and caption files from dataset and extract frames from video.

In [None]:
def download_dataset_files(video_urls_path="video_urls.txt", captions_path="captions.txt"):
    # URLs for the data files (replace these with the actual links provided)
    ## GET DATA HERE: https://ivi.fnwi.uva.nl/isis/mediamill/datasets/videostory.php
    video_urls_link = "https://isis-data.science.uva.nl/mediamill/videostory/content/datasets/VideoStory46K/urls.txt"
    captions_link = "https://isis-data.science.uva.nl/mediamill/videostory/content/datasets/VideoStory46K/titles_stemmed.txt"

    try:
        # Download the files if they do not already exist
        if not os.path.exists(video_urls_path):
            #print(f"Downloading video URLs to {video_urls_path}...")
            wget.download(video_urls_link, video_urls_path)

        if not os.path.exists(captions_path):
            #print(f"Downloading captions to {captions_path}...")
            wget.download(captions_link, captions_path)
    except Exception as e:
        print(f"Error downloading dataset files: {e}")

def load_data(video_urls_path="video_urls.txt", captions_path="captions.txt"):
    try:
        with open(video_urls_path, 'r') as f:
            video_urls = [line.strip() for line in f.readlines()]

        with open(captions_path, 'r') as f:
            captions = [line.strip() for line in f.readlines()]

        return video_urls, captions
    except FileNotFoundError as e:
        print(f"Error loading data: {e}")
        return [], []
    except Exception as e:
        print(f"Unexpected error loading data: {e}")
        return [], []

# Step 1: Download the video temporarily using youtube-dl
def download_video_temp(url, temp_video_path="/content/temp_video.mp4"):
    ydl_opts = {
        'outtmpl': temp_video_path,
        'format': 'mp4',
        'quiet': True
    }
    try:
        with youtube_dl.YoutubeDL(ydl_opts) as ydl:
            print(f"Downloading video from URL: {url}")
            ydl.download([url])
        return temp_video_path
    except Exception as e:
        print(f"Failed to download video from URL: {url}. Error: {e}")
        return None

# Step 2: Extract frames from the video
def extract_frames_from_video(video_path, num_frames=10, target_size=(64, 64)):
    try:
        cap = cv2.VideoCapture(video_path)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        frame_step = max(1, total_frames // num_frames)
        frames = []

        for i in range(num_frames):
            cap.set(cv2.CAP_PROP_POS_FRAMES, i * frame_step)
            ret, frame = cap.read()
            if ret:
                frame_resized = cv2.resize(frame, target_size)
                frame_rgb = cv2.cvtColor(frame_resized, cv2.COLOR_BGR2RGB)
                frames.append(frame_rgb)

        cap.release()
        return np.array(frames)
    except Exception as e:
        print(f"Error extracting frames from video {video_path}: {e}")
        return np.array([])

def process_videos_and_captions(video_urls, captions, num_frames=10):
    processed_data = []

    for url, caption in zip(video_urls, captions):
        try:
            # Download video temporarily
            temp_video_path = download_video_temp(url)
            if temp_video_path is None:
                continue

            # Extract frames
            video_frames = extract_frames_from_video(temp_video_path, num_frames=num_frames)
            if video_frames.size == 0:
                print(f"Skipping video {url} as no frames were extracted.")
                continue

            # Store the result (frames + caption pair)
            processed_data.append((video_frames, caption))

            # Delete temporary video to save space
            if os.path.exists(temp_video_path):
                os.remove(temp_video_path)
        except Exception as e:
            print(f"Error processing video {url}: {e}")
            continue

    return processed_data

def preprocess_captions(captions, max_caption_length):
    print("Preprocessing captions...")
    try:
        tokenizer = Tokenizer(oov_token="<UNK>")
        tokenizer.fit_on_texts(captions)
        word_index = tokenizer.word_index
        vocab_size = len(word_index) + 1  # Reserve 0 for padding
        sequences = tokenizer.texts_to_sequences(captions)
        padded_sequences = pad_sequences(sequences, maxlen=max_caption_length, padding="post")
        return tokenizer, vocab_size, padded_sequences
    except Exception as e:
        print(f"Error preprocessing captions: {e}")
        return None, 0, None

In [None]:
def batch_process_videos_and_captions(video_urls, captions, num_frames=10, batch_size = 50, batches = 50):
    # has to be tracked manually rn. Replace with last saved video url. DO NOT FORGET: OTHERWISE DUPLICATE DATA
    start = 3000 #video_urls.index("http://www.youtube.com/watch?v=_XlZ7er6HUQ&feature=youtube_gdata_player")
    current_batch = 0
    print(start)
    for i in range(start+1, len(video_urls), batch_size):
        batch_urls = video_urls[i:i + batch_size]
        batch_captions = captions[i:i + batch_size]

        # Process batch
        batch_data = process_videos_and_captions(batch_urls, batch_captions)

        # Save intermediate results
        with open(f"processed_data_batch_{i//batch_size}.pkl", "wb") as f:
            pickle.dump(batch_data, f)
        print(f"Batch {i//batch_size} saved!")
        current_batch += 1
        if current_batch >= batches:
            break

    # sip batches into single zip file
    zip_batches()
    return batch_data

def load_batches():
    zip_filename = "processed_data.zip"
    if not os.path.exists(zip_filename):
        print("No ZIP file found. Processed data cannot be loaded.")
        return []

    processed_data = []
    with zipfile.ZipFile(zip_filename, "r") as zipf:
        zipf.extractall()  # Extract all batch files
        batch_files = glob.glob("processed_data_batch_*.pkl")
        for file in batch_files:
            with open(file, "rb") as f:
                batch_data = pickle.load(f)
                processed_data.extend(batch_data)
            os.remove(file)  # Clean up extracted files
    print("Processed data loaded successfully.")
    return processed_data

def zip_batches():
    batch_files = glob.glob("processed_data_batch_*.pkl")
    if not batch_files:
        print("No batch files to zip.")
        return

    zip_filename = "processed_data.zip"
    with zipfile.ZipFile(zip_filename, "a") as zipf:  # Open in append mode
        for file in batch_files:
            if file not in zipf.namelist():  # Avoid duplicate entries
                zipf.write(file)
                os.remove(file)  # Remove the file after adding to the zip
    print(f"All batch files added to {zip_filename} and deleted.")

    from google.colab import drive

def upload_to_drive(file_path, drive_folder="/content/drive/MyDrive/"):
    """
    Uploads a file to Google Drive and provides a sharable link.

    Args:
        file_path (str): Path to the file you want to upload.
        drive_folder (str): Google Drive folder where the file will be uploaded. Default is MyDrive.

    Returns:
        str: A message with the Google Drive path for sharing.
    """
    zip_batches()
    # Mount Google Drive
    drive.mount('/content/drive')

    # Check if file exists
    if not os.path.exists(file_path):
        return f"Error: {file_path} does not exist."

    # Copy file to Google Drive
    destination = os.path.join(drive_folder, os.path.basename(file_path))
    !cp {file_path} {destination}

    # Output Google Drive file path for sharing
    shareable_path = f"https://drive.google.com/file/{os.path.basename(destination)}"
    print(f"File uploaded successfully to: {destination}")
    print(f"Share this file using this link: {shareable_path}")
    return shareable_path



In [None]:
def video_upload(url, temp_video_path="/content/temp_analysis.mp4"):
    try:
        from yt_dlp import YoutubeDL  # Use yt-dlp for better reliability
        ydl_opts = {
            'outtmpl': temp_video_path,
            'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/mp4',
            'quiet': True
        }
        with YoutubeDL(ydl_opts) as ydl:
            print(f"Downloading video from URL: {url}")
            ydl.download([url])
        if not os.path.exists(temp_video_path) or os.path.getsize(temp_video_path) == 0:
            print("Downloaded video is invalid.")
            return None
        return temp_video_path
    except Exception as e:
        print(f"Failed to download video from URL: {url}. Error: {e}")
        return None

def video_split(video_path, seconds=8, overlap=2):
    try:
        cap = cv2.VideoCapture(video_path)
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        duration = total_frames / fps

        clips = []
        start_time = 0

        while start_time < duration:
            end_time = min(start_time + seconds, duration)
            clips.append((start_time, end_time))
            start_time += seconds - overlap

        cap.release()
        return clips
    except Exception as e:
        print(f"Error splitting video: {e}")
        return []

def extract_frames_from_clips(video_path, clips, num_frames=10, target_size=(64, 64)):
    """
    Extracts evenly spaced frames from each video clip.
    """
    frames = []
    try:
        cap = cv2.VideoCapture(video_path)
        fps = int(cap.get(cv2.CAP_PROP_FPS))

        for start_time, end_time in clips:
            clip = []
            start_frame = int(start_time * fps)
            end_frame = int(end_time * fps)
            frame_step = max(1, (end_frame - start_frame) // num_frames)

            for i in range(num_frames):
                frame_idx = start_frame + (i * frame_step)
                if frame_idx > end_frame:
                    break
                cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
                ret, frame = cap.read()
                if ret:
                    frame_resized = cv2.resize(frame, target_size)
                    frame_rgb = cv2.cvtColor(frame_resized, cv2.COLOR_BGR2RGB)
                    clip.append(frame_rgb)
                else:
                    print(f"Failed to read frame at index {frame_idx}.")
            frames.append(np.array(clip))

        cap.release()
        print(f"Extracted {len(frames)} frames from video.")
        return np.array(frames, dtype=np.float32) / 255.0  # Normalize to [0, 1]
    except Exception as e:
        print(f"Error extracting frames: {e}")
        return np.array([])

def process_video(url):
    try:
        # Download the video
        video_path = video_upload(url)
        if not video_path:
            print("Failed to download video.")
            return None

        # Split the video into time-based clips
        clips = video_split(video_path)
        if not clips:
            print("Failed to split video into clips.")
            return None

        # Extract frames from the clips
        video_frames = extract_frames_from_clips(video_path, clips)
        if video_frames.size == 0:
            print("Failed to extract frames.")
            return None

        return video_frames
    except Exception as e:
        print(f"Error processing video: {e}")
        return None

#test
#process_video("https://www.youtube.com/watch?v=zWH_9VRWn8Y")

In [None]:
def combine_processed_data(zip_path, output_path="processed_data.pkl"):
    # Step 1: Unzip the file
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall("extracted_data")  # Extract to a folder called 'extracted_data'

    # Step 2: Get all files matching the pattern
    batch_files = glob.glob("extracted_data/processed_data_batch_*.pkl")
    print(f"Found {len(batch_files)} batch files to combine.")

    combined_data = []

    # Step 3: Load and combine the .pkl files
    for file in batch_files:
        with open(file, 'rb') as f:
            data = pickle.load(f)
            if isinstance(data, list):
                combined_data.extend(data)  # Combine lists
            elif isinstance(data, dict):
                combined_data.append(data)  # Add dictionaries to a list
            else:
                print(f"Skipping {file}: Unsupported data type.")

    # Step 4: Save the combined data to a new .pkl file
    with open(output_path, 'wb') as f:
        pickle.dump(combined_data, f)

    print(f"Combined data saved to {output_path}.")

combine_processed_data("drive/MyDrive/processed_data.zip")

Found 59 batch files to combine.
Combined data saved to processed_data.pkl.


In [None]:
def video_encoder(input_shape):
    inputs = Input(shape=input_shape)

    # First ConvLSTM2D block
    x = ConvLSTM2D(filters=32, kernel_size=(3, 3), padding='same', return_sequences=True, activation="relu")(inputs)
    x = BatchNormalization()(x)

    # Second ConvLSTM2D block
    x = ConvLSTM2D(filters=64, kernel_size=(3, 3), padding='same', return_sequences=True, activation="relu")(x)
    x = BatchNormalization()(x)

    # TimeDistributed Conv2D layers for spatial processing of each frame
    x = TimeDistributed(Conv2D(filters=128, kernel_size=(3, 3), padding='same', activation="relu"))(x)
    x = TimeDistributed(MaxPooling2D(pool_size=(2, 2)))(x)
    x = TimeDistributed(BatchNormalization())(x)

    # Another Conv2D block within TimeDistributed
    x = TimeDistributed(Conv2D(filters=128, kernel_size=(3, 3), padding='same', activation="relu"))(x)
    x = TimeDistributed(MaxPooling2D(pool_size=(2, 2)))(x)

    # Apply GlobalAveragePooling2D to collapse spatial dimensions (height, width) for each frame
    x = BatchNormalization()(x)

    # Flatten the output for the fully connected layer
    x = Flatten()(x)

    return Model(inputs, x)


# Step 1: Load processed data from the .pkl file
def load_processed_data(file_path="processed_data.pkl"):
    with open(file_path, 'rb') as f:
        combined_data = pickle.load(f)
    print(f"Loaded {len(combined_data)} entries from {file_path}")
    return combined_data

# Step 1: Load processed data
combined_data = load_processed_data("processed_data.pkl")

# Extract video frames and captions
video_frames = np.array([item[0] for item in combined_data])  # Assuming item[0] contains video frame arrays
captions = [item[1] for item in combined_data]  # Assuming item[1] contains captions as strings

tokenizer = Tokenizer(oov_token="<UNK>")
tokenizer.fit_on_texts(captions)
word_index = tokenizer.word_index

# Define encoder
input_shape = (10, 64, 64, 3)
encoder = video_encoder(input_shape)
encoder.summary()
vocab_size = len(word_index) + 1  # Include padding (0) in vocab size
max_caption_length = 20

# Convert captions to padded sequences
sequences = tokenizer.texts_to_sequences(captions)
padded_captions = pad_sequences(sequences, maxlen=max_caption_length, padding="post")

# Prepare inputs and targets
X_video = video_frames  # Video frame data
X_caption = padded_captions[:, :-1]  # Input captions (exclude the last token)
y_caption = padded_captions[:, 1:]  # Target captions (exclude the first token)

# Debugging: Verify shapes
print(f"Video frames shape: {X_video.shape}")  # Should match input_shape of video encoder
print(f"X_caption shape: {X_caption.shape}")   # Should match input_length of embedding layer
print(f"y_caption shape: {y_caption.shape}")   # Should match decoder's output shape

# Example model adjustment (if input shape mismatch persists)
video_input = Input(shape=X_video.shape[1:])  # Shape of video frames
caption_input = Input(shape=(max_caption_length,)) # Match X_caption shape

# Define Embedding layer with appropriate input_length
caption_embedding = Embedding(input_dim=vocab_size, output_dim=256, input_length=X_caption.shape[1])(caption_input)
decoder_lstm = LSTM(256, return_sequences=True)(caption_embedding)
output = TimeDistributed(Dense(vocab_size, activation="softmax"))(decoder_lstm)


Loaded 1576 entries from processed_data.pkl





Video frames shape: (1576, 10, 64, 64, 3)
X_caption shape: (1576, 19)
y_caption shape: (1576, 19)


Building Decoder for text generation

In [None]:
def text_decoder(vocab_size, embedding_dim, max_caption_length):
    decoder_input = Input(shape=(max_caption_length,))
    embedding = Embedding(input_dim=vocab_size, output_dim=embedding_dim, mask_zero=True)(decoder_input)
    lstm = LSTM(256, return_sequences=True, dropout=0.3, recurrent_dropout=0.3)(embedding)
    dropout_output = Dropout(0.3)(lstm)
    output = Dense(vocab_size, activation='softmax')(dropout_output)
    return Model(decoder_input, output)

# Define decoder
embedding_dim = 256
decoder = text_decoder(vocab_size, embedding_dim, max_caption_length)
decoder.summary()


Combined Encoder and Decoder

In [None]:
def video_to_text_model(input_shape, vocab_size, embedding_dim, max_caption_length):
    # Encoder
    encoder = video_encoder(input_shape)
    video_features = encoder.output  # Shape: (None, feature_dim)

    # Decoder
    decoder_input = Input(shape=(max_caption_length - 1,))  # Input captions
    embedding = Embedding(input_dim=vocab_size, output_dim=embedding_dim, mask_zero=True)(decoder_input)  # Shape: (None, max_caption_length-1, embedding_dim)

    # Use video features as the initial state of the LSTM
    video_state_h = Dense(256, activation='relu')(video_features)  # Hidden state
    video_state_c = Dense(256, activation='relu')(video_features)  # Cell state

    lstm = LSTM(256, return_sequences=True)(embedding, initial_state=[video_state_h, video_state_c])

    # Output layer
    output = Dense(vocab_size, activation='softmax')(lstm)  # Shape: (None, max_caption_length-1, vocab_size)

    return Model([encoder.input, decoder_input], output)

model = video_to_text_model(input_shape, vocab_size, embedding_dim, max_caption_length)
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
model.summary()


In [None]:

# Train the model
epochs = 30
batch_size = 16
model.fit([X_video, X_caption], y_caption, epochs=epochs, batch_size=batch_size)


Epoch 1/30
[1m99/99[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5243s[0m 53s/step - accuracy: 0.5556 - loss: 7.4519
Epoch 2/30
[1m37/99[0m [32m━━━━━━━[0m[37m━━━━━━━━━━━━━[0m [1m54:39[0m 53s/step - accuracy: 0.6167 - loss: 6.7450

Training

Generate Captions/Alt text

In [None]:
import numpy as np
from keras.models import Sequential
from keras.layers import Dense, Input

def generate_caption(model, tokenizer, clip_frames, max_caption_length):
    """
    Generate a transcript for a video using the encoder-decoder model, processing clips individually.
    """
    # Initialize the complete transcript
    transcript = []

    # Process each set of frames (clip) in the array
    for clip_index, frames in enumerate(clip_frames):
        print(f"Processing clip {clip_index + 1}/{len(clip_frames)}...")

        # Initialize the decoder input with the <start> token
        start_token = tokenizer.word_index.get("<start>", 1)  # Default to 1 if <start> is not in the vocab
        decoder_input = np.zeros((1, max_caption_length - 1))
        decoder_input[0, 0] = start_token

        # Initialize generated caption for the clip
        generated_caption = []

        # Generate the caption word by word
        for i in range(1, max_caption_length - 1):
            # Predict the next word based on the input so far
            # Reshape frames to add a batch dimension
            frames_reshaped = frames[np.newaxis, ...]  # Add batch dimension
            predictions = model.predict([frames_reshaped, decoder_input])

            # Select the word with the highest probability
            next_word_index = np.argmax(predictions[0, i - 1, :])

            # Stop if the <end> token is generated
            if next_word_index == tokenizer.word_index.get("<end>", 2):  # Default to 2 if <end> is not in the vocab
                break

            # Append the predicted word to the caption
            generated_caption.append(tokenizer.index_word.get(next_word_index, "<UNK>"))

            # Update the decoder input with the predicted word
            decoder_input[0, i] = next_word_index

        # Join the words into a caption and append to the transcript
        transcript.append(" ".join(generated_caption))

    # Return the complete transcript as a single string
    return " ".join(transcript)

link = "https://www.youtube.com/watch?v=zWH_9VRWn8Y"
clip_frames = process_video(link)
print(len(clip_frames))
print(clip_frames[0].shape)
# Generate a caption for the unseen video
caption = generate_caption(model, tokenizer, clip_frames, max_caption_length)
print("Generated caption:", caption)


NameError: name 'process_video' is not defined

Generate caption on video from training set