# 🎬 Trailer AI — Rewritten Notebook
End-to-end: CSV → Download (yt_dlp API) → Features → Quick Train → Evaluate → Trailer.


In [ ]:
%pip install -q yt-dlp opencv-python librosa==0.10.1 numpy==1.24.4 pandas tqdm scikit-learn joblib matplotlib python-dotenv


In [ ]:
import os, sys, re, json, shutil, subprocess
from dataclasses import dataclass, asdict
from typing import List, Tuple, Optional, Dict
import numpy as np, pandas as pd
import cv2, librosa, yt_dlp
import matplotlib; matplotlib.use('Agg')
import matplotlib.pyplot as plt
from tqdm import tqdm
from sklearn.linear_model import LogisticRegression
import joblib
WORKDIR = os.path.join(os.getcwd(), 'data')
for d in ('raw','features','models','out'): os.makedirs(os.path.join(WORKDIR,d), exist_ok=True)
print('Working dir:', WORKDIR)


In [ ]:
def basename_noext(p: str) -> str:
    return os.path.splitext(os.path.basename(p))[0]
def seconds_from_vtt_ts(ts: str) -> float:
    parts = re.split(r'[,:.]', ts); h,m,s = int(parts[0]), int(parts[1]), float(parts[2]); return h*3600+m*60+s
def parse_vtt(vtt_path: str):
    if not vtt_path or not os.path.exists(vtt_path): return []
    entries=[]; block=[]
    with open(vtt_path,'r',encoding='utf-8',errors='ignore') as f:
        for line in f:
            line=line.rstrip('\n')
            if not line.strip():
                if block:
                    for i,ln in enumerate(block):
                        if '-->' in ln:
                            t1,t2=[x.strip() for x in ln.split('-->')]
                            try:
                                s=seconds_from_vtt_ts(t1); e=seconds_from_vtt_ts(t2.split(' ')[0]); text=' '.join(block[i+1:]).strip()
                                if e>s: entries.append((s,e,text))
                            except: pass
                            break
                block=[]
            else:
                block.append(line)
    return entries
def caption_overlap(captions, start, end):
    if not captions: return 0.0
    dur=max(1e-6,end-start); covered=0.0
    for s,e,_ in captions:
        inter=max(0.0,min(end,e)-max(start,s)); covered+=inter
    return min(covered,dur)/dur
def caption_keyword_density(captions,start,end):
    import re; tot=0.0; dur=max(1e-6,end-start)
    for s,e,text in captions:
        inter=max(0.0,min(end,e)-max(start,s))
        if inter>0:
            w=len(re.findall(r'\w+',text.lower())); tot+=w*(inter/(e-s+1e-6))
    return float(tot/dur)
def normalize(vals):
    a=np.asarray(vals,dtype=np.float32); 
    if a.size==0: return a
    mn,mx=float(np.min(a)),float(np.max(a)); 
    return np.zeros_like(a) if mx-mn<1e-12 else (a-mn)/(mx-mn)


In [ ]:
def download_video(url: str, raw_dir: str, write_subs: bool=False, cookies_path: Optional[str]=None,
                   sleep_requests: Optional[int]=10, max_sleep_interval: Optional[int]=20):
    os.makedirs(raw_dir, exist_ok=True)
    ydl_opts = {
        'outtmpl': os.path.join(raw_dir, '%(id)s.%(ext)s'),
        'format': 'mp4/bestvideo[ext=mp4]+bestaudio[ext=m4a]/best',
        'merge_output_format': 'mp4',
        'noplaylist': True,
        'quiet': True,
        'retries': 3,
    }
    if write_subs:
        ydl_opts.update({'writesubtitles': True,'writeautomaticsub': True,'subtitleslangs': ['en'],'subtitlesformat':'vtt'})
    if cookies_path and os.path.exists(cookies_path): ydl_opts['cookiefile']=cookies_path
    if sleep_requests and max_sleep_interval:
        ydl_opts['sleep_interval_requests']=sleep_requests; ydl_opts['max_sleep_interval_requests']=max_sleep_interval
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        info = ydl.extract_info(url, download=True)
        if info is None: raise RuntimeError('yt_dlp failed to fetch info')
        if 'requested_downloads' in info and info['requested_downloads']:
            filepath = info['requested_downloads'][0]['filepath']
        else:
            vid = info.get('id'); ext = info.get('ext','mp4'); filepath = os.path.join(raw_dir, f"{vid}.{ext}")
    vtt_path=None; vid=os.path.splitext(os.path.basename(filepath))[0]
    for cand in (os.path.join(raw_dir,f"{vid}.en.vtt"), os.path.join(raw_dir,f"{vid}.vtt")):
        if os.path.exists(cand): vtt_path=cand; break
    return filepath, vtt_path


In [ ]:
from dataclasses import dataclass
from typing import List, Tuple
@dataclass
class Chunk:
    video_id: str; start: float; end: float; motion: float=0.0; audio: float=0.0; cap_overlap: float=0.0; kw_density: float=0.0; score: float=0.0
def sample_video_histograms(mp4_path: str, fps_sample: float = 2.0):
    cap=cv2.VideoCapture(mp4_path)
    if not cap.isOpened(): raise RuntimeError(f'Cannot open video: {mp4_path}')
    fps=cap.get(cv2.CAP_PROP_FPS) or 30.0; step=max(1,int(round(fps/fps_sample)))
    ts_list, diffs, prev_hist=[], [], None; frame_idx=0
    while True:
        ret=cap.grab();
        if not ret: break
        if frame_idx % step == 0:
            ret,frame=cap.retrieve(); 
            if not ret: break
            ts=cap.get(cv2.CAP_PROP_POS_MSEC)/1000.0
            hsv=cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
            h0=cv2.calcHist([hsv],[0],None,[64],[0,180])
            h1=cv2.calcHist([hsv],[1],None,[64],[0,256])
            h2=cv2.calcHist([hsv],[2],None,[64],[0,256])
            hist=np.concatenate([h0.flatten(),h1.flatten(),h2.flatten()]).astype(np.float32); hist/= (np.sum(hist)+1e-6)
            diffs.append(0.0 if prev_hist is None else float(np.sum(np.abs(hist-prev_hist))))
            prev_hist=hist; ts_list.append(ts)
        frame_idx+=1
    cap.release(); ts=np.array(ts_list,dtype=np.float32); diffs=np.array(diffs,dtype=np.float32); return ts,diffs
def detect_scenes(ts, diffs, thresh: float = 0.55):
    if len(ts)==0: return [(0.0,0.0)]
    cuts=[0];
    for i in range(1,len(diffs)):
        if diffs[i]>thresh: cuts.append(i)
    cuts.append(len(ts)-1)
    bounds=[]
    for i in range(len(cuts)-1):
        s=float(ts[cuts[i]]); e=float(ts[cuts[i+1]]); 
        if e>s: bounds.append((s,e))
    return bounds
def make_chunks(bounds: List[Tuple[float,float]], min_len=2.0, max_len=6.0, video_id='vid'):
    chunks=[]
    for s,e in bounds:
        cur=s
        while cur + min_len <= e:
            end=min(cur+max_len,e); chunks.append(Chunk(video_id=video_id,start=cur,end=end)); cur+=min_len
    return chunks
def avg_motion(diffs, ts, start, end):
    mask=(ts>=start)&(ts<=end)
    return float(np.mean(diffs[mask])) if np.any(mask) else 0.0
def audio_rms(mp4_path: str, start: float, end: float):
    dur=max(0.0,end-start)
    if dur<=0.0: return 0.0
    try:
        y,sr=librosa.load(mp4_path, sr=None, offset=max(0.0,start), duration=dur)
        return float(np.sqrt(np.mean(y**2))) if y.size>0 else 0.0
    except Exception:
        return 0.0
def compute_features_for_video(mp4_path: str, vtt_path: Optional[str], min_seg: float, max_seg: float, scene_thresh: float):
    video_id=basename_noext(mp4_path); captions=parse_vtt(vtt_path) if vtt_path else []
    ts,diffs=sample_video_histograms(mp4_path, fps_sample=2.0)
    if len(ts)==0: raise RuntimeError('Failed to sample frames.')
    bounds=detect_scenes(ts,diffs,thresh=scene_thresh) or [(0.0,float(ts[-1]))]
    chunks=make_chunks(bounds,min_len=min_seg,max_len=max_seg,video_id=video_id)
    for c in tqdm(chunks, desc=f'Features {video_id}'):
        c.motion=avg_motion(diffs,ts,c.start,c.end)
        c.audio=audio_rms(mp4_path,c.start,c.end)
        c.cap_overlap=caption_overlap(captions,c.start,c.end) if captions else 0.0
        c.kw_density=caption_keyword_density(captions,c.start,c.end) if captions else 0.0
    m_n=normalize([c.motion for c in chunks]); a_n=normalize([c.audio for c in chunks]); t_n=normalize([0.5*c.cap_overlap+0.5*c.kw_density for c in chunks])
    for i,c in enumerate(chunks): c.score=float(0.4*m_n[i]+0.4*a_n[i]+0.2*t_n[i])
    return chunks
def greedy_select(chunks, target_len, min_gap):
    chosen=[]; used=[]; total=0.0
    for c in sorted(chunks,key=lambda x:x.score, reverse=True):
        if total>=target_len*0.98: break
        if any(abs(c.start-s)<min_gap for s in used): continue
        dur=c.end-c.start
        if total+dur>target_len+2.0: continue
        chosen.append(c); used.append(c.start); total+=dur
    return chosen
def render_trailer(mp4_path, chunks, out_mp4, target_len, min_seg):
    selected=greedy_select(chunks,target_len=target_len,min_gap=min_seg/2.0)
    if not selected: raise RuntimeError('No chunks selected for trailer.')
    tmp=os.path.join(os.path.dirname(out_mp4),'_tmp'); os.makedirs(tmp, exist_ok=True)
    parts=[]
    for i,c in enumerate(selected):
        part=os.path.join(tmp,f'part_{i:03d}.mp4')
        cmd=['ffmpeg','-y','-ss',f'{c.start:.3f}','-to',f'{c.end:.3f}','-i',mp4_path,'-c:v','libx264','-preset','veryfast','-crf','23','-c:a','aac','-b:a','128k',part]
        try:
            subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        except Exception as e:
            raise RuntimeError('ffmpeg is required on PATH. Please install ffmpeg.') from e
        parts.append(part)
    fl=os.path.join(tmp,'files.txt')
    with open(fl,'w',encoding='utf-8') as f:
        for p in parts: f.write(f"file '{os.path.abspath(p)}'\n")
    os.makedirs(os.path.dirname(out_mp4), exist_ok=True)
    subprocess.run(['ffmpeg','-y','-safe','0','-f','concat','-i',fl,'-c','copy', out_mp4], check=True)
    shutil.rmtree(tmp, ignore_errors=True); print('🎬 Trailer saved:', out_mp4)


In [ ]:
csv_path=os.path.join(WORKDIR,'video_ids.csv')
if not os.path.exists(csv_path):
    pd.DataFrame({'video_id':['dQw4w9WgXcQ']}).to_csv(csv_path, index=False)
df=pd.read_csv(csv_path)
if 'video_id' not in df.columns: raise ValueError("CSV must have a 'video_id' column.")
urls=[f"https://www.youtube.com/watch?v={vid}" for vid in df['video_id'].dropna().astype(str).tolist()]
print(f'✅ Loaded {len(urls)} video(s):')
for u in urls: print('  ', u)


In [ ]:
min_seg, max_seg, scene_thresh = 2.0, 6.0, 0.55
raw_dir=os.path.join(WORKDIR,'raw'); features_dir=os.path.join(WORKDIR,'features')
WRITE_SUBS=False; COOKIES_PATH=None
for url in tqdm(urls):
    try:
        mp4,vtt=download_video(url, raw_dir, write_subs=WRITE_SUBS, cookies_path=COOKIES_PATH)
        chunks=compute_features_for_video(mp4, vtt, min_seg, max_seg, scene_thresh)
        out_csv=os.path.join(features_dir, f"{basename_noext(mp4)}.csv")
        pd.DataFrame([asdict(c) for c in chunks]).to_csv(out_csv, index=False)
        print('✅ Saved features →', out_csv)
    except Exception as e:
        print('⚠️ Error on URL:', url, '→', e)


In [ ]:
def label_chunks_from_annotations(df: pd.DataFrame, ann_csv: str) -> pd.DataFrame:
    ann=pd.read_csv(ann_csv)
    req={'video_id','start_sec','end_sec'}
    if not (set(ann.columns) >= req): raise RuntimeError('annotations.csv must have columns: video_id,start_sec,end_sec')
    def iou(a_s,a_e,b_s,b_e):
        inter=max(0.0, min(a_e,b_e)-max(a_s,b_s)); union=(a_e-a_s)+(b_e-b_s)-inter; return inter/union if union>0 else 0.0
    labels=[]
    for _,r in df.iterrows():
        v=ann[ann.video_id==r['video_id']]; lbl=0
        for _,a in v.iterrows():
            if iou(r['start'], r['end'], float(a['start_sec']), float(a['end_sec'])) >= 0.5: lbl=1; break
        labels.append(lbl)
    out=df.copy(); out['label']=labels; return out
def train_quick_model(features_dir: str, ann_csv: str, model_path: str):
    files=[os.path.join(features_dir,f) for f in os.listdir(features_dir) if f.endswith('.csv')]
    if not files: raise RuntimeError('No feature CSVs found. Run extraction first.')
    X=pd.concat([pd.read_csv(f) for f in files], ignore_index=True)
    X=label_chunks_from_annotations(X, ann_csv)
    feat_cols=['motion','audio','cap_overlap','kw_density']; y=X['label'].values.astype(np.int32)
    clf=LogisticRegression(max_iter=1000, class_weight='balanced'); clf.fit(X[feat_cols].values, y)
    os.makedirs(os.path.dirname(model_path), exist_ok=True); joblib.dump(clf, model_path)
    print('✅ Quick model saved to', model_path)
ann_csv=os.path.join(WORKDIR,'annotations.csv')
if not os.path.exists(ann_csv):
    with open(ann_csv,'w') as f:
        f.write('video_id,start_sec,end_sec\n')
        vids=sorted([basename_noext(p) for p in os.listdir(raw_dir) if p.endswith('.mp4')])
        if vids:
            f.write(f"{vids[0]},10,16\n")
model_path=os.path.join(WORKDIR,'models','ranking_model.pkl')
train_quick_model(features_dir, ann_csv, model_path)


In [ ]:
def _rank_metrics_per_video(labels, scores, ks=(5,10)):
    labels=np.asarray(labels,dtype=np.int32); scores=np.asarray(scores,dtype=np.float32)
    order=np.argsort(-scores); y=labels[order]; npos=int(y.sum()); res={}
    for K in ks:
        K=int(min(K,len(y))); topk=y[:K]; tp=int(topk.sum())
        prec=tp/max(1,K); rec=tp/max(1,npos) if npos>0 else 0.0
        f1=0.0 if (prec+rec)==0 else 2*prec*rec/(prec+rec)
        gains=(2**topk-1)/np.log2(np.arange(2,K+2)); dcg=gains.sum()
        ideal=np.sort(labels)[::-1][:K]; idcg=((2**ideal-1)/np.log2(np.arange(2,len(ideal)+2))).sum(); ndcg=(dcg/idcg) if idcg>0 else 0.0
        res[f'P@{K}']=prec; res[f'R@{K}']=rec; res[f'F1@{K}']=f1; res[f'NDCG@{K}']=ndcg
    ap=0.0
    if npos>0:
        hits=0; precs=[]
        for i,rel in enumerate(y, start=1):
            if rel==1: hits+=1; precs.append(hits/i)
        ap=float(np.mean(precs)) if precs else 0.0
    res['MAP']=ap; return res
def evaluate_model(features_dir, annotations_csv, model_path, out_csv, out_png, ks=(5,10)):
    files=[os.path.join(features_dir,f) for f in os.listdir(features_dir) if f.endswith('.csv')]
    if not files: raise RuntimeError('No feature CSVs found.')
    df=pd.concat([pd.read_csv(f) for f in files], ignore_index=True)
    df=label_chunks_from_annotations(df, annotations_csv)
    feat_cols=['motion','audio','cap_overlap','kw_density']
    clf=joblib.load(model_path)
    scores=None
    if hasattr(clf,'predict_proba'):
        try:
            proba=clf.predict_proba(df[feat_cols].values)
            scores=proba[:,1] if proba.ndim==2 and proba.shape[1]>=2 else proba.ravel()
        except Exception:
            scores=None
    if scores is None: scores=clf.predict(df[feat_cols].values)
    df['pred']=scores
    vids=sorted(df['video_id'].unique().tolist()); rows=[]
    agg={f'P@{k}':0.0 for k in ks}; agg.update({f'R@{k}':0.0 for k in ks}); agg.update({f'F1@{k}':0.0 for k in ks}); agg.update({f'NDCG@{k}':0.0 for k in ks}); agg['MAP']=0.0
    n=0
    for vid in vids:
        sub=df[df.video_id==vid]; m=_rank_metrics_per_video(sub['label'].values, sub['pred'].values, ks=ks)
        rows.append({'video_id':vid, **m})
        for k in ks:
            agg[f'P@{k}']+=m[f'P@{k}']; agg[f'R@{k}']+=m[f'R@{k}']; agg[f'F1@{k}']+=m[f'F1@{k}']; agg[f'NDCG@{k}']+=m[f'NDCG@{k}']
        agg['MAP']+=m['MAP']; n+=1
    if n>0:
        for k in ks:
            agg[f'P@{k}']/=n; agg[f'R@{k}']/=n; agg[f'F1@{k}']/=n; agg[f'NDCG@{k}']/=n
        agg['MAP']/=n
    os.makedirs(os.path.dirname(out_csv), exist_ok=True); pd.DataFrame(rows).to_csv(out_csv, index=False)
    labels=[*(f'P@{k}' for k in ks), *(f'R@{k}' for k in ks), *(f'F1@{k}' for k in ks), *(f'NDCG@{k}' for k in ks), 'MAP']
    values=[agg[x] for x in labels]
    plt.figure(figsize=(10,4)); plt.bar(labels, values); plt.ylim(0,1.0); plt.title('Ranking Model Evaluation'); plt.ylabel('Score'); plt.tight_layout()
    os.makedirs(os.path.dirname(out_png), exist_ok=True); plt.savefig(out_png); plt.close(); return agg
eval_csv=os.path.join(WORKDIR,'models','eval_report.csv'); eval_png=os.path.join(WORKDIR,'models','eval_report.png')
metrics=evaluate_model(features_dir, ann_csv, model_path, eval_csv, eval_png, ks=(5,10)); metrics


In [ ]:
mp4=sorted([p for p in os.listdir(raw_dir) if p.endswith('.mp4')])[-1]
mp4_path=os.path.join(raw_dir, mp4)
chunks=compute_features_for_video(mp4_path, None, 2.0, 6.0, 0.55)
clf=joblib.load(model_path)
Xf=np.array([[c.motion,c.audio,c.cap_overlap,c.kw_density] for c in chunks], dtype=np.float32)
scores=None
if hasattr(clf,'predict_proba'):
    try:
        proba=clf.predict_proba(Xf); scores=proba[:,1] if proba.ndim==2 and proba.shape[1]>=2 else proba.ravel()
    except Exception: scores=None
if scores is None: scores=clf.predict(Xf)
mn,mx=float(np.min(scores)), float(np.max(scores)); p=(scores-mn)/(mx-mn+1e-9)
for i,c in enumerate(chunks): c.score=float(p[i])
out_mp4=os.path.join(WORKDIR,'out', f"{basename_noext(mp4_path)}_trailer.mp4")
render_trailer(mp4_path, chunks, out_mp4, 45, 2.0)
out_mp4


In [ ]:
from IPython.display import Image, Video, display
display(Image(os.path.join(WORKDIR,'models','eval_report.png')))
Video(out_mp4, embed=True)
