In [None]:
!pip install scenedetect opencv-python moviepy transformers sentence-transformers git+https://github.com/openai/whisper.git
!apt install ffmpeg -y

In [None]:
import os
import cv2
import whisper
import numpy as np
import moviepy.editor as mp
import base64
from scenedetect import VideoManager, SceneManager
from scenedetect.detectors import ContentDetector
from transformers import pipeline
from sentence_transformers import SentenceTransformer, util
from datetime import timedelta
from IPython.display import display, HTML

In [None]:
# Scene Detection
def detect_scenes(video_path, threshold=30.0):
    video_manager = VideoManager([video_path])
    scene_manager = SceneManager()
    scene_manager.add_detector(ContentDetector(threshold=threshold))
    video_manager.set_downscale_factor()
    video_manager.start()
    scene_manager.detect_scenes(frame_source=video_manager)
    return scene_manager.get_scene_list()

In [None]:
# Keyframe Extraction (optional: use histogram variance for best frame)
def extract_keyframes(video_path, scenes, output_dir="keyframes"):
    os.makedirs(output_dir, exist_ok=True)
    cap = cv2.VideoCapture(video_path)
    keyframes = []
    for idx, (start, end) in enumerate(scenes):
        cap.set(cv2.CAP_PROP_POS_MSEC, start.get_seconds() * 1000)
        ret, frame = cap.read()
        if ret:
            filename = f"{output_dir}/scene_{idx+1}.jpg"
            cv2.imwrite(filename, frame)
            keyframes.append(filename)
    cap.release()
    return keyframes

In [None]:
# Transcription using Whisper
def transcribe_audio(video_path):
    model = whisper.load_model("base")
    result = model.transcribe(video_path)
    return result["text"], result["segments"]

In [None]:
# Align transcript to scenes using timestamps
from whisper.utils import format_timestamp

def split_transcript_by_scenes_with_timestamps(segments, scenes):
    chunks = []
    for start, end in scenes:
        start_time = start.get_seconds()
        end_time = end.get_seconds()
        text = " ".join([seg['text'] for seg in segments if seg['start'] >= start_time and seg['end'] <= end_time])
        chunks.append(text.strip())
    return chunks

In [None]:
# CPU-safe summarizer with auto length control
summarizer = pipeline("summarization", model="facebook/bart-large-cnn", device=-1)

In [None]:
def summarize_chunks(text_chunks, min_ratio=0.25, max_ratio=0.5):
    summaries = []
    for idx, chunk in enumerate(text_chunks):
        word_count = len(chunk.split())
        if word_count < 10:
            summaries.append("Text too short to summarize.")
            continue

        if word_count > 500:
            words = chunk.split()
            sub_summaries = []
            for i in range(0, len(words), 250):
                sub_chunk = " ".join(words[i:i+250])
                try:
                    summary = summarizer(sub_chunk, max_length=100, min_length=30, do_sample=False)[0]['summary_text']
                except Exception as e:
                    summary = "(partial) Could not summarize."
                sub_summaries.append(summary)
            summaries.append(" ".join(sub_summaries))
        else:
            try:
                max_len = min(80, max(20, int(word_count * max_ratio)))
                min_len = min(40, max(10, int(word_count * min_ratio)))
                summary = summarizer(chunk, max_length=max_len, min_length=min_len, do_sample=False)[0]['summary_text']
            except Exception as e:
                summary = "Could not summarize."
            summaries.append(summary)
    return summaries

In [None]:
# Merge similar summaries using cosine similarity
def merge_similar_summaries(summaries, threshold=0.9):
    model = SentenceTransformer('all-MiniLM-L6-v2')
    embeddings = model.encode(summaries, convert_to_tensor=True)
    merged = []
    used = set()
    for i in range(len(summaries)):
        if i in used:
            continue
        group = [summaries[i]]
        used.add(i)
        for j in range(i + 1, len(summaries)):
            if j not in used:
                sim = util.pytorch_cos_sim(embeddings[i], embeddings[j])
                if sim >= threshold:
                    used.add(j)
        merged.append(group[0])
    return merged

In [None]:
# Generate HTML Output with Base64 images
def generate_html_output(scenes, keyframes, summaries, save_path="summary.html"):
    html = "<h2>Video Scene Summarization</h2><br>"
    for i, (scene, img_path, summary) in enumerate(zip(scenes, keyframes, summaries)):
        start_time = str(timedelta(seconds=int(scene[0].get_seconds())))
        end_time = str(timedelta(seconds=int(scene[1].get_seconds())))

        with open(img_path, "rb") as image_file:
            encoded = base64.b64encode(image_file.read()).decode('utf-8')
        img_tag = f'<img src="data:image/jpeg;base64,{encoded}" width="300"/>'

        html += f"""
        <div style='border:1px solid #ccc; padding:10px; margin:10px'>
            <h4>Scene {i+1} | {start_time} - {end_time}</h4>
            {img_tag}<br>
            <b>Summary:</b> {summary}
        </div>
        """
    with open(save_path, "w", encoding="utf-8") as f:
        f.write(html)
    display(HTML(html))
    print(f"Summary HTML saved to: {save_path}")

In [None]:
# Main Pipeline
def run_pipeline(video_path):
    print("Detecting scenes...")
    scenes = detect_scenes(video_path)

    if not scenes:
        print("No scenes detected. Using full video as one chunk.")
        from scenedetect.frame_timecode import FrameTimecode
        clip = mp.VideoFileClip(video_path)
        duration = int(clip.duration)
        scenes = [(FrameTimecode(0), FrameTimecode(duration))]
        clip.close()

    print("Extracting keyframes...")
    keyframes = extract_keyframes(video_path, scenes)

    print("Transcribing audio with Whisper...")
    transcript, segments = transcribe_audio(video_path)

    print("Splitting transcript by scene using timestamps...")
    text_chunks = split_transcript_by_scenes_with_timestamps(segments, scenes)

    print("Summarizing scenes...")
    summaries = summarize_chunks(text_chunks)

    print("Merging redundant summaries...")
    final_summaries = merge_similar_summaries(summaries)

    print("Generating HTML summary report...")
    generate_html_output(scenes, keyframes, final_summaries)
    print("Pipeline completed successfully.")

In [None]:
def upload_and_run():
    from google.colab import files
    uploaded = files.upload()
    video_path = list(uploaded.keys())[0]
    run_pipeline(video_path)
    files.download("summary.html")

In [None]:
upload_and_run()