
# Segment 3 — Core AI Components (CPU-only, v3 with PySceneDetect)
This compact lab covers:
1) Extract frames → 2) Image tagging → 3) Face detection → 4) **Shot detection (PySceneDetect)** → **Shot→Scene merge**  
5) NER → 6) Summarization → 7) Simple RAG → 8) Export `metadata.json` (+ optional scenes CSV & thumbnails)

> Tip: Use **very short** videos (≤ 20–30s) for CPU runtimes.


## 0) Setup (install once)

In [None]:

# CPU wheels & libs (run on Colab/local; safe to re-run)
!pip -q install transformers==4.43.3 sentence-transformers==3.0.1 torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu
!pip -q install opencv-python==4.10.0.84 numpy==1.26.4 matplotlib==3.8.4 pillow==10.3.0 scikit-learn==1.5.1
!pip -q install scenedetect==0.6.2


## 1) Imports & Config

In [None]:

import os, io, csv, json
import numpy as np
import cv2
from PIL import Image
import matplotlib.pyplot as plt

import torch
from transformers import pipeline, AutoImageProcessor, AutoModelForImageClassification
from sentence_transformers import SentenceTransformer, util

# PySceneDetect
from scenedetect import open_video, SceneManager
from scenedetect.detectors import ContentDetector, AdaptiveDetector

np.random.seed(42); torch.manual_seed(42)
print("OpenCV:", cv2.__version__)
print("Torch:", torch.__version__)


## 2) Extract frames (every N seconds)

In [None]:

#@title Provide a short video path and sampling rate
video_path = "/content/sample.mp4"  #@param {type:"string"}
every_s = 2.0                       #@param {type:"number"}
frames_dir = "/content/frames_s3_v3"   #@param {type:"string"}

def extract_frames(video_path, every_s=2.0, out_dir="/content/frames_s3_v3"):
    os.makedirs(out_dir, exist_ok=True)
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise FileNotFoundError(f"Cannot open {video_path}")
    fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
    total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0)
    print(f"FPS={fps:.2f}  Frames={total}")
    frames = []
    next_t = 0.0
    while True:
        pos_s = cap.get(cv2.CAP_PROP_POS_MSEC) / 1000.0
        ok, frame = cap.read()
        if not ok: break
        if pos_s >= next_t:
            out = os.path.join(out_dir, f"frame_{int(pos_s):04d}.jpg")
            cv2.imwrite(out, frame); frames.append((pos_s, out))
            next_t += every_s
    cap.release()
    print("Saved frames:", len(frames))
    return frames

try:
    frames = extract_frames(video_path, every_s, frames_dir)
except Exception as e:
    frames = []
    print("Upload a short video and update video_path. Error:", e)


## 3) Image tagging (ViT, CPU)

In [None]:

processor = AutoImageProcessor.from_pretrained("google/vit-base-patch16-224")
vision_model = AutoModelForImageClassification.from_pretrained("google/vit-base-patch16-224")

def classify_image(path):
    img = Image.open(path).convert("RGB")
    inputs = processor(images=img, return_tensors="pt")
    with torch.no_grad():
        logits = vision_model(**inputs).logits
    return vision_model.config.id2label[logits.argmax(-1).item()]

frame_tags = []
for t, p in (frames[:8] if frames else []):
    try:
        lbl = classify_image(p)
        frame_tags.append({"time_s": round(t,2), "path": p, "label": lbl})
        print(f"{t:6.2f}s :: {os.path.basename(p)} :: {lbl}")
    except Exception as e:
        print("Failed:", p, e)
if not frames:
    print("No frames yet — run extraction above.")


## 4) Face detection (Haar cascade, CPU)

In [None]:

haar = cv2.data.haarcascades + "haarcascade_frontalface_default.xml"
face_cascade = cv2.CascadeClassifier(haar)

def detect_faces(path):
    img = cv2.imread(path); gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    faces = face_cascade.detectMultiScale(gray, 1.1, 5, minSize=(40,40))
    return int(len(faces))

face_counts = []
for t, p in (frames[:8] if frames else []):
    n = detect_faces(p); face_counts.append({"time_s": round(t,2), "path": p, "faces": n})
    print(f"{t:6.2f}s :: faces={n}")


## 5) Shot detection (PySceneDetect)

In [None]:

def detect_shots_pyscenedetect(video_path, method="content", threshold=27.0, min_scene_len=15):
    video = open_video(video_path)
    sm = SceneManager()
    if method == "adaptive":
        sm.add_detector(AdaptiveDetector(adaptive_threshold=3.0, min_scene_len=min_scene_len))
    else:
        sm.add_detector(ContentDetector(threshold=threshold, min_scene_len=min_scene_len))
    sm.detect_scenes(video)
    scene_list = sm.get_scene_list()
    shots = [{"start": s.get_seconds(), "end": e.get_seconds(),
              "duration": e.get_seconds()-s.get_seconds()} for s,e in scene_list]
    return shots

try:
    shots = detect_shots_pyscenedetect(video_path, method="content", threshold=27.0, min_scene_len=15)
    print("Shots detected:", len(shots))
    print(shots[:5])
except Exception as e:
    shots = []
    print("Shot detection failed:", e)


## 6) Merge shots → scenes (HSV hist + min duration)

In [None]:

def hsv_hist_at_time(cap, t_sec, bins=32):
    cap.set(cv2.CAP_PROP_POS_MSEC, max(t_sec, 0)*1000.0)
    ok, frame = cap.read()
    if not ok: return None
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
    hist = cv2.calcHist([hsv],[0,1],None,[bins,bins],[0,180,0,256])
    return cv2.normalize(hist, hist).flatten()

def merge_shots_to_scenes(video_path, shots, min_scene_dur=3.0, max_hist_dist=0.20, bins=32):
    if not shots: return []
    cap = cv2.VideoCapture(video_path)
    merged = [dict(shots[0])]
    for nxt in shots[1:]:
        cur = merged[-1]
        t1 = (cur["start"] + cur["end"]) / 2.0
        t2 = (nxt["start"] + nxt["end"]) / 2.0
        h1 = hsv_hist_at_time(cap, t1, bins=bins)
        h2 = hsv_hist_at_time(cap, t2, bins=bins)
        dist = cv2.compareHist(h1, h2, cv2.HISTCMP_BHATTACHARYYA) if (h1 is not None and h2 is not None) else 1.0
        should_merge = (cur["duration"] < min_scene_dur) or (dist < max_hist_dist)
        if should_merge:
            cur["end"] = nxt["end"]
            cur["duration"] = cur["end"] - cur["start"]
        else:
            merged.append(dict(nxt))
    cap.release()
    return merged

try:
    scenes = merge_shots_to_scenes(video_path, shots, min_scene_dur=3.0, max_hist_dist=0.20)
    print("Scenes (merged):", len(scenes))
    print(scenes[:5])
except Exception as e:
    scenes = []
    print("Scene merge failed:", e)


## 7) (Optional) Export scenes CSV & thumbnails

In [None]:

def export_scenes_csv_and_thumbs(video_path, scenes, csv_path="/content/scenes_s3_v3.csv", thumb_dir="/content/scene_thumbs_s3_v3"):
    if not scenes:
        print("No scenes to export."); return None, None
    os.makedirs(thumb_dir, exist_ok=True)
    # CSV
    with open(csv_path, "w", newline="") as f:
        w = csv.writer(f)
        w.writerow(["scene_idx","start_s","end_s","duration_s"])
        for i, sc in enumerate(scenes):
            w.writerow([i, round(sc["start"],2), round(sc["end"],2), round(sc["duration"],2)])
    # Thumbnails (midpoint frame)
    cap = cv2.VideoCapture(video_path)
    thumbs = []
    for i, sc in enumerate(scenes):
        t_mid = (sc["start"] + sc["end"]) / 2.0
        cap.set(cv2.CAP_PROP_POS_MSEC, t_mid*1000.0)
        ok, frame = cap.read()
        if ok:
            outp = os.path.join(thumb_dir, f"scene_{i:03d}.jpg")
            cv2.imwrite(outp, frame); thumbs.append(outp)
    cap.release()
    print("Saved CSV:", csv_path)
    print("Saved thumbs:", len(thumbs), "->", thumb_dir)
    return csv_path, thumbs

# Uncomment to export after scenes are computed:
# csv_path, thumbs = export_scenes_csv_and_thumbs(video_path, scenes)


## 8) NER & Summarization

In [None]:

# NER
ner = pipeline("ner", model="dslim/bert-base-NER", aggregation_strategy="simple", device=-1)
transcript_text = "Lewis Hamilton spoke to Mercedes engineers during the Monaco Grand Prix. The FIA issued a penalty."
entities = [{"type":e["entity_group"], "text":e["word"], "score":float(e["score"])} for e in ner(transcript_text)]
print("Entities:", entities[:6])

# Summarization
from transformers import pipeline as hf_pipeline
summarizer = hf_pipeline("summarization", model="sshleifer/distilbart-cnn-12-6", device=-1)
summary = summarizer(transcript_text, max_length=60, min_length=20, do_sample=False)[0]["summary_text"]
print("Summary:", summary)


## 9) Simple RAG (retrieve + template)

In [None]:

embedder = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2", device="cpu")
chunks = [
    "[00:00-00:10] Lewis Hamilton discussed tire strategy.",
    "[00:10-00:20] Safety car was deployed after a collision.",
    "[00:20-00:30] FIA announced stricter track limits."
]
emb = embedder.encode(chunks, convert_to_tensor=True, normalize_embeddings=True)

def retrieve(query, top_k=2):
    q = embedder.encode([query], convert_to_tensor=True, normalize_embeddings=True)
    scores = util.cos_sim(q, emb)[0]
    top = torch.topk(scores, k=top_k)
    return [(chunks[i], float(s)) for i,s in zip(top.indices.tolist(), top.values.tolist())]

def rag_answer(query):
    items = retrieve(query, 2)
    evidence = "\n".join(f"- {c}" for c,_ in items)
    return f"Q: {query}\nEvidence:\n{evidence}\nA: {items[0][0] if items else 'Insufficient evidence.'}"

print(rag_answer("Why was the safety car deployed?"))


## 10) Export `metadata.json`

In [None]:

metadata = {
    "frames": frame_tags,
    "faces": face_counts,
    "shots": shots,
    "scenes": scenes,
    "transcript": transcript_text,
    "entities": entities,
    "summary": summary,
    "rag_example": rag_answer("Why was the safety car deployed?")
}
out_path = "/content/segment3_v3_pyscenedetect_metadata.json"
with open(out_path, "w") as f:
    json.dump(metadata, f, indent=2)
print("Saved:", out_path)
print(json.dumps(metadata, indent=2)[:1200], "...")
