In [1]:
import json
import re
import tempfile
from pathlib import Path
from typing import NamedTuple

import scenedetect
import torch
import torchvision.transforms as transforms
import whisper
from PIL import Image
from tqdm.auto import tqdm
from transformers import AutoProcessor, MllamaForConditionalGeneration
from yt_dlp import YoutubeDL


class VideoSummarizerConfig(NamedTuple):

    workdir: Path
    content_detector_threshold: float = 27
    num_keyframes_per_scene: int = 1


class VideoSummarizer:

    @staticmethod
    def from_youtube_url(
        youtube_url: str, config: VideoSummarizerConfig
    ) -> "VideoSummarizer":
        config.workdir.mkdir(exist_ok=True)

        video_path: Path | None = None

        def hook(info):
            nonlocal video_path
            video_path = Path(info["filename"])

        with YoutubeDL(
            {
                "format": "best",
                "progress_hooks": [hook],
                "outtmpl": f"{str(config.workdir)}/%(title)s.%(ext)s",
            }
        ) as ydl:
            ydl.download([youtube_url])

        return VideoSummarizer(config, Path(video_path.name))

    def __init__(
        self, config: VideoSummarizerConfig, video_path: Path | None = None
    ) -> None:
        self.config = config
        self.config.workdir.mkdir(exist_ok=True)

        if video_path is not None:
            (self.config.workdir / "video_path.txt").write_text(str(video_path))

    def get_video_path(self) -> Path:
        return self.config.workdir / Path(
            (self.config.workdir / "video_path.txt").read_text().strip()
        )

    def get_video_title(self) -> str:
        return self.get_video_path().stem

    def get_scene_data(self) -> dict:
        with (self.config.workdir / "scenes.json").open() as fp:
            return json.load(fp)

    def get_scene_data_with_labels(self) -> dict:
        with (self.config.workdir / "scenes_with_labels.json").open() as fp:
            return json.load(fp)

    def get_scene_data_with_descriptions(self) -> dict:
        with (self.config.workdir / "scenes_with_descriptions.json").open() as fp:
            return json.load(fp)

    def get_summary(self) -> str:
        return (self.config.workdir / "summary.txt").read_text()

    def detect_scenes(self) -> None:
        video = scenedetect.open_video(str(self.get_video_path()))
        scene_manager = scenedetect.SceneManager()
        scene_manager.add_detector(
            scenedetect.ContentDetector(
                threshold=self.config.content_detector_threshold
            )
        )
        scene_manager.detect_scenes(video)

        scene_list = scene_manager.get_scene_list()

        scenes_dir = self.config.workdir / "scenes"
        scenes_dir.mkdir(exist_ok=True)
        scenedetect.split_video_ffmpeg(
            str(self.get_video_path()), scene_list, scenes_dir
        )

        keyframes_dir = self.config.workdir / "keyframes"
        keyframes_dir.mkdir(exist_ok=True)
        scenedetect.scene_manager.save_images(
            scene_list,
            video,
            self.config.num_keyframes_per_scene,
            output_dir=keyframes_dir,
        )

        scenes_json_data = {
            "title": self.get_video_title(),
            "framerate": scene_list[0][0].get_framerate(),
            "scenes": [],
        }
        for scene_number, scene in enumerate(scene_list, start=1):
            scenes_json_data["scenes"].append(
                {
                    "scene_number": scene_number,
                    "start_frame": scene[0].get_frames(),
                    "end_frame": scene[1].get_frames(),
                    "start_time": scene[0].get_timecode(),
                    "end_time": scene[1].get_timecode(),
                    "keyframes": [],
                }
            )

        for scene_video_file in scenes_dir.iterdir():
            if not scene_video_file.is_file():
                continue
            scene_match = re.match(r".*-Scene-(\d+).mp4", scene_video_file.name)
            if scene_match is None:
                continue
            scene_number = int(scene_match.group(1))
            for scene_data in scenes_json_data["scenes"]:
                if scene_data["scene_number"] == scene_number:
                    scene_data["video_path"] = str(
                        scene_video_file.relative_to(self.config.workdir)
                    )

        for keyframe_image_file in keyframes_dir.iterdir():
            if not keyframe_image_file.is_file():
                continue
            keyframe_match = re.match(
                r".*-Scene-(\d+)-(\d+).jpg", keyframe_image_file.name
            )
            if keyframe_match is None:
                continue
            scene_number = int(keyframe_match.group(1))
            keyframe_number = int(keyframe_match.group(2))
            for scene_data in scenes_json_data["scenes"]:
                if scene_data["scene_number"] == scene_number:
                    scene_data["keyframes"].append(
                        {
                            "number": keyframe_number,
                            "path": str(
                                keyframe_image_file.relative_to(self.config.workdir)
                            ),
                        }
                    )

        for scene_data in scenes_json_data["scenes"]:
            scene_data["keyframes"] = sorted(
                scene_data["keyframes"], key=lambda x: x["number"]
            )

        with (self.config.workdir / "scenes.json").open("w") as fp:
            json.dump(scenes_json_data, fp, indent=4)

    def add_classifier_labels(self) -> None:
        LABEL_MAP = {
            0: (
                "CS",
                "Close-up shot (CS): A relatively small object, e.g., face, hand.",
            ),
            1: (
                "ECS",
                "Extreme close-up shot (ECS): Even a smaller part of an object, e.g., eyes.",
            ),
            2: ("FS", "Full shot (FS): Human body in full."),
            3: ("LS", "Long shot (LS): A long distance."),
            4: ("MS", "Medium shot (MS): Knees or waist up."),
        }

        model = torch.load(
            "./Pytorch_Classification_50ep.pt",
            map_location=torch.device("cpu"),
            weights_only=False,
        )
        model.eval()

        scene_json_data = self.get_scene_data()

        image_transform = transforms.Compose(
            [
                transforms.Resize((128, 128)),
                transforms.ToTensor(),
                transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
            ]
        )
        for scene_data in scene_json_data["scenes"]:
            for keyframe_data in scene_data["keyframes"]:
                image = Image.open(self.config.workdir / keyframe_data["path"])
                image = image_transform(image)
                image = image.unsqueeze(0)
                with torch.no_grad():
                    pred = model(image)
                _, pred = torch.max(pred, 1)
                pred = pred.item()
                label_name, label_description = LABEL_MAP[pred]
            keyframe_data["label"] = {
                "name": label_name,
                "description": label_description,
            }

        with (self.config.workdir / "scenes_with_labels.json").open("w") as fp:
            json.dump(scene_json_data, fp, indent=4)

    def add_keyframe_descriptions(self) -> None:

        llama_model_id = "meta-llama/Llama-3.2-11B-Vision-Instruct"
        llama_model = MllamaForConditionalGeneration.from_pretrained(
            llama_model_id, device_map={"": 0}, torch_dtype=torch.bfloat16
        )
        llama_processor = AutoProcessor.from_pretrained(llama_model_id)

        chat_messages = [
            {
                "role": "user",
                "content": [
                    {"type": "image"},
                    {
                        "type": "text",
                        "text": "Can you please describe this image in a few sentences?",
                    },
                ],
            }
        ]

        scene_json_data = self.get_scene_data_with_labels()

        for scene_data in tqdm(
            scene_json_data["scenes"], desc="Generating descriptions"
        ):
            for keyframe_data in scene_data["keyframes"]:
                image = Image.open(self.config.workdir / keyframe_data["path"])

                inputs = llama_processor(
                    image,
                    llama_processor.apply_chat_template(
                        chat_messages, add_generation_prompt=True
                    ),
                    add_special_tokens=False,
                    return_tensors="pt",
                ).to(llama_model.device)
                output = llama_model.generate(**inputs, max_new_tokens=1024)
                response = llama_processor.decode(
                    output[0][inputs["input_ids"].shape[-1] :]
                ).strip()
                keyframe_data["description"] = response

        with (self.config.workdir / "scenes_with_descriptions.json").open("w") as fp:
            json.dump(scene_json_data, fp, indent=4)

    def create_whisper_transcripts(self) -> None:
        whisper_model = whisper.load_model("large-v3-turbo")
        transcription_result = whisper_model.transcribe(
            str(self.get_video_path()), language="English"
        )
        with (self.config.workdir / "transcript.json").open("w") as fp:
            json.dump(transcription_result, fp, indent=4)

    def create_summary(self) -> None:
        scene_json_data = self.get_scene_data_with_descriptions()
        with (self.config.workdir / "transcript.json").open() as fp:
            transcript_json_data = json.load(fp)

        scene_and_transcript_data = []
        for scene_data in scene_json_data["scenes"]:
            start_time = self.time_str_to_float(scene_data["start_time"])
            end_time = self.time_str_to_float(scene_data["end_time"])
            scene_and_transcript_data.append(
                (start_time, end_time, "1_scene", scene_data)
            )

        for segment_data in transcript_json_data["segments"]:
            start_time = segment_data["start"]
            end_time = segment_data["end"]
            scene_and_transcript_data.append(
                (start_time, end_time, "2_transcript", segment_data)
            )

        scene_and_transcript_data.sort(key=lambda x: x[0:3])

        summary = []
        summary.append(f"Video title: {scene_json_data['title']}")
        for start_time, end_time, data_type, data in scene_and_transcript_data:
            if data_type == "1_scene":
                summary.append("")
                summary.append(
                    f"Scene {data['scene_number']} from {self.float_to_time_str(start_time)} to {self.float_to_time_str(end_time)}"
                )
                for keyframe in data["keyframes"]:
                    summary.append(
                        f"    Keyframe shot: {keyframe["label"]["description"]}"
                    )
                    summary.append(f"    Keyframe description:")
                    for line in keyframe["description"].splitlines():
                        line = line.strip()
                        if line:
                            summary.append(f"        {line}")
                    summary.append(f"End of scene {data['scene_number']}")
            elif data_type == "2_transcript":
                summary.append("")
                summary.append(
                    f"Transcript from {self.float_to_time_str(start_time)} to {self.float_to_time_str(end_time)}"
                )
                for line in data["text"].splitlines():
                    line = line.strip()
                    if line:
                        summary.append(f"    {line}")
                summary.append(f"End of transcript")

        (self.config.workdir / "summary.txt").write_text("\n".join(summary))

    @staticmethod
    def time_str_to_float(time_str: str) -> float:
        time_parts = time_str.split(":")
        result = 0
        for part in time_parts:
            result = result * 60 + float(part)
        return result

    @staticmethod
    def float_to_time_str(time_float: float) -> str:
        components = []
        time_float = int(time_float)
        while time_float > 0:
            components.append(f"{time_float % 60:02d}")
            time_float //= 60
        components.reverse()
        while len(components) < 2:
            components.insert(0, "00")
        return ":".join(components)

In [2]:
# summarizer = VideoSummarizer.from_youtube_url(
#     "https://www.youtube.com/watch?v=cVkMnskciHU",
#     VideoSummarizerConfig(Path("demo-workdir")),
# )
summarizer = VideoSummarizer(VideoSummarizerConfig(Path("demo-workdir")))
# summarizer.detect_scenes()
# summarizer.add_classifier_labels()
# summarizer.add_keyframe_descriptions()
# summarizer.create_whisper_transcripts()
# summarizer.create_summary()

In [None]:
import gradio as gr
from openai import OpenAI

client = OpenAI()


def chat_fn(message, history):
    messages = [
        {
            "role": "system",
            "content": "You are a helpful assistant who answers questions about a video.",
        },
        {
            "role": "system",
            "content": "Here are the information about the video.\n\n"
            + summarizer.get_summary(),
        },
    ]
    for history_item in history:
        messages.append(
            {"role": history_item["role"], "content": history_item["content"]}
        )
    messages.append({"role": "user", "content": message})
    completion = client.chat.completions.create(model="gpt-4o-mini", messages=messages)
    full_answer = completion.choices[0].message.content

    messages = [
        {
            "role": "system",
            "content": 'You are a helpful assistant tasked with condensing long answers into short, readable summaries. The entire response should be wrapped in a Markdown code block.\nGiven a full answer, gives a one-sentence version of the answer and a longer summary broken into short paragraphs, each beginning with an emoji that reflects the topic of this paragraph. If you have a title for a paragraph, use HTML tags to make it eye-catching and having colors that match the emoji, e.g. `<span style="color: red; font-weight: bold;">Paragraph Title</span>` If you have keywords in the body text, use Markdown syntax to make them italic, e.g. `*some keywords*`.',
        },
        {
            "role": "user",
            "content": f"Question:\n{message.strip()}\nFull Answer:\n{full_answer}\nNow, please summarize the answer.",
        },
    ]
    completion = client.chat.completions.create(model="gpt-4o-mini", messages=messages)
    summarized_answer = completion.choices[0].message.content

    # Strip code block
    lines = []
    for line in summarized_answer.splitlines():
        if "```" in line:
            continue
        lines.append(line)

    summarized_answer = "\n".join(lines)
    history.append({"role": "user", "content": message})
    history.append({"role": "assistant", "content": summarized_answer})
    return "", history


summarizer: VideoSummarizer | None = None

with gr.Blocks() as demo:
    title = gr.Markdown("# Video Summarizer")
    youtube_url_input = gr.Text(value="", label="YouTube URL")
    process_video_button = gr.Button(value="Process Video")
    download_button = gr.DownloadButton(label="Download Texts", interactive=False)
    upload_button = gr.UploadButton(
        label="Upload Texts", file_count="single", file_types=["text"]
    )
    chatbot = gr.Chatbot(type="messages", height="70vh")
    message_input = gr.Textbox(label="Message", placeholder="Type your message here")
    clear = gr.ClearButton([message_input, chatbot])
    message_input.submit(
        chat_fn, inputs=[message_input, chatbot], outputs=[message_input, chatbot]
    )

    def on_process_video_click(youtube_url) -> None:
        global summarizer
        temp_workdir = Path(tempfile.mkdtemp())
        print(f"Creating temporary working directory at '{temp_workdir}'")
        summarizer = VideoSummarizer.from_youtube_url(
            youtube_url, VideoSummarizerConfig(temp_workdir)
        )
        summarizer.detect_scenes()
        summarizer.add_classifier_labels()
        summarizer.add_keyframe_descriptions()
        summarizer.create_whisper_transcripts()
        summarizer.create_summary()
        return gr.DownloadButton(
            label="Download Texts",
            value=summarizer.config.workdir / "summary.txt",
            interactive=True,
        )

    process_video_button.click(
        on_process_video_click, inputs=youtube_url_input, outputs=download_button
    )

    def on_upload_click(file) -> None:
        global summarizer
        original_file_path = Path(file)
        temp_workdir = Path(tempfile.mkdtemp())
        print(f"Creating temporary working directory at '{temp_workdir}'")
        (temp_workdir / "summary.txt").write_text(original_file_path.read_text())
        summarizer = VideoSummarizer(VideoSummarizerConfig(temp_workdir))
        return gr.DownloadButton(
            label="Download Texts",
            value=summarizer.config.workdir / "summary.txt",
            interactive=True,
        )

    upload_button.upload(on_upload_click, inputs=upload_button, outputs=download_button)

demo.launch()