# Phase 1 — Data Collection

**Goal:** Collect YouTube video metadata for a target UTC calendar day, with a widened publish window to improve recall.

**Inputs:** YouTube Data API v3 (search, videos, channels)

**Outputs (example):**
- `data/raw/2025-09-18/search/*.json` (optional raw snapshots)
- `data/processed/2025-09-18/videos.parquet`  (+ CSV if enabled)
- `data/processed/2025-09-18/channels.parquet` (optional)

**Assumptions:**
- Timestamps are treated as **UTC**.
- API key is provided via environment variable `YOUTUBE_API_KEY` (or `.env`).

In [26]:
# ===== 1) SETUP =====
# --- Standard library ---
import os
import sys
import time
import json
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List

# --- Third-party ---
import requests
import pandas as pd
from tqdm import tqdm
from dotenv import load_dotenv, find_dotenv


In [27]:
# ===== 2) CONFIG =====
os.environ.pop("YOUTUBE_API_KEY", None)                 
load_dotenv(find_dotenv(usecwd=True), override=True)

API_KEY = os.getenv("YOUTUBE_API_KEY")
assert API_KEY and API_KEY.strip(), "Missing YOUTUBE_API_KEY in environment or .env"
# Filtering strategy: relaxed, not strict. In practice, tight constraints (e.g., extremely narrow publish windows + strictly searching english)
# collapsed daily results to near zero for several categories/regions. Prioritizing recall to ensure enough data and clean downstream

REGION_CODE = "US"          # only search videos from US to bias toward English titles
RELEVANCE_LANGUAGE = None   # hint for search (not a strict filter so still possible to get non-eng titles)
DAYS_AGO = 3                 # target the UTC calendar day N days ago
WIDEN_HOURS = 12             # Hours widen the target day (broadening publish window looked at)
SEARCH_UNITS = 9000          # budget for search.list (100 units per page)
SAVE_CSV = True              # also write CSV alongside Parquet
SAVE_RAW = True              # save raw JSON responses (good for debugging)
OUT_ROOT = "data"           # output root folder (created if missing)

# YouTube API base
YOUTUBE_API_BASE = "https://www.googleapis.com/youtube/v3"

In [28]:
# ===== 3) HELPERS =====

# Creates a directory if it doesnt exist
def ensure_dir(p: str):
    os.makedirs(p, exist_ok=True)

# Current UTC timestamp in ISO-like format (YYYY-mm-ddTHH:MM:SSZ).
def ts_now_utc() -> str:
    return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H-%M-%SZ")

#   Returns (publishedAfter, publishedBefore) formatted as RFC3339 for the UTC day 'days_ago' before now.
def iso_day_bounds_utc(days_ago: int):
    now = datetime.now(timezone.utc)
    target_day = (now - timedelta(days=days_ago)).date()
    start = datetime(target_day.year, target_day.month, target_day.day, 0, 0, 0, tzinfo=timezone.utc)
    end   = datetime(target_day.year, target_day.month, target_day.day, 23, 59, 59, tzinfo=timezone.utc)
    return start.isoformat().replace("+00:00","Z"), end.isoformat().replace("+00:00","Z")

# Convert an ISO-8601 duration from the API (e.g., 'PT2H3M10S') into seconds.
# - Handles hours/minutes/seconds parts (PT#H#M#S). Ignores weeks/months/years.
# - Returns None if the input is empty or not a 'PT...' duration.
def parse_iso8601_duration_to_seconds(iso_dur: str):
    if not iso_dur or not iso_dur.startswith("PT"):
        return None
    total = 0
    num = ''
    for ch in iso_dur[2:]:
        if ch.isdigit():
            num += ch
        else:
            if ch == 'H' and num:
                total += int(num) * 3600
            elif ch == 'M' and num:
                total += int(num) * 60
            elif ch == 'S' and num:
                total += int(num)
            num = ''
    return total

# Conversion to int, returns none if fails 
def safe_int(x):
    try:
        return int(x)
    except Exception:
        return None

# Yields successive chunks of size "n" from list "lst" (for batching API calls)
def chunked(lst, n):
    for i in range(0, len(lst), n):
        yield lst[i:i+n]

# Thin wrapper around GET requests to YouTube Data API v3.
#   - Appends the API key to the query string
#   - Raises for HTTP errors 
#   - Returns the parsed JSON response
#   - Uses a 30s timeout to avoid hanging forever
def yt_get(endpoint: str, params: Dict[str, Any], api_key: str) -> Dict[str, Any]:
    url = f"{YOUTUBE_API_BASE}/{endpoint}"
    p = params.copy()
    p["key"] = api_key
    r = requests.get(url, params=p, timeout=30)
    r.raise_for_status()
    return r.json()

In [29]:
# ===== 4) API WRAPPERS  =====

# Fetches YouTube video categories for a given region
def fetch_categories(api_key: str, region_code: str, raw_dir: str):
    params = {"part": "snippet", "regionCode": region_code, "hl": "en_US"}
    data = yt_get("videoCategories", params, api_key)
    # Optional raw capture for reproducibility and auditing 
    if SAVE_RAW:
        raw_path = os.path.join(raw_dir, f"categories_{region_code}_{ts_now_utc()}.json")
        with open(raw_path, "w", encoding="utf-8") as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
    # Flattens the response into rows
    rows = []
    for item in data.get("items", []):
        rows.append({
            "id": item.get("id"),
            "title": item.get("snippet", {}).get("title"),
            "assignable": item.get("snippet", {}).get("assignable", False),
        })
    return pd.DataFrame(rows)

# performs a single pass of pagniated search requests 
def _search_pass(api_key, params_base, pages, raw_dir, tag):
    ids = []
    next_page = None
    for i in range(1, pages+1):
        p = dict(params_base)
        if next_page:
            p["pageToken"] = next_page
        data = yt_get("search", p, api_key)
        # Optional raw capture of each page 
        if SAVE_RAW:
            with open(os.path.join(raw_dir, f"search_{tag}_p{i:03d}_{ts_now_utc()}.json"), "w", encoding="utf-8") as f:
                json.dump(data, f, ensure_ascii=False, indent=2)
        # Extract VideoIds only
        ids.extend([it.get("id", {}).get("videoId") for it in data.get("items", []) if it.get("id", {}).get("videoId")])
        # Handles pagination
        next_page = data.get("nextPageToken")
        if not next_page:
            break
        # Small pause to be polite and avoid triggering rate limits
        time.sleep(0.15)
    return ids

# Broad search sweep for videos in a UTC day window, with three robustness passes:
def search_all_categories_day(api_key: str, region_code: str, relevance_language: str,
                              published_after: str, published_before: str,
                              search_units_budget: int, raw_dir: str,
                              page_pause_s: float = 0.15):
    # convert budget (units) to pages (100 units per page)
    total_pages = max(1, search_units_budget // 100)
    collected = set()

    # Base params (no q)
    base = {
        "part": "id",
        "type": "video",
        "regionCode": region_code,
        "publishedAfter": published_after,
        "publishedBefore": published_before,
        "order": "date",
        "maxResults": 50,
    }
    if relevance_language:  # only add if set
        base["relevanceLanguage"] = relevance_language

    # PASS 1 — no q
    ids = _search_pass(api_key, base, pages=min(10, total_pages//3 or 1), raw_dir=raw_dir, tag="p1_allcats_noq")
    collected.update([i for i in ids if i])

    # PASS 2 — light q-seeds to force results if pass 1 was thin
    if len(collected) < 200:
        q_seeds = ["a","e","i","o","u","the","and","1","2","3"]
        pages_left = max(1, total_pages//3)
        per_seed = max(1, pages_left // len(q_seeds))
        for q in q_seeds:
            b = dict(base); b["q"] = q
            ids = _search_pass(api_key, b, pages=per_seed, raw_dir=raw_dir, tag=f"p2_seed_{q}")
            collected.update([i for i in ids if i])

    # PASS 3 — per-category fallback if still thin
    if len(collected) < 200:
        try:
            cats = fetch_categories(api_key, region_code, raw_dir)
            cat_ids = [c for c in cats.loc[cats.assignable==True, "id"].tolist() if c]
        except Exception:
            cat_ids = ["10","24","20","27","28","17","25","26","22"]  # common categories fallback
        pages_left = max(1, total_pages - (len(collected)//50) - 10)
        per_cat = max(1, pages_left // max(1,len(cat_ids)))
        for cid in cat_ids:
            b = dict(base); b["videoCategoryId"] = cid
            ids = _search_pass(api_key, b, pages=per_cat, raw_dir=raw_dir, tag=f"p3_cat_{cid}")
            collected.update([i for i in ids if i])

    return list(collected)

#    Fetch details/statistics for a list of video IDs via videos.list.
def videos_details(api_key: str, video_ids: List[str], raw_dir: str) -> pd.DataFrame:
    rows = []
    batches = list(chunked(video_ids, 50))
    for i, batch in enumerate(tqdm(batches, desc="videos.list batches", unit="batch"), start=1):
        params = {"part": "snippet,contentDetails,statistics", "id": ",".join(batch)}
        data = yt_get("videos", params, api_key)
         # Optional raw capture per batch
        if SAVE_RAW:
            raw_path = os.path.join(raw_dir, f"videos_batch{i:04d}_{ts_now_utc()}.json")
            with open(raw_path, "w", encoding="utf-8") as f:
                json.dump(data, f, ensure_ascii=False, indent=2)
        for it in data.get("items", []):
            sn = it.get("snippet", {}) or {}
            st = it.get("statistics", {}) or {}
            cd = it.get("contentDetails", {}) or {}
            duration_sec = parse_iso8601_duration_to_seconds(cd.get("duration"))
            tags = sn.get("tags") or []
            rows.append({
                "videoId": it.get("id"),
                "channelId": sn.get("channelId"),
                "categoryId": sn.get("categoryId"),
                "publishedAt": sn.get("publishedAt"),
                "title": sn.get("title"),
                "description": sn.get("description"),
                "tags_str": ",".join(tags) if isinstance(tags, list) else None,
                "duration_sec": duration_sec,
                "viewCount": safe_int(st.get("viewCount")),
                "likeCount": safe_int(st.get("likeCount")),
                "commentCount": safe_int(st.get("commentCount")),
            })
    return pd.DataFrame(rows)

#Fetch channel-level statistics (viewCount, subscriberCount, videoCount, hiddenSubscriberCount).
def channels_stats(api_key: str, channel_ids: List[str], raw_dir: str) -> pd.DataFrame:
    rows = []
    seen = set()
    # Deduplicate, preserve order
    uniq = [c for c in channel_ids if c and c not in seen and not seen.add(c)]
    batches = list(chunked(uniq, 50))
    for i, batch in enumerate(tqdm(batches, desc="channels.list batches", unit="batch"), start=1):
        params = {"part": "statistics", "id": ",".join(batch)}
        data = yt_get("channels", params, api_key)
        # Optional raw capture per batch
        if SAVE_RAW:
            raw_path = os.path.join(raw_dir, f"channels_batch{i:04d}_{ts_now_utc()}.json")
            with open(raw_path, "w", encoding="utf-8") as f:
                json.dump(data, f, ensure_ascii=False, indent=2)
        for it in data.get("items", []):
            st = it.get("statistics", {}) or {}
            rows.append({
                "channelId": it.get("id"),
                "channel_viewCount": safe_int(st.get("viewCount")),
                "channel_subscriberCount": safe_int(st.get("subscriberCount")),
                "channel_videoCount": safe_int(st.get("videoCount")),
                "channel_hiddenSubscriberCount": bool(st.get("hiddenSubscriberCount")) if st.get("hiddenSubscriberCount") is not None else None
            })
    return pd.DataFrame(rows)


In [None]:
# ===== 5) RUN COLLECTION  =====

raw_day = datetime.now(timezone.utc).strftime("%Y-%m-%d")
raw_dir = os.path.join(OUT_ROOT, "raw", raw_day)
interim_dir = os.path.join(OUT_ROOT, "interim")
ensure_dir(raw_dir); ensure_dir(interim_dir)

# build the target time window 
# approx_age_hours clusters around ~60–90 hours rather than exactly 72
to_dt = lambda s: datetime.fromisoformat(s.replace("Z", "+00:00"))
_start = to_dt(iso_day_bounds_utc(DAYS_AGO)[0]) - timedelta(hours=WIDEN_HOURS)
_end   = to_dt(iso_day_bounds_utc(DAYS_AGO)[1]) + timedelta(hours=WIDEN_HOURS)
published_after = _start.isoformat().replace("+00:00","Z")
published_before = _end.isoformat().replace("+00:00","Z")
print("Target UTC window:", published_after, "→", published_before)

# Save categories reference
try:
    _ = fetch_categories(API_KEY, REGION_CODE, raw_dir)
except Exception as e:
    print("Warning fetching categories:", e)

# Search sweep (one big run; all categories together)
video_ids = search_all_categories_day(
    api_key=API_KEY,
    region_code=REGION_CODE,
    relevance_language=RELEVANCE_LANGUAGE,
    published_after=published_after,
    published_before=published_before,
    search_units_budget=SEARCH_UNITS,
    raw_dir=raw_dir
)
print(f"Found {len(video_ids)} video IDs (after de-dupe)")
if not video_ids:
    raise SystemExit("No videos found. Consider increasing SEARCH_UNITS or adjusting DAYS_AGO.")

# Enrich each ID with details/stats (videos.list)
vids_df = videos_details(API_KEY, video_ids, raw_dir)
print("videos.list returned rows:", len(vids_df))
if vids_df.empty:
    raise SystemExit("No details returned by videos.list.")

# 3) Filter out Shorts
vids_df["duration_sec"] = pd.to_numeric(vids_df["duration_sec"], errors="coerce")
# Modification to 180 seconds from previous 60 seconds due to discovering newer time limit for shorts. Additional filtering is carried out in combine_and_dedupe.py
vids_df["is_shorts"] = (vids_df["duration_sec"] <= 180)
vids_df = vids_df[~vids_df["is_shorts"]].copy()

# 4) Snapshot timing & ~72h renames,recording when looked (snapshot_at_utc) and how old each video was then.
snapshot_at = datetime.now(timezone.utc)
vids_df["publishedAt_utc"] = pd.to_datetime(vids_df["publishedAt"], utc=True, errors="coerce")
vids_df["snapshot_at_utc"] = snapshot_at.isoformat().replace("+00:00","Z")
vids_df["approx_age_hours"] = ((snapshot_at - vids_df["publishedAt_utc"]).dt.total_seconds() / 3600.0).round(2)
vids_df.rename(columns={
    "viewCount": "views_72h",
    "likeCount": "likes_72h",
    "commentCount": "comments_72h",
}, inplace=True)

# 5) Channel stats
try:
    ch_df = channels_stats(API_KEY, vids_df["channelId"].dropna().tolist(), raw_dir)
    if ch_df is not None and not ch_df.empty:
        vids_df = vids_df.merge(ch_df, on="channelId", how="left")
except Exception as e:
    print("Warning fetching channel stats:", e)

# quick flag to analyze how often engagement stats are missing
vids_df["has_missing_stats"] = vids_df[["likes_72h","comments_72h"]].isna().any(axis=1)

# 6) Select & save tidy columns
cols = [
    "videoId","channelId","categoryId",
    "publishedAt_utc","snapshot_at_utc","approx_age_hours",
    "title","description","tags_str",
    "duration_sec","is_shorts","has_missing_stats",
    "views_72h","likes_72h","comments_72h",
    "channel_subscriberCount","channel_viewCount","channel_videoCount","channel_hiddenSubscriberCount",
]
tidy = vids_df[cols].copy()

run_stamp = ts_now_utc()
out_parquet = os.path.join(interim_dir, f"labels_input_{run_stamp}.parquet")
tidy.to_parquet(out_parquet, index=False)
if SAVE_CSV:
    out_csv = os.path.join(interim_dir, f"labels_input_{run_stamp}.csv")
    tidy.to_csv(out_csv, index=False)

#text confirmation that files have been saved
print("\nSaved:")
print(" ", out_parquet)
if SAVE_CSV:
    print(" ", out_csv)
print(f"Rows: {len(tidy)} | Unique videos: {tidy['videoId'].nunique()}")


### API Quota Reference (rough)

- `search.list`: ~100 units **per page**
- `videos.list`: 1 unit per call (up to 50 ids)
- `channels.list`: 1 unit per call (up to 50 ids)

A `SEARCH_UNITS=9000` budget ≈ 90 pages of search results.