<a href="https://colab.research.google.com/github/Raniamea/arabic-video-summarisation/blob/main/02_scenedetect.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
# --- Imports
import os, json, random, re
import numpy as np
import torch
import cv2
from PIL import Image

from scenedetect import open_video, SceneManager
from scenedetect.detectors import ContentDetector

# --- Reproducibility (deterministic decoding across runs/devices)
random.seed(0); np.random.seed(0)
torch.manual_seed(0)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(0)

ModuleNotFoundError: No module named 'scenedetect'

In [1]:
from google.colab import drive
import os

# Unmount first
!fusermount -u /content/drive || echo "Already unmounted"

# Delete the mount folder entirely
!rm -rf /content/drive

# Now mount again
from google.colab import drive
drive.mount('/content/drive')

fusermount: failed to unmount /content/drive: No such file or directory
Already unmounted
Mounted at /content/drive


In [None]:
# --- Paths & params
base_path = "/content/drive/MyDrive/ArabicVideoSummariser"
videos_path = os.path.join(base_path, "videos")
captions_path = os.path.join(base_path, "captions")
keyframes_path = os.path.join(base_path, "keyframes")
transcripts_path = os.path.join(base_path, "transcripts")

os.makedirs(captions_path, exist_ok=True)
os.makedirs(keyframes_path, exist_ok=True)

# Read params.json (must contain: {"video_file": "..."} )
param_path = os.path.join(base_path, "params.json")
with open(param_path, "r", encoding="utf-8") as f:
    params = json.load(f)

#video_filename = params.get("video_file")
video_filename="PaperMaking.mp4"
assert video_filename, "params.json must include 'video_file'."
video_path = os.path.join(videos_path, video_filename)
assert os.path.exists(video_path), f"Video not found: {video_path}"

video_name = os.path.splitext(video_filename)[0]
keyframe_dir = os.path.join(keyframes_path, video_name)
os.makedirs(keyframe_dir, exist_ok=True)

captions_json_path = os.path.join(captions_path, f"{video_name}.json")
# Arabic transcript with timecodes produced by your ASR stage:
transcript_tc_path = os.path.join(transcripts_path, f"{video_name}_ar_with_timecodes.txt")

print(f"🎥 Processing video file: {video_filename}")

In [None]:
# Load models (device, BLIP-2, Marian MT, CLIP, mSBERT)
from transformers import AutoProcessor, Blip2ForConditionalGeneration
from transformers import MarianMTModel, MarianTokenizer
from transformers import CLIPProcessor, CLIPModel
from sentence_transformers import SentenceTransformer, util

# Device
device = "cuda" if torch.cuda.is_available() else "cpu"
print("🖥️ Device:", device)

# BLIP-2 (EN captioning)
caption_processor = AutoProcessor.from_pretrained("Salesforce/blip2-opt-2.7b", use_fast=False)
caption_model = Blip2ForConditionalGeneration.from_pretrained(
    "Salesforce/blip2-opt-2.7b",
    device_map="auto",
    torch_dtype=torch.float16 if device == "cuda" else torch.float32
).to(device)

# Marian MT (EN -> AR)
translator_tokenizer = MarianTokenizer.from_pretrained("Helsinki-NLP/opus-mt-en-ar")
translator_model = MarianMTModel.from_pretrained("Helsinki-NLP/opus-mt-en-ar").to(device)

# CLIP for image<->EN text grounding
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(device)

# Multilingual SBERT for AR caption <-> AR transcript similarity
sbert = SentenceTransformer("sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2", device=device)

print("✅ Models loaded.")


In [None]:
#Helper functions (frames, generation, scoring, transcript context)
timecode_re = re.compile(r"\[(\d+(?:\.\d+)?)\s*-\s*(\d+(?:\.\d+)?)\]\s+(.*)")

def get_transcript_context_ar(scene_time: float, window_s: float = 10.0) -> str:
    """Return Arabic transcript text around scene_time ± window_s (concatenated)."""
    if not os.path.exists(transcript_tc_path):
        return ""
    ctx = []
    start_win, end_win = scene_time - window_s, scene_time + window_s
    with open(transcript_tc_path, encoding="utf-8") as f:
        for line in f:
            m = timecode_re.match(line.strip())
            if not m:
                continue
            s, e, text = float(m.group(1)), float(m.group(2)), m.group(3)
            if not (e < start_win or s > end_win):
                ctx.append(text)
    return " ".join(ctx)[:1500]  # keep short for embeddings

def grab_frames_around(cap: cv2.VideoCapture, fps: float, base_time_s: float, offsets=(0.0, 0.2, 0.4)):
    """Return list of PIL images at base_time_s + offsets (seconds)."""
    images = []
    for off in offsets:
        t = max(0.0, base_time_s + off)
        frame_idx = int(t * fps)
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
        ok, fr = cap.read()
        if ok:
            images.append(Image.fromarray(cv2.cvtColor(fr, cv2.COLOR_BGR2RGB)))
    return images

@torch.no_grad()
def blip2_nbest_en(images, beams=5, returns=3):
    """Generate N-best English captions across multiple images."""
    cands = []
    for img in images:
        inputs = caption_processor(images=[img], return_tensors="pt", padding=True)\
                 .to(device, torch.float16 if device=="cuda" else torch.float32)
        ids = caption_model.generate(
            **inputs,
            do_sample=False,                # deterministic
            num_beams=beams,
            num_return_sequences=returns,
            length_penalty=1.0,
            repetition_penalty=1.05,
            max_new_tokens=50,
        )
        for seq in ids:
            txt = caption_processor.decode(seq, skip_special_tokens=True).strip()
            cands.append(txt)
    # de-duplicate while preserving order
    seen, uniq = set(), []
    for t in cands:
        if t not in seen:
            seen.add(t)
            uniq.append(t)
    return uniq

@torch.no_grad()
def mt_nbest_ar(en_list, beams=5, returns=3):
    """Translate each EN candidate to K-best AR; return unique (en, ar) pairs."""
    pairs, seen = [], set()
    for en in en_list:
        ti = translator_tokenizer([en], return_tensors="pt", padding=True).to(device)
        out = translator_model.generate(
            **ti,
            do_sample=False,               # deterministic
            num_beams=beams,
            num_return_sequences=returns,
            max_new_tokens=80,
        )
        for seq in out:
            ar = translator_tokenizer.decode(seq, skip_special_tokens=True).strip()
            key = (en, ar)
            if key not in seen:
                seen.add(key)
                pairs.append(key)
    return pairs

@torch.no_grad()
def clip_image_text_score(image_pil: Image.Image, text_en: str) -> float:
    """Cosine similarity between image and EN caption using CLIP."""
    inputs = clip_processor(text=[text_en], images=image_pil, return_tensors="pt", padding=True).to(device)
    outs = clip_model(**inputs)
    img = outs.image_embeds / outs.image_embeds.norm(dim=-1, keepdim=True)
    txt = outs.text_embeds / outs.text_embeds.norm(dim=-1, keepdim=True)
    return float((img @ txt.T).squeeze().detach().cpu())

@torch.no_grad()
def sbert_sim_ar(ar_caption: str, ar_context: str) -> float:
    """Cosine similarity between AR caption and AR transcript context."""
    if not ar_context:
        return 0.0
    embs = sbert.encode([ar_caption, ar_context], convert_to_tensor=True, normalize_embeddings=True)
    return float(util.cos_sim(embs[0], embs[1]).item())


In [None]:
# Scene detection (PySceneDetect) & video prep
# Detect scenes
scene_manager = SceneManager()
scene_manager.add_detector(ContentDetector(threshold=30.0))

video = open_video(video_path)
scene_manager.detect_scenes(video)
scene_list = scene_manager.get_scene_list()

# VideoCapture for frame access
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0

print(f"🎬 Detected {len(scene_list)} scenes, FPS={fps:.2f}")


In [None]:
#Caption each scene (multi-frame, N-best, re-rank), save JSON + keyframes

# Weights for final scoring: visual grounding (CLIP) vs transcript consistency (SBERT)
alpha, beta = 0.5, 0.5

captions = {}

for i, (start, _) in enumerate(scene_list):
    scene_t = start.get_seconds()

    # 1) Multi-frame sampling around boundary
    images = grab_frames_around(cap, fps, scene_t, offsets=(0.0, 0.2, 0.4))
    if not images:
        print(f"⚠️ Scene {i:03} has no decodable frames; skipping.")
        continue

    # Save representative keyframe (first sampled image)
    frame_name = f"scene_{i:03}.jpg"
    frame_path = os.path.join(keyframe_dir, frame_name)
    cv2.imwrite(frame_path, cv2.cvtColor(np.array(images[0]), cv2.COLOR_RGB2BGR))

    # 2) English N-best captions (BLIP-2 across frames)
    en_cands = blip2_nbest_en(images, beams=5, returns=3)
    if not en_cands:
        print(f"⚠️ No EN candidates for scene {i:03}; skipping.")
        continue

    # 3) Arabic N-best translations (Marian)
    pair_cands = mt_nbest_ar(en_cands, beams=5, returns=3)
    if not pair_cands:
        print(f"⚠️ No AR candidates for scene {i:03}; skipping.")
        continue

    # 4) Scores
    # Visual score = average CLIP score over sampled frames for each EN candidate
    en_vis = {}
    for en in set([p[0] for p in pair_cands]):
        en_vis[en] = float(np.mean([clip_image_text_score(img, en) for img in images]))

    # Transcript context around scene time (Arabic)
    ar_ctx = get_transcript_context_ar(scene_t)

    # 5) Joint re-ranking
    best, best_score = None, -1e9
    for en, ar in pair_cands:
        vscore = en_vis[en]
        tscore = sbert_sim_ar(ar, ar_ctx)
        score = alpha * vscore + beta * tscore
        if score > best_score:
            best_score = score
            best = {"english": en, "arabic": ar, "v": vscore, "t": tscore, "score": score}

    # 6) Save best candidate
    captions[frame_name] = {
        "scene_time": round(scene_t, 2),
        "english": best["english"],
        "arabic": best["arabic"],
        "scores": {"visual": round(best["v"], 4), "transcript": round(best["t"], 4)}
    }

    print(f"✓ {frame_name} @ {scene_t:.2f}s | EN*: {best['english']} | AR*: {best['arabic']} "
          f"| V={best['v']:.3f} T={best['t']:.3f}")

cap.release()

# Save JSON
with open(captions_json_path, "w", encoding="utf-8") as f:
    json.dump(captions, f, ensure_ascii=False, indent=2)

print(f"✅ Captions saved to: {captions_json_path}")
print(f"🖼️ Keyframes dir: {keyframe_dir}")
