In [None]:
import os
import cv2
import json
import re
import gc
import torch
import numpy as np
from pathlib import Path
from PIL import Image
from tqdm import tqdm
from transformers import (
    AutoProcessor,
    AutoModelForVision2Seq,
    BitsAndBytesConfig
)
from sentence_transformers import SentenceTransformer

def check_device():
    return "cuda" if torch.cuda.is_available() else "cpu"

DEVICE = check_device()

In [None]:
CONFIG = {
    "VIDEO_ROOT": Path("data/LIRIS-ACCEDE-data/data"),
    "FRAME_OUTPUT_ROOT": Path("data/keyframes"),
    "DESC_OUTPUT_ROOT": Path("data/semantic/frame_description"),
    "FEATURE_OUTPUT_ROOT": Path("data/features_semantic"),
    "NUM_FRAMES": 4,
    "LLAVA_MODEL_ID": "llava-hf/llava-v1.6-mistral-7b-hf",
    "EMBEDDING_MODEL_ID": "intfloat/e5-large",
    "MAX_NEW_TOKENS": 256,
    "BATCH_SIZE": 50
}

for path_key in ["FRAME_OUTPUT_ROOT", "DESC_OUTPUT_ROOT", "FEATURE_OUTPUT_ROOT"]:
    CONFIG[path_key].mkdir(parents=True, exist_ok=True)

In [None]:
def extract_keyframes(video_path, save_dir, n=4):
    save_dir = Path(save_dir)
    save_dir.mkdir(parents=True, exist_ok=True)

    cap = cv2.VideoCapture(str(video_path))
    if not cap.isOpened():
        return None

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    if total_frames < n:
        cap.release()
        return None

    indices = np.linspace(0, total_frames - 1, n).astype(int)
    saved_paths = []

    for i, idx in enumerate(indices):
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ret, frame = cap.read()
        if not ret:
            continue

        frame_path = save_dir / f"frame_{i}.jpg"
        cv2.imwrite(str(frame_path), frame)
        saved_paths.append(frame_path)

    cap.release()
    return saved_paths

In [None]:
def describe_single_frame(image_path, processor, model, max_new_tokens=200):
    image = Image.open(image_path).convert("RGB")

    prompt = (
        "<image>\n"
        "Analyze this keyframe and provide descriptions for three specific dimensions.\n"
        "Strictly follow this output format:\n\n"
        "Expression: [Describe facial expression. If the face is too small, blurred, or NOT present, strictly output 'Unknown'.]\n"
        "Posture: [Describe body actions and gestures. If NO people are present, strictly output 'Unknown'.]\n"
        "Environment: [Describe the physical location and list key objects present. Do not describe lighting or colors.]\n"
    )

    inputs = processor(images=image, text=prompt, return_tensors="pt").to(model.device)

    with torch.no_grad():
        output_ids = model.generate(
            **inputs,
            max_new_tokens=max_new_tokens,
            do_sample=False,
            temperature=0.0,
            pad_token_id=processor.tokenizer.eos_token_id,
        )

    input_token_len = inputs.input_ids.shape[1]
    generated_ids = output_ids[:, input_token_len:][0]
    raw_output = processor.decode(generated_ids, skip_special_tokens=True).strip()

    pattern = r"Expression:\s*(.*?)\s*Posture:\s*(.*?)\s*Environment:\s*(.*)"
    match = re.search(pattern, raw_output, re.DOTALL | re.IGNORECASE)

    if match:
        return {
            "expression": match.group(1).strip(),
            "posture": match.group(2).strip(),
            "environment": match.group(3).strip()
        }
    else:
        return {
            "expression": "Unknown",
            "posture": "Unknown",
            "environment": raw_output
        }

def process_video_frames(video_id, processor, model):
    video_dir = CONFIG["FRAME_OUTPUT_ROOT"] / video_id
    if not video_dir.exists():
        return []

    frame_paths = sorted([
        p for p in video_dir.iterdir()
        if p.suffix.lower() in [".jpg", ".jpeg", ".png"]
    ])

    descriptions = []
    for p in frame_paths:
        desc_dict = describe_single_frame(p, processor, model, CONFIG["MAX_NEW_TOKENS"])
        item = {
            "frame": p.name,
            "expression": desc_dict["expression"],
            "posture": desc_dict["posture"],
            "environment": desc_dict["environment"]
        }
        descriptions.append(item)

    return descriptions

In [None]:
def generate_visual_embedding(video_id, model_emb):
    json_path = CONFIG["DESC_OUTPUT_ROOT"] / f"{video_id}.json"
    if not json_path.exists():
        return None

    try:
        with open(json_path, "r", encoding="utf-8") as f:
            frames_data = json.load(f)
    except:
        return None

    descriptions = []
    for item in frames_data:
        text = f"{item.get('expression', '')}. {item.get('posture', '')}. {item.get('environment', '')}."
        descriptions.append(text)

    if len(descriptions) < CONFIG["NUM_FRAMES"]:
        descriptions += [""] * (CONFIG["NUM_FRAMES"] - len(descriptions))
    elif len(descriptions) > CONFIG["NUM_FRAMES"]:
        descriptions = descriptions[:CONFIG["NUM_FRAMES"]]

    formatted_inputs = [f"passage: {desc.strip()}" for desc in descriptions]

    embeddings = model_emb.encode(
        formatted_inputs,
        convert_to_numpy=True,
        normalize_embeddings=True
    )
    return embeddings

In [None]:
video_files = sorted([
    f for f in os.listdir(CONFIG["VIDEO_ROOT"])
    if f.lower().endswith((".mp4", ".mov", ".mkv", ".avi"))
])

for vid in tqdm(video_files):
    video_path = CONFIG["VIDEO_ROOT"] / vid
    video_id = video_path.stem
    frame_dir = CONFIG["FRAME_OUTPUT_ROOT"] / video_id

    if frame_dir.is_dir() and len(list(frame_dir.glob("*.jpg"))) >= CONFIG["NUM_FRAMES"]:
        continue

    extract_keyframes(video_path, frame_dir, n=CONFIG["NUM_FRAMES"])

In [None]:
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_use_double_quant=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.float16,
)

processor = AutoProcessor.from_pretrained(CONFIG["LLAVA_MODEL_ID"])
model = AutoModelForVision2Seq.from_pretrained(
    CONFIG["LLAVA_MODEL_ID"],
    quantization_config=bnb_config,
    device_map="auto",
)

video_ids = sorted([p.name for p in CONFIG["FRAME_OUTPUT_ROOT"].iterdir() if p.is_dir()])

for i, vid in enumerate(tqdm(video_ids)):
    out_path = CONFIG["DESC_OUTPUT_ROOT"] / f"{vid}.json"

    if out_path.exists():
        continue

    try:
        descriptions = process_video_frames(vid, processor, model)
        if descriptions:
            with open(out_path, "w", encoding="utf-8") as f:
                json.dump(descriptions, f, ensure_ascii=False, indent=2)
    except Exception:
        continue

    if i > 0 and i % CONFIG["BATCH_SIZE"] == 0:
        torch.cuda.empty_cache()

del model
del processor
gc.collect()
torch.cuda.empty_cache()

In [None]:
model_emb = SentenceTransformer(CONFIG["EMBEDDING_MODEL_ID"], device=DEVICE)
json_files = sorted(list(CONFIG["DESC_OUTPUT_ROOT"].glob("*.json")))

for json_file in tqdm(json_files):
    video_id = json_file.stem
    save_path = CONFIG["FEATURE_OUTPUT_ROOT"] / f"{video_id}.npy"

    if save_path.exists():
        continue

    visual_emb = generate_visual_embedding(video_id, model_emb)

    if visual_emb is not None:
        np.save(save_path, visual_emb)