In [1]:
import sys
print(sys.executable)


c:\Users\JENNIFER\AppData\Local\Programs\Python\Python313\python.exe


In [2]:
pip install openai selenium webdriver-manager python-dotenv beautifulsoup4


Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 25.1.1 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [3]:
!pip install opencv-python-headless pillow numpy requests python-dotenv
# optional (better object detection): ultralytics (YOLOv8)
!pip install ultralytics





[notice] A new release of pip is available: 25.1.1 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip





[notice] A new release of pip is available: 25.1.1 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [4]:
pip install mutagen

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 25.1.1 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


plan_pexels_enriched_with_audio.py

- Uses OpenAI to split a brief into structured shots
- Uses Pexels API to fetch videos & images for each shot
- Uses Freesound API to fetch audio previews matching queries
- Classifies discovered assets (image/video/audio) with lightweight heuristics
- Produces plan_pexels_enriched_with_audio.json (shots + asset_manifest with classification)

Requirements:
- Put OPENAI_API_KEY, PEXELS_API_KEY, FREESOUND_API_KEY in a .env file or environment.
- pip install openai requests python-dotenv pillow opencv-python-headless numpy ultralytics mutagen

In [5]:
import os
import time
import json
import io
from typing import List, Dict, Any, Optional
from urllib.parse import quote_plus
from dotenv import load_dotenv
import requests
from PIL import Image
import numpy as np
import cv2
from mutagen import File as MutagenFile  # for audio metadata

# --- OpenAI modern client ---
from openai import OpenAI

# Try to import ultralytics YOLO (optional)
try:
    from ultralytics import YOLO
    _YOLO_AVAILABLE = True
except Exception:
    _YOLO_AVAILABLE = False

load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
PEXELS_API_KEY = os.getenv("PEXELS_API_KEY")
FREESOUND_API_KEY = os.getenv("FREESOUND_API_KEY")

if not OPENAI_API_KEY:
    raise RuntimeError("Please set OPENAI_API_KEY in your environment or .env file")
if not PEXELS_API_KEY:
    raise RuntimeError("Please set PEXELS_API_KEY in your environment or .env file")
if not FREESOUND_API_KEY:
    raise RuntimeError("Please set FREESOUND_API_KEY in your environment or .env file")

client = OpenAI(api_key=OPENAI_API_KEY)
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini")  # change if you prefer another model

# Pexels API endpoints
PEXELS_PHOTO_SEARCH = "https://api.pexels.com/v1/search"
PEXELS_VIDEO_SEARCH = "https://api.pexels.com/videos/search"
PEXELS_HEADERS = {"Authorization": PEXELS_API_KEY}

# Freesound endpoints
FREESOUND_SEARCH = "https://freesound.org/apiv2/search/text/"
FREESOUND_HEADERS = {"Authorization": f"Token {FREESOUND_API_KEY}"}

In [6]:
MAX_VIDEOS_PER_QUERY = 3
MAX_IMAGES_PER_QUERY = 3
MAX_AUDIOS_PER_QUERY = 3
MAX_QUERIES_PER_SHOT = 3
MAX_SHOTS = 6

# ---------------------
# 1) OpenAI shot-splitting (structured JSON)
# ---------------------
def ask_model_for_shots(prompt_text: str, min_shots: int = 1, max_shots: int = 4) -> List[Dict[str, Any]]:
    """
    Use OpenAI to return structured shots JSON (id, text_description, suggested_duration_seconds, keywords, style_tokens, negative_filters)
    """
    system = (
        "You are an assistant that MUST return ONLY valid JSON (no commentary). "
        f"Given a short video brief, return an array of between {min_shots} and {max_shots} shots. "
        "Each shot object must include the following keys: "
        '"id" (string), "text_description" (string), "suggested_duration_seconds" (integer), '
        '"keywords" (array of strings), "style_tokens" (array of strings), "negative_filters" (array of strings).'
    )
    user = f"Convert this brief into shots: \"{prompt_text}\". Return JSON only."

    resp = client.chat.completions.create(
        model=OPENAI_MODEL,
        messages=[
            {"role": "system", "content": system},
            {"role": "user", "content": user},
        ],
        temperature=0.0,
        max_tokens=400,
    )

    # safe access to content
    try:
        content = resp.choices[0].message["content"]
    except Exception:
        try:
            content = resp.choices[0].message.content
        except Exception:
            content = resp.choices[0].get("message", {}).get("content", "")

    text = content.strip() if content else ""
    # strip backticks if model wraps JSON in fences
    if text.startswith("```"):
        lines = text.splitlines()
        if len(lines) >= 3:
            text = "\n".join(lines[1:-1])

    try:
        shots = json.loads(text)
    except Exception as e:
        raise RuntimeError(f"Failed to parse JSON from model output: {e}\n---\n{text}")

    if not isinstance(shots, list):
        raise RuntimeError("Model did not return a JSON array of shots.")
    return shots


In [7]:
# ---------------------
# 2) Query expansion
# ---------------------
def expand_queries_for_shot(shot: Dict[str, Any], max_queries: int = MAX_QUERIES_PER_SHOT) -> List[str]:
    queries = []
    text = shot.get("text_description", "").strip()
    if text:
        queries.append(text)
    keywords = shot.get("keywords", []) or []
    if keywords:
        queries.append(" ".join(keywords))
        for k in keywords[:2]:
            queries.append(k)
    style_tokens = shot.get("style_tokens", []) or []
    if style_tokens and text:
        queries.append(text + " " + " ".join(style_tokens[:2]))
    seen = set()
    final = []
    for q in queries:
        qclean = " ".join(q.split())
        if qclean and qclean not in seen:
            seen.add(qclean)
            final.append(qclean)
        if len(final) >= max_queries:
            break
    return final

In [8]:
# ---------------------
# 3) Pexels API helpers (fetch)
# ---------------------
def pexels_search_photos(query: str, per_page: int = MAX_IMAGES_PER_QUERY) -> Dict[str, Any]:
    params = {"query": query, "per_page": per_page}
    r = requests.get(PEXELS_PHOTO_SEARCH, headers=PEXELS_HEADERS, params=params, timeout=12)
    r.raise_for_status()
    return r.json()

def pexels_search_videos(query: str, per_page: int = MAX_VIDEOS_PER_QUERY) -> Dict[str, Any]:
    params = {"query": query, "per_page": per_page}
    r = requests.get(PEXELS_VIDEO_SEARCH, headers=PEXELS_HEADERS, params=params, timeout=12)
    r.raise_for_status()
    return r.json()

def normalize_photo_item(item: Dict[str, Any]) -> Dict[str, Any]:
    return {
        "id": f"photo_{item.get('id')}",
        "type": "image",
        "url": item.get("src", {}).get("original") or item.get("src", {}).get("large"),
        "width": item.get("width"),
        "height": item.get("height"),
        "photographer": item.get("photographer"),
        "provider": "pexels",
        "license": "pexels",
        "meta": item,
    }

def normalize_video_item(item: Dict[str, Any]) -> Dict[str, Any]:
    files = item.get("video_files", []) or []
    chosen = None
    if files:
        files_sorted = sorted(files, key=lambda f: (f.get("width", 0), f.get("fps", 0)), reverse=True)
        chosen = files_sorted[0]
    return {
        "id": f"video_{item.get('id')}",
        "type": "video",
        "url": chosen.get("link") if chosen else (item.get("url")),
        "duration": item.get("duration"),
        "width": chosen.get("width") if chosen else None,
        "height": chosen.get("height") if chosen else None,
        "provider": "pexels",
        "license": "pexels",
        "meta": item,
    }

def fetch_pexels_for_query(query: str, top_k_v: int = MAX_VIDEOS_PER_QUERY, top_k_i: int = MAX_IMAGES_PER_QUERY) -> Dict[str, List[Dict[str,Any]]]:
    results = {"videos": [], "images": []}
    try:
        vresp = pexels_search_videos(query, per_page=top_k_v)
        videos = vresp.get("videos", [])
        for v in videos[:top_k_v]:
            norm = normalize_video_item(v)
            results["videos"].append(norm)
    except Exception as e:
        print(f"[warning] video search failed for query '{query}': {e}")
    try:
        presp = pexels_search_photos(query, per_page=top_k_i)
        photos = presp.get("photos", [])
        for p in photos[:top_k_i]:
            norm = normalize_photo_item(p)
            results["images"].append(norm)
    except Exception as e:
        print(f"[warning] photo search failed for query '{query}': {e}")
    return results


In [9]:
# ---------------------
# 4) Freesound API helpers (audio)
# ---------------------
def freesound_search(query: str, page_size: int = MAX_AUDIOS_PER_QUERY) -> Dict[str, Any]:
    """
    Search Freesound using text search.
    Returns the parsed JSON results (the 'results' list contains items).
    """
    params = {"query": query, "page_size": page_size, "fields": "id,name,previews,duration,username,tags,license"}
    r = requests.get(FREESOUND_SEARCH, headers=FREESOUND_HEADERS, params=params, timeout=12)
    r.raise_for_status()
    return r.json()

def normalize_freesound_item(item: Dict[str, Any]) -> Dict[str, Any]:
    """
    Normalize a Freesound search result to our asset dict.
    We'll prefer the high-quality preview URL for prototyping (preview-hq-mp3 or preview-hq-ogg)
    """
    previews = item.get("previews", {}) or {}
    preview_url = previews.get("preview-hq-mp3") or previews.get("preview-hq-ogg") or previews.get("preview-lq-mp3")
    return {
        "id": f"audio_fs_{item.get('id')}",
        "type": "audio",
        "url": preview_url,
        "duration": item.get("duration"),
        "title": item.get("name"),
        "uploader": item.get("username"),
        "tags": item.get("tags", []),
        "provider": "freesound",
        "license": item.get("license"),
        "meta": item,
    }

def fetch_freesound_for_query(query: str, top_k: int = MAX_AUDIOS_PER_QUERY) -> Dict[str, List[Dict[str,Any]]]:
    results = {"audios": []}
    try:
        resp = freesound_search(query, page_size=top_k)
        for item in resp.get("results", [])[:top_k]:
            norm = normalize_freesound_item(item)
            # only include if preview URL exists
            if norm.get("url"):
                results["audios"].append(norm)
    except Exception as e:
        print(f"[warning] Freesound search failed for query '{query}': {e}")
    return results


In [10]:
# ---------------------
# 5) Classification helpers (AUDIO)
# For audio: we'll read metadata via mutagen and attempt a small RMS loudness estimate when possible
def classify_audio_item(audio_asset: Dict[str, Any]) -> Dict[str, Any]:
    """
    Classify an audio item using:
      - duration (from metadata or provider)
      - format/file type (from url extension)
      - uploader and tags (from provider)
      - approximate RMS loudness estimate (best-effort, optional)
    Returns a classification dict to attach to the manifest.
    """
    url = audio_asset.get("url")
    cls = {
        "duration_seconds": audio_asset.get("duration"),
        "file_format": None,
        "uploader": audio_asset.get("uploader"),
        "tags": audio_asset.get("tags", []),
        "license": audio_asset.get("license"),
        "loudness_rms": None,
        "mood_tags": [],
        "mood_confidence": 0.0,
        "notes": ""
    }

    if not url:
        cls["notes"] = "no_url"
        return cls

    # infer file format from url
    lower = url.lower()
    if lower.endswith(".mp3"):
        cls["file_format"] = "mp3"
    elif lower.endswith(".ogg") or lower.endswith(".oga"):
        cls["file_format"] = "ogg"
    elif lower.endswith(".wav"):
        cls["file_format"] = "wav"
    else:
        # try to parse from headers
        try:
            head = requests.head(url, timeout=8, allow_redirects=True)
            ctype = head.headers.get("content-type", "")
            if "mpeg" in ctype or "mp3" in ctype:
                cls["file_format"] = "mp3"
            elif "wav" in ctype:
                cls["file_format"] = "wav"
            elif "ogg" in ctype:
                cls["file_format"] = "ogg"
        except Exception:
            pass

    # download preview bytes (safe: previews are small). We'll limit bytes to e.g. 3 MB
    audio_bytes = None
    try:
        audio_bytes = download_bytes(url, max_bytes=3 * 1024 * 1024, timeout=10)
    except Exception:
        audio_bytes = None

    # use mutagen to read metadata/duration if possible
    if audio_bytes:
        try:
            tmp = "tmp_audio_preview"
            # try to guess extension from url or format; mutagen handles from buffer if saved to file
            if cls["file_format"]:
                tmp_path = f"{tmp}.{cls['file_format']}"
            else:
                tmp_path = tmp + ".bin"
            with open(tmp_path, "wb") as f:
                f.write(audio_bytes)
            af = MutagenFile(tmp_path)
            if af is not None:
                # length in seconds
                if hasattr(af.info, "length"):
                    cls["duration_seconds"] = float(af.info.length)
                # sample rate
                if hasattr(af.info, "sample_rate"):
                    cls["sample_rate"] = int(getattr(af.info, "sample_rate"))
                # channels
                if hasattr(af.info, "channels"):
                    cls["channels"] = int(getattr(af.info, "channels"))
            # small RMS estimate (using numpy wav decode if wav or if mutagen gives raw data is complex)
            # We'll attempt a simple RMS for WAV using cv2.imdecode as fallback for small previews
            if cls.get("file_format") == "wav":
                try:
                    import wave, struct
                    with wave.open(tmp_path, 'rb') as w:
                        frames = w.readframes(min(44100, w.getnframes()))
                        if w.getsampwidth() == 2:
                            fmt = "<{}h".format(len(frames)//2)
                            ints = struct.unpack(fmt, frames)
                            arr = np.array(ints, dtype=np.float32)
                            rms = float(np.sqrt(np.mean((arr/32768.0)**2)))
                            cls["loudness_rms"] = round(rms, 5)
                except Exception:
                    pass
        except Exception as e:
            cls["notes"] += f"mutagen_failed:{e};"
        finally:
            try:
                os.remove(tmp_path)
            except Exception:
                pass

    # simple mood tags based on provider tags & duration heuristics
    mood_tags = []
    reason = []
    tags = [t.lower() for t in (audio_asset.get("tags") or [])]
    if any(t in tags for t in ("ambient", "calm", "relax", "peaceful", "meditation")):
        mood_tags.append("calm")
        reason.append("ambient_tag")
    if any(t in tags for t in ("dramatic", "tension", "suspense", "intense")):
        mood_tags.append("tense")
        reason.append("dramatic_tag")
    if any(t in tags for t in ("happy","upbeat","bright")):
        mood_tags.append("uplifting")
        reason.append("happy_tag")
    # duration-based heuristics
    dur = cls.get("duration_seconds")
    if dur:
        if dur < 2.0:
            mood_tags.append("sfx")
            reason.append("short_duration")
        elif dur >= 30 and not mood_tags:
            mood_tags.append("background")
            reason.append("long_duration")
    # dedupe & confidence
    mood_tags = list(dict.fromkeys(mood_tags))[:3]
    conf = 0.5 + min(0.3, 0.1 * len(mood_tags))
    cls["mood_tags"] = mood_tags
    cls["mood_confidence"] = round(min(0.99, conf), 2)
    if reason:
        cls["notes"] += "mood_reason:" + ";".join(reason)
    return cls

# ---------------------
# Utility: download bytes (shared)
# ---------------------
def download_bytes(url: str, max_bytes: Optional[int] = None, timeout: int = 10) -> Optional[bytes]:
    headers = {"User-Agent": "Mozilla/5.0"}
    try:
        if max_bytes:
            r = requests.get(url, stream=True, timeout=timeout, headers=headers)
            r.raise_for_status()
            buf = io.BytesIO()
            for chunk in r.iter_content(chunk_size=8192):
                if not chunk:
                    break
                buf.write(chunk)
                if buf.tell() >= max_bytes:
                    break
            return buf.getvalue()
        else:
            r = requests.get(url, timeout=timeout, headers=headers)
            r.raise_for_status()
            return r.content
    except Exception:
        return None

In [11]:
# 6) Classification helpers (VIDEO/IMAGE)

_face_cascade = None
def _get_face_cascade():
    global _face_cascade
    if _face_cascade is None:
        cascade_path = cv2.data.haarcascades + "haarcascade_frontalface_default.xml"
        _face_cascade = cv2.CascadeClassifier(cascade_path)
    return _face_cascade

def download_bytes(url: str, max_bytes: Optional[int] = None, timeout: int = 12) -> Optional[bytes]:
    headers = {"User-Agent": "Mozilla/5.0"}
    try:
        if max_bytes:
            r = requests.get(url, stream=True, timeout=timeout, headers=headers)
            r.raise_for_status()
            buf = io.BytesIO()
            for chunk in r.iter_content(chunk_size=8192):
                if not chunk:
                    break
                buf.write(chunk)
                if buf.tell() >= max_bytes:
                    break
            return buf.getvalue()
        else:
            r = requests.get(url, timeout=timeout, headers=headers)
            r.raise_for_status()
            return r.content
    except Exception:
        return None

def sample_frames_from_video(url_or_path: str, n_frames: int = 6) -> List[np.ndarray]:
    cap = cv2.VideoCapture(url_or_path)
    if not cap.isOpened():
        try:
            data = download_bytes(url_or_path)
            if not data:
                return []
            tmp = "tmp_asset_video.bin"
            with open(tmp, "wb") as f:
                f.write(data)
            cap = cv2.VideoCapture(tmp)
            if not cap.isOpened():
                return []
        except Exception:
            return []
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 0
    if frame_count <= 0:
        frames = []
        for _ in range(n_frames):
            ret, f = cap.read()
            if not ret:
                break
            frames.append(cv2.cvtColor(f, cv2.COLOR_BGR2RGB))
        cap.release()
        return frames
    indices = np.linspace(0, max(0, frame_count - 1), num=min(n_frames, frame_count), dtype=int)
    frames = []
    for idx in indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, int(idx))
        ret, f = cap.read()
        if not ret or f is None:
            continue
        frames.append(cv2.cvtColor(f, cv2.COLOR_BGR2RGB))
    cap.release()
    return frames

def sample_frames_from_image(url_or_path: str) -> List[np.ndarray]:
    b = download_bytes(url_or_path)
    if not b:
        return []
    try:
        im = Image.open(io.BytesIO(b)).convert("RGB")
        arr = np.array(im)
        return [arr]
    except Exception:
        return []

def dominant_colors_from_rgb_array(arr: np.ndarray, top_k: int = 3) -> List[str]:
    try:
        im = Image.fromarray(arr)
        small = im.resize((200, 200))
        pal = small.convert("P", palette=Image.ADAPTIVE, colors=top_k)
        palette = pal.getpalette()
        color_counts = pal.getcolors()
        color_counts.sort(reverse=True)
        dominant = []
        for count, idx in color_counts[:top_k]:
            r = palette[idx*3]; g = palette[idx*3+1]; b = palette[idx*3+2]
            dominant.append('#{:02x}{:02x}{:02x}'.format(r,g,b))
        return dominant
    except Exception:
        return []

def compute_motion_intensity(frames: List[np.ndarray]) -> (str, float):
    if not frames or len(frames) < 2:
        return ("none", 0.0)
    mags = []
    for i in range(1, len(frames)):
        a = cv2.cvtColor(frames[i-1], cv2.COLOR_RGB2GRAY).astype(np.float32)
        b = cv2.cvtColor(frames[i], cv2.COLOR_RGB2GRAY).astype(np.float32)
        diff = np.abs(b - a)
        mags.append(diff.mean())
    mean_mag = float(np.mean(mags)) if mags else 0.0
    if mean_mag < 2.5:
        band = "low"
    elif mean_mag < 8.0:
        band = "medium"
    else:
        band = "high"
    return (band, mean_mag)

def estimate_camera_move(frames: List[np.ndarray]) -> (str, float):
    if not frames or len(frames) < 2:
        return ("unknown", 0.0)
    try:
        prev = cv2.cvtColor(frames[0], cv2.COLOR_RGB2GRAY)
        nxt = cv2.cvtColor(frames[-1], cv2.COLOR_RGB2GRAY)
        flow = cv2.calcOpticalFlowFarneback(prev, nxt, None,
                                            pyr_scale=0.5, levels=3, winsize=15,
                                            iterations=3, poly_n=5, poly_sigma=1.2, flags=0)
        mag, ang = cv2.cartToPolar(flow[...,0], flow[...,1])
        avg_mag = float(np.mean(mag))
        avg_dx = float(np.mean(flow[...,0]))
        avg_dy = float(np.mean(flow[...,1]))
        if avg_mag < 0.5:
            return ("static", avg_mag)
        if abs(avg_dx) > abs(avg_dy) * 1.2:
            return ("pan", avg_mag)
        if abs(avg_dy) > abs(avg_dx) * 1.2:
            return ("tilt", avg_mag)
        return ("dolly_or_forward", avg_mag)
    except Exception:
        return ("unknown", 0.0)

# YOLO wrapper
_yolo_model = None
def _get_yolo_model():
    global _yolo_model
    if not _YOLO_AVAILABLE:
        return None
    if _yolo_model is None:
        _yolo_model = YOLO("yolov8n.pt")
    return _yolo_model

def detect_with_yolo_on_frames(frames: List[np.ndarray]) -> Dict[str, Any]:
    model = _get_yolo_model()
    if model is None or not frames:
        return {"objects": [], "contains_people": False, "person_count": 0}
    try:
        frame = frames[0]
        results = model.predict(source=frame, imgsz=640, conf=0.25, verbose=False)
        dets = []
        contains_people = False
        person_count = 0
        if results and len(results) > 0:
            r = results[0]
            boxes = getattr(r, "boxes", []) or []
            for b in boxes:
                try:
                    cls = int(b.cls.cpu().numpy()[0]) if hasattr(b, 'cls') else int(b.cls[0])
                    label = model.names.get(cls, str(cls)) if hasattr(model, "names") else str(cls)
                    conf = float(b.conf.cpu().numpy()[0]) if hasattr(b, 'conf') else float(b.conf[0])
                except Exception:
                    continue
                dets.append({"label": label, "conf": conf})
                if label.lower() in ("person", "people"):
                    contains_people = True
                    person_count += 1
        return {"objects": dets, "contains_people": contains_people, "person_count": person_count}
    except Exception:
        return {"objects": [], "contains_people": False, "person_count": 0}

def detect_faces_on_frames(frames: List[np.ndarray]) -> Dict[str, Any]:
    face_cascade = _get_face_cascade()
    if face_cascade is None or not frames:
        return {"contains_people": False, "person_count": 0, "objects": []}
    count = 0
    for frame in frames:
        try:
            gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
            faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=4, minSize=(30,30))
            count += len(faces)
        except Exception:
            continue
    return {"contains_people": (count > 0), "person_count": count, "objects": []}

def infer_mood_tags(dominant_colors: List[str], motion_band: str, contains_people: bool, objects: List[Dict]) -> (List[str], float, str):
    tags = []; reason_terms = []; temp = 0
    if dominant_colors:
        def hex_to_rgb(h):
            h = h.lstrip('#')
            return tuple(int(h[i:i+2], 16) for i in (0, 2, 4))
        try:
            r,g,b = hex_to_rgb(dominant_colors[0])
            temp = (r - b) / 255.0
        except Exception:
            temp = 0
    if motion_band in ("none","low"):
        if temp > 0.05:
            tags.extend(["calm", "nostalgic"]); reason_terms.append("warm_palette+low_motion")
        else:
            tags.extend(["calm"]); reason_terms.append("cool+low_motion")
    elif motion_band == "medium":
        if temp > 0.05:
            tags.extend(["gentle", "pleasant"]); reason_terms.append("warm+medium_motion")
        else:
            tags.extend(["moving","reflective"]); reason_terms.append("cool+medium_motion")
    else:
        tags.extend(["energetic","urgent"]); reason_terms.append("high_motion")
    obj_labels = [o["label"].lower() for o in objects] if objects else []
    if contains_people and "calm" in tags:
        tags.append("intimate"); reason_terms.append("people+calm")
    if any(x in obj_labels for x in ("ocean","beach","shore","wave","sea","kite")):
        if "calm" in tags or temp > 0.05:
            tags = ["calm","poignant"]; reason_terms.append("ocean+kite+warm")
    tags = list(dict.fromkeys(tags))[:3]
    score = 0.5
    if dominant_colors: score += 0.15
    if motion_band != "none": score += 0.15
    if contains_people or objects: score += 0.15
    score = min(0.99, score)
    reason = ";".join(reason_terms) if reason_terms else ""
    return tags, round(score,2), reason

def classify_asset(url_or_path: str, asset_type: str = None, sample_frames_count: int = 6) -> Dict[str, Any]:
    if asset_type is None:
        l = url_or_path.lower()
        if any(l.endswith(x) for x in (".mp4",".mov",".webm",".mkv",".avi")):
            asset_type = "video"
        elif any(l.endswith(x) for x in (".jpg",".jpeg",".png",".webp")):
            asset_type = "image"
        else:
            cap = cv2.VideoCapture(url_or_path)
            if cap and cap.isOpened():
                asset_type = "video"; cap.release()
            else:
                asset_type = "image"
    frames = sample_frames_from_video(url_or_path, n_frames=sample_frames_count) if asset_type=="video" else sample_frames_from_image(url_or_path)
    classification = {
        "contains_people": False, "person_count": 0, "objects": [],
        "motion_intensity": "none", "motion_metric": 0.0,
        "camera_move": "unknown", "camera_move_metric": 0.0,
        "dominant_colors": [], "mood_tags": [], "mood_confidence": 0.0, "mood_reason": "", "notes": ""
    }
    if not frames:
        classification["notes"]="no_frames_obtained"
        return {"url":url_or_path, "type":asset_type, "classification":classification}
    try:
        dom = dominant_colors_from_rgb_array(frames[0], top_k=3)
        classification["dominant_colors"]=dom
    except Exception:
        classification["dominant_colors"]=[]
    if _YOLO_AVAILABLE:
        try:
            det = detect_with_yolo_on_frames(frames)
            classification["objects"]=det.get("objects",[])
            classification["contains_people"]=bool(det.get("contains_people",False))
            classification["person_count"]=int(det.get("person_count",0))
        except Exception as e:
            classification["notes"]+=f"yolo_failed:{e};"
    else:
        try:
            det = detect_faces_on_frames(frames)
            classification["contains_people"]=det.get("contains_people",False)
            classification["person_count"]=det.get("person_count",0)
            classification["objects"]=[]
        except Exception as e:
            classification["notes"]+=f"face_failed:{e};"
    try:
        band, metric = compute_motion_intensity(frames)
        classification["motion_intensity"]=band; classification["motion_metric"]=round(metric,3)
    except Exception as e:
        classification["notes"]+=f"motion_failed:{e};"
    try:
        cam, cam_metric = estimate_camera_move(frames)
        classification["camera_move"]=cam; classification["camera_move_metric"]=round(cam_metric,3)
    except Exception as e:
        classification["notes"]+=f"camera_move_failed:{e};"
    try:
        tags, conf, reason = infer_mood_tags(classification["dominant_colors"], classification["motion_intensity"], classification["contains_people"], classification["objects"])
        classification["mood_tags"]=tags; classification["mood_confidence"]=conf; classification["mood_reason"]=reason
    except Exception as e:
        classification["notes"]+=f"mood_failed:{e};"
    return {"url":url_or_path, "type":asset_type, "classification":classification}



In [12]:
def classify_asset(url_or_path: str, asset_type: str = None, sample_frames_count: int = 6) -> Dict[str, Any]:
    """
    A consolidated classify_asset that routes to video/image or audio classification.
    """
    # If asset_type explicitly 'audio', call audio classifier
    if asset_type == "audio":
        return {"url": url_or_path, "type": "audio", "classification": classify_audio_item({"url": url_or_path, "tags": [], "uploader": None, "duration": None, "license": None})}
    if asset_type is None:
        l = url_or_path.lower()
        if any(l.endswith(x) for x in (".mp4",".mov",".webm",".mkv",".avi")):
            asset_type = "video"
        elif any(l.endswith(x) for x in (".jpg",".jpeg",".png",".webp")):
            asset_type = "image"
        else:
            cap = cv2.VideoCapture(url_or_path)
            if cap and cap.isOpened():
                asset_type = "video"
                cap.release()
            else:
                asset_type = "image"

    frames = []
    if asset_type == "video":
        frames = sample_frames_from_video(url_or_path, n_frames=sample_frames_count)
    else:
        frames = sample_frames_from_image(url_or_path)

    classification = {
        "contains_people": False, "person_count": 0, "objects": [],
        "motion_intensity": "none", "motion_metric": 0.0,
        "camera_move": "unknown", "camera_move_metric": 0.0,
        "dominant_colors": [], "mood_tags": [], "mood_confidence": 0.0, "mood_reason": "", "notes": ""
    }

    if not frames:
        classification["notes"] = "no_frames_obtained"
        return {"url": url_or_path, "type": asset_type, "classification": classification}

    # dominant colors
    try:
        classification["dominant_colors"] = dominant_colors_from_rgb_array(frames[0], top_k=3)
    except Exception:
        classification["dominant_colors"] = []

    # detection: YOLO if available else Haar faces
    if _YOLO_AVAILABLE:
        try:
            det = detect_with_yolo_on_frames(frames)
            classification["objects"] = det.get("objects", [])
            classification["contains_people"] = bool(det.get("contains_people", False))
            classification["person_count"] = int(det.get("person_count", 0))
        except Exception as e:
            classification["notes"] += f"yolo_failed:{e};"
    else:
        try:
            det = detect_faces_on_frames(frames)
            classification["contains_people"] = det.get("contains_people", False)
            classification["person_count"] = det.get("person_count", 0)
            classification["objects"] = []
        except Exception as e:
            classification["notes"] += f"face_failed:{e};"

    # motion
    try:
        band, metric = compute_motion_intensity(frames)
        classification["motion_intensity"] = band
        classification["motion_metric"] = round(metric, 3)
    except Exception as e:
        classification["notes"] += f"motion_failed:{e};"

    # camera move
    try:
        cam, cam_metric = estimate_camera_move(frames)
        classification["camera_move"] = cam
        classification["camera_move_metric"] = round(cam_metric, 3)
    except Exception as e:
        classification["notes"] += f"camera_move_failed:{e};"

    # mood tags (reuse infer_mood_tags)
    try:
        tags, conf, reason = infer_mood_tags(classification["dominant_colors"], classification["motion_intensity"], classification["contains_people"], classification["objects"])
        classification["mood_tags"] = tags
        classification["mood_confidence"] = conf
        classification["mood_reason"] = reason
    except Exception as e:
        classification["notes"] += f"mood_failed:{e};"

    return {"url": url_or_path, "type": asset_type, "classification": classification}


In [13]:
# ---------------------
# 6) Pipeline: fetch (pexels + freesound) + classify + assemble plan
# ---------------------
def build_plan_fetch_and_classify(prompt_text: str) -> Dict[str, Any]:
    shots = ask_model_for_shots(prompt_text, min_shots=1, max_shots=MAX_SHOTS)
    plan = {"prompt": prompt_text, "shots": [], "asset_manifest": {}}
    manifest_index = 0

    for shot in shots:
        queries = expand_queries_for_shot(shot)
        shot_entry = {
            "id": shot.get("id"),
            "text_description": shot.get("text_description"),
            "suggested_duration_seconds": shot.get("suggested_duration_seconds"),
            "queries_used": queries,
            "assets": {"videos": [], "images": [], "audios": []}
        }
        for q in queries:
            print(f"[INFO] Fetching Pexels for shot {shot.get('id')} query: {q}")
            found_media = fetch_pexels_for_query(q)
            # classify videos
            for v in found_media.get("videos", []):
                url = v.get("url")
                if not url:
                    continue
                if url in [a.get("url") for a in shot_entry["assets"]["videos"]]:
                    continue
                cls = classify_asset(url, asset_type="video", sample_frames_count=4)
                manifest_index += 1
                aid = f"video_{manifest_index}"
                plan["asset_manifest"][aid] = {"url": url, "type": "video", "classification": cls["classification"], "meta": v.get("meta")}
                shot_entry["assets"]["videos"].append({"asset_id": aid, "url": url, "classification": cls["classification"]})

            # classify images
            for im in found_media.get("images", []):
                url = im.get("url")
                if not url:
                    continue
                if url in [a.get("url") for a in shot_entry["assets"]["images"]]:
                    continue
                cls = classify_asset(url, asset_type="image", sample_frames_count=1)
                manifest_index += 1
                aid = f"image_{manifest_index}"
                plan["asset_manifest"][aid] = {"url": url, "type": "image", "classification": cls["classification"], "meta": im.get("meta")}
                shot_entry["assets"]["images"].append({"asset_id": aid, "url": url, "classification": cls["classification"]})

            # fetch & classify freesound audios for the same query
            print(f"[INFO] Fetching Freesound for shot {shot.get('id')} query: {q}")
            found_audio = fetch_freesound_for_query(q)
            for a in found_audio.get("audios", []):
                url = a.get("url")
                if not url:
                    continue
                if url in [x.get("url") for x in shot_entry["assets"]["audios"]]:
                    continue
                # classify audio (we have metadata in 'a'), use classify_audio_item
                audio_cls = classify_audio_item(a)
                manifest_index += 1
                aid = f"audio_{manifest_index}"
                plan["asset_manifest"][aid] = {"url": url, "type": "audio", "classification": audio_cls, "meta": a.get("meta")}
                shot_entry["assets"]["audios"].append({"asset_id": aid, "url": url, "classification": audio_cls})

            time.sleep(0.25)
        plan["shots"].append(shot_entry)
    return plan



In [None]:
# ---------------------
# 7) Run & save
# ---------------------
#Sunset over the ocean - calm

if __name__ == "__main__":
    brief = "a dog and a cat fightun- energetic, fast-paced"
    print("[RUN] Building plan, fetching Pexels & Freesound, classifying assets for:", brief)
    plan = build_plan_fetch_and_classify(brief)
    out_path = "plan_pexels_enriched_with_audio.json"
    with open(out_path, "w", encoding="utf-8") as f:
        json.dump(plan, f, indent=2)
    print("Saved", out_path)
    print(json.dumps({"prompt": plan["prompt"], "shots_count": len(plan["shots"]), "assets_count": len(plan["asset_manifest"])}, indent=2))


[RUN] Building plan, fetching Pexels & Freesound, classifying assets for: Bustling city with vehicles and people - energetic, fast-paced


[INFO] Fetching Pexels for shot shot_1 query: Aerial view of a busy intersection with cars and buses moving rapidly.
[INFO] Fetching Freesound for shot shot_1 query: Aerial view of a busy intersection with cars and buses moving rapidly.
[INFO] Fetching Pexels for shot shot_1 query: city traffic bustling aerial
[INFO] Fetching Freesound for shot shot_1 query: city traffic bustling aerial
[INFO] Fetching Pexels for shot shot_1 query: city
[INFO] Fetching Freesound for shot shot_1 query: city
[INFO] Fetching Pexels for shot shot_2 query: Close-up of pedestrians crossing the street, showcasing diverse groups of people.
[INFO] Fetching Freesound for shot shot_2 query: Close-up of pedestrians crossing the street, showcasing diverse groups of people.
[INFO] Fetching Pexels for shot shot_2 query: people pedestrians crosswalk diversity
[INFO] Fetching Freesound for shot shot_2 query: people pedestrians crosswalk diversity
[INFO] Fetching Pexels for shot shot_2 query: people
[INFO] Fetching Free