In [None]:
from google.colab import drive
drive.mount('/content/drive')


In [None]:
!ls "/content/drive/MyDrive/Animal_Kingdom/action_recognition/dataset"

In [None]:
!mkdir -p /content/videos
!tar -xvzf "/content/drive/MyDrive/Animal_Kingdom/action_recognition/dataset/video.tar.gz" -C /content/videos

In [None]:
import os

video_dir = "/content/videos/video"
video_files = [f for f in os.listdir(video_dir) if f.endswith(('.mp4', '.avi', '.mov'))]
print(f"Found {len(video_files)} video files.")
print(video_files[:5])  # show a few

In [None]:
import pandas as pd

metadata_path = "/content/drive/MyDrive/Animal_Kingdom/action_recognition/AR_metadata.xlsx"
df = pd.read_excel(metadata_path)
print(df.head())

In [None]:
#Transformers for BLIP
!pip install transformers

#Torch for model and inference
!pip install torch torchvision torchaudio

#Excel file reading
!pip install openpyxl pandas

#Image and video processing
!pip install opencv-python pillow

In [None]:
import os, random, json, gc, cv2, torch
import pandas as pd
from pathlib import Path
from PIL import Image

from transformers import (
    Blip2Processor, Blip2ForConditionalGeneration,
    BartTokenizer, BartForConditionalGeneration
)

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", device)


In [None]:
# BLIP‑2 via HF Transformers
blip_model_id = "Salesforce/blip2-opt-2.7b"   # you can swap to flan‑t5 variant if you prefer
blip_processor = Blip2Processor.from_pretrained(blip_model_id)
# If you're tight on VRAM, you can remove device_map and keep it on CPU. CUDA is faster.
blip_model = Blip2ForConditionalGeneration.from_pretrained(
    blip_model_id,
    torch_dtype=torch.float16 if device == "cuda" else torch.float32,
    device_map="auto" if device == "cuda" else None
)
blip_model.eval()

@torch.inference_mode()
def generate_caption(image_path: str, max_new_tokens: int = 50) -> str:
    image = Image.open(image_path).convert("RGB")
    inputs = blip_processor(images=image, return_tensors="pt")
    if device == "cuda":
        inputs = {k: v.to(blip_model.device, dtype=torch.float16 if v.dtype.is_floating_point else None)
                  for k, v in inputs.items()}
    outputs = blip_model.generate(**inputs, max_new_tokens=max_new_tokens)
    caption = blip_processor.batch_decode(outputs, skip_special_tokens=True)[0].strip()
    return caption


In [None]:
bart_model_id = "facebook/bart-large-cnn"
bart_tokenizer = BartTokenizer.from_pretrained(bart_model_id)
bart_model = BartForConditionalGeneration.from_pretrained(bart_model_id).to(device)
bart_model.eval()

@torch.inference_mode()
def summarize_captions_bart(captions, max_len=60, min_len=15) -> str:
    text_input = " ".join(captions)
    prompt = ("Summarize the following wildlife video scenes with high detail and precision. "
              "Retain unique animal behaviors, actions, and surroundings: " + text_input)
    inputs = bart_tokenizer([prompt], return_tensors="pt", max_length=1024, truncation=True).to(device)
    summary_ids = bart_model.generate(
        inputs["input_ids"],
        max_length=max_len,
        min_length=min_len,
        num_beams=4,
        early_stopping=True,
        no_repeat_ngram_size=3,
        repetition_penalty=2.0
    )
    return bart_tokenizer.decode(summary_ids[0], skip_special_tokens=True).strip()


In [None]:
def extract_frames_4fps(video_path, output_folder):
    os.makedirs(output_folder, exist_ok=True)
    cap = cv2.VideoCapture(str(video_path))
    fps = cap.get(cv2.CAP_PROP_FPS) or 0
    interval = int(fps // 4) if fps and fps >= 4 else 1
    success, frame = cap.read()
    count, saved = 0, 0
    while success:
        if count % interval == 0:
            frame_path = os.path.join(output_folder, f"frame_{saved:03d}.jpg")
            cv2.imwrite(frame_path, frame)
            saved += 1
        success, frame = cap.read()
        count += 1
    cap.release()
    return saved


In [None]:
def semantic_correctness_score(caption, keywords):
    cap = caption.lower()
    total = len(keywords)
    hits = sum(1 for kw in keywords if kw.lower() in cap)
    return (hits / total) if total else 0.0


In [None]:
# Inputs
metadata_path = "/content/drive/MyDrive/Animal_Kingdom/action_recognition/AR_metadata.xlsx"
video_dir = "/content/videos/video"
frames_root = "/content/frames"
save_csv = "/content/blip2_bart_results.csv"

# Load metadata: video_id | list_animal | list_animal_action
meta_df = pd.read_excel(metadata_path)
# Normalize into a dict of plain Python structures
metadata = {}
for _, r in meta_df.iterrows():
    vid = str(r["video_id"])
    animals = r.get("list_animal", [])
    if isinstance(animals, str):
        # If CSV stored JSON-like array as string, try to eval safely
        try:
            animals = eval(animals)
        except Exception:
            animals = [animals] if animals else []
    actions_raw = r.get("list_animal_action", "")
    actions = []
    if isinstance(actions_raw, str) and actions_raw.strip():
        try:
            # expecting something like [(animal, action), ...]
            actions = [act for (_, act) in eval(actions_raw)]
        except Exception:
            pass
    metadata[vid] = {
        "animals": animals if isinstance(animals, list) else [animals],
        "actions": actions
    }

# Choose your set (here: first 20 mp4s just as a smoke test)
all_videos = sorted([f for f in os.listdir(video_dir) if f.endswith(".mp4")])[:20]

rows = []
for v in all_videos:
    video_id = Path(v).stem
    vpath = os.path.join(video_dir, v)
    fdir = os.path.join(frames_root, video_id)
    print(f"\nProcessing {video_id}")

    # 1) Frames
    _ = extract_frames_4fps(vpath, fdir)

    # 2) Caption each frame (you can subsample if many)
    frame_files = [os.path.join(fdir, f) for f in sorted(os.listdir(fdir)) if f.endswith(".jpg")]
    frame_captions = []
    for fimg in frame_files:
        try:
            frame_captions.append(generate_caption(fimg))
        except Exception as e:
            # be resilient; skip bad frames
            print(f"  warn: caption failed on {Path(fimg).name}: {e}")
            continue

    if not frame_captions:
        print("  warn: no captions, skipping")
        continue

    # 3) Summarize
    final_caption = summarize_captions_bart(frame_captions)

    # 4) Simple semantic score against metadata keywords
    meta = metadata.get(video_id, {"animals": [], "actions": []})
    keywords = [*(meta["animals"] or []), *(meta["actions"] or [])]
    sem_score = semantic_correctness_score(final_caption, keywords)

    rows.append({
        "video_id": video_id,
        "final_caption": final_caption,
        "semantic_correctness_percent": f"{sem_score*100:.1f}%",
        "num_frames": len(frame_files)
    })

# Save
out_df = pd.DataFrame(rows)
out_df.to_csv(save_csv, index=False)
print("\nSaved:", save_csv)


In [None]:
# ================================
# BLIP2 → BART scalable pipeline
# Prints final captions in Colab output
# Saves JSONL prompts as {video_id, prompt, semantic_correctness_percent}
# Also saves full CSV/JSONL with details
# ================================

# --- (If running in a fresh Colab) you may need installs ---
# !pip install git+https://github.com/salesforce/BLIP.git transformers timm accelerate opencv-python pandas -q

import os, cv2, math, json, random, ast, gc, time, re
from pathlib import Path
from typing import List, Dict, Any, Tuple

import torch
import pandas as pd
from PIL import Image

from transformers import (
    Blip2Processor, Blip2ForConditionalGeneration,
    BartTokenizer, BartForConditionalGeneration
)

# ================================
# 1) Config
# ================================
VIDEO_DIR = "/content/videos/video"               # directory with ~30k .mp4
METADATA_XLSX = "/content/drive/MyDrive/Animal_Kingdom/action_recognition/AR_metadata.xlsx"

OUTPUT_DIR = "/content/ak_blip2_bart_results"
CSV_PATH = f"{OUTPUT_DIR}/results_1000.csv"           # detailed results
JSONL_PATH = f"{OUTPUT_DIR}/results_1000.jsonl"        # detailed results (JSONL)
PROMPTS_JSONL = f"{OUTPUT_DIR}/prompts_1000.jsonl"     # *** compact prompts JSONL ***
CHECKPOINT_PATH = f"{OUTPUT_DIR}/completed_ids.txt"
LOG_PATH = f"{OUTPUT_DIR}/run.log"

NUM_VIDEOS = 1000
RANDOM_SEED = 42
MAX_FRAMES_PER_VIDEO = 64
CAPTION_BATCH_SIZE = 8
TARGET_FPS = 4

# Generation knobs (speed vs quality)
BLIP2_MAX_NEW_TOKENS = 40
BLIP2_NUM_BEAMS = 1

BART_MAX_LEN = 60
BART_MIN_LEN = 15
BART_NUM_BEAMS = 1

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", DEVICE)

os.makedirs(OUTPUT_DIR, exist_ok=True)

def log(msg: str):
    ts = time.strftime("%Y-%m-%d %H:%M:%S")
    with open(LOG_PATH, "a") as f:
        f.write(f"[{ts}] {msg}\n")
    print(msg)

# ================================
# 2) Load models once
# ================================
log("Loading BLIP2 + BART models…")
blip_processor = Blip2Processor.from_pretrained("Salesforce/blip2-opt-2.7b")
blip_model = Blip2ForConditionalGeneration.from_pretrained(
    "Salesforce/blip2-opt-2.7b",
    device_map="auto",
    torch_dtype=torch.float16
)

bart_tokenizer = BartTokenizer.from_pretrained("facebook/bart-large-cnn")
bart_model = BartForConditionalGeneration.from_pretrained("facebook/bart-large-cnn").to(DEVICE)

# ================================
# 3) Helpers
# ================================

def list_mp4s(dir_path: str) -> List[str]:
    return sorted([f for f in os.listdir(dir_path) if f.lower().endswith(".mp4")])


def read_completed_ids(path: str) -> set:
    if not os.path.exists(path):
        return set()
    with open(path, "r") as f:
        return set(line.strip() for line in f if line.strip())


def append_completed_id(path: str, vid: str):
    with open(path, "a") as f:
        f.write(vid + "\n")


def uniform_indices(total_frames: int, max_count: int) -> List[int]:
    if total_frames <= 0:
        return []
    if total_frames <= max_count:
        return list(range(total_frames))
    step = total_frames / float(max_count)
    return [int(i * step) for i in range(max_count)]


def target_step_from_fps(fps: float, target_fps: float = TARGET_FPS) -> int:
    if fps <= 0:
        return 1
    step = max(1, int(round(fps / target_fps)))
    return step


def sample_frame_indices(cap: cv2.VideoCapture, max_frames: int) -> List[int]:
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    if fps and fps >= TARGET_FPS:
        step = target_step_from_fps(fps, TARGET_FPS)
        candidates = list(range(0, total_frames, step))
        if len(candidates) > max_frames:
            # downsample uniformly over candidates
            idxs = uniform_indices(len(candidates), max_frames)
            return [candidates[i] for i in idxs]
        return candidates[:max_frames]
    else:
        return uniform_indices(total_frames, max_frames)


def frames_to_pil(cap: cv2.VideoCapture, indices: List[int]) -> List[Image.Image]:
    imgs = []
    for idx in indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ok, frame = cap.read()
        if not ok:
            continue
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        imgs.append(Image.fromarray(frame_rgb))
    return imgs


@torch.inference_mode()
def caption_images_blip2(images: List[Image.Image]) -> List[str]:
    captions = []
    for i in range(0, len(images), CAPTION_BATCH_SIZE):
        batch_imgs = images[i:i+CAPTION_BATCH_SIZE]
        inputs = blip_processor(images=batch_imgs, return_tensors="pt").to(blip_model.device, torch.float16)
        out = blip_model.generate(
            **inputs,
            max_new_tokens=BLIP2_MAX_NEW_TOKENS,
            num_beams=BLIP2_NUM_BEAMS
        )
        batch_caps = blip_processor.batch_decode(out, skip_special_tokens=True)
        captions.extend([c.strip() for c in batch_caps])
        del inputs, out
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
    return captions


@torch.inference_mode()
def summarize_captions_bart(captions: List[str]) -> str:
    text_input = " ".join(captions)
    prompt = (
        "Summarize the following wildlife video scenes with high detail and precision. "
        "Retain unique animal behaviors, actions, and surroundings: " + text_input
    )
    inputs = bart_tokenizer([prompt], return_tensors="pt", max_length=1024, truncation=True).to(DEVICE)
    # NOTE: do not pass early_stopping/length_penalty (ignored in recent HF)
    summary_ids = bart_model.generate(
        inputs["input_ids"],
        max_length=BART_MAX_LEN,
        min_length=BART_MIN_LEN,
        num_beams=BART_NUM_BEAMS,
        no_repeat_ngram_size=3,
        repetition_penalty=2.0,
    )
    return bart_tokenizer.decode(summary_ids[0], skip_special_tokens=True).strip()


# ---- Fuzzy-ish keyword matching to reduce false 0% ----
_non_alnum = re.compile(r"[^a-z0-9\s]")
_multispace = re.compile(r"\s+")

def _norm(s: str) -> str:
    s = s.lower()
    s = _non_alnum.sub(" ", s)
    s = _multispace.sub(" ", s).strip()
    # naive singularization
    if s.endswith("s") and len(s) > 3:
        s = s[:-1]
    return s


def semantic_correctness_score(caption: str, keywords: List[str]) -> Tuple[float | None, List[str], List[str]]:
    kws = [kw for kw in keywords if isinstance(kw, str) and kw.strip()]
    if not kws:
        return None, [], []  # N/A — no ground truth
    cap = " " + _norm(caption) + " "
    matched, missing = [], []
    for kw in kws:
        k = _norm(kw)
        variants = {k}
        # a couple of common short forms
        if "chimpanzee" in k: variants.add(k.replace("chimpanzee", "chimp"))
        if "crocodile" in k: variants.add(k.replace("crocodile", "croc"))
        hit = any((" " + v + " ") in cap for v in variants)
        (matched if hit else missing).append(kw)
    score = len(matched) / len(kws)
    return score, matched, missing


def safe_parse_list(value) -> List[str]:
    if isinstance(value, list):
        return value
    if not isinstance(value, str) or not value.strip():
        return []
    try:
        parsed = ast.literal_eval(value)
        return parsed if isinstance(parsed, list) else []
    except Exception:
        return []


def safe_parse_actions(value) -> List[str]:
    if not isinstance(value, str) or not value.strip():
        return []
    try:
        parsed = ast.literal_eval(value)
        actions = []
        if isinstance(parsed, list):
            for item in parsed:
                if isinstance(item, (tuple, list)) and len(item) >= 2:
                    actions.append(str(item[1]))
                elif isinstance(item, str):
                    actions.append(item)
        return actions
    except Exception:
        return []


def write_jsonl(path: str, record: Dict[str, Any]):
    with open(path, "a") as f:
        f.write(json.dumps(record, ensure_ascii=False) + "\n")

# ================================
# 4) Load metadata
# ================================
log("Loading metadata…")
meta_df = pd.read_excel(METADATA_XLSX)
if "video_id" not in meta_df.columns:
    raise ValueError("Expected 'video_id' column in metadata.")
meta_df = meta_df.set_index("video_id", drop=False)

# ================================
# 5) Choose random 1000 (resumable)
# ================================
all_files = list_mp4s(VIDEO_DIR)
if len(all_files) < NUM_VIDEOS:
    raise ValueError(f"Found only {len(all_files)} MP4s, need {NUM_VIDEOS}.")
random.seed(RANDOM_SEED)
sampled_files = random.sample(all_files, NUM_VIDEOS)

completed = read_completed_ids(CHECKPOINT_PATH)
remaining = [f for f in sampled_files if Path(f).stem not in completed]
log(f"{len(completed)} already completed, {len(remaining)} remaining.")

if not os.path.exists(CSV_PATH):
    pd.DataFrame(columns=[
        "video_id","final_caption","frame_captions",
        "keywords","matched_keywords","missing_keywords",
        "semantic_correctness_percent"
    ]).to_csv(CSV_PATH, index=False)

# ================================
# 6) Main loop — PRINT captions + SAVE prompts
# ================================
for idx, video_file in enumerate(remaining, 1):
    video_id = Path(video_file).stem
    video_path = os.path.join(VIDEO_DIR, video_file)
    log(f"[{idx}/{len(remaining)}] Processing {video_id}")

    try:
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            log(f"  !! Could not open {video_path}, skipping.")
            append_completed_id(CHECKPOINT_PATH, video_id)
            continue

        indices = sample_frame_indices(cap, MAX_FRAMES_PER_VIDEO)
        if not indices:
            log("  !! No frames found, skipping.")
            cap.release()
            append_completed_id(CHECKPOINT_PATH, video_id)
            continue

        pil_images = frames_to_pil(cap, indices)
        cap.release()
        if not pil_images:
            log("  !! Failed reading frames, skipping.")
            append_completed_id(CHECKPOINT_PATH, video_id)
            continue

        # ---- Caption frames (GPU batches) ----
        frame_captions = caption_images_blip2(pil_images)

        # ---- Summarize ----
        final_caption = summarize_captions_bart(frame_captions)

        # ---- Metadata ----
        animals, actions = [], []
        if video_id in meta_df.index:
            row = meta_df.loc[video_id]
            animals = safe_parse_list(row.get("list_animal", []))
            actions = safe_parse_actions(row.get("list_animal_action", ""))
        else:
            log(f"  !! No metadata for {video_id}")

        keywords = [a.lower() for a in animals] + [a.lower() for a in actions]
        score, matched, missing = semantic_correctness_score(final_caption, keywords)
        score_fmt = "N/A" if score is None else f"{score*100:.1f}%"

        # ---- PRINT to Colab output (your request) ----
        print("\n>>>", video_id)
        print("Caption:", final_caption)
        if frame_captions:
            print("First 5 frame captions:")
            for c in frame_captions[:5]:
                print(" -", c)
        print("Semantic correctness:", score_fmt)
        if score is not None:
            print("Matched:", matched)
            print("Missing:", missing)

        # ---- Save detailed record ----
        record = {
            "video_id": video_id,
            "final_caption": final_caption,
            "frame_captions": frame_captions,
            "keywords": keywords,
            "matched_keywords": matched,
            "missing_keywords": missing,
            "semantic_correctness_percent": score_fmt
        }
        write_jsonl(JSONL_PATH, record)
        pd.DataFrame([record]).to_csv(CSV_PATH, mode="a", header=False, index=False)

        # ---- Save compact prompts JSONL (your requested format) ----
        prompt_rec = {
            "video_id": video_id,
            "prompt": final_caption,
            "semantic_correctness_percent": score_fmt,
        }
        write_jsonl(PROMPTS_JSONL, prompt_rec)

        append_completed_id(CHECKPOINT_PATH, video_id)

        # ---- Housekeeping ----
        del pil_images, frame_captions
        gc.collect()
        if torch.cuda.is_available():
            torch.cuda.empty_cache()

        log(f"  ✓ Done {video_id} | score={score_fmt}")

    except Exception as e:
        log(f"  !! Error on {video_id}: {repr(e)}")
        append_completed_id(CHECKPOINT_PATH, video_id)
        continue

log("\nBLIP2 + BART 1000-video run complete.")
log(f"CSV (detailed): {CSV_PATH}")
log(f"JSONL (detailed): {JSONL_PATH}")
log(f"JSONL (prompts): {PROMPTS_JSONL}")
log(f"Completed IDs list: {CHECKPOINT_PATH}")


First 1000 videos(BLIP2+BART+4fps+prompt)

In [None]:
# ================================
# 1. Install Dependencies
# ================================
!pip install git+https://github.com/salesforce/BLIP.git transformers timm accelerate opencv-python pandas -q

# ================================
# 2. Import Libraries
# ================================
import os, cv2, torch
import pandas as pd
from PIL import Image
from transformers import (
    Blip2Processor, Blip2ForConditionalGeneration,
    BartTokenizer, BartForConditionalGeneration
)

# ================================
# 3. Device Setup
# ================================
device = "cuda" if torch.cuda.is_available() else "cpu"
print(" Using device:", device)

# ================================
# 4. BLIP2 Setup (Captioning)
# ================================
blip_processor = Blip2Processor.from_pretrained("Salesforce/blip2-opt-2.7b")
blip_model = Blip2ForConditionalGeneration.from_pretrained(
    "Salesforce/blip2-opt-2.7b",
    device_map="auto",
    torch_dtype=torch.float16
)

def generate_caption(image_path):
    image = Image.open(image_path).convert('RGB')
    inputs = blip_processor(images=image, return_tensors="pt").to(blip_model.device, torch.float16)
    outputs = blip_model.generate(**inputs, max_new_tokens=50)
    return blip_processor.batch_decode(outputs, skip_special_tokens=True)[0]

# ================================
# 5. BART Setup (Summarization)
# ================================
bart_tokenizer = BartTokenizer.from_pretrained("facebook/bart-large-cnn")
bart_model = BartForConditionalGeneration.from_pretrained("facebook/bart-large-cnn").to(device)

def summarize_captions_bart(captions):
    text_input = " ".join(captions)
    prompt = (
        "Summarize the following wildlife video scenes with high detail and precision. "
        "Retain unique animal behaviors, actions, and surroundings: " + text_input
    )
    inputs = bart_tokenizer([prompt], return_tensors="pt", max_length=1024, truncation=True).to(device)
    summary_ids = bart_model.generate(
        inputs["input_ids"],
        max_length=60,
        min_length=15,
        num_beams=4,
        early_stopping=True,
        no_repeat_ngram_size=3,
        repetition_penalty=2.0
    )
    return bart_tokenizer.decode(summary_ids[0], skip_special_tokens=True).strip()


# ================================
# 6. Frame Extraction
# ================================
def extract_frames_4fps(video_path, output_folder):
    os.makedirs(output_folder, exist_ok=True)
    vidcap = cv2.VideoCapture(video_path)
    fps = vidcap.get(cv2.CAP_PROP_FPS)
    interval = int(fps / 4) if fps >= 4 else 1  # Capture every 0.25 sec
    success, image = vidcap.read()
    count, saved = 0, 0
    while success:
        if count % interval == 0:
            frame_path = os.path.join(output_folder, f"frame_{saved:03d}.jpg")
            cv2.imwrite(frame_path, image)
            saved += 1
        success, image = vidcap.read()
        count += 1
    vidcap.release()
    return saved

# ================================
# 7. Metadata Matching Function
# ================================
def semantic_correctness_score(caption, keywords):
    caption = caption.lower()
    matched = [kw for kw in keywords if kw.lower() in caption]
    missing = [kw for kw in keywords if kw.lower() not in caption]
    score = len(matched) / len(keywords) if keywords else 0
    return score, matched, missing

# ================================
# 8. Load Metadata
# ================================
metadata_path = "/content/drive/MyDrive/Animal_Kingdom/action_recognition/AR_metadata.xlsx"
meta_df = pd.read_excel(metadata_path)
metadata_dict = {row["video_id"]: row for _, row in meta_df.iterrows()}

# ================================
# 9. Process Videos
# ================================
video_dir = "/content/videos/video/"
video_files = sorted([f for f in os.listdir(video_dir) if f.endswith(".mp4")])[:1000]

results = []

for video_file in video_files:
    video_id = os.path.splitext(video_file)[0]
    video_path = os.path.join(video_dir, video_file)
    frame_dir = f"/content/frames/{video_id}"

    print(f"\n Processing {video_id}")

    # Step 1: Extract frames
    extract_frames_4fps(video_path, frame_dir)

    # Step 2: Caption each frame
    frame_captions = []
    for fname in sorted(os.listdir(frame_dir)):
        if fname.endswith(".jpg"):
            path = os.path.join(frame_dir, fname)
            caption = generate_caption(path)
            frame_captions.append(caption)

    # Step 3: Summarize using BART
    final_caption = summarize_captions_bart(frame_captions)

    # Step 4: Metadata matching
    meta = metadata_dict.get(video_id, {})
    animals = meta.get("list_animal", [])
    actions_raw = meta.get("list_animal_action", "")
    try:
        actions = [act for (_, act) in eval(actions_raw)] if isinstance(actions_raw, str) else []
    except:
        actions = []

    keywords = [a.lower() for a in animals] + [a.lower() for a in actions]
    score, matched, missing = semantic_correctness_score(final_caption, keywords)

    results.append({
        "video_id": video_id,
        "final_caption": final_caption,
        "frame_captions": frame_captions,
        "keywords": keywords,
        "matched_keywords": matched,
        "missing_keywords": missing,
        "semantic_correctness_percent": f"{score*100:.1f}%"
    })

# ================================
# 10. Save Results
# ================================
results_df = pd.DataFrame(results)
results_df.to_csv("/content/blip2_bart_results.csv", index=False)

print("\n BLIP2 + BART Evaluation Complete! Results saved to:")
print(" /content/blip2_bart_results.csv")


In [None]:
import pandas as pd

# Load results
results_df = pd.read_csv("/content/blip2_bart_results.csv")

# Display key columns
print(results_df[["video_id", "semantic_correctness_percent"]])

Without prompt(BLIP2+BART+4fps)

In [None]:
# ================================
# 1. Install Dependencies
# ================================
!pip install -q git+https://github.com/salesforce/BLIP.git transformers timm accelerate opencv-python pandas

# ================================
# 2. Import Libraries
# ================================
import os, cv2, torch, ast
import pandas as pd
from PIL import Image
from transformers import (
    Blip2Processor, Blip2ForConditionalGeneration,
    BartTokenizer, BartForConditionalGeneration
)

# ================================
# 3. Device Setup
# ================================
device = "cuda" if torch.cuda.is_available() else "cpu"
print(" Using device:", device)

# ================================
# 4. BLIP2 Setup (Captioning)
# ================================
blip_processor = Blip2Processor.from_pretrained("Salesforce/blip2-opt-2.7b")
blip_model = Blip2ForConditionalGeneration.from_pretrained(
    "Salesforce/blip2-opt-2.7b",
    device_map="auto",                # lets Accelerate place sharded weights
    torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32
)

@torch.inference_mode()
def generate_caption(image_path):
    image = Image.open(image_path).convert('RGB')
    # IMPORTANT: send tensors to the SAME device/dtype as the model
    inputs = blip_processor(images=image, return_tensors="pt")
    inputs = {k: v.to(device=blip_model.device, dtype=blip_model.dtype) for k, v in inputs.items()}
    outputs = blip_model.generate(**inputs, max_new_tokens=50)
    caption = blip_processor.batch_decode(outputs, skip_special_tokens=True)[0]
    return caption.strip()

# ================================
# 5. BART Setup (Summarization)
# ================================
bart_tokenizer = BartTokenizer.from_pretrained("facebook/bart-large-cnn")
bart_model = BartForConditionalGeneration.from_pretrained("facebook/bart-large-cnn").to(device)

@torch.inference_mode()
def summarize_captions_bart(captions, max_input_tokens=1024, max_summary_tokens=60):
    """
    Summarize ONLY the concatenated captions (no instruction/prompt string).
    """
    # Guard: if no captions were produced, return empty string
    if not captions:
        return ""

    # Join captions with newlines to give the encoder separable sentences
    text_input = "\n".join([c for c in captions if isinstance(c, str) and c.strip()])
    if not text_input.strip():
        return ""

    # Tokenize with truncation to BART's 1024-token encoder limit
    inputs = bart_tokenizer(
        text_input,
        return_tensors="pt",
        max_length=max_input_tokens,
        truncation=True
    ).to(device)

    summary_ids = bart_model.generate(
        input_ids=inputs["input_ids"],
        attention_mask=inputs.get("attention_mask", None),
        max_length=max_summary_tokens,
        min_length=15,
        num_beams=4,
        early_stopping=True,
        no_repeat_ngram_size=3,
        length_penalty=1.0
    )
    return bart_tokenizer.decode(summary_ids[0], skip_special_tokens=True).strip()

# ================================
# 6. Frame Extraction
# ================================
def extract_frames_4fps(video_path, output_folder):
    os.makedirs(output_folder, exist_ok=True)
    vidcap = cv2.VideoCapture(video_path)
    if not vidcap.isOpened():
        print(f"  ! Could not open video: {video_path}")
        return 0

    fps = vidcap.get(cv2.CAP_PROP_FPS)
    # Capture roughly every 0.25 sec; if FPS invalid/low, fall back to interval 1
    try:
        interval = int(max(1, round((fps if fps and fps > 0 else 4) / 4)))
    except:
        interval = 1

    success, image = vidcap.read()
    count, saved = 0, 0
    while success:
        if count % interval == 0:
            frame_path = os.path.join(output_folder, f"frame_{saved:03d}.jpg")
            cv2.imwrite(frame_path, image)
            saved += 1
        success, image = vidcap.read()
        count += 1
    vidcap.release()
    return saved

# ================================
# 7. Metadata Matching Function
# ================================
def semantic_correctness_score(caption, keywords):
    caption_l = (caption or "").lower()
    kws = [kw.lower() for kw in (keywords or []) if isinstance(kw, str)]
    matched = [kw for kw in kws if kw in caption_l]
    missing = [kw for kw in kws if kw not in caption_l]
    score = (len(matched) / len(kws)) if kws else 0.0
    return score, matched, missing

# ================================
# 8. Load Metadata
# ================================
metadata_path = "/content/drive/MyDrive/Animal_Kingdom/action_recognition/AR_metadata.xlsx"
meta_df = pd.read_excel(metadata_path)

# Build a dict: video_id -> dict with parsed animals/actions
metadata_dict = {}
for _, row in meta_df.iterrows():
    vid = row.get("video_id")
    animals_raw = row.get("list_animal", [])
    actions_raw = row.get("list_animal_action", "")

    # Parse animals: handle lists stored as strings like "['lion', 'zebra']"
    if isinstance(animals_raw, str):
        try:
            animals = ast.literal_eval(animals_raw)
            if not isinstance(animals, list):
                animals = [str(animals)]
        except Exception:
            # fall back: split on commas
            animals = [a.strip() for a in animals_raw.split(",") if a.strip()]
    elif isinstance(animals_raw, list):
        animals = animals_raw
    else:
        animals = []

    # Parse actions: often like "[('lion','running'),('zebra','grazing')]"
    actions = []
    if isinstance(actions_raw, str) and actions_raw.strip():
        try:
            parsed = ast.literal_eval(actions_raw)
            # Accept list of tuples or list of strings
            if isinstance(parsed, list):
                for item in parsed:
                    if isinstance(item, (list, tuple)) and len(item) >= 2:
                        actions.append(str(item[1]))
                    elif isinstance(item, str):
                        actions.append(item)
        except Exception:
            # try simple comma split as a fallback
            actions = [a.strip() for a in actions_raw.split(",") if a.strip()]

    metadata_dict[vid] = {
        "animals": animals,
        "actions": actions
    }

# ================================
# 9. Process Videos
# ================================
video_dir = "/content/videos/video/"
video_files = sorted([f for f in os.listdir(video_dir) if f.endswith(".mp4")])[:1000]

results = []

for video_file in video_files:
    video_id = os.path.splitext(video_file)[0]
    video_path = os.path.join(video_dir, video_file)
    frame_dir = f"/content/frames/{video_id}"

    print(f"\n Processing {video_id}")

    # Step 1: Extract frames
    n_frames = extract_frames_4fps(video_path, frame_dir)
    if n_frames == 0:
        print("  ! No frames extracted; skipping.")
        continue

    # Step 2: Caption each frame
    frame_captions = []
    frame_names = sorted([fn for fn in os.listdir(frame_dir) if fn.endswith(".jpg")])
    for idx, fname in enumerate(frame_names, start=1):
        path = os.path.join(frame_dir, fname)
        caption = generate_caption(path)
        frame_captions.append(caption)
        if idx % 20 == 0:
            print(f"  - Captioned {idx}/{len(frame_names)} frames")

    # Step 3: Summarize using BART (no instruction string!)
    final_caption = summarize_captions_bart(frame_captions)

    # Step 4: Metadata matching
    meta = metadata_dict.get(video_id, {"animals": [], "actions": []})
    animals = meta.get("animals", [])
    actions = meta.get("actions", [])
    keywords = [a for a in animals] + [a for a in actions]

    score, matched, missing = semantic_correctness_score(final_caption, keywords)

    results.append({
        "video_id": video_id,
        "final_caption": final_caption,
        "frame_captions": frame_captions,
        "keywords": [k.lower() for k in keywords],
        "matched_keywords": matched,
        "missing_keywords": missing,
        "semantic_correctness_percent": f"{score*100:.1f}%"
    })

# ================================
# 10. Save Results
# ================================
results_df = pd.DataFrame(results)
out_path = "/content/blip2_bart_results.csv"
results_df.to_csv(out_path, index=False)

print("\n BLIP2 + BART Evaluation Complete! Results saved to:")
print(f" {out_path}")


FLAN-T5 summarizer+instruction prompt

In [None]:
# ================================
# 1) Install dependencies
# ================================
!pip install -q git+https://github.com/salesforce/BLIP.git transformers timm accelerate opencv-python pandas nltk

# ================================
# 2) Imports
# ================================
import os, re, cv2, ast, torch
import pandas as pd
from PIL import Image
from collections import Counter

from transformers import (
    Blip2Processor, Blip2ForConditionalGeneration,
    BartTokenizer, BartForConditionalGeneration,
    T5Tokenizer, T5ForConditionalGeneration
)

# Lemmatization for robust scoring
import nltk
nltk.download('wordnet', quiet=True)
from nltk.stem import WordNetLemmatizer
lemmatizer = WordNetLemmatizer()

# ================================
# 3) Device
# ================================
device = "cuda" if torch.cuda.is_available() else "cpu"
print(" Using device:", device)

# ================================
# 4) BLIP2 (frame captioning)
# ================================
blip_processor = Blip2Processor.from_pretrained("Salesforce/blip2-opt-2.7b")
blip_model = Blip2ForConditionalGeneration.from_pretrained(
    "Salesforce/blip2-opt-2.7b",
    device_map="auto",
    torch_dtype=(torch.float16 if torch.cuda.is_available() else torch.float32)
)

@torch.inference_mode()
def generate_caption(image_path):
    image = Image.open(image_path).convert('RGB')
    inputs = blip_processor(images=image, return_tensors="pt")
    inputs = {k: v.to(device=blip_model.device, dtype=blip_model.dtype) for k, v in inputs.items()}
    out = blip_model.generate(**inputs, max_new_tokens=50)
    cap = blip_processor.batch_decode(out, skip_special_tokens=True)[0]
    return cap.strip()

# ================================
# 5) Summarizers: FLAN (default) & BART
# ================================
flan_tokenizer = T5Tokenizer.from_pretrained("google/flan-t5-large")
flan_model = T5ForConditionalGeneration.from_pretrained("google/flan-t5-large").to(device)

bart_tokenizer = BartTokenizer.from_pretrained("facebook/bart-large-cnn")
bart_model = BartForConditionalGeneration.from_pretrained("facebook/bart-large-cnn").to(device)

def curate_captions(caps, max_caps=80, min_chars=8):
    """Deduplicate exact repeats, basic quality filter, uniformly sample to max_caps."""
    caps = [c.strip() for c in caps if isinstance(c, str) and len(c.strip()) >= min_chars]
    seen, uniq = set(), []
    for c in caps:
        if c not in seen:
            seen.add(c); uniq.append(c)
    if len(uniq) <= max_caps:
        return uniq
    # uniform sampling by index
    idxs = [round(i) for i in [j*(len(uniq)-1)/(max_caps-1) for j in range(max_caps)]]
    return [uniq[i] for i in idxs]

# --- Metadata keyword normalization (maps metadata → caption-friendly terms)
SPECIES_MAP = {
    # birds / common wildlife seen in your misses
    "eurasian wren bird": "wren",
    "ardea alba egret": "egret",
    "grey heron": "heron",
    "black-winged stilt": "stilt",
    "yellowhammer": "bird",
    "tit bird": "tit",
    "mallard duck": "duck",
    "common crane": "crane",
    "black mamba": "snake",
    # mammals genericization
    "mongoose": "mongoose",
    # plurals often used by BLIP -> keep generic
}

ACTION_MAP = {
    "keeping still": "standing still",
    "attending": "looking",
    "sensing": "looking",         # sometimes “sniffing”/“listening”, but “looking” is safer
    "chirping": "singing",
    "flapping": "flapping",
    "landing": "landing",
    "preening": "preening",
    "walking": "walking",
    "moving": "moving",
    "eating": "eating",
    "jumping": "jumping",
    "attacking": "attacking",
    "flying": "flying",
    "shaking head": "shaking head",
    "singing nightingale": "nightingale singing"
}

def normalize_meta_keywords(animals, actions):
    def norm_list(lst, mp):
        out = []
        for x in (lst or []):
            x_l = str(x).lower()
            out.append(mp.get(x_l, x_l))
        return out
    return norm_list(animals, SPECIES_MAP), norm_list(actions, ACTION_MAP)

def pick_keywords_to_encourage(frame_captions, meta_keywords, max_animals=5, max_actions=3):
    """Keep only keys that actually appear (or their normalized forms) in captions to avoid hallucination."""
    text = " ".join(frame_captions).lower()
    animals, actions = [], []
    for kw in meta_keywords:
        kw_l = kw.lower()
        if kw_l in text:
            # crude verb vs noun heuristic
            if any(kw_l.endswith(s) for s in ("ing","ed")) or " " in kw_l:
                actions.append(kw_l)
            else:
                animals.append(kw_l)
    return animals[:max_animals] + actions[:max_actions]

def build_force_ids(tokenizer, words):
    fw = []
    for w in words:
        ids = tokenizer(w, add_special_tokens=False).input_ids
        if ids:
            fw.append(ids)
    return fw if fw else None

@torch.inference_mode()
def summarize_with_flan(frame_captions, encourage_words=None, max_new_tokens=80):
    caps = curate_captions(frame_captions, max_caps=80)
    if not caps: return ""
    # Instruction works for FLAN; include “Ensure to mention …” softly
    prompt = "Summarize the wildlife video in one detailed, factual sentence mentioning all animals and their actions.\n"
    if encourage_words:
        prompt += "Ensure to mention: " + ", ".join(encourage_words) + ".\n"
    prompt += "\n".join(f"- {c}" for c in caps)

    enc = flan_tokenizer(prompt, return_tensors="pt", truncation=True, max_length=1024).to(device)
    gen_kwargs = dict(
        **enc,
        max_new_tokens=max_new_tokens,
        min_length=20,
        num_beams=4,
        length_penalty=1.15,
        no_repeat_ngram_size=3,
        repetition_penalty=1.1,
        early_stopping=True
    )
    # (HF supports force_words_ids for T5; usually not needed when we “encourage” in text)
    ids = flan_model.generate(**gen_kwargs)
    return flan_tokenizer.decode(ids[0], skip_special_tokens=True).strip()

@torch.inference_mode()
def summarize_with_bart(frame_captions, encourage_words=None, max_length=80):
    caps = curate_captions(frame_captions, max_caps=80)
    if not caps: return ""
    text = "\n".join(caps)
    enc = bart_tokenizer(text, return_tensors="pt", truncation=True, max_length=1024).to(device)

    gen_kwargs = dict(
        input_ids=enc["input_ids"],
        attention_mask=enc.get("attention_mask", None),
        max_length=max_length,
        min_length=20,
        num_beams=4,
        length_penalty=1.1,
        no_repeat_ngram_size=3,
        repetition_penalty=1.1,
        early_stopping=True
    )
    # Only force words that already appear in frame captions to avoid hallucinations
    if encourage_words:
        fw_ids = build_force_ids(bart_tokenizer, encourage_words)
        if fw_ids:
            gen_kwargs["force_words_ids"] = fw_ids

    out_ids = bart_model.generate(**gen_kwargs)
    return bart_tokenizer.decode(out_ids[0], skip_special_tokens=True).strip()

# ================================
# 6) Frame extraction (4 fps)
# ================================
def extract_frames_4fps(video_path, output_folder):
    os.makedirs(output_folder, exist_ok=True)
    vid = cv2.VideoCapture(video_path)
    if not vid.isOpened():
        print(f"  ! Could not open video: {video_path}")
        return 0
    fps = vid.get(cv2.CAP_PROP_FPS) or 0
    interval = int(max(1, round((fps if fps > 0 else 4)/4)))  # ~ every 0.25s
    ok, frame = vid.read()
    count, saved = 0, 0
    while ok:
        if count % interval == 0:
            cv2.imwrite(os.path.join(output_folder, f"frame_{saved:03d}.jpg"), frame)
            saved += 1
        ok, frame = vid.read()
        count += 1
    vid.release()
    return saved

# ================================
# 7) Metadata loading & parsing
# ================================
metadata_path = "/content/drive/MyDrive/Animal_Kingdom/action_recognition/AR_metadata.xlsx"
meta_df = pd.read_excel(metadata_path)

def parse_metadata(df):
    meta = {}
    for _, row in df.iterrows():
        vid = row.get("video_id")
        animals_raw = row.get("list_animal", [])
        actions_raw = row.get("list_animal_action", "")

        # animals
        if isinstance(animals_raw, str):
            try:
                animals = ast.literal_eval(animals_raw)
                if not isinstance(animals, list):
                    animals = [str(animals)]
            except Exception:
                animals = [a.strip() for a in animals_raw.split(",") if a.strip()]
        elif isinstance(animals_raw, list):
            animals = animals_raw
        else:
            animals = []

        # actions
        actions = []
        if isinstance(actions_raw, str) and actions_raw.strip():
            try:
                parsed = ast.literal_eval(actions_raw)
                if isinstance(parsed, list):
                    for it in parsed:
                        if isinstance(it, (list, tuple)) and len(it) >= 2:
                            actions.append(str(it[1]))
                        elif isinstance(it, str):
                            actions.append(it)
            except Exception:
                actions = [a.strip() for a in actions_raw.split(",") if a.strip()]

        meta[vid] = {"animals": animals, "actions": actions}
    return meta

metadata_dict = parse_metadata(meta_df)

# ================================
# 8) Robust semantic scoring
# ================================
ANIMAL_SYNONYMS = {
    "hippo":"hippopotamus", "gator":"alligator", "croc":"crocodile",
    "lioness":"lion", "cheetahs":"cheetah", "elephants":"elephant"
}

def normalize_tokens(text):
    words = re.findall(r"[a-zA-Z]+", (text or "").lower())
    lemmas = set()
    for w in words:
        base_n = lemmatizer.lemmatize(w, pos='n')
        base_v = lemmatizer.lemmatize(w, pos='v')
        lemmas.update([w, base_n, base_v, ANIMAL_SYNONYMS.get(w, w)])
    return lemmas

def lemma_phrase_match(caption, phrase):
    cap_lemmas = normalize_tokens(caption)
    toks = [t for t in re.findall(r"[a-zA-Z]+", (phrase or "").lower()) if t]
    if not toks: return False
    kw_lemmas = set()
    for t in toks:
        kw_lemmas.update([
            t,
            lemmatizer.lemmatize(t, pos='n'),
            lemmatizer.lemmatize(t, pos='v'),
            ANIMAL_SYNONYMS.get(t, t)
        ])
    return kw_lemmas.issubset(cap_lemmas)

def semantic_scores(caption, animals, actions):
    raw_animals, raw_actions = animals or [], actions or []

    # Normalize metadata to caption-friendly forms first
    animals_n, actions_n = normalize_meta_keywords(raw_animals, raw_actions)
    keywords = [*animals_n, *actions_n]
    keywords = [k for k in keywords if isinstance(k, str) and k.strip()]
    if not keywords:
        return 0.0, [], [], 0.0, [], [], animals_n, actions_n

    cap_l = (caption or "").lower()
    strict_matched, strict_missing = [], []
    for kw in keywords:
        if kw.lower() in cap_l:
            strict_matched.append(kw)
        else:
            strict_missing.append(kw)
    strict_score = len(strict_matched)/len(keywords) if keywords else 0.0

    lemma_matched, lemma_missing = [], []
    for kw in keywords:
        if lemma_phrase_match(caption, kw):
            lemma_matched.append(kw)
        else:
            lemma_missing.append(kw)
    lemma_score = len(lemma_matched)/len(keywords) if keywords else 0.0

    return strict_score, strict_matched, strict_missing, lemma_score, lemma_matched, lemma_missing, animals_n, actions_n

# ================================
# 9) Main loop
# ================================
video_dir = "/content/videos/video/"
video_files = sorted([f for f in os.listdir(video_dir) if f.endswith(".mp4")])[:1000]

SUMMARIZER = "flan"         # "flan" or "bart"
USE_KEYWORD_STEERING = True # softly encourage only safe keywords

results = []

for video_file in video_files:
    video_id = os.path.splitext(video_file)[0]
    video_path = os.path.join(video_dir, video_file)
    frame_dir = f"/content/frames/{video_id}"

    print(f"\n Processing {video_id}")

    # Step 1: Extract frames
    n_frames = extract_frames_4fps(video_path, frame_dir)
    if n_frames == 0:
        print("  ! No frames extracted; skipping.")
        continue

    # Step 2: Caption each frame
    frame_captions = []
    frame_names = sorted([fn for fn in os.listdir(frame_dir) if fn.endswith(".jpg")])
    for idx, fname in enumerate(frame_names, start=1):
        path = os.path.join(frame_dir, fname)
        caption = generate_caption(path)
        frame_captions.append(caption)
        if idx % 20 == 0:
            print(f"  - Captioned {idx}/{len(frame_names)} frames")

    # Step 3: Summarize
    meta = metadata_dict.get(video_id, {"animals": [], "actions": []})
    animals_raw, actions_raw = meta.get("animals", []), meta.get("actions", [])
    # normalize metadata to caption-friendly forms
    animals_n, actions_n = normalize_meta_keywords(animals_raw, actions_raw)
    meta_keywords_norm = [*animals_n, *actions_n]

    encourage = pick_keywords_to_encourage(frame_captions, meta_keywords_norm) if USE_KEYWORD_STEERING else None

    if SUMMARIZER.lower() == "flan":
        final_caption = summarize_with_flan(frame_captions, encourage_words=encourage, max_new_tokens=80)
    else:
        final_caption = summarize_with_bart(frame_captions, encourage_words=encourage, max_length=80)

    # Step 4: Scoring (strict + lemma-aware)
    s_score, s_match, s_miss, l_score, l_match, l_miss, animals_n, actions_n = \
        semantic_scores(final_caption, animals_raw, actions_raw)

    results.append({
        "video_id": video_id,
        "final_caption": final_caption,
        "frame_captions_curated": curate_captions(frame_captions, max_caps=80),
        "keywords_normalized": [*animals_n, *actions_n],
        "strict_matched": s_match,
        "strict_missing": s_miss,
        "strict_semantic_percent": f"{s_score*100:.1f}%",
        "lemma_matched": l_match,
        "lemma_missing": l_miss,
        "lemma_semantic_percent": f"{l_score*100:.1f}%"
    })

# ================================
# 10) Save results
# ================================
out_path = "/content/blip2_summary_results.csv"
pd.DataFrame(results).to_csv(out_path, index=False)

print("\n Done. Results saved to:")
print(f" {out_path}")


First 5 videos + FLAN-T5 + Robust semantic score

In [None]:
# ================================
# Process ONLY the first 5 videos with robust scoring
# ================================
!pip install -q git+https://github.com/salesforce/BLIP.git transformers timm accelerate opencv-python pandas nltk

import os, re, cv2, ast, torch
import pandas as pd
from PIL import Image
from transformers import (
    Blip2Processor, Blip2ForConditionalGeneration,
    T5Tokenizer, T5ForConditionalGeneration,
    BartTokenizer, BartForConditionalGeneration
)

# --- Lemmatizer for robust scoring
import nltk
nltk.download('wordnet', quiet=True)
from nltk.stem import WordNetLemmatizer
lemmatizer = WordNetLemmatizer()

device = "cuda" if torch.cuda.is_available() else "cpu"
print(" Using device:", device)

# ---------- BLIP2 (frame captioning)
blip_processor = Blip2Processor.from_pretrained("Salesforce/blip2-opt-2.7b")
blip_model = Blip2ForConditionalGeneration.from_pretrained(
    "Salesforce/blip2-opt-2.7b",
    device_map="auto",
    torch_dtype=(torch.float16 if torch.cuda.is_available() else torch.float32)
)

@torch.inference_mode()
def generate_caption(image_path):
    image = Image.open(image_path).convert('RGB')
    inputs = blip_processor(images=image, return_tensors="pt")
    inputs = {k: v.to(device=blip_model.device, dtype=blip_model.dtype) for k, v in inputs.items()}
    out = blip_model.generate(**inputs, max_new_tokens=50)
    return blip_processor.batch_decode(out, skip_special_tokens=True)[0].strip()

# ---------- Summarizers
flan_tokenizer = T5Tokenizer.from_pretrained("google/flan-t5-large")
flan_model = T5ForConditionalGeneration.from_pretrained("google/flan-t5-large").to(device)

bart_tokenizer = BartTokenizer.from_pretrained("facebook/bart-large-cnn")
bart_model = BartForConditionalGeneration.from_pretrained("facebook/bart-large-cnn").to(device)

def curate_captions(caps, max_caps=80, min_chars=8):
    caps = [c.strip() for c in caps if isinstance(c, str) and len(c.strip()) >= min_chars]
    seen, uniq = set(), []
    for c in caps:
        if c not in seen:
            seen.add(c); uniq.append(c)
    if len(uniq) <= max_caps:
        return uniq
    idxs = [round(i) for i in [j*(len(uniq)-1)/(max_caps-1) for j in range(max_caps)]]
    return [uniq[i] for i in idxs]

def extract_frames_4fps(video_path, output_folder):
    os.makedirs(output_folder, exist_ok=True)
    vid = cv2.VideoCapture(video_path)
    if not vid.isOpened():
        print(f"  ! Could not open {video_path}")
        return 0
    fps = vid.get(cv2.CAP_PROP_FPS) or 0
    interval = int(max(1, round((fps if fps > 0 else 4)/4)))
    ok, frame = vid.read()
    count, saved = 0, 0
    while ok:
        if count % interval == 0:
            cv2.imwrite(os.path.join(output_folder, f"frame_{saved:03d}.jpg"), frame)
            saved += 1
        ok, frame = vid.read()
        count += 1
    vid.release()
    return saved

# ---------- Metadata loading & normalization
metadata_path = "/content/drive/MyDrive/Animal_Kingdom/action_recognition/AR_metadata.xlsx"
meta_df = pd.read_excel(metadata_path)

def parse_metadata(df):
    meta = {}
    for _, row in df.iterrows():
        vid = row.get("video_id")
        animals_raw = row.get("list_animal", [])
        actions_raw = row.get("list_animal_action", "")

        # animals
        if isinstance(animals_raw, str):
            try:
                animals = ast.literal_eval(animals_raw)
                if not isinstance(animals, list): animals = [str(animals)]
            except Exception:
                animals = [a.strip() for a in animals_raw.split(",") if a.strip()]
        elif isinstance(animals_raw, list):
            animals = animals_raw
        else:
            animals = []

        # actions
        actions = []
        if isinstance(actions_raw, str) and actions_raw.strip():
            try:
                parsed = ast.literal_eval(actions_raw)
                if isinstance(parsed, list):
                    for it in parsed:
                        if isinstance(it, (list, tuple)) and len(it) >= 2: actions.append(str(it[1]))
                        elif isinstance(it, str): actions.append(it)
            except Exception:
                actions = [a.strip() for a in actions_raw.split(",") if a.strip()]

        meta[vid] = {"animals": animals, "actions": actions}
    return meta

metadata_dict = parse_metadata(meta_df)

# Map metadata -> caption-friendly forms
SPECIES_MAP = {
    "eurasian wren bird": "wren",
    "ardea alba egret": "egret",
    "grey heron": "heron",
    "black-winged stilt": "stilt",
    "yellowhammer": "bird",
    "tit bird": "tit",
    "mallard duck": "duck",
    "common crane": "crane",
    "black mamba": "snake",
    "mongoose": "mongoose",
}
ACTION_MAP = {
    "keeping still": "standing still",
    "attending": "looking",
    "sensing": "looking",
    "chirping": "singing",
    "singing nightingale": "nightingale singing",
    "flapping": "flapping",
    "landing": "landing",
    "preening": "preening",
    "walking": "walking",
    "moving": "moving",
    "eating": "eating",
    "jumping": "jumping",
    "attacking": "attacking",
    "flying": "flying",
    "shaking head": "shaking head",
}

def normalize_meta_keywords(animals, actions):
    def norm_list(lst, mp):
        out = []
        for x in (lst or []):
            x_l = str(x).lower()
            out.append(mp.get(x_l, x_l))
        return out
    return norm_list(animals, SPECIES_MAP), norm_list(actions, ACTION_MAP)

# ---------- Summarization
def encourage_words(frame_captions, meta_keywords, max_animals=5, max_actions=3):
    text = " ".join(frame_captions).lower()
    animals, actions = [], []
    for kw in meta_keywords:
        kw_l = kw.lower()
        if kw_l in text:
            if any(kw_l.endswith(s) for s in ("ing","ed")) or " " in kw_l: actions.append(kw_l)
            else: animals.append(kw_l)
    return animals[:max_animals] + actions[:max_actions]

@torch.inference_mode()
def summarize_with_flan(frame_captions, encourage=None, max_new_tokens=80):
    caps = curate_captions(frame_captions, max_caps=80)
    if not caps: return ""
    prompt = "Summarize the wildlife video in one detailed, factual sentence mentioning all animals and their actions.\n"
    if encourage: prompt += "Ensure to mention: " + ", ".join(encourage) + ".\n"
    prompt += "\n".join(f"- {c}" for c in caps)
    enc = flan_tokenizer(prompt, return_tensors="pt", truncation=True, max_length=1024).to(device)
    ids = flan_model.generate(
        **enc, max_new_tokens=max_new_tokens, min_length=20,
        num_beams=4, length_penalty=1.15, no_repeat_ngram_size=3,
        repetition_penalty=1.1, early_stopping=True
    )
    return flan_tokenizer.decode(ids[0], skip_special_tokens=True).strip()

# ---------- Robust scoring (hypernyms + action inference + partial match)
ANIMAL_BACKOFF = {
    "nightingale": ["nightingale", "bird", "songbird"],
    "wren": ["wren", "bird"],
    "egret": ["egret", "heron", "bird"],
    "heron": ["heron", "bird"],
    "stilt": ["stilt", "wader", "shorebird", "bird"],
    "crane": ["crane", "bird"],
    "duck": ["duck", "waterfowl", "bird"],
    "tit": ["tit", "bird"],
    "snake": ["snake", "serpent", "reptile"],
    "mongoose": ["mongoose", "mammal", "animal"],
    "bird": ["bird"],
    "animal": ["animal"],
}

def infer_actions_from_caption(text):
    t = (text or "").lower()
    inferred = set()
    if any(p in t for p in ["singing","chirping","trilling","calling","vocalizing","song"]):
        inferred.update(["singing","chirping","calling"])
    if any(p in t for p in ["beak open","mouth open","open beak","open mouth"]):
        inferred.update(["singing","calling","chirping"])
    if any(p in t for p in ["keeping still","standing still","motionless","perched","sitting","resting"]):
        inferred.update(["keeping still","standing still"])
    if "walking" in t: inferred.add("walking")
    if "running" in t: inferred.add("running")
    if any(p in t for p in ["flying","in flight","takes off","taking off","soars","soaring"]): inferred.add("flying")
    if any(p in t for p in ["landing","lands","touches down"]): inferred.add("landing")
    if any(p in t for p in ["flapping","flaps"]): inferred.add("flapping")
    if any(p in t for p in ["preening","grooming feathers"]): inferred.add("preening")
    if any(p in t for p in ["eating","feeding","chewing","grazing","pecking"]): inferred.add("eating")
    if any(p in t for p in ["shaking head","head shake"]): inferred.add("shaking head")
    if any(p in t for p in ["looking","watching","peering","gazing","staring"]): inferred.add("looking")
    if any(p in t for p in ["attacks","attack","attacking","strikes","strike"]): inferred.add("attacking")
    if any(p in t for p in ["jumping","leaping","hopping"]): inferred.add("jumping")
    if "moving" in t: inferred.add("moving")
    return inferred

def normalize_lemmas(text):
    words = re.findall(r"[a-zA-Z]+", (text or "").lower())
    return {lemmatizer.lemmatize(w, pos='n') for w in words} | {lemmatizer.lemmatize(w, pos='v') for w in words}

def partial_phrase_match(caption, phrase, threshold=0.6):
    cap = normalize_lemmas(caption)
    toks = [t for t in re.findall(r"[a-zA-Z]+", (phrase or "").lower()) if t]
    if not toks: return False
    kw = {lemmatizer.lemmatize(t, pos='n') for t in toks} | {lemmatizer.lemmatize(t, pos='v') for t in toks}
    overlap = sum(1 for t in kw if t in cap)
    return (overlap / len(kw)) >= threshold

def animal_match_with_backoff(caption, animal_kw):
    cap = (caption or "").lower()
    key = animal_kw.lower()
    cand = ANIMAL_BACKOFF.get(key, [key])
    return any(c in cap for c in cand) or partial_phrase_match(caption, key, threshold=0.5)

def score_semantics(caption, animals_norm, actions_norm):
    # Return NA if no keywords
    all_kws = [*(animals_norm or []), *(actions_norm or [])]
    if not all_kws:
        return None, [], [], None, [], []
    cap_l = (caption or "").lower()

    # Strict (legacy)
    s_match, s_miss = [], []
    for kw in all_kws:
        (s_match if kw.lower() in cap_l else s_miss).append(kw)
    s_score = len(s_match)/len(all_kws)

    # Robust
    inferred = infer_actions_from_caption(caption)
    r_match, r_miss = [], []
    for kw in all_kws:
        k = kw.lower()
        if k in (animals_norm or []):
            ok = animal_match_with_backoff(caption, k)
        else:
            ok = (k in inferred) or partial_phrase_match(caption, k, threshold=0.6)
        (r_match if ok else r_miss).append(kw)
    r_score = len(r_match)/len(all_kws)
    return s_score, s_match, s_miss, r_score, r_match, r_miss

# ---------- Main: only first 5 videos
video_dir = "/content/videos/video/"
video_files = sorted([f for f in os.listdir(video_dir) if f.endswith(".mp4")])[:5]

SUMMARIZER = "flan"      # "flan" (recommended) or "bart"
USE_STEERING = True

rows = []
for vf in video_files:
    vid = os.path.splitext(vf)[0]
    vpath = os.path.join(video_dir, vf)
    fdir = f"/content/frames/{vid}"

    print(f"\n Processing {vid}")
    n = extract_frames_4fps(vpath, fdir)
    if n == 0:
        print("  ! Skipping (no frames).")
        continue

    # Captions
    caps = []
    frames = sorted([fn for fn in os.listdir(fdir) if fn.endswith(".jpg")])
    for i, fn in enumerate(frames, 1):
        caps.append(generate_caption(os.path.join(fdir, fn)))
        if i % 20 == 0:
            print(f"  - Captioned {i}/{len(frames)} frames")
    caps_cur = curate_captions(caps, max_caps=80)

    # Metadata
    meta = metadata_dict.get(vid, {"animals": [], "actions": []})
    animals_raw, actions_raw = meta.get("animals", []), meta.get("actions", [])
    animals_n, actions_n = normalize_meta_keywords(animals_raw, actions_raw)
    kws_norm = [*animals_n, *actions_n]

    # Summarize
    encouragement = encourage_words(caps_cur, kws_norm) if USE_STEERING else None
    if SUMMARIZER == "flan":
        final_caption = summarize_with_flan(caps_cur, encourage=encouragement, max_new_tokens=80)
    else:
        # Optional: swap in BART if you want to compare
        enc = bart_tokenizer("\n".join(caps_cur), return_tensors="pt", truncation=True, max_length=1024).to(device)
        out_ids = bart_model.generate(
            input_ids=enc["input_ids"], attention_mask=enc.get("attention_mask", None),
            max_length=80, min_length=20, num_beams=4, length_penalty=1.1,
            no_repeat_ngram_size=3, repetition_penalty=1.1, early_stopping=True
        )
        final_caption = bart_tokenizer.decode(out_ids[0], skip_special_tokens=True).strip()

    # Scores
    s_score, s_match, s_miss, r_score, r_match, r_miss = score_semantics(final_caption, animals_n, actions_n)

    rows.append({
        "video_id": vid,
        "final_caption": final_caption,
        "frame_captions_curated": caps_cur,
        "keywords_normalized": kws_norm,
        "strict_matched": s_match, "strict_missing": s_miss,
        "robust_matched": r_match, "robust_missing": r_miss,
        "strict_semantic_percent": "NA" if s_score is None else f"{s_score*100:.1f}%",
        "robust_semantic_percent": "NA" if r_score is None else f"{r_score*100:.1f}%"
    })

# Save & print a tiny summary
out_path = "/content/blip2_summary_results_first5.csv"
pd.DataFrame(rows).to_csv(out_path, index=False)
print("\nSaved:", out_path)

for r in rows:
    print(f"\n[{r['video_id']}]")
    print("Final:", r["final_caption"])
    print("Strict:", r["strict_semantic_percent"], " | Robust:", r["robust_semantic_percent"])
    if r["robust_missing"]:
        print("Robust missing (few):", r["robust_missing"][:5])

In [None]:
First 1000(FLAN-T5)

In [None]:
# ================================
# 1) Install
# ================================
# !pip install -q git+https://github.com/salesforce/BLIP.git transformers timm accelerate opencv-python pandas nltk

# ================================
# 2) Imports
# ================================
import os, re, cv2, ast, torch
import pandas as pd
from PIL import Image
from transformers import (
    Blip2Processor, Blip2ForConditionalGeneration,
    BartTokenizer, BartForConditionalGeneration,
    T5Tokenizer, T5ForConditionalGeneration
)

# Lemmatization for robust scoring
import nltk
nltk.download('wordnet', quiet=True)
from nltk.stem import WordNetLemmatizer
lemmatizer = WordNetLemmatizer()

# ================================
# 3) Device
# ================================
device = "cuda" if torch.cuda.is_available() else "cpu"
print(" Using device:", device)

# ================================
# 4) BLIP2 (frame captioning)
# ================================
blip_processor = Blip2Processor.from_pretrained("Salesforce/blip2-opt-2.7b")
blip_model = Blip2ForConditionalGeneration.from_pretrained(
    "Salesforce/blip2-opt-2.7b",
    device_map="auto",
    torch_dtype=(torch.float16 if torch.cuda.is_available() else torch.float32)
)

@torch.inference_mode()
def generate_caption(image_path):
    image = Image.open(image_path).convert('RGB')
    inputs = blip_processor(images=image, return_tensors="pt")
    inputs = {k: v.to(device=blip_model.device, dtype=blip_model.dtype) for k, v in inputs.items()}
    out = blip_model.generate(**inputs, max_new_tokens=50)
    return blip_processor.batch_decode(out, skip_special_tokens=True)[0].strip()

# ================================
# 5) Summarizers: FLAN (default) & BART
# ================================
flan_tokenizer = T5Tokenizer.from_pretrained("google/flan-t5-large")
flan_model = T5ForConditionalGeneration.from_pretrained("google/flan-t5-large").to(device)

bart_tokenizer = BartTokenizer.from_pretrained("facebook/bart-large-cnn")
bart_model = BartForConditionalGeneration.from_pretrained("facebook/bart-large-cnn").to(device)

def curate_captions(caps, max_caps=80, min_chars=8):
    caps = [c.strip() for c in caps if isinstance(c, str) and len(c.strip()) >= min_chars]
    seen, uniq = set(), []
    for c in caps:
        if c not in seen:
            seen.add(c); uniq.append(c)
    if len(uniq) <= max_caps: return uniq
    idxs = [round(i) for i in [j*(len(uniq)-1)/(max_caps-1) for j in range(max_caps)]]
    return [uniq[i] for i in idxs]

# --- Normalize metadata to caption-friendly forms
SPECIES_MAP = {
    "eurasian wren bird": "wren",
    "ardea alba egret": "egret",
    "grey heron": "heron",
    "black-winged stilt": "stilt",
    "yellowhammer": "bird",
    "tit bird": "tit",
    "mallard duck": "duck",
    "common crane": "crane",
    "black mamba": "snake",
    "mongoose": "mongoose",
}
ACTION_MAP = {
    "keeping still": "standing still",
    "attending": "looking",
    "sensing": "looking",
    "chirping": "singing",
    "flapping": "flapping",
    "landing": "landing",
    "preening": "preening",
    "walking": "walking",
    "moving": "moving",
    "eating": "eating",
    "jumping": "jumping",
    "attacking": "attacking",
    "flying": "flying",
    "shaking head": "shaking head",
    "singing nightingale": "nightingale singing",
}

def normalize_meta_keywords(animals, actions):
    def norm_list(lst, mp):
        out = []
        for x in (lst or []):
            x_l = str(x).lower()
            out.append(mp.get(x_l, x_l))
        return out
    return norm_list(animals, SPECIES_MAP), norm_list(actions, ACTION_MAP)

# ---- Steering: now with fallback to metadata if nothing found in captions
def pick_keywords_to_encourage(frame_captions, meta_keywords, max_animals=8, max_actions=6):
    text = " ".join(frame_captions).lower()
    animals, actions = [], []
    for kw in meta_keywords:
        k = kw.lower()
        if k in text:
            if any(k.endswith(s) for s in ("ing","ed")) or " " in k: actions.append(k)
            else: animals.append(k)
    # Fallback: if nothing surfaced from captions, nudge a small slice of trusted metadata
    if not animals and not actions:
        animals = [k for k in meta_keywords if " " not in k][:max_animals]
        actions = [k for k in meta_keywords if (" " in k) or k.endswith(("ing","ed"))][:max_actions]
    return animals[:max_animals] + actions[:max_actions]

@torch.inference_mode()
def summarize_with_flan(frame_captions, encourage_words=None, max_new_tokens=80):
    caps = curate_captions(frame_captions, max_caps=80)
    if not caps: return ""
    prompt = "Summarize the wildlife video in one detailed, factual sentence mentioning all animals and their actions.\n"
    if encourage_words: prompt += "Ensure to mention: " + ", ".join(encourage_words) + ".\n"
    prompt += "\n".join(f"- {c}" for c in caps)
    enc = flan_tokenizer(prompt, return_tensors="pt", truncation=True, max_length=1024).to(device)
    ids = flan_model.generate(
        **enc, max_new_tokens=max_new_tokens, min_length=20, num_beams=4,
        length_penalty=1.15, no_repeat_ngram_size=3, repetition_penalty=1.1,
        early_stopping=True
    )
    return flan_tokenizer.decode(ids[0], skip_special_tokens=True).strip()

@torch.inference_mode()
def summarize_with_bart(frame_captions, encourage_words=None, max_length=80):
    caps = curate_captions(frame_captions, max_caps=80)
    if not caps: return ""
    text = "\n".join(caps)
    enc = bart_tokenizer(text, return_tensors="pt", truncation=True, max_length=1024).to(device)
    gen_kwargs = dict(
        input_ids=enc["input_ids"], attention_mask=enc.get("attention_mask", None),
        max_length=max_length, min_length=20, num_beams=4,
        length_penalty=1.1, no_repeat_ngram_size=3, repetition_penalty=1.1,
        early_stopping=True
    )
    # Optional: force a few trusted keywords
    if encourage_words:
        fw = []
        for w in encourage_words[:8]:
            ids = bart_tokenizer(w, add_special_tokens=False).input_ids
            if ids: fw.append(ids)
        if fw: gen_kwargs["force_words_ids"] = fw
    out = bart_model.generate(**gen_kwargs)
    return bart_tokenizer.decode(out[0], skip_special_tokens=True).strip()

# ================================
# 6) Frame extraction (4 fps)
# ================================
def extract_frames_4fps(video_path, output_folder):
    os.makedirs(output_folder, exist_ok=True)
    vid = cv2.VideoCapture(video_path)
    if not vid.isOpened():
        print(f"  ! Could not open video: {video_path}")
        return 0
    fps = vid.get(cv2.CAP_PROP_FPS) or 0
    interval = int(max(1, round((fps if fps > 0 else 4)/4)))  # ~ every 0.25s
    ok, frame = vid.read()
    count, saved = 0, 0
    while ok:
        if count % interval == 0:
            cv2.imwrite(os.path.join(output_folder, f"frame_{saved:03d}.jpg"), frame)
            saved += 1
        ok, frame = vid.read(); count += 1
    vid.release()
    return saved

# ================================
# 7) Metadata loading & parsing
# ================================
metadata_path = "/content/drive/MyDrive/Animal_Kingdom/action_recognition/AR_metadata.xlsx"
meta_df = pd.read_excel(metadata_path)

def parse_metadata(df):
    meta = {}
    for _, row in df.iterrows():
        vid = row.get("video_id")
        animals_raw = row.get("list_animal", [])
        actions_raw = row.get("list_animal_action", "")
        # animals
        if isinstance(animals_raw, str):
            try:
                animals = ast.literal_eval(animals_raw)
                if not isinstance(animals, list): animals = [str(animals)]
            except Exception:
                animals = [a.strip() for a in animals_raw.split(",") if a.strip()]
        elif isinstance(animals_raw, list):
            animals = animals_raw
        else:
            animals = []
        # actions
        actions = []
        if isinstance(actions_raw, str) and actions_raw.strip():
            try:
                parsed = ast.literal_eval(actions_raw)
                if isinstance(parsed, list):
                    for it in parsed:
                        if isinstance(it, (list, tuple)) and len(it) >= 2: actions.append(str(it[1]))
                        elif isinstance(it, str): actions.append(it)
            except Exception:
                actions = [a.strip() for a in actions_raw.split(",") if a.strip()]
        meta[vid] = {"animals": animals, "actions": actions}
    return meta

metadata_dict = parse_metadata(meta_df)

# ================================
# 8) Robust semantic scoring (short & general)
# ================================
# Small hypernym/backoff to avoid brittle species matches
ANIMAL_BACKOFF = {
    "nightingale": ["nightingale", "bird", "songbird"],
    "wren": ["wren", "bird"],
    "egret": ["egret", "heron", "bird"],
    "heron": ["heron", "bird"],
    "stilt": ["stilt", "wader", "shorebird", "bird"],
    "crane": ["crane", "bird"],
    "duck": ["duck", "waterfowl", "bird"],
    "tit": ["tit", "bird"],
    "snake": ["snake", "serpent", "reptile"],
    "mongoose": ["mongoose", "mammal", "animal"],
    "bird": ["bird"],
    "animal": ["animal"],
}

def normalize_lemmas(text):
    words = re.findall(r"[a-zA-Z]+", (text or "").lower())
    return {lemmatizer.lemmatize(w, pos='n') for w in words} | {lemmatizer.lemmatize(w, pos='v') for w in words}

def partial_phrase_match(caption, phrase, thr=0.6):
    cap = normalize_lemmas(caption)
    toks = [t for t in re.findall(r"[a-zA-Z]+", (phrase or "").lower()) if t]
    if not toks: return False
    kw = {lemmatizer.lemmatize(t, pos='n') for t in toks} | {lemmatizer.lemmatize(t, pos='v') for t in toks}
    return (sum(1 for t in kw if t in cap) / len(kw)) >= thr

def animal_match_with_backoff(caption, animal_kw):
    cap = (caption or "").lower()
    key = animal_kw.lower()
    cand = ANIMAL_BACKOFF.get(key, [key])
    return any(c in cap for c in cand) or partial_phrase_match(caption, key, thr=0.5)

def _has_any(t, phrases): return any(p in t for p in phrases)

def infer_actions_from_caption(text):
    t = (text or "").lower()
    inferred = set()
    if _has_any(t, ["singing","chirping","trilling","calling","vocalizing","song"]):
        inferred.update(["singing","chirping","calling"])
    if _has_any(t, ["beak open","open beak","mouth open","open mouth"]):
        inferred.update(["singing","calling","chirping"])
    if _has_any(t, ["keeping still","standing still","motionless","perched","sitting","resting","standing"]):
        inferred.update(["keeping still","standing still","looking"])
    if "walking" in t: inferred.add("walking")
    if "running" in t: inferred.add("running")
    if _has_any(t, ["flying","in flight","soars","soaring","taking off","takes off"]): inferred.add("flying")
    if _has_any(t, ["landing","lands","touches down"]): inferred.add("landing")
    if _has_any(t, ["flapping","flaps"]): inferred.add("flapping")
    if _has_any(t, ["eating","feeding","chewing","grazing","pecking"]):
        inferred.add("eating")
        if _has_any(t, ["eating a", "eating the", "eating prey", "eating a snake", "eats a"]):
            inferred.update(["biting","attacking"])
    if _has_any(t, ["biting","bites","bite"]): inferred.update(["biting","attacking"])
    if _has_any(t, ["dead ", " dead", "carcass", "lifeless"]): inferred.add("dying")
    if _has_any(t, ["shaking head","head shake"]): inferred.add("shaking head")
    if _has_any(t, ["looking","watching","peering","gazing","staring"]): inferred.add("looking")
    if "moving" in t: inferred.add("moving")
    return inferred

def robust_semantic_scores(caption, animals_norm, actions_norm):
    """
    Returns both strict% (substring) and robust% (partial lemma + inference + hypernym).
    """
    keywords = [*(animals_norm or []), *(actions_norm or [])]
    keywords = [k for k in keywords if isinstance(k, str) and k.strip()]
    if not keywords:
        return "NA", [], [], "NA", [], []

    cap_l = (caption or "").lower()

    # strict (for continuity)
    s_match, s_miss = [], []
    for kw in keywords:
        (s_match if kw.lower() in cap_l else s_miss).append(kw)
    s_pct = f"{(len(s_match)/len(keywords))*100:.1f}%"

    # robust
    inferred = infer_actions_from_caption(caption)
    r_match, r_miss = [], []
    for kw in keywords:
        k = kw.lower()
        if k in (animals_norm or []):
            ok = animal_match_with_backoff(caption, k)
        else:
            ok = (k in inferred) or partial_phrase_match(caption, k, thr=0.6)
        (r_match if ok else r_miss).append(kw)
    r_pct = f"{(len(r_match)/len(keywords))*100:.1f}%"

    return s_pct, s_match, s_miss, r_pct, r_match, r_miss

# ================================
# 9) Main loop
# ================================
video_dir = "/content/videos/video/"
video_files = sorted([f for f in os.listdir(video_dir) if f.endswith(".mp4")])[:1000]

SUMMARIZER = "flan"         # "flan" (recommended) or "bart"
USE_KEYWORD_STEERING = True

results = []
for video_file in video_files:
    video_id = os.path.splitext(video_file)[0]
    video_path = os.path.join(video_dir, video_file)
    frame_dir = f"/content/frames/{video_id}"

    print(f"\n Processing {video_id}")
    n_frames = extract_frames_4fps(video_path, frame_dir)
    if n_frames == 0:
        print("  ! No frames extracted; skipping.")
        continue

    # captions
    frame_names = sorted([fn for fn in os.listdir(frame_dir) if fn.endswith(".jpg")])
    frame_captions = [generate_caption(os.path.join(frame_dir, fn)) for fn in frame_names]

    # metadata (normalize to caption-friendly forms)
    meta = metadata_dict.get(video_id, {"animals": [], "actions": []})
    animals_raw, actions_raw = meta.get("animals", []), meta.get("actions", [])
    animals_n, actions_n = normalize_meta_keywords(animals_raw, actions_raw)
    meta_keywords_norm = [*animals_n, *actions_n]

    encourage = pick_keywords_to_encourage(frame_captions, meta_keywords_norm) if USE_KEYWORD_STEERING else None

    # summary
    if SUMMARIZER.lower() == "flan":
        final_caption = summarize_with_flan(frame_captions, encourage_words=encourage, max_new_tokens=80)
    else:
        final_caption = summarize_with_bart(frame_captions, encourage_words=encourage, max_length=80)

    # scores
    strict_pct, strict_matched, strict_missing, robust_pct, robust_matched, robust_missing = \
        robust_semantic_scores(final_caption, animals_n, actions_n)

    results.append({
        "video_id": video_id,
        "final_caption": final_caption,
        "frame_captions_curated": curate_captions(frame_captions, max_caps=80),
        "keywords_normalized": meta_keywords_norm,
        "strict_matched": strict_matched,
        "strict_missing": strict_missing,
        "strict_semantic_percent": strict_pct,
        "robust_matched": robust_matched,
        "robust_missing": robust_missing,
        "robust_semantic_percent": robust_pct
    })

# ================================
# 10) Save
# ================================
out_path = "/content/summary_results.csv"
pd.DataFrame(results).to_csv(out_path, index=False)
print("\n Done. Results saved to:", out_path)


In [None]:
import pandas as pd

# Point this to your output CSV (change if needed)
PATH = "/content/summary_results.csv"

df = pd.read_csv(PATH)

def to_num(series):
    # handles "73.5%", "NA", floats, etc.
    s = series.astype(str).str.strip().str.replace("%", "", regex=False)
    return pd.to_numeric(s, errors="coerce")

stats = {}

# Try robust first (recommended), then strict if present
for col in ["robust_semantic_percent", "strict_semantic_percent"]:
    if col in df.columns:
        vals = to_num(df[col]).dropna()
        stats[col] = {
            "count": int(vals.count()),
            "mean": float(vals.mean()) if not vals.empty else None,
            "median": float(vals.median()) if not vals.empty else None,
            "std": float(vals.std(ddof=1)) if len(vals) > 1 else 0.0,
            "min": float(vals.min()) if not vals.empty else None,
            "max": float(vals.max()) if not vals.empty else None,
            "zero_count": int((vals == 0).sum()),
        }

for k, v in stats.items():
    print(f"\n{k}:")
    for kk, vv in v.items():
        if isinstance(vv, float):
            print(f"  {kk}: {vv:.2f}")
        else:
            print(f"  {kk}: {vv}")

# Save a small summary CSV next to the results
summary_df = pd.DataFrame(stats).T
summary_path = PATH.replace(".csv", "_summary_stats.csv")
summary_df.to_csv(summary_path)
print("\nSaved summary to:", summary_path)


Random 1000(FLAN-T5)

In [None]:
# ================================
# 1) Install
# ================================
!pip install -q git+https://github.com/salesforce/BLIP.git transformers timm accelerate opencv-python pandas nltk

# ================================
# 2) Imports
# ================================
import os, re, cv2, ast, math, random, gc, time, torch
import pandas as pd
from PIL import Image
from typing import List
from transformers import (
    Blip2Processor, Blip2ForConditionalGeneration,
    T5Tokenizer, T5ForConditionalGeneration,
    BartTokenizer, BartForConditionalGeneration
)

# Lemmatization for robust scoring
import nltk
nltk.download('wordnet', quiet=True)
from nltk.stem import WordNetLemmatizer
lemmatizer = WordNetLemmatizer()

# ================================
# 3) Device + perf knobs
# ================================
device = "cuda" if torch.cuda.is_available() else "cpu"
print(" Using device:", device)
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True

# ================================
# 4) BLIP2 (batched frame captioning)
# ================================
blip_processor = Blip2Processor.from_pretrained("Salesforce/blip2-opt-2.7b")
blip_model = Blip2ForConditionalGeneration.from_pretrained(
    "Salesforce/blip2-opt-2.7b",
    device_map="auto",
    torch_dtype=(torch.float16 if torch.cuda.is_available() else torch.float32)
)

@torch.inference_mode()
def batch_generate_captions(image_paths: List[str], batch_size: int = 4, max_new_tokens: int = 35) -> List[str]:
    """
    Batched BLIP2 captioning for speed. Greedy decoding (do_sample=False).
    """
    captions = []
    use_amp = (device == "cuda")
    for i in range(0, len(image_paths), batch_size):
        batch_paths = image_paths[i:i+batch_size]
        images = [Image.open(p).convert("RGB") for p in batch_paths]

        if use_amp:
            with torch.autocast(device_type="cuda", dtype=blip_model.dtype):
                inputs = blip_processor(images=images, return_tensors="pt", padding=True)
                inputs = {k: v.to(device=blip_model.device, dtype=blip_model.dtype) for k,v in inputs.items()}
                out = blip_model.generate(**inputs, max_new_tokens=max_new_tokens, do_sample=False)
        else:
            inputs = blip_processor(images=images, return_tensors="pt", padding=True)
            inputs = {k: v.to(device=blip_model.device, dtype=blip_model.dtype) for k,v in inputs.items()}
            out = blip_model.generate(**inputs, max_new_tokens=max_new_tokens, do_sample=False)

        caps = blip_processor.batch_decode(out, skip_special_tokens=True)
        captions.extend([c.strip() for c in caps])
        for im in images: im.close()
        del inputs, out
        if device == "cuda":
            torch.cuda.synchronize()
        gc.collect()
    return captions

# ================================
# 5) Summarizers (FLAN default, BART optional)
# ================================
flan_tok = T5Tokenizer.from_pretrained("google/flan-t5-large")
flan = T5ForConditionalGeneration.from_pretrained("google/flan-t5-large").to(device)

bart_tok = BartTokenizer.from_pretrained("facebook/bart-large-cnn")
bart = BartForConditionalGeneration.from_pretrained("facebook/bart-large-cnn").to(device)

def curate_captions(caps, max_caps=80, min_chars=8):
    """De-dup + uniformly sample to avoid 1024-token truncation."""
    caps = [c.strip() for c in caps if isinstance(c, str) and len(c.strip()) >= min_chars]
    seen, uniq = set(), []
    for c in caps:
        if c not in seen:
            seen.add(c); uniq.append(c)
    if len(uniq) <= max_caps: return uniq
    idxs = [round(i) for i in [j*(len(uniq)-1)/(max_caps-1) for j in range(max_caps)]]
    return [uniq[i] for i in idxs]

# Normalize metadata (dataset label -> caption-friendly)
SPECIES_MAP = {
    "eurasian wren bird":"wren","ardea alba egret":"egret","grey heron":"heron",
    "black-winged stilt":"stilt","yellowhammer":"bird","tit bird":"tit",
    "mallard duck":"duck","common crane":"crane","black mamba":"snake","mongoose":"mongoose",
}
ACTION_MAP = {
    "keeping still":"standing still","attending":"looking","sensing":"looking",
    "chirping":"singing","flapping":"flapping","landing":"landing","preening":"preening",
    "walking":"walking","moving":"moving","eating":"eating","jumping":"jumping",
    "attacking":"attacking","flying":"flying","shaking head":"shaking head",
    "singing nightingale":"nightingale singing"
}

def normalize_meta_keywords(animals, actions):
    def norm(lst, mp):
        out = []
        for x in (lst or []):
            x_l = str(x).lower()
            out.append(mp.get(x_l, x_l))
        return out
    return norm(animals, SPECIES_MAP), norm(actions, ACTION_MAP)

def pick_keywords_to_encourage(frame_captions, meta_keywords, max_animals=8, max_actions=6):
    """
    Try to encourage words observed in captions; if none, fallback to a small slice of trusted metadata.
    """
    text = " ".join(frame_captions).lower()
    animals, actions = [], []
    for kw in meta_keywords:
        k = kw.lower()
        if k in text:
            if any(k.endswith(s) for s in ("ing","ed")) or " " in k: actions.append(k)
            else: animals.append(k)
    if not animals and not actions:
        animals = [k for k in meta_keywords if " " not in k][:max_animals]
        actions = [k for k in meta_keywords if (" " in k) or k.endswith(("ing","ed"))][:max_actions]
    return animals[:max_animals] + actions[:max_actions]

@torch.inference_mode()
def summarize_flan(frame_captions, encourage=None, max_new_tokens=60):
    caps = curate_captions(frame_captions, max_caps=80)
    if not caps: return ""
    prompt = "Summarize the wildlife video in one detailed, factual sentence mentioning all animals and their actions.\n"
    if encourage: prompt += "Ensure to mention: " + ", ".join(encourage) + ".\n"
    prompt += "\n".join(f"- {c}" for c in caps)
    enc = flan_tok(prompt, return_tensors="pt", truncation=True, max_length=1024).to(device)
    ids = flan.generate(
        **enc, max_new_tokens=max_new_tokens, min_length=18,
        num_beams=1, do_sample=False, no_repeat_ngram_size=3,
        length_penalty=1.0, early_stopping=True
    )
    return flan_tok.decode(ids[0], skip_special_tokens=True).strip()

@torch.inference_mode()
def summarize_bart(frame_captions, encourage=None, max_length=60):
    caps = curate_captions(frame_captions, max_caps=80)
    if not caps: return ""
    text = "\n".join(caps)
    enc = bart_tok(text, return_tensors="pt", truncation=True, max_length=1024).to(device)
    gen_kwargs = dict(
        input_ids=enc["input_ids"], attention_mask=enc.get("attention_mask", None),
        max_length=max_length, min_length=18, num_beams=1, do_sample=False,
        no_repeat_ngram_size=3, length_penalty=1.0, early_stopping=True
    )
    if encourage:
        fw = []
        for w in encourage[:8]:
            ids = bart_tok(w, add_special_tokens=False).input_ids
            if ids: fw.append(ids)
        if fw: gen_kwargs["force_words_ids"] = fw
    out = bart.generate(**gen_kwargs)
    return bart_tok.decode(out[0], skip_special_tokens=True).strip()

# ================================
# 6) Frame extraction (cap frames for speed)
# ================================
def extract_frames_4fps(video_path, out_dir, max_frames=96):
    os.makedirs(out_dir, exist_ok=True)
    vid = cv2.VideoCapture(video_path)
    if not vid.isOpened():
        print(f"  ! Could not open {video_path}")
        return 0
    fps = vid.get(cv2.CAP_PROP_FPS) or 0
    interval = int(max(1, round((fps if fps > 0 else 4)/4)))  # ~ every 0.25s
    ok, frame = vid.read()
    count, saved = 0, 0
    while ok and saved < max_frames:
        if count % interval == 0:
            cv2.imwrite(os.path.join(out_dir, f"frame_{saved:03d}.jpg"), frame)
            saved += 1
        ok, frame = vid.read(); count += 1
    vid.release()
    return saved

# ================================
# 7) Metadata loading & parsing
# ================================
metadata_path = "/content/drive/MyDrive/Animal_Kingdom/action_recognition/AR_metadata.xlsx"
meta_df = pd.read_excel(metadata_path)

def parse_metadata(df):
    meta = {}
    for _, row in df.iterrows():
        vid = row.get("video_id")
        animals_raw = row.get("list_animal", [])
        actions_raw = row.get("list_animal_action", "")
        # animals
        if isinstance(animals_raw, str):
            try:
                animals = ast.literal_eval(animals_raw)
                if not isinstance(animals, list): animals = [str(animals)]
            except Exception:
                animals = [a.strip() for a in animals_raw.split(",") if a.strip()]
        elif isinstance(animals_raw, list):
            animals = animals_raw
        else:
            animals = []
        # actions
        actions = []
        if isinstance(actions_raw, str) and actions_raw.strip():
            try:
                parsed = ast.literal_eval(actions_raw)
                if isinstance(parsed, list):
                    for it in parsed:
                        if isinstance(it, (list, tuple)) and len(it) >= 2: actions.append(str(it[1]))
                        elif isinstance(it, str): actions.append(it)
            except Exception:
                actions = [a.strip() for a in actions_raw.split(",") if a.strip()]
        meta[vid] = {"animals": animals, "actions": actions}
    return meta

metadata = parse_metadata(meta_df)

# ================================
# 8) Robust semantic scoring (compact)
# ================================
ANIMAL_BACKOFF = {
    "nightingale":["nightingale","bird","songbird"],
    "wren":["wren","bird"], "egret":["egret","heron","bird"], "heron":["heron","bird"],
    "stilt":["stilt","wader","shorebird","bird"], "crane":["crane","bird"],
    "duck":["duck","waterfowl","bird"], "tit":["tit","bird"],
    "snake":["snake","serpent","reptile"], "mongoose":["mongoose","mammal","animal"],
    "bird":["bird"], "animal":["animal"],
}

def normalize_lemmas(text):
    words = re.findall(r"[a-zA-Z]+", (text or "").lower())
    return {lemmatizer.lemmatize(w, pos='n') for w in words} | {lemmatizer.lemmatize(w, pos='v') for w in words}

def partial_match(caption, phrase, thr=0.6):
    cap = normalize_lemmas(caption)
    toks = [t for t in re.findall(r"[a-zA-Z]+", (phrase or "").lower()) if t]
    if not toks: return False
    kw = {lemmatizer.lemmatize(t, pos='n') for t in toks} | {lemmatizer.lemmatize(t, pos='v') for t in toks}
    return (sum(1 for t in kw if t in cap) / len(kw)) >= thr

def animal_ok(caption, key):
    cap = (caption or "").lower()
    cand = ANIMAL_BACKOFF.get(key, [key])
    return any(c in cap for c in cand) or partial_match(caption, key, thr=0.5)

def infer_actions(text):
    t = (text or "").lower()
    inferred=set()
    def has(xs): return any(x in t for x in xs)
    if has(["singing","chirping","trilling","calling","vocalizing","song"]): inferred.update(["singing","chirping","calling"])
    if has(["beak open","open beak","mouth open","open mouth"]): inferred.update(["singing","calling","chirping"])
    if has(["keeping still","standing still","motionless","perched","sitting","resting","standing"]): inferred.update(["keeping still","standing still","looking"])
    if "walking" in t: inferred.add("walking")
    if "running" in t: inferred.add("running")
    if has(["flying","in flight","soars","soaring","taking off","takes off"]): inferred.add("flying")
    if has(["landing","lands","touches down"]): inferred.add("landing")
    if has(["flapping","flaps"]): inferred.add("flapping")
    if has(["eating","feeding","chewing","grazing","pecking"]):
        inferred.add("eating")
        if has(["eating a","eating the","eating prey","eating a snake","eats a"]): inferred.update(["biting","attacking"])
    if has(["biting","bites","bite"]): inferred.update(["biting","attacking"])
    if has(["dead "," carcass","lifeless"]): inferred.add("dying")
    if has(["shaking head","head shake"]): inferred.add("shaking head")
    if has(["looking","watching","peering","gazing","staring"]): inferred.add("looking")
    if "moving" in t: inferred.add("moving")
    return inferred

def score_semantics(caption, animals_norm, actions_norm):
    keywords = [*(animals_norm or []), *(actions_norm or [])]
    keywords = [k for k in keywords if isinstance(k, str) and k.strip()]
    if not keywords:
        return "NA", [], [], "NA", [], []
    cap_l = (caption or "").lower()

    # strict (substring)
    s_match, s_miss = [], []
    for kw in keywords:
        (s_match if kw.lower() in cap_l else s_miss).append(kw)
    s_pct = f"{(len(s_match)/len(keywords))*100:.1f}%"

    # robust (partial + inference + hypernym)
    inf = infer_actions(caption)
    r_match, r_miss = [], []
    for kw in keywords:
        k = kw.lower()
        if k in (animals_norm or []):
            ok = animal_ok(caption, k)
        else:
            ok = (k in inf) or partial_match(caption, k, thr=0.6)
        (r_match if ok else r_miss).append(kw)
    r_pct = f"{(len(r_match)/len(keywords))*100:.1f}%"
    return s_pct, s_match, s_miss, r_pct, r_match, r_miss

# ================================
# 9) Random sampling of 1000 videos (reproducible)
# ================================
video_dir = "/content/videos/video/"
all_videos = [f for f in os.listdir(video_dir) if f.endswith(".mp4")]

SAMPLE_SIZE = 1000
RANDOM_SEED = 42
random.seed(RANDOM_SEED)
if len(all_videos) <= SAMPLE_SIZE:
    video_files = all_videos[:]  # take all if fewer than requested
else:
    video_files = random.sample(all_videos, SAMPLE_SIZE)

pd.DataFrame({"video_file": video_files}).to_csv("/content/sampled_videos.csv", index=False)
print(f"Sampling {len(video_files)} videos (seed={RANDOM_SEED}). Saved list to /content/sampled_videos.csv.")

# ================================
# 10) Main (batched, capped, progress saved at the end)
# ================================
SUMMARIZER = "flan"           # "flan" (recommended) or "bart"
USE_KEYWORD_STEERING = True
MAX_FRAMES_PER_VIDEO = 96
BATCH_SIZE = 4
OUT_CSV = "/content/summary_results_r1000.csv"

rows = []
t_start = time.time()

for idx, vf in enumerate(video_files, 1):
    vid = os.path.splitext(vf)[0]
    vpath = os.path.join(video_dir, vf)
    fdir = f"/content/frames/{vid}"

    try:
        print(f"\n[{idx}/{len(video_files)}] Processing {vid}")
        n = extract_frames_4fps(vpath, fdir, max_frames=MAX_FRAMES_PER_VIDEO)
        if n == 0:
            print("  ! No frames extracted; skipping.")
            continue

        frame_names = sorted([fn for fn in os.listdir(fdir) if fn.endswith(".jpg")])
        frame_paths = [os.path.join(fdir, fn) for fn in frame_names]

        # Batched BLIP2
        frame_captions = batch_generate_captions(frame_paths, batch_size=BATCH_SIZE, max_new_tokens=35)

        # Metadata -> normalized keywords
        m = metadata.get(vid, {"animals": [], "actions": []})
        animals_raw, actions_raw = m.get("animals", []), m.get("actions", [])
        animals_n, actions_n = normalize_meta_keywords(animals_raw, actions_raw)
        meta_keywords_norm = [*animals_n, *actions_n]

        encourage = pick_keywords_to_encourage(frame_captions, meta_keywords_norm) if USE_KEYWORD_STEERING else None

        # Summarize
        if SUMMARIZER == "flan":
            final_caption = summarize_flan(frame_captions, encourage=encourage, max_new_tokens=60)
        else:
            final_caption = summarize_bart(frame_captions, encourage=encourage, max_length=60)

        # Scores
        s_pct, s_match, s_miss, r_pct, r_match, r_miss = score_semantics(final_caption, animals_n, actions_n)

        rows.append({
            "video_id": vid,
            "final_caption": final_caption,
            "frame_captions_curated": curate_captions(frame_captions, max_caps=80),
            "keywords_normalized": meta_keywords_norm,
            "strict_matched": s_match, "strict_missing": s_miss, "strict_semantic_percent": s_pct,
            "robust_matched": r_match, "robust_missing": r_miss, "robust_semantic_percent": r_pct
        })

    except KeyboardInterrupt:
        print("\n!! Interrupted by user.")
        break
    except Exception as e:
        print(f"  !! Error on {vid}: {e}. Skipping.")
        continue

# Save results
pd.DataFrame(rows).to_csv(OUT_CSV, index=False)
print(f"\nDone. Saved {len(rows)} rows to {OUT_CSV}. Total time: {time.time()-t_start:.1f}s")


In [None]:
import pandas as pd

# Path to your results CSV
csv_path = "/content/summary_results_r1000.csv"
df = pd.read_csv(csv_path)

def to_float(series):
    """Convert '73.5%' -> 73.5, handle NA/empty."""
    return (
        series.astype(str)
        .str.replace("%", "", regex=False)
        .str.strip()
        .replace({"NA": None})
        .astype(float)
    )

stats = {}

for col in ["strict_semantic_percent", "robust_semantic_percent"]:
    if col in df.columns:
        vals = to_float(df[col]).dropna()
        if not vals.empty:
            stats[col] = {
                "count": int(vals.count()),
                "mean": float(vals.mean()),
                "median": float(vals.median()),
                "std": float(vals.std(ddof=1)),
                "min": float(vals.min()),
                "max": float(vals.max()),
                "zero_count": int((vals == 0).sum())
            }

# Print results
for col, m in stats.items():
    print(f"\n{col}:")
    for k, v in m.items():
        print(f"  {k}: {v:.2f}" if isinstance(v, float) else f"  {k}: {v}")


Visualizations

In [None]:
# --- Colab Visualization Script for Semantic Scoring (Sequential 1000 vs Random 1000) ---
# Input files (change these names if yours differ)
SEQ_CSV = "summary_results (1).csv"     # first 1000, sequential
RAND_CSV = "summary_results_r1000.csv"  # random 1000

# 1) Imports
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# 2) Load CSVs
def load_results(path):
    df = pd.read_csv(path)
    # Coerce percent strings like "50.0%" -> float
    def to_float_percent(colname):
        if colname in df.columns:
            return (
                df[colname]
                .astype(str)
                .str.replace("%", "", regex=False)
                .str.strip()
                .replace({"nan": np.nan, "None": np.nan})
                .astype(float)
            )
        # If missing, create NaNs so code still runs
        return pd.Series([np.nan] * len(df), index=df.index)

    # Standardize columns (observed in your CSVs)
    df["strict_semantic_percent_num"] = to_float_percent("strict_semantic_percent")
    df["robust_semantic_percent_num"] = to_float_percent("robust_semantic_percent")
    return df

df_seq = load_results(SEQ_CSV)
df_rand = load_results(RAND_CSV)

# Quick sanity checks
print("Sequential CSV rows:", len(df_seq))
print("Random CSV rows:", len(df_rand))
print("Seq strict/robust head:\n", df_seq[["strict_semantic_percent_num","robust_semantic_percent_num"]].head())
print("Rand strict/robust head:\n", df_rand[["strict_semantic_percent_num","robust_semantic_percent_num"]].head())

# 3) Create output folder
os.makedirs("figures", exist_ok=True)

# 4) Helper: summary stats table
def summary_stats(name, series):
    series = series.dropna()
    return pd.Series(
        {
            "count": len(series),
            "mean": series.mean(),
            "median": series.median(),
            "std": series.std(ddof=1),
            "min": series.min(),
            "max": series.max(),
            "zeros": (series == 0).sum(),
        },
        name=name,
    )

stats = pd.concat(
    [
        summary_stats("seq_strict", df_seq["strict_semantic_percent_num"]),
        summary_stats("seq_robust", df_seq["robust_semantic_percent_num"]),
        summary_stats("rand_strict", df_rand["strict_semantic_percent_num"]),
        summary_stats("rand_robust", df_rand["robust_semantic_percent_num"]),
    ],
    axis=1,
)
print("\n=== Summary Stats (percent) ===\n")
print(stats.round(2))

# 5) Histogram(s): strict vs robust for each dataset
def hist_two(series_a, series_b, labels, title, fname):
    plt.figure(figsize=(8,6))
    bins = np.linspace(0, 100, 21)  # 0..100 in 5% steps
    plt.hist(series_a.dropna(), bins=bins, alpha=0.6, label=labels[0])
    plt.hist(series_b.dropna(), bins=bins, alpha=0.6, label=labels[1])
    plt.xlabel("Semantic Percent (%)")
    plt.ylabel("Count of Videos")
    plt.title(title)
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.savefig(os.path.join("figures", fname), dpi=200)
    plt.show()

# Sequential 1000
hist_two(
    df_seq["strict_semantic_percent_num"],
    df_seq["robust_semantic_percent_num"],
    labels=["Strict (Sequential 1000)", "Robust (Sequential 1000)"],
    title="Sequential 1000: Strict vs Robust Semantic Score Distribution",
    fname="hist_seq_strict_vs_robust.png",
)

# Random 1000
hist_two(
    df_rand["strict_semantic_percent_num"],
    df_rand["robust_semantic_percent_num"],
    labels=["Strict (Random 1000)", "Robust (Random 1000)"],
    title="Random 1000: Strict vs Robust Semantic Score Distribution",
    fname="hist_rand_strict_vs_robust.png",
)

# 6) Boxplots: compare sequential vs random for each metric
def boxplot_compare(seq_series, rand_series, title, ylabel, fname):
    plt.figure(figsize=(8,6))
    data = [seq_series.dropna(), rand_series.dropna()]
    plt.boxplot(data, labels=["Sequential 1000", "Random 1000"], showmeans=True)
    plt.ylabel(ylabel)
    plt.title(title)
    plt.grid(True, axis="y", alpha=0.3)
    plt.tight_layout()
    plt.savefig(os.path.join("figures", fname), dpi=200)
    plt.show()

boxplot_compare(
    df_seq["strict_semantic_percent_num"],
    df_rand["strict_semantic_percent_num"],
    "Strict Semantic Score: Sequential vs Random",
    "Strict Semantic Percent (%)",
    "boxplot_strict_seq_vs_rand.png",
)

boxplot_compare(
    df_seq["robust_semantic_percent_num"],
    df_rand["robust_semantic_percent_num"],
    "Robust Semantic Score: Sequential vs Random",
    "Robust Semantic Percent (%)",
    "boxplot_robust_seq_vs_rand.png",
)

# 7) Scatter: strict vs robust (per dataset)
def scatter_strict_vs_robust(df, title, fname):
    x = df["strict_semantic_percent_num"]
    y = df["robust_semantic_percent_num"]
    plt.figure(figsize=(7,7))
    plt.scatter(x, y, s=12, alpha=0.5)
    # y = x reference line
    lims = [0, 100]
    plt.plot(lims, lims, 'k--', linewidth=1)
    plt.xlim(lims)
    plt.ylim(lims)
    plt.xlabel("Strict Semantic Percent (%)")
    plt.ylabel("Robust Semantic Percent (%)")
    plt.title(title)
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.savefig(os.path.join("figures", fname), dpi=200)
    plt.show()

scatter_strict_vs_robust(df_seq, "Sequential 1000: Strict vs Robust (per video)", "scatter_seq_strict_vs_robust.png")
scatter_strict_vs_robust(df_rand, "Random 1000: Strict vs Robust (per video)", "scatter_rand_strict_vs_robust.png")

# 8) Bar chart: zero-score counts
def zero_count(series):
    return int((series.dropna() == 0).sum())

z_seq_strict = zero_count(df_seq["strict_semantic_percent_num"])
z_seq_robust = zero_count(df_seq["robust_semantic_percent_num"])
z_rand_strict = zero_count(df_rand["strict_semantic_percent_num"])
z_rand_robust = zero_count(df_rand["robust_semantic_percent_num"])

labels = ["Seq Strict", "Seq Robust", "Rand Strict", "Rand Robust"]
zeros = [z_seq_strict, z_seq_robust, z_rand_strict, z_rand_robust]

plt.figure(figsize=(8,6))
bars = plt.bar(labels, zeros)
plt.ylabel("Count of Videos with 0% Score")
plt.title("Zero-Score Counts: Strict vs Robust, Sequential vs Random")
plt.grid(True, axis="y", alpha=0.3)
plt.tight_layout()
plt.savefig(os.path.join("figures", "bar_zeros_strict_vs_robust.png"), dpi=200)
plt.show()

print("\nZero-score counts:")
for lab, z in zip(labels, zeros):
    print(f"  {lab}: {z}")

# 9) Save the summary stats to CSV for easy inclusion in the dissertation
stats.round(2).to_csv("figures/summary_stats_semantic_scores.csv")
print("\nSaved figures in ./figures and stats CSV at figures/summary_stats_semantic_scores.csv")


In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

# Load your Exp-5 CSV
df = pd.read_csv("blip2_bart_results.csv")

# Convert semantic correctness to float
df["semantic_correctness_percent_num"] = (
    df["semantic_correctness_percent"]
    .astype(str)
    .str.replace("%", "", regex=False)
    .astype(float)
)

# Plot histogram
plt.figure(figsize=(8,6))
bins = np.linspace(0, 100, 21)  # 0 to 100 in 5% bins
plt.hist(df["semantic_correctness_percent_num"], bins=bins, color="skyblue", edgecolor="black", alpha=0.7)
plt.xlabel("Semantic Correctness Score (%)")
plt.ylabel("Number of Videos")
plt.title("Distribution of Semantic Scores (Exp-5: BLIP2+BART @ 4FPS+Prompt, N=1000)")
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()


In [None]:
import pandas as pd
import numpy as np

# Load CSV
df = pd.read_csv("blip2_bart_results.csv")

# Convert semantic correctness to float
df["semantic_correctness_percent_num"] = (
    df["semantic_correctness_percent"]
    .astype(str)
    .str.replace("%", "", regex=False)
    .astype(float)
)

# Define bins (0-100 in 10% intervals)
bins = np.arange(0, 110, 10)  # 0,10,20,...100
labels = [f"{bins[i]}–{bins[i+1]}%" for i in range(len(bins)-1)]

# Cut into bins
df["score_range"] = pd.cut(df["semantic_correctness_percent_num"], bins=bins, labels=labels, include_lowest=True)

# Count how many videos fall into each bin
distribution = df["score_range"].value_counts().sort_index()

# Convert to DataFrame for clean table
dist_table = pd.DataFrame({
    "Score Range": distribution.index,
    "Number of Videos": distribution.values
})

print(dist_table.to_string(index=False))
