# Data Collection

In [3]:
"""
YouTube Engagement Snapshot Collector
-------------------------------------
Run once per day.

First run
  - Collect videos from the last 24 hours for SEARCH_QUERIES

Subsequent runs (days 1..7):
  - Append one engagement snapshot per day (views/likes/comments/timestamp)
    for the original set of videos, up to 7 days since publish
"""

import os
import time
from datetime import datetime, timedelta, timezone

import pandas as pd
from googleapiclient.discovery import build


API_KEY = 'AIzaSyBBGSADyqTlyuc7svXh9XMDyekaf2i_IM8'
youtube = build('youtube', 'v3', developerKey=API_KEY)

SEARCH_QUERIES = [
    "unboxing", "sponsored", "review", "haul",
    "TikTok made me buy it", "giveaway", "product launch", "viral ad"
]

VIDEO_FILE = "youtube_videos_16.csv"
SNAPSHOT_FILE = "engagement_snapshots_16.csv"

# Max days we keep taking snapshots for a video after publish
SNAPSHOT_DAYS = 7

# API safety: small delay between calls to reduce rate-limit risk
API_DELAY_SECS = 0.1


def get_today_bounds():
    """Return ISO timestamps for the last 24 hours (UTC)."""
    now = datetime.now(timezone.utc)
    yesterday = now - timedelta(days=1)
    return yesterday.isoformat(), now.isoformat()


def fetch_new_videos(query, published_after, published_before):
    """Search for videos by query within a 24h window (UTC)."""
    video_ids = []
    next_page_token = None
    while True:
        request = youtube.search().list(
            q=query,
            type="video",
            part="id",
            maxResults=50,
            publishedAfter=published_after,
            publishedBefore=published_before,
            order="date",
            pageToken=next_page_token,
        )
        response = request.execute()

        for item in response.get("items", []):
            vid = item["id"]["videoId"]
            video_ids.append(vid)

        next_page_token = response.get("nextPageToken")
        if not next_page_token:
            break

        time.sleep(API_DELAY_SECS)

    return list(set(video_ids))


def fetch_channel_stats(channel_ids):
    """
    Fetch channel-level statistics (subscriberCount, videoCount) ONCE for the seed.
    Returns DataFrame: channel_id, channel_subscriber_count, channel_video_count.
    """
    rows = []
    for i in range(0, len(channel_ids), 50):
        chunk = channel_ids[i : i + 50]
        resp = youtube.channels().list(
            part="statistics",
            id=",".join(chunk),
            maxResults=50,
        ).execute()

        for item in resp.get("items", []):
            ch_id = item["id"]
            stats = item.get("statistics", {}) or {}
            rows.append(
                {
                    "channel_id": ch_id,
                    "channel_subscriber_count": stats.get("subscriberCount"),
                    "channel_video_count": stats.get("videoCount"),
                }
            )
        time.sleep(API_DELAY_SECS)

    df = pd.DataFrame(rows)
    if not df.empty:
        df.drop_duplicates(subset=["channel_id"], inplace=True)
    return df


def fetch_video_details(video_ids):
    """
    Get snippet, statistics, contentDetails for a list of video IDs for the seed set.
    Also enrich with channel stats (subscriber/video counts) and save ONLY in VIDEO_FILE.
    """
    rows = []
    channel_ids = set()

    for i in range(0, len(video_ids), 50):
        chunk = video_ids[i : i + 50]
        response = youtube.videos().list(
            part="snippet,statistics,contentDetails",
            id=",".join(chunk),
        ).execute()

        for item in response.get("items", []):
            vid = item["id"]
            snippet = item.get("snippet", {}) or {}
            stats = item.get("statistics", {}) or {}
            content = item.get("contentDetails", {}) or {}

            publish_time = pd.to_datetime(snippet.get("publishedAt"))
            ch_id = snippet.get("channelId")
            if ch_id:
                channel_ids.add(ch_id)

            rows.append(
                {
                    "video_id": vid,
                    "title": snippet.get("title"),
                    "description": snippet.get("description"),
                    "tags": "|".join(snippet.get("tags", [])) if snippet.get("tags") else "",
                    "category_id": snippet.get("categoryId"),
                    "duration": content.get("duration"),
                    "publish_time": publish_time,
                    "channel_id": ch_id,
                    "channel_title": snippet.get("channelTitle"),
                    # seed-time stats (strings from API; will coerce below)
                    "view_count": stats.get("viewCount"),
                    "like_count": stats.get("likeCount"),
                    "comment_count": stats.get("commentCount"),
                }
            )

        time.sleep(API_DELAY_SECS)

    videos_df = pd.DataFrame(rows)
    if videos_df.empty:
        return videos_df

    videos_df.drop_duplicates(subset=["video_id"], inplace=True)

    # Merge channel stats once
    if channel_ids:
        ch_stats_df = fetch_channel_stats(list(channel_ids))
        videos_df = videos_df.merge(ch_stats_df, on="channel_id", how="left")

    return videos_df


def load_csv_or_empty(path, parse_dates=None):
    if os.path.exists(path):
        return pd.read_csv(path, parse_dates=parse_dates)
    return pd.DataFrame()


def coerce_stats_to_int(df, cols):
    """YouTube API returns numbers as strings; coerce safely."""
    for c in cols:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0).astype(int)
    return df


def update_snapshots(videos_df, snapshots_df):

    if videos_df.empty:
        print("No videos to snapshot.")
        return snapshots_df, 0

    now = datetime.now(timezone.utc)
    videos_df["publish_time"] = pd.to_datetime(videos_df["publish_time"], utc=True)

    # Only videos within the allowed snapshot window
    videos_df["days_since_publish"] = (now - videos_df["publish_time"]).dt.days
    active = videos_df[videos_df["days_since_publish"].between(0, SNAPSHOT_DAYS)]

    if active.empty:
        print("No active videos within snapshot window.")
        return snapshots_df, 0

    # Ensure snapshots_df has key columns
    if snapshots_df.empty:
        snapshots_df = pd.DataFrame(
            columns=[
                "video_id",
                "snapshot_day",
                "view_count",
                "like_count",
                "comment_count",
                "timestamp",
            ]
        )

    # Build a set of existing (video_id, snapshot_day) to avoid duplicates
    existing_pairs = set()
    if not snapshots_df.empty:
        existing_pairs = set(
            zip(
                snapshots_df["video_id"].astype(str),
                snapshots_df["snapshot_day"].astype(int),
            )
        )

    new_rows = []
    for _, row in active.iterrows():
        video_id = str(row["video_id"])
        snapshot_day = int(row["days_since_publish"])

        # Skip if today's snapshot already exists
        if (video_id, snapshot_day) in existing_pairs:
            continue

        try:
            resp = youtube.videos().list(part="statistics", id=video_id).execute()
            items = resp.get("items", [])
            if not items:
                print(f" No stats returned for video_id={video_id}")
                continue

            stats = items[0].get("statistics", {}) or {}

            new_rows.append(
                {
                    "video_id": video_id,
                    "snapshot_day": snapshot_day,
                    "view_count": stats.get("viewCount"),
                    "like_count": stats.get("likeCount"),
                    "comment_count": stats.get("commentCount"),
                    "timestamp": now.isoformat(),
                }
            )

            time.sleep(API_DELAY_SECS)

        except Exception as e:
            print(f"❌ Error fetching stats for video_id={video_id}: {e}")

    if not new_rows:
        print("ℹ️ No new snapshots to add today.")
        return snapshots_df, 0

    updates_df = pd.DataFrame(new_rows)
    updates_df = coerce_stats_to_int(
        updates_df, ["view_count", "like_count", "comment_count"]
    )
    snapshots_df = pd.concat([snapshots_df, updates_df], ignore_index=True)

    # Deduplicate safety
    snapshots_df.drop_duplicates(subset=["video_id", "snapshot_day"], keep="last", inplace=True)
    return snapshots_df, len(updates_df)


def main():
    # Load or initialize storage
    videos_df = load_csv_or_empty(VIDEO_FILE, parse_dates=["publish_time"])
    snapshots_df = load_csv_or_empty(SNAPSHOT_FILE)

    if videos_df.empty:
        # ======================= DAY 0: SEED COLLECTION =======================
        print("🔍 First run detected: collecting videos from the last 24 hours...")
        published_after, published_before = get_today_bounds()

        all_ids = set()
        for q in SEARCH_QUERIES:
            ids = fetch_new_videos(q, published_after, published_before)
            all_ids.update(ids)
        all_ids = list(all_ids)
        print(f"✅ Found {len(all_ids)} new video(s) to seed.")

        if all_ids:
            new_videos_df = fetch_video_details(all_ids)

            # Coerce numeric stats for both video and channel fields (seed only)
            new_videos_df = coerce_stats_to_int(
                new_videos_df,
                [
                    "view_count",
                    "like_count",
                    "comment_count",
                    "channel_subscriber_count",
                    "channel_video_count",
                ],
            )

            # Save seed set with channel stats
            new_videos_df.to_csv(VIDEO_FILE, index=False)
            videos_df = new_videos_df
            print(f" Saved {len(new_videos_df)} video(s) to {VIDEO_FILE}")
        else:
            print(" No videos found for seeding. Exiting.")
            return

    else:
        print("ℹ️ Seed videos already exist. Skipping new video collection.")

    # ======================= DAYS 0..7: SNAPSHOTS ONLY =======================
    print(" Updating engagement snapshots (one per video per day, up to 7 days)...")
    snapshots_df, added = update_snapshots(videos_df, snapshots_df)

    if added > 0:
        snapshots_df.to_csv(SNAPSHOT_FILE, index=False)
        print(f"Added {added} snapshot(s). Saved to {SNAPSHOT_FILE}")
    else:
        print("No snapshots added today.")

    print(" Done")


if __name__ == "__main__":
    main()


ℹ️ Seed videos already exist. Skipping new video collection.
📊 Updating engagement snapshots (one per video per day, up to 7 days)...
⚠️ No stats returned for video_id=HY1Rd-B3y-Q
⚠️ No stats returned for video_id=P8aqzsVTAeQ
⚠️ No stats returned for video_id=ol30GkzEnjA
⚠️ No stats returned for video_id=8fzzEXyJJDg
⚠️ No stats returned for video_id=aSNb7DdI39g
⚠️ No stats returned for video_id=6RAIxSCHBrs
⚠️ No stats returned for video_id=My2Bzh6rZUM
⚠️ No stats returned for video_id=6tyCVWdxx6Q
⚠️ No stats returned for video_id=wLIIagoWGb8
⚠️ No stats returned for video_id=r789iCf6NR0
⚠️ No stats returned for video_id=DqrTg6KVCE8
⚠️ No stats returned for video_id=1eQUW8g8PgQ
⚠️ No stats returned for video_id=USDqFyn8UPA
⚠️ No stats returned for video_id=qiH0YbSWNGg
⚠️ No stats returned for video_id=_1YZvGPgLFE
⚠️ No stats returned for video_id=DmOkTjAp79I
⚠️ No stats returned for video_id=UZND5UyN4DU
⚠️ No stats returned for video_id=WvKR0Sdo4pc
⚠️ No stats returned for video_id=qc9x