# Base definitions

In [2]:
import posixpath

# Root path for the project
ROOT_DIR = 'A:/IVADL'

# V3C1 path
V3C1_DIR = posixpath.join(ROOT_DIR, "V3C1-100")

In [3]:
import os

def get_all_videos(base_dir):
    """
    Get all directories under the given base directory.
    
    :param base_dir: The base directory to search under.
    :return: A list of all directories under the base directory.
    """
    all_dirs = []
    for root, dirs, files in os.walk(base_dir):
        for dir_name in dirs:
            all_dirs.append(dir_name)
        break
    return all_dirs


# Detect shot boundaries with Transnetv2
https://github.com/soCzech/TransNetV2/blob/master/inference/transnetv2.py

### Detect and generate scenes

In [86]:
from transnetv2 import TransNetV2
import os
import numpy as np
from PIL import Image

def make_prodictions(video_name):
    video_dir = posixpath.join(V3C1_DIR, video_name)

    # Check if the scenes file already exists
    scenes_full_path = posixpath.join(video_dir, f"{video_name}_scenes.txt")
    if posixpath.isfile(scenes_full_path):
        print(f"The scenes file {scenes_full_path} already exists. Skipping prediction.")
        return

    video_mp4_path = posixpath.join(video_dir, f"{video_name}.mp4")
    video_mov_path = posixpath.join(video_dir, f"{video_name}.mov")
    if posixpath.isfile(video_mp4_path):
        video_full_path = video_mp4_path
    elif posixpath.isfile(video_mov_path):
        video_full_path = video_mov_path
    else:
        raise FileNotFoundError(f"Neither .mp4 nor .mov video file exists for {video_name}.")

    # Predict the scenes in the video
    try:
        video_frames, single_frame_p, all_frame_p = model.predict_video(video_full_path)
    except Exception as e:
        raise RuntimeError(f"Error predicting video: {e}")

    # Convert predictions to a list of scenes
    list_of_scenes = model.predictions_to_scenes(predictions=all_frame_p)

    # Visualize the predictions on the video frames
    pil_image = model.visualize_predictions(
        frames=video_frames, predictions=(single_frame_p, all_frame_p)
    )

    # Save the visualization image (optional)
    visualization_path = posixpath.join(video_dir, f"{video_name}_visualization.png")
    pil_image.save(visualization_path)

    # Write the list of scenes to a file
    np.savetxt(scenes_full_path, list_of_scenes, fmt="%d")

# Specify the paths for weights and video file
weights_dir = posixpath.join(ROOT_DIR, "transnetv2-weights")

# Initialize the TransNetV2 model with the weights directory
model = TransNetV2(weights_dir)

videos = get_all_videos(V3C1_DIR)
for video in videos:
    make_prodictions(video)


The scenes file A:/IVADL/V3C1-100/00100/00100_scenes.txt already exists. Skipping prediction.
The scenes file A:/IVADL/V3C1-100/00101/00101_scenes.txt already exists. Skipping prediction.
The scenes file A:/IVADL/V3C1-100/00102/00102_scenes.txt already exists. Skipping prediction.
The scenes file A:/IVADL/V3C1-100/00103/00103_scenes.txt already exists. Skipping prediction.
The scenes file A:/IVADL/V3C1-100/00104/00104_scenes.txt already exists. Skipping prediction.
The scenes file A:/IVADL/V3C1-100/00105/00105_scenes.txt already exists. Skipping prediction.
The scenes file A:/IVADL/V3C1-100/00106/00106_scenes.txt already exists. Skipping prediction.
The scenes file A:/IVADL/V3C1-100/00107/00107_scenes.txt already exists. Skipping prediction.
The scenes file A:/IVADL/V3C1-100/00108/00108_scenes.txt already exists. Skipping prediction.
The scenes file A:/IVADL/V3C1-100/00109/00109_scenes.txt already exists. Skipping prediction.
The scenes file A:/IVADL/V3C1-100/00110/00110_scenes.txt alr

### Generate Json file of the scenes
Add additional intervals if the scene is longer than 10 seconds

In [107]:
import json
import numpy as np
from datetime import timedelta
import subprocess
import re
import posixpath

def get_fps(video_path):
    """
    Get the frames per second (fps) of a video using ffmpeg.
    
    :param video_path: The path to the video file.
    :return: The fps as a float.
    """
    command = [
        'ffmpeg',
        '-i', video_path,
        '-hide_banner'
    ]

    result = subprocess.run(command, stderr=subprocess.PIPE, stdout=subprocess.PIPE)

    # Decode with 'utf-8' and ignore errors
    output = result.stderr.decode('utf-8', errors='ignore')

    # Extract fps using regular expression
    fps_match = re.search(r'(\d+(\.\d+)?) fps', output)
    if fps_match:
        fps = float(fps_match.group(1))
        return fps
    else:
        raise ValueError("FPS not found in ffmpeg output")

def frames_to_timecode(frame_number, fps):
    """Convert frame number to timecode."""
    seconds = frame_number / fps
    timecode = str(timedelta(seconds=seconds)).split(".")[0]
    milliseconds = int((seconds - int(seconds)) * 1000)
    return f"{timecode}.{milliseconds:03d}"

def format_scenes_to_json(video_name, min_seconds=0):
    video_dir = posixpath.join(V3C1_DIR, video_name)
    scenes_full_path = posixpath.join(video_dir, f"{video_name}_scenes.txt")

    if not posixpath.isfile(scenes_full_path):
        raise FileNotFoundError(f"The scenes file {scenes_full_path} does not exist.")

    video_mp4_path = posixpath.join(video_dir, f"{video_name}.mp4")
    video_mov_path = posixpath.join(video_dir, f"{video_name}.mov")
    if posixpath.isfile(video_mp4_path):
        video_full_path = video_mp4_path
    elif posixpath.isfile(video_mov_path):
        video_full_path = video_mov_path
    else:
        raise FileNotFoundError(f"Neither .mp4 nor .mov video file exists for {video_name}.")

    fps = get_fps(video_full_path)
    min_frames = int(min_seconds * fps) if min_seconds > 0 else 0

    # Load the scenes from the file
    scenes = np.loadtxt(scenes_full_path, dtype=int)
    
    # Ensure scenes are a two-dimensional array
    if scenes.ndim == 1:
        scenes = scenes.reshape(1, -1)

    # Process the intervals
    scene_intervals = []
    additional_intervals = []
    for start_frame, end_frame in scenes:
        # Add 5 additional time steps for the start. Otherwise it can be a problem with switching scenes
        scene_intervals.append(start_frame + 5 if start_frame > 0 else 0)

    # Add the last end_frame to scene_intervals
    scene_intervals.append(scenes[-1, 1])

    if min_frames > 0:
        prev_interval = 0
        for interval in scene_intervals:
            diff = interval - prev_interval

            if diff > min_frames * 1.5:
                # Number of intervals needed
                num_intervals = (diff // min_frames) + 1
                # Step size for even distribution
                step = diff // num_intervals
                additional_intervals.extend(prev_interval + i * step for i in range(1, num_intervals))

            prev_interval = interval

    intervals = sorted(scene_intervals + additional_intervals)
    
    # Prepare the JSON structure
    scenes_json = []

    for index, start_frame in enumerate(intervals[:-1]):
        end_frame = intervals[index + 1] - 1
        scene_data = {
            "frame": f"{video_name}_frame{index+1:04d}.jpg",
            "starting_frame": int(start_frame),
            "ending_frame": int(end_frame),
            "starting_time": frames_to_timecode(start_frame, fps),
            "ending_time": frames_to_timecode(end_frame, fps)
        }
        scenes_json.append(scene_data)

    # Convert the list to a JSON string
    scenes_json_str = json.dumps(scenes_json, indent=2)
    
    # Save the JSON to a file
    json_full_path = posixpath.join(video_dir, f"{video_name}_scenes.json")
    with open(json_full_path, 'w') as json_file:
        json_file.write(scenes_json_str)
    
    return

videos = get_all_videos(V3C1_DIR)
for video in videos:
    format_scenes_to_json(video, min_seconds=10)


### Generate keyframe files with FFMPEG

In [110]:
import ffmpeg
import posixpath

def extract_keyframes(video_name):
    video_dir = posixpath.join(V3C1_DIR, video_name)
    scenes_full_path = posixpath.join(video_dir, f"{video_name}_scenes.json")

    if not posixpath.isfile(scenes_full_path):
        raise FileNotFoundError(f"The scenes file {scenes_full_path} does not exist.")

    video_mp4_path = posixpath.join(video_dir, f"{video_name}.mp4")
    video_mov_path = posixpath.join(video_dir, f"{video_name}.mov")
    if posixpath.isfile(video_mp4_path):
        video_full_path = video_mp4_path
    elif posixpath.isfile(video_mov_path):
        video_full_path = video_mov_path
    else:
        raise FileNotFoundError(f"Neither .mp4 nor .mov video file exists for {video_name}.")

    with open(scenes_full_path, 'r') as file:
        scenes = json.load(file)

    # Process the scenes
    scene_intervals = []
    for scene in scenes:
        start_frame = scene['starting_frame']
        scene_intervals.append(start_frame)

    intervals = sorted(scene_intervals)

    file_name = f'{video_dir}/{video_name}_frame%04d.jpg'
    select = '+'.join(f'eq(n,{interval})' for interval in intervals)

    try:
        ffmpeg.input(video_full_path).filter('select', select).output(file_name, vsync='0').run(capture_stdout=True, capture_stderr=True)
    except ffmpeg.Error as e:
        print('stdout:', e.stdout.decode('utf8'))
        print('stderr:', e.stderr.decode('utf8'))

videos = get_all_videos(V3C1_DIR)
for video in videos:
    extract_keyframes(video)

### Check that all frames in the scene files have been exported

In [133]:
def process_video(video_name):
    video_dir = posixpath.join(V3C1_DIR, video_name)
    scenes_full_path = posixpath.join(video_dir, f"{video_name}_scenes.json")

    if not posixpath.isfile(scenes_full_path):
        raise FileNotFoundError(f"The scenes file {scenes_full_path} does not exist.")

    with open(scenes_full_path, 'r') as file:
        scenes = json.load(file)

    # Process the scenes
    for scene in scenes:
        keyframe = scene['frame']
        image_full_path = posixpath.join(video_dir, keyframe)

        if not posixpath.isfile(image_full_path):
            raise FileNotFoundError(f"The frame file {keyframe} does not exist.")

videos = get_all_videos(V3C1_DIR)
for video in videos:
    process_video(video)


# Generate descriptions with CLIP

In [140]:
import torch
from PIL import Image
from transformers import BlipProcessor, BlipForConditionalGeneration
import json

def generate_image_category(image_path):
    # Load and preprocess the image
    image = Image.open(image_path).convert("RGB")
    # image = image.resize((224, 224))    # Resize to a standard size
    inputs = processor(images=image, return_tensors="pt")

    # Generate description
    with torch.no_grad():
        outputs = model.generate(**inputs, max_new_tokens=100)
    
    # Decode the output description
    description = processor.decode(outputs[0], skip_special_tokens=True)
    return description

def process_video(video_name):
    video_dir = posixpath.join(V3C1_DIR, video_name)
    scenes_full_path = posixpath.join(video_dir, f"{video_name}_scenes.json")
    output_json_path = posixpath.join(video_dir, f"{video}_scenes_descriptions.json")

    if posixpath.isfile(output_json_path):
        print(f"The description file {output_json_path} already exists. Skipping prediction.")
        return

    if not posixpath.isfile(scenes_full_path):
        raise FileNotFoundError(f"The scenes file {scenes_full_path} does not exist.")

    print(f"Generating descriptions for the video {video_name}")

    with open(scenes_full_path, 'r') as file:
        scenes = json.load(file)

    # Process the scenes
    for scene in scenes:
        keyframe = scene['frame']
        # print(f"   Processing keyframe {keyframe}")
        image_full_path = posixpath.join(video_dir, keyframe)
        description = generate_image_category(image_full_path)
        scene['description'] = description

    with open(output_json_path, 'w') as json_file:
        json.dump(scenes, json_file, indent=4)

# Load the BLIP model and processor
processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")

videos = get_all_videos(V3C1_DIR)
for video in videos:
    process_video(video)


The description file A:/IVADL/V3C1-100/00100/00100_scenes_descriptions.json already exists. Skipping prediction.
The description file A:/IVADL/V3C1-100/00101/00101_scenes_descriptions.json already exists. Skipping prediction.
The description file A:/IVADL/V3C1-100/00102/00102_scenes_descriptions.json already exists. Skipping prediction.
The description file A:/IVADL/V3C1-100/00103/00103_scenes_descriptions.json already exists. Skipping prediction.
The description file A:/IVADL/V3C1-100/00104/00104_scenes_descriptions.json already exists. Skipping prediction.
The description file A:/IVADL/V3C1-100/00105/00105_scenes_descriptions.json already exists. Skipping prediction.
The description file A:/IVADL/V3C1-100/00106/00106_scenes_descriptions.json already exists. Skipping prediction.
The description file A:/IVADL/V3C1-100/00107/00107_scenes_descriptions.json already exists. Skipping prediction.
The description file A:/IVADL/V3C1-100/00108/00108_scenes_descriptions.json already exists. Skip

# Generate categories with YOLO

In [15]:
from ultralytics import YOLO
from PIL import Image
import json

def generate_image_category(image_path):
    # Load and resize the image
    with Image.open(image_path) as img:
        resized_img = img.resize((224, 224))  # Resize the image to 224x224

    # Use the model to make predictions
    results = model.predict(resized_img)

    # Extract the predictions
    output = results[0].verbose()
    print(output)
    quit

    # Get the first prediction and split the name and value
    top_prediction = output.split(", ")[0]
    name, value = top_prediction.split(" ")

    # Store the category only if value >= 0.10
    category = name if float(value) >= 0.10 else ""
    return category

def process_video(video_name):
    video_dir = posixpath.join(V3C1_DIR, video_name)
    descriptions_json_path = posixpath.join(video_dir, f"{video}_scenes_descriptions.json")
    categories_full_path = posixpath.join(video_dir, f"{video_name}_scenes_descriptions_categories.json")

    if posixpath.isfile(categories_full_path):
        print(f"The category file {categories_full_path} already exists. Skipping prediction.")
        return

    if not posixpath.isfile(descriptions_json_path):
        raise FileNotFoundError(f"The scenes file {descriptions_json_path} does not exist.")

    with open(descriptions_json_path, 'r') as file:
        scenes = json.load(file)

    # Process the scenes
    for scene in scenes:
        keyframe = scene['frame']
        image_full_path = posixpath.join(video_dir, keyframe)
        category = generate_image_category(image_full_path)
        scene['category'] = category

    with open(categories_full_path, 'w') as json_file:
        json.dump(scenes, json_file, indent=4)

# Load a pretrained model
model = YOLO("yolov8m-cls.pt")

videos = get_all_videos(V3C1_DIR)
for video in videos:
    process_video(video)



0: 224x224 matchstick 0.02, envelope 0.02, nematode 0.02, hook 0.02, velvet 0.02, 37.0ms
Speed: 2.0ms preprocess, 37.0ms inference, 0.0ms postprocess per image at shape (1, 3, 224, 224)
matchstick 0.02, envelope 0.02, nematode 0.02, hook 0.02, velvet 0.02, 

0: 224x224 toaster 0.32, organ 0.19, theater_curtain 0.07, tripod 0.04, upright 0.03, 35.0ms
Speed: 2.0ms preprocess, 35.0ms inference, 0.0ms postprocess per image at shape (1, 3, 224, 224)
toaster 0.32, organ 0.19, theater_curtain 0.07, tripod 0.04, upright 0.03, 

0: 224x224 organ 0.37, theater_curtain 0.19, chime 0.08, toaster 0.06, upright 0.03, 35.0ms
Speed: 2.0ms preprocess, 35.0ms inference, 0.0ms postprocess per image at shape (1, 3, 224, 224)
organ 0.37, theater_curtain 0.19, chime 0.08, toaster 0.06, upright 0.03, 

0: 224x224 chain 0.10, spotlight 0.08, lampshade 0.07, digital_clock 0.06, strainer 0.05, 34.0ms
Speed: 2.0ms preprocess, 34.0ms inference, 0.0ms postprocess per image at shape (1, 3, 224, 224)
chain 0.10, sp

# Convert videos for preview with FFMPEG

In [16]:
import subprocess

def transcode_video(input_file, output_file, width=None, height=None, bitrate=None):
    # Create the base ffmpeg command
    cmd = ['ffmpeg', '-i', input_file]
    
    # Add video scaling if width and height are provided
    if width and height:
        cmd.extend(['-vf', f'scale={width}:{height}'])
    
    # Add video bitrate if provided
    if bitrate:
        cmd.extend(['-b:v', bitrate])
    
    # Add the output file
    cmd.append(output_file)
    
    # Execute the command
    try:
        subprocess.run(cmd, check=True)
        print(f"Transcoding completed successfully. Output file: {output_file}")
    except subprocess.CalledProcessError as e:
        print(f"An error occurred: {e}")




video_name = "00100"
video_dir = posixpath.join(V3C1_DIR, video_name)

video_mp4_path = posixpath.join(video_dir, f"{video_name}.mp4")
video_mov_path = posixpath.join(video_dir, f"{video_name}.mov")
if posixpath.isfile(video_mp4_path):
    input_video = video_mp4_path
elif posixpath.isfile(video_mov_path):
    input_video = video_mov_path
else:
    raise FileNotFoundError(f"Neither .mp4 nor .mov video file exists for {video_name}.")

output_video = posixpath.join(video_dir, f"{video_name}_preview.mp4")
new_width = 640
new_height = 360
# new_bitrate = '500k'
transcode_video(input_video, output_video, new_width, new_height)

# videos = get_all_videos(V3C1_DIR)
# for video in videos:
#     transcode_video(input_video, output_video, new_width, new_height)


Transcoding completed successfully. Output file: A:/IVADL/V3C1-100/00100/00100_preview.mp4
