<a href="https://colab.research.google.com/github/gu-ma/hgk-ml-workshop/blob/main/notebooks/process_videos.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Process a folder with video files

Assuming we now have a folder with all our videos we will use a script to:

* Extract the scenes
* Save a short video + thumbnails for each scene

Run this cell and restart the runtime ⬇️

In [None]:
# Install Some Libraries
! pip install scenedetect
! pip install ffmpeg-python
! pip install salesforce-lavis

## Connect Google drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

## Helper function

In [None]:
# Fix a problem with shell commands (doesn't work!)
# https://stackoverflow.com/questions/31469707/changing-the-locale-preferred-encoding-in-python-3-in-windows
import _locale
_locale._getdefaultlocale = (lambda *args: ['en_US', 'utf8'])

# Import + Preload
import re
import os
import glob
import shutil
import itertools
from pathlib import Path

import pandas as pd

import torch
from PIL import Image
from lavis.models import load_model_and_preprocess

from scenedetect import open_video, ContentDetector, SceneManager, StatsManager

from scenedetect.scene_manager import (
    write_scene_list_html,
    write_scene_list,
    save_images,
)

import ffmpeg

# setup device to use
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# loads BLIP caption base model, with finetuned checkpoints on MSCOCO captioning dataset.
model, vis_processors, _ = load_model_and_preprocess(
    name="blip_caption", model_type="base_coco", is_eval=True, device=device
)


def video_extract_scenes(
    video_path,
    output_dir,
    threshold=30.0,
    min_scene_len=15,
    frames_per_scene=3,
    generate_captions=True,
):

    video_name = Path(video_path).stem

    # Create video_stream, and stats manager.
    video_stream = open_video(video_path)
    stats_manager = StatsManager()

    # Construct our SceneManager and pass it our StatsManager.
    scene_manager = SceneManager(stats_manager)

    # Add ContentDetector algorithm
    content_detector = ContentDetector(threshold=threshold, min_scene_len=min_scene_len)
    scene_manager.add_detector(content_detector)

    # Improve processing speed by downscaling before processing.
    scene_manager.auto_downscale = True

    # Perform the scene detection.
    scene_manager.detect_scenes(video=video_stream, show_progress=True)

    # Get scene list
    scene_list = scene_manager.get_scene_list()

    # Save images
    image_filenames = save_images(
        scene_list=scene_list,
        video=video_stream,
        num_images=frames_per_scene,
        output_dir=output_dir,
        show_progress=True,
    )

    # Save an html summary
    output_html_filename = str(Path(output_dir) / f"{video_name}_preview.html")
    write_scene_list_html(
        output_html_filename=output_html_filename,
        scene_list=scene_list,
        image_filenames=image_filenames,
        image_width=320,
        image_height=240,
    )

    # Prepare for captioning
    captions = []

    # Print info on detected scenes + generate captions
    for i, tup in enumerate(zip(scene_list, image_filenames.items())):

        scene, image = tup

        # Print info
        print(
            'Scene %2d: Start %s / Frame %d, End %s / Frame %d'
            % (
                i + 1,
                scene[0].get_timecode(),
                scene[0].get_frames(),
                scene[1].get_timecode(),
                scene[1].get_frames(),
            )
        )

        # Generate captions
        if generate_captions:

            # load and preprocess image
            image_fp = os.path.join(output_dir, image[1][0])
            raw_image = Image.open(image_fp).convert("RGB")
            image = vis_processors["eval"](raw_image).unsqueeze(0).to(device)

            # generate caption
            # caption = model.generate({"image": image})
            caption = model.generate(
                {"image": image}, use_nucleus_sampling=True, num_captions=2
            )
            captions.append(caption)

            print(f'Caption: {caption}')

    # Save csv with scenes
    stats_file_filename = str(Path(output_dir) / f'{video_name}_stats.csv')
    with open(stats_file_filename, "w") as f:
        write_scene_list(
            output_csv_file=f, scene_list=scene_list, include_cut_list=False
        )

    # Append captions to csv
    if generate_captions:
        scenes_pd = pd.read_csv(stats_file_filename)
        captions_pd = pd.DataFrame(captions, columns=['caption1', 'caption2'])
        scenes_pd = pd.concat([scenes_pd, captions_pd], axis=1)
        scenes_pd.to_csv(stats_file_filename)

    return scene_list, image_filenames, captions


def rename_file(file):
    parent = Path(file).parent
    suffix = Path(file).suffix
    stem = re.sub("[^A-Za-z0-9_]+", "-", Path(file).stem)
    filename = f'{stem}{suffix}'
    return parent / filename


def trim_video(input_path, output_path, start=30, end=60):
    input_stream = ffmpeg.input(input_path)

    vid = (
        input_stream
        .filter('trim', start=start, end=end)
        .setpts('PTS-STARTPTS')
    )

    aud = (
        input_stream.audio
        .filter_('atrim', start=start, end=end)
        .filter_('asetpts', 'PTS-STARTPTS')
    )

    joined = ffmpeg.concat(vid, aud, v=1, a=1).node
    output = ffmpeg.output(vid, output_path, **{"c": "copy", "c:v": "libx264", "crf": "24", "c:a": "copy"})
    output.run(quiet=False, overwrite_output=True)



## Video Processing

In [None]:
import re
import shutil

# @markdown Path to source directory on google drive. Right click your directory and choose "copy path" then paste it below
gdrive_input_dir = "/content/drive/MyDrive/AI/hgk_workshop/playlist01"  # @param { type:'string' }

# @markdown Minimum scenes length (in __frames__)
min_scene_len = 30  # @param { type:"slider", min:1, max:120, step:1 }

# @markdown Number of image to save per scenes
frames_per_scene = 1  # @param { type: "integer" }

# @markdown Higher threshold = Less scenes detected
threshold = 30  # @param { type:"slider", min:10, max:300, step:1 }

# @markdown generate captions
generate_captions = True  # @param { type:"boolean"}

# @markdown create a short video for each scene
create_videos = True  # @param { type:"boolean"}

# @markdown Test only on the first video
test = True  # @param { type:"boolean" }

# @markdown Copy all files to google drive when done
copy_to_gdrive = True  # @param { type:"boolean" }

# Set local directories name
input_dir = "/content/input"
output_dir = "/content/output"

# Some other dir vars
(gdrive_path, gdrive_folder) = os.path.split(gdrive_input_dir)
gdrive_folder = f'{gdrive_folder}_output'

gdrive_output_dir = Path(gdrive_path) / Path(gdrive_folder)

# Create output dir if it does not exist
if not os.path.isdir(output_dir):
    os.mkdir(output_dir)

# Create input dir if it does not exist
if not os.path.isdir(input_dir):
    os.mkdir(input_dir)

# Copy all files locally
shutil.copytree(gdrive_input_dir, input_dir, dirs_exist_ok=True)

# List all files in the input directory
dir_path = f'{input_dir}/**/*.*'
files = glob.glob(dir_path, recursive=True)
files.sort()

# Process each file
for i, file in enumerate(files):

    new_filename = rename_file(file)
    os.rename(file, new_filename)
    file = str(new_filename)

    if i >= 1 and test:
        break

    print(f'---\n{file}\n---\nExtracting scenes\n---')

    # Extract scenes
    [scene_list, image_filenames, captions] = video_extract_scenes(
        video_path=file,
        output_dir=output_dir,
        threshold=threshold,
        min_scene_len=min_scene_len,
        frames_per_scene=frames_per_scene,
        generate_captions=generate_captions,
    )

    # create short videos
    if create_videos:
        print('---\nCreating videos\n---')
        fp_in = file
        for i, [scene, image] in enumerate(zip(scene_list, image_filenames.items())):
            fp_img = str(image[1][0])[:-4]
            fp_out = f"{output_dir}/{fp_img}.mp4"
            start_time = float(scene[0].get_seconds())
            end_time = float(scene[1].get_seconds())
            print(fp_in, fp_out, start_time, end_time)
            if not os.path.exists(fp_out):
                output = trim_video(fp_in, fp_out, start_time, end_time)
            # ! ffmpeg -y -hide_banner -loglevel error -i {fp_in} -ss {ss} -to {to} -c copy -c:v libx264 -crf 24 -c:a copy {fp_out}

if copy_to_gdrive:
    # Create input dir if it does not exist
    if not os.path.isdir(gdrive_output_dir):
        os.mkdir(gdrive_output_dir)

    # Copy the processed files to gdrive
    shutil.copytree(output_dir, gdrive_output_dir, dirs_exist_ok=True)

## Make a zip file

You can right click the file on the left to download it ⬅️

In [None]:
# Create a ZIP file
shutil.make_archive(base_name=str(gdrive_folder), format='zip', root_dir=output_dir)

## Run this cell to delete the output folder

In [None]:
shutil.rmtree(output_dir)