In [None]:
import os
os.environ["YOUTUBE_API_KEY"] = #I can't post this


In [None]:

import os, re, time, math, json, random, pathlib, datetime as dt
from typing import List, Dict, Tuple, Optional

import pandas as pd


USE_PARQUET = True


API_KEY = os.environ.get("YOUTUBE_API_KEY")
if not API_KEY:
    raise RuntimeError("Missing YOUTUBE_API_KEY env var — please set it before running.")

try:
    from googleapiclient.discovery import build
    from googleapiclient.errors import HttpError
except Exception as e:
    raise RuntimeError("Please install google-api-python-client (pip install google-api-python-client).") from e

YOUTUBE = build("youtube", "v3", developerKey=API_KEY)


DATA_DIR = pathlib.Path("./yt_climate_data")
DATA_DIR.mkdir(parents=True, exist_ok=True)


PROGRESS_PATH = DATA_DIR / "progress.json"


def _sleep_backoff(attempt: int):
    
    delay = min(0.5 * (2 ** attempt) + random.random() * 0.25, 8.0)
    time.sleep(delay)

def _yt_call(method, **kwargs):
    
    attempts = 0
    while True:
        try:
            return method(**kwargs).execute()
        except HttpError as e:
            status = getattr(e, "status_code", None)
            
            if e.resp.status in (403, 429, 500, 503):
                attempts += 1
                if attempts > 6:
                    raise
                _sleep_backoff(attempts)
            else:
                raise


def load_progress() -> Dict[str, str]:
    if PROGRESS_PATH.exists():
        try:
            return json.loads(PROGRESS_PATH.read_text(encoding="utf-8"))
        except Exception:
            return {}
    return {}

def save_progress(prog: Dict[str, str]):
    tmp = PROGRESS_PATH.with_suffix(".json.tmp")
    tmp.write_text(json.dumps(prog, ensure_ascii=False, indent=2), encoding="utf-8")
    tmp.replace(PROGRESS_PATH)


def save_df(df: pd.DataFrame, path: pathlib.Path):
    if USE_PARQUET:
        try:
            df.to_parquet(path, index=False)
            return
        except Exception as e:
            print(f"[warn] Parquet failed ({e}). Falling back to CSV.")
    df.to_csv(path.with_suffix(".csv"), index=False, encoding="utf-8")

def load_existing_ids(path: pathlib.Path) -> set:
    if not path.exists():
        alt = path.with_suffix(".csv")
        if not alt.exists():
            return set()
        df = pd.read_csv(alt, usecols=["comment_id"])
        return set(df["comment_id"].astype(str))
    df = pd.read_parquet(path, columns=["comment_id"])
    return set(df["comment_id"].astype(str))


def to_dt(s: Optional[str]) -> Optional[dt.datetime]:
    if not s: return None
    try:
        return dt.datetime.fromisoformat(s.replace("Z", "+00:00"))
    except Exception:
        return None


In [None]:

from typing import Dict, List, Optional


CHANNELS_US: Dict[str, str] = {
    "CNN": "UCupvZG-5ko_eiXAupbDfxWw",
    "Fox News": "UCXIJgqnII2ZOINSWNOGFThA",
    "MSNBC": "UCaXkIU1QidjPwiAYu6GcHjg",
    "PBS NewsHour": "UC6ZFN9Tx6xh-skXCuRHCDpQ",
    "ABC News": "UCBi2mrWuNuyYy4gbM6fU18Q",   
    "CBS News": "UC8p1vwvWtl6T73JiExfWs1g",    
    "NBC News": "UCeY0bbntWzzVIaj2z3QigXg",
    "Bloomberg": "UCIALMKvObZNtJ6AmdCLP7Lg",
}

CHANNELS_EU: Dict[str, str] = {
    "BBC News": "UC16niRr50-MSBwiO3YDb3RA",
    "Sky News": "UCoMdktPbSTixAyNGwb-UYkQ",
    "DW News": "UCknLrEdhRCp1aegoMqRaCZg",
    "euronews (English)": "UCSrZ3UV4jOidv8ppoVuvW9Q",
    "France 24 English": "UCQfwfsi5VrQ8yKZ-UWmAEFg",
    "The Guardian": "UCIRYBXDze5krPDzAEOxFGVA",
    "Financial Times": "UCoUxsWakJucWg46KW5RsvPw",  
    "Channel 4 News": "UCTrQ7HXWRRxr7OsOtodr2_w",   
}


REGION_LABELS = {
    **{cid: "US" for cid in CHANNELS_US.values()},
    **{cid: "EU" for cid in CHANNELS_EU.values()},
}


ALL_CHANNELS: Dict[str, str] = {**CHANNELS_US, **CHANNELS_EU}


def get_channel_countries(channel_ids: List[str]) -> Dict[str, Optional[str]]:
    countries: Dict[str, Optional[str]] = {}
    for i in range(0, len(channel_ids), 50):
        resp = _yt_call(
            YOUTUBE.channels().list,
            part="snippet,brandingSettings",
            id=",".join(channel_ids[i:i+50])
        )
        for it in resp.get("items", []):
            cid = it["id"]
            country = (it.get("snippet", {}) or {}).get("country") \
                      or (it.get("brandingSettings", {}).get("channel", {}) or {}).get("country")
            countries[cid] = country
    return countries

CHANNEL_COUNTRIES = get_channel_countries(list(REGION_LABELS.keys()))


In [None]:
import pandas as pd


print(f"US channels: {len(CHANNELS_US)} | EU channels: {len(CHANNELS_EU)} | Total: {len(ALL_CHANNELS)}")


meta_rows = []
for name, cid in ALL_CHANNELS.items():
    meta_rows.append({
        "channel_name": name,
        "channel_id": cid,
        "region_group": REGION_LABELS.get(cid, "UNK"),
        "api_country": CHANNEL_COUNTRIES.get(cid)  
    })
df_channels = pd.DataFrame(meta_rows).sort_values(["region_group","channel_name"])
display(df_channels)


unknown_regions = [r for r in meta_rows if r["region_group"] == "UNK"]
if unknown_regions:
    print("[warn] Some channels missing region labels:", [r["channel_name"] for r in unknown_regions])

missing_country = [r["channel_name"] for r in meta_rows if r["api_country"] in (None, "")]
if missing_country:
    print("[note] These channels don’t expose a country in the API (normal):", missing_country)


In [None]:
import re
import pandas as pd
import datetime as dt


DATE_FROM = "2010-01-01"
DATE_TO   = dt.datetime.now(dt.timezone.utc).date().isoformat()


KEYWORD_PAGES = 1   

CLIMATE_KEYWORDS = [
    "global warming",
    "climate change",
    "climate crisis",
    "climate emergency",
    "global heating",
    "climate heating",
    "environmental warming",
    "global temperature",
    "greenhouse effect",
    "greenhouse impact",
    "paris agreement",
    "paris climate accord",
    "carbon neutrality",
    "carbon emissions",
    "paris climate agreement"
]


_KEYWORDS_LC = [k.lower() for k in CLIMATE_KEYWORDS]
def is_climate_text(text: str | None) -> bool:
    if not text:
        return False
    t = text.lower()
    return any(k in t for k in _KEYWORDS_LC)


def _rfc3339_start(date_str: str) -> str:
    return f"{date_str}T00:00:00Z"

def _rfc3339_end(date_str: str) -> str:
    
    return f"{date_str}T23:59:59Z"


def get_uploads_playlist_id(channel_id: str) -> str | None:
    resp = _yt_call(YOUTUBE.channels().list, part="contentDetails", id=channel_id)
    items = resp.get("items", [])
    if not items:
        return None
    return items[0]["contentDetails"]["relatedPlaylists"]["uploads"]

def list_channel_video_ids_with_dates(channel_id: str) -> list[tuple[str, str | None]]:
    """
    Enumerate (video_id, publishedAt) for all uploads via playlistItems.
    Uses contentDetails.videoPublishedAt when present, else snippet.publishedAt.
    """
    uploads = get_uploads_playlist_id(channel_id)
    if not uploads:
        return []
    out, token = [], None
    while True:
        resp = _yt_call(
            YOUTUBE.playlistItems().list,
            part="contentDetails,snippet",   
            playlistId=uploads,
            maxResults=50,
            pageToken=token
        )
        for it in resp.get("items", []):
            cd = it.get("contentDetails", {}) or {}
            sn = it.get("snippet", {}) or {}
            vid = cd.get("videoId")
            vdate = cd.get("videoPublishedAt") or sn.get("publishedAt")
            if vid:
                out.append((vid, vdate))
        token = resp.get("nextPageToken")
        if not token:
            break
    return out

def batch_get_video_meta(video_ids: list[str]) -> list[dict]:
    out = []
    for i in range(0, len(video_ids), 50):
        resp = _yt_call(
            YOUTUBE.videos().list,
            part="snippet,statistics",
            id=",".join(video_ids[i:i+50])
        )
        out.extend(resp.get("items", []))
    return out


def _search_channel_for_keyword(
    channel_id: str,
    keyword: str,
    date_from: str,
    date_to: str,
    max_pages: int = 1,
    order: str = "date"
) -> list[dict]:
    """
    Run a channel-scoped keyword search in a date window.
    Returns a list of {'videoId','publishedAt','title'} (snippet-level).
    """
    out, token, pages = [], None, 0
    while True:
        resp = _yt_call(
            YOUTUBE.search().list,
            part="snippet",
            channelId=channel_id,
            q=keyword,
            type="video",
            maxResults=50,
            order=order,  
            publishedAfter=_rfc3339_start(date_from),
            publishedBefore=_rfc3339_end(date_to),
            pageToken=token
        )
        pages += 1
        for it in resp.get("items", []):
            sid = it.get("id", {}) or {}
            vid = sid.get("videoId")
            sn  = it.get("snippet", {}) or {}
            if vid:
                out.append({
                    "videoId": vid,
                    "publishedAt": sn.get("publishedAt",""),
                    "title": sn.get("title","")
                })
        token = resp.get("nextPageToken")
        if not token or pages >= max_pages:
            break
    return out

def discover_climate_videos_for_channel(
    channel_id: str,
    cap_per_channel: int = 60,
    date_from: str | None = DATE_FROM,
    date_to: str | None = DATE_TO,
    verbose: bool | None = False,
    per_keyword_pages: int = KEYWORD_PAGES
) -> list[dict]:
    """
    Simpler discovery:
      - For each keyword, run channel-scoped search.list in [date_from, date_to]
      - Merge & dedupe video IDs
      - Batch fetch snippet+statistics
      - Keep items whose title/desc/tags contain ANY keyword
      - Sort by publish date (desc) then comment count; cap to `cap_per_channel`
    """
    if not date_from or not date_to:
        
        date_from, date_to = "2000-01-01", DATE_TO

    
    cand = {}
    for kw in CLIMATE_KEYWORDS:
        hits = _search_channel_for_keyword(channel_id, kw, date_from, date_to, max_pages=per_keyword_pages)
        for h in hits:
            vid = h["videoId"]
            
            if vid not in cand:
                cand[vid] = h

    if verbose:
        print(f"[discover-simple] channel={channel_id}: candidates_from_search={len(cand)} "
              f"(keywords={len(CLIMATE_KEYWORDS)}, pages/kw={per_keyword_pages})")

    if not cand:
        return []

    ids = list(cand.keys())

    
    metas = batch_get_video_meta(ids)

    
    selected = []
    for it in metas:
        sn = it.get("snippet", {}) or {}
        title = sn.get("title","")
        desc  = sn.get("description","")
        tags  = " ".join(sn.get("tags", []) or [])
        if is_climate_text(title) or is_climate_text(desc) or is_climate_text(tags):
            stats = it.get("statistics", {}) or {}
            selected.append({
                "video_id": it["id"],
                "video_title": title,
                "video_publish_date": sn.get("publishedAt",""),
                "video_comment_count": int(stats.get("commentCount", 0) or 0),
                "video_view_count": int(stats.get("viewCount", 0) or 0),
            })

    
    selected.sort(key=lambda r: (r["video_publish_date"], r["video_comment_count"]), reverse=True)
    out = selected[:cap_per_channel]

    if verbose:
        print(f"[discover-simple] channel={channel_id}: kept={len(out)} (from {len(selected)} filtered, "
              f"{len(cand)} raw), window={date_from}→{date_to}")

    return out


In [None]:
VERBOSE_DISCOVERY = True
print("DATE_FROM → DATE_TO:", DATE_FROM, "→", DATE_TO)


tests = [
    "Climate change is accelerating; IPCC report 2023",
    "Global Warming hoax? debate",
    "We installed solar and heat pumps at home",
    "Sports highlights from last night"
]
for t in tests:
    print(f"{t!r} -> {is_climate_text(t)}")


import pandas as pd

def list_channel_video_ids_with_dates_head(channel_id: str, max_pages: int = 2):
    uploads = get_uploads_playlist_id(channel_id)
    if not uploads:
        return []
    out, token, pages = [], None, 0
    while True:
        resp = _yt_call(
            YOUTUBE.playlistItems().list,
            part="contentDetails",
            playlistId=uploads,
            maxResults=50,
            pageToken=token
        )
        pages += 1
        for it in resp.get("items", []):
            cd = it.get("contentDetails", {}) or {}
            out.append((cd.get("videoId"), cd.get("videoPublishedAt") or cd.get("publishedAt")))
        token = resp.get("nextPageToken")
        if not token or pages >= max_pages:
            break
    return out

def quick_preview_one_channel(channel_name: str, head_pages: int = 2, cap_per_channel: int = 8):
    cid = ALL_CHANNELS[channel_name]
    pairs = list_channel_video_ids_with_dates_head(cid, max_pages=head_pages)
    dfp = pd.DataFrame(pairs, columns=["video_id","video_publishedAt_raw"])
    dfp["video_publishedAt"] = pd.to_datetime(dfp["video_publishedAt_raw"], utc=True, errors="coerce")

    # date-window filter
    dfp = dfp[
        (dfp["video_publishedAt"] >= pd.to_datetime(DATE_FROM, utc=True))
        & (dfp["video_publishedAt"] <= pd.to_datetime(DATE_TO,   utc=True))
    ]
    ids = dfp["video_id"].dropna().astype(str).tolist()
    metas = batch_get_video_meta(ids)

    rows = []
    for it in metas:
        sn = it.get("snippet", {}) or {}
        title = sn.get("title",""); desc = sn.get("description",""); tags = " ".join(sn.get("tags", []) or [])
        if is_climate_text(title) or is_climate_text(desc) or is_climate_text(tags):
            rows.append({
                "video_id": it["id"],
                "video_title": title,
                "video_publish_date": sn.get("publishedAt",""),
            })
    df = pd.DataFrame(rows)
    print(f"[{channel_name}] pages_scanned={head_pages} | uploads_seen={len(dfp)} | climate_hits={len(df)}")
    display(df.head(cap_per_channel))
    return df


_ = quick_preview_one_channel("CNN", head_pages=8, cap_per_channel=15)



import pandas as pd

def _get_channel_titles(channel_ids):
    titles = {}
    for i in range(0, len(channel_ids), 50):
        resp = _yt_call(YOUTUBE.channels().list, part="snippet", id=",".join(channel_ids[i:i+50]))
        for it in resp.get("items", []):
            titles[it["id"]] = it.get("snippet", {}).get("title", "")
    return titles

def _list_upload_ids_head(channel_id: str, head_pages: int = 1):
    uploads = get_uploads_playlist_id(channel_id)
    if not uploads:
        return False, []  
    ids, token, pages = [], None, 0
    while True:
        resp = _yt_call(
            YOUTUBE.playlistItems().list,
            part="contentDetails",
            playlistId=uploads,
            maxResults=50,
            pageToken=token
        )
        pages += 1
        for it in resp.get("items", []):
            cd = it.get("contentDetails", {}) or {}
            vid = cd.get("videoId")
            if vid:
                ids.append(vid)
        token = resp.get("nextPageToken")
        if not token or pages >= head_pages:
            break
    return True, ids

def quick_all_channel_check(
    channels_map,
    region_labels,
    head_pages: int = 1,
    check_climate_head: bool = True
) -> pd.DataFrame:
    
    cid_list = list(channels_map.values())
    api_titles = _get_channel_titles(cid_list)

    rows = []
    for name, cid in channels_map.items():
        uploads_ok, ids = _list_upload_ids_head(cid, head_pages=head_pages)
        uploads_seen = len(ids)

        climate_hits_head = None
        if check_climate_head and ids:
            
            resp = _yt_call(YOUTUBE.videos().list, part="snippet", id=",".join(ids[:50]))
            hits = 0
            for it in resp.get("items", []):
                sn = it.get("snippet", {}) or {}
                title = sn.get("title","")
                desc  = sn.get("description","")
                tags  = " ".join(sn.get("tags", []) or [])
                if is_climate_text(title) or is_climate_text(desc) or is_climate_text(tags):
                    hits += 1
            climate_hits_head = hits

        rows.append({
            "channel_name_cohort": name,
            "channel_id": cid,
            "api_title": api_titles.get(cid, ""),
            "region_group": region_labels.get(cid, "UNK"),
            "uploads_playlist_ok": uploads_ok,
            "uploads_seen_head": uploads_seen,
            "climate_hits_head": climate_hits_head,
        })

    df = pd.DataFrame(rows).sort_values(["region_group","channel_name_cohort"]).reset_index(drop=True)

    
    bad_uploads = df[~df["uploads_playlist_ok"]]
    print(f"Channels checked: {len(df)} | with uploads playlist: {len(df) - len(bad_uploads)} | missing uploads: {len(bad_uploads)}")
    if bad_uploads.shape[0]:
        print("!! These channels returned no uploads playlist (ID likely wrong):")
        display(bad_uploads[["channel_name_cohort","channel_id","region_group","api_title"]])

    if check_climate_head:
        total_hits = int(df["climate_hits_head"].fillna(0).sum())
        by_region = (df.groupby("region_group", as_index=False)["climate_hits_head"]
                       .sum()
                       .rename(columns={"climate_hits_head":"climate_hits_head_sum"}))
        print(f"\nHead-sample climate hits (approx, first {head_pages*50} uploads/channel): {total_hits}")
        display(by_region)

    print("\nPer-channel snapshot (head sample only):")
    display(df)
    return df


print("== ALL CHANNELS — head_pages=1, climate check on ==")
df_s4_lite_all = quick_all_channel_check(ALL_CHANNELS, REGION_LABELS, head_pages=1, check_climate_head=True)

print("\n== US ONLY — head_pages=1 ==")
df_s4_lite_us = quick_all_channel_check(CHANNELS_US, REGION_LABELS, head_pages=1, check_climate_head=True)

print("\n== EU ONLY — head_pages=1 ==")
df_s4_lite_eu = quick_all_channel_check(CHANNELS_EU, REGION_LABELS, head_pages=1, check_climate_head=True)


In [None]:

from typing import Optional, List, Dict, Tuple
from googleapiclient.errors import HttpError
import datetime as dt



def _skip_reason_from_http_error(e: Exception) -> Optional[str]:
    """
    Extract a simple reason tag from HttpError content, if recognizable.
    """
    s = ""
    if hasattr(e, "content") and e.content:
        try:
            s = e.content.decode("utf-8", "ignore")
        except Exception:
            s = str(e)
    else:
        s = str(e)
    for key in (
        "commentsDisabled",
        "forbidden",
        "videoNotFound",
        "processingFailure",
        "insufficientPermissions",
        "quotaExceeded",
    ):
        if key in s:
            return key
    return None

def get_video_commentability(video_id: str, skip_if_zero: bool = False) -> Tuple[bool, str]:
    """
    Quick preflight: check if a video is likely to accept comment fetches.
    Uses videos.list(part='status,statistics').

    Heuristics:
      - madeForKids -> skip
      - statistics.commentCount missing -> likely disabled/hidden -> skip
      - if skip_if_zero=True and commentCount == 0 -> skip (otherwise allow)

    Returns: (is_commentable, reason)
    """
    try:
        resp = _yt_call(
            YOUTUBE.videos().list,
            part="status,statistics",
            id=video_id
        )
    except HttpError as e:
        reason = _skip_reason_from_http_error(e) or "video_api_error"
        return False, reason

    items = resp.get("items", [])
    if not items:
        return False, "video_not_found"

    v = items[0]
    status = v.get("status", {}) or {}
    stats  = v.get("statistics", {}) or {}

    if status.get("madeForKids") or status.get("selfDeclaredMadeForKids"):
        return False, "made_for_kids"

    if "commentCount" not in stats:
        
        return False, "comments_disabled_or_hidden"

    if skip_if_zero and int(stats.get("commentCount", 0) or 0) == 0:
        return False, "no_comments"

    return True, "ok"




def fetch_comments_for_video(
    video_id: str,
    since_iso: Optional[str] = None,
    include_replies: bool = False,
    max_top_level: Optional[int] = None,
    order: str = "time",
    precheck: bool = True,
    skip_if_zero: bool = False,
) -> List[Dict]:
    """
    Fetch top-level comments (and optionally replies) for a video (newest-first by default).

    Parameters
    ----------
    video_id : str
        Target video ID.
    since_iso : ISO8601 or None
        If provided, keep comments with publishedAt > since_iso (used for incremental runs).
    include_replies : bool
        If True, fetch all replies for each top-level comment (adds quota).
    max_top_level : int or None
        Cap on number of top-level comments to fetch per video (replies not capped).
    order : str
        'time' (newest first) or 'relevance' (YouTube sorting). Defaults to 'time'.
    precheck : bool
        If True, preflight with videos.list(status,statistics) to skip known-bad videos early.
    skip_if_zero : bool
        With precheck, if True and commentCount==0, skip the video.

    Returns
    -------
    List[Dict]
        Each dict includes:
            comment_id, parent_id (None for top-level), comment_text, like_count,
            comment_published_at, author_channel_id
    """
    
    if precheck:
        ok, why = get_video_commentability(video_id, skip_if_zero=skip_if_zero)
        if not ok:
            print(f"[skip-pre] {video_id}: {why}")
            return []

    rows: List[Dict] = []
    page_token: Optional[str] = None
    fetched_top = 0
    cutoff_dt = to_dt(since_iso) if since_iso else None

    while True:
        try:
            resp = _yt_call(
                YOUTUBE.commentThreads().list,
                part="snippet",
                videoId=video_id,
                maxResults=100,
                order=order,
                textFormat="plainText",
                pageToken=page_token
            )
        except HttpError as e:
            reason = _skip_reason_from_http_error(e)
            if reason:
                print(f"[skip] threads {video_id}: {reason}")
                return rows
            raise

        items = resp.get("items", [])
        if not items:
            break

        for th in items:
            top = th["snippet"]["topLevelComment"]
            csn = top["snippet"]
            top_id = top["id"]
            published = csn.get("publishedAt", "")
            p_dt = to_dt(published)

            
            if cutoff_dt and p_dt and p_dt <= cutoff_dt:
                continue

            rows.append({
                "comment_id": top_id,
                "parent_id": None,
                "comment_text": csn.get("textDisplay") or csn.get("textOriginal", ""),
                "like_count": int(csn.get("likeCount", 0) or 0),
                "comment_published_at": published,
                "author_channel_id": (csn.get("authorChannelId") or {}).get("value"),
            })
            fetched_top += 1

            
            if include_replies and th["snippet"].get("totalReplyCount", 0) > 0:
                rp_token = None
                while True:
                    try:
                        rresp = _yt_call(
                            YOUTUBE.comments().list,
                            part="snippet",
                            parentId=top_id,
                            maxResults=100,
                            textFormat="plainText",
                            pageToken=rp_token
                        )
                    except HttpError as e:
                        reason = _skip_reason_from_http_error(e)
                        if reason:
                            print(f"[skip] replies {video_id}/{top_id}: {reason}")
                            break
                        raise

                    for r in rresp.get("items", []):
                        rsn = r["snippet"]
                        r_published = rsn.get("publishedAt", "")
                        r_dt = to_dt(r_published)
                        if cutoff_dt and r_dt and r_dt <= cutoff_dt:
                            continue
                        rows.append({
                            "comment_id": r["id"],
                            "parent_id": top_id,
                            "comment_text": rsn.get("textDisplay") or rsn.get("textOriginal", ""),
                            "like_count": int(rsn.get("likeCount", 0) or 0),
                            "comment_published_at": r_published,
                            "author_channel_id": (rsn.get("authorChannelId") or {}).get("value"),
                        })

                    rp_token = rresp.get("nextPageToken")
                    if not rp_token:
                        break

            if max_top_level and fetched_top >= max_top_level:
                break

        if max_top_level and fetched_top >= max_top_level:
            break

        page_token = resp.get("nextPageToken")
        if not page_token:
            break

    return rows




def quota_estimate_comments(num_videos: int, avg_top_level_per_video: int = 500, include_replies: bool = False) -> int:
    """
    Very rough units estimate for comment crawling:
      ~1 call per 100 top-level comments page, plus ~30% extra if replies on (highly variable).
    """
    pages = max(1, (avg_top_level_per_video + 99) // 100)
    total = num_videos * pages
    if include_replies:
        total = int(total * 1.3)
    return total


In [None]:
import pandas as pd

test_channel = "CNN"  
test_cid = ALL_CHANNELS[test_channel]


one = discover_climate_videos_for_channel(test_cid, cap_per_channel=1, date_from=DATE_FROM, date_to=DATE_TO, verbose=True)
assert len(one) >= 1, "No climate video found in window for test channel."
vid = one[0]["video_id"]
print(f"Testing video: {vid} | {one[0]['video_title']}")

comments = fetch_comments_for_video(vid, since_iso=None, include_replies=False, max_top_level=100)
print(f"Fetched {len(comments)} comments (top-level)")

dfc = pd.DataFrame(comments)
if not dfc.empty:
    
    dfc["comment_published_at"] = pd.to_datetime(dfc["comment_published_at"], errors="coerce", utc=True)
    display(dfc.head(10)[["comment_id","comment_published_at","like_count","comment_text"]])
else:
    print("No comments returned (possible if comments are disabled or locked).")


In [None]:
print("Est (units) for 500 top-level comments across 200 videos, replies off:",
      quota_estimate_comments(num_videos=200, avg_top_level_per_video=500, include_replies=False))
print("Est (units) with replies on (~30% overhead):",
      quota_estimate_comments(num_videos=200, avg_top_level_per_video=500, include_replies=True))


In [None]:
from typing import Dict, List, Optional
import pandas as pd
import datetime as dt
from collections import defaultdict


def discover_climate_by_year(channel_id: str, start_year=2010, end_year=2025, per_year_cap=6, verbose=False) -> List[dict]:
    """
    Time-balanced discovery: pick up to per_year_cap climate videos per year.
    Uses playlistItems -> videos (batch) and Cell C's is_climate_text.
    """
    pairs = list_channel_video_ids_with_dates(channel_id)
    if not pairs:
        return []
    df = pd.DataFrame(pairs, columns=["video_id","video_publishedAt_raw"])
    df["dt"] = pd.to_datetime(df["video_publishedAt_raw"], utc=True, errors="coerce")
    df["year"] = df["dt"].dt.year

    results: List[dict] = []
    for y in range(start_year, end_year + 1):
        ids = df.loc[df["year"] == y, "video_id"].dropna().astype(str).tolist()
        if not ids:
            continue
        metas = batch_get_video_meta(ids)
        year_hits = []
        for it in metas:
            sn = it.get("snippet", {}) or {}
            title = sn.get("title",""); desc = sn.get("description",""); tags = " ".join(sn.get("tags", []) or [])
            if is_climate_text(title) or is_climate_text(desc) or is_climate_text(tags):
                stats = it.get("statistics", {}) or {}
                year_hits.append({
                    "video_id": it["id"],
                    "video_title": title,
                    "video_publish_date": sn.get("publishedAt",""),
                    "video_comment_count": int(stats.get("commentCount", 0) or 0),
                    "video_view_count": int(stats.get("viewCount", 0) or 0),
                    "year": y,
                })
        
        year_hits.sort(key=lambda r: r["video_comment_count"], reverse=True)
        kept = year_hits[:per_year_cap]
        results.extend(kept)
        if verbose and year_hits:
            print(f"[per-year] channel={channel_id} year={y} matches={len(year_hits)} kept={len(kept)}")
    
    results.sort(key=lambda r: r["video_publish_date"])
    return results



def build_video_manifest(
    channels_map: Dict[str, str],
    region_labels: Dict[str, str],
    mode: str = "recent",                
    cap_per_channel: int = 60,           
    date_from: Optional[str] = DATE_FROM,
    date_to: Optional[str] = DATE_TO,
    # per-year options:
    start_year: int = 2010,
    end_year: int = 2025,
    per_year_cap: int = 6,
    verbose: bool = False,
) -> pd.DataFrame:
    """
    Returns a DataFrame of videos to crawl with channel + region metadata.
    Columns: [video_id, video_title, video_publish_date, video_comment_count, video_view_count,
              channel_name, channel_id, region_group]
    """
    discovered: List[dict] = []

    for name, cid in channels_map.items():
        if mode == "recent":
            vids = discover_climate_videos_for_channel(
                channel_id=cid,
                cap_per_channel=cap_per_channel,
                date_from=date_from,
                date_to=date_to,
                verbose=verbose
            )
        elif mode == "per_year":
            vids = discover_climate_by_year(
                channel_id=cid,
                start_year=start_year,
                end_year=end_year,
                per_year_cap=per_year_cap,
                verbose=verbose
            )
        else:
            raise ValueError("mode must be 'recent' or 'per_year'.")

        for v in vids:
            discovered.append({
                "video_id": v["video_id"],
                "video_title": v["video_title"],
                "video_publish_date": v.get("video_publish_date",""),
                "video_comment_count": v.get("video_comment_count", 0),
                "video_view_count": v.get("video_view_count", 0),
                "channel_name": name,
                "channel_id": cid,
                "region_group": region_labels.get(cid, "UNK"),
            })

        if verbose:
            print(f"[manifest] {name} ({cid}) -> {len(vids)} videos")

    df = pd.DataFrame(discovered)
    
    if not df.empty:
        save_df(df, DATA_DIR / f"manifest_{mode}_{date_from}_{date_to}.parquet")
    return df



def run_crawl_from_manifest(
    manifest_df: pd.DataFrame,
    include_replies: bool = False,
    max_top_level_per_video: Optional[int] = None,
    out_basename: str = "climate_comments",
    ignore_progress: bool = False
):
    """
    Iterates over the manifest, fetches comments, writes to Parquet/CSV, and updates progress.json
    """
    if manifest_df.empty:
        print("Manifest is empty — nothing to crawl.")
        return

    progress = {} if ignore_progress else load_progress()
    all_rows: List[dict] = []

    for _, row in manifest_df.iterrows():
        vid = row["video_id"]
        since_iso = None if ignore_progress else progress.get(vid)
        rows = fetch_comments_for_video(
            video_id=vid,
            since_iso=since_iso,
            include_replies=include_replies,
            max_top_level=max_top_level_per_video
        )
        if not rows:
            continue

        
        for r in rows:
            r.update({
                "video_id": vid,
                "video_title": row["video_title"],
                "video_publish_date": row["video_publish_date"],
                "channel_id": row["channel_id"],
                "channel_name": row["channel_name"],
                "region_group": row["region_group"],
            })

        
        newest = max((to_dt(r["comment_published_at"]) for r in rows if r.get("comment_published_at")), default=None)
        if newest:
            progress[vid] = newest.astimezone(dt.timezone.utc).isoformat().replace("+00:00", "Z")

        all_rows.extend(rows)

    
    if not ignore_progress:
        save_progress(progress)

    if not all_rows:
        print("No new comments fetched (up to date or empty).")
        return

    
    out_path = DATA_DIR / f"{out_basename}.parquet"
    df = pd.DataFrame(all_rows)
    df.drop_duplicates(subset=["comment_id"], inplace=True)
    for col in ("comment_published_at", "video_publish_date"):
        if col in df.columns:
            df[col] = pd.to_datetime(df[col], errors="coerce", utc=True)

    existing_ids = load_existing_ids(out_path)
    if existing_ids:
        before = len(df)
        df = df[~df["comment_id"].astype(str).isin(existing_ids)].copy()
        print(f"Deduped vs existing: {before} -> {len(df)} new rows")

    if df.empty:
        print("No new rows to write after dedupe.")
        return

    save_df(df, out_path)
    print(f"Wrote {len(df)} rows to {out_path}")



def build_monthly_aggregates(out_basename: str = "climate_comments"):
    path = DATA_DIR / f"{out_basename}.parquet"
    alt  = path.with_suffix(".csv")
    if path.exists():
        base = pd.read_parquet(path)
    elif alt.exists():
        base = pd.read_csv(alt, parse_dates=["comment_published_at","video_publish_date"])
    else:
        print("No comment data found.")
        return None

    base["comment_published_at"] = pd.to_datetime(base["comment_published_at"], utc=True, errors="coerce")
    base["month"] = base["comment_published_at"].dt.to_period("M")
    agg = (base
           .dropna(subset=["month"])
           .groupby(["region_group", "month"], as_index=False)
           .agg(
               comments=("comment_id","count"),
               unique_videos=("video_id","nunique"),
               unique_channels=("channel_id","nunique"),
               total_likes=("like_count","sum"),
           ))
    agg["month_start"] = agg["month"].dt.to_timestamp()
    save_df(agg, DATA_DIR / f"{out_basename}_monthly.parquet")
    print("Monthly aggregate preview:")
    display(agg.sort_values(["region_group","month_start"]).head(12))
    return agg


In [None]:
REGION = "EU"          
MODE   = "recent"      

CAP_PER_CHANNEL = 3    
DATE_FROM_TEST  = DATE_FROM
DATE_TO_TEST    = DATE_TO
START_YEAR      = 2010
END_YEAR        = 2025

# pick channels by region
if REGION.upper() == "US":
    CHMAP = CHANNELS_US
elif REGION.upper() == "EU":
    CHMAP = CHANNELS_EU
else:
    CHMAP = ALL_CHANNELS


if MODE == "recent":
    manifest = build_video_manifest(
        channels_map=CHMAP,
        region_labels=REGION_LABELS,
        mode="recent",
        cap_per_channel=CAP_PER_CHANNEL,
        date_from=DATE_FROM_TEST, date_to=DATE_TO_TEST,
        verbose=True
    )
else:
    manifest = build_video_manifest(
        channels_map=CHMAP,
        region_labels=REGION_LABELS,
        mode="per_year",
        start_year=START_YEAR, end_year=END_YEAR,
        per_year_cap=2,                 
        verbose=True
    )

print(f"Manifest size: {len(manifest)} videos")
if not manifest.empty:
    display(manifest.head())
    print("Videos by region:")
    display(manifest.groupby("region_group")["video_id"].count().rename("videos").to_frame())


manifest = manifest[manifest["video_comment_count"].fillna(0).astype(int) > 0].copy()
print("After filtering zero-comment videos:", len(manifest))


def _slug(s): 
    return str(s).replace(" ", "").replace(":", "").replace("-", "")
OUT_BASENAME = f"climate_comments_{REGION.upper()}_{MODE}_{_slug(DATE_FROM_TEST)}_{_slug(DATE_TO_TEST)}" \
               if MODE=="recent" else f"climate_comments_{REGION.upper()}_{MODE}_{START_YEAR}_{END_YEAR}"


run_crawl_from_manifest(
    manifest_df=manifest,
    include_replies=False,
    max_top_level_per_video=150,   
    out_basename=OUT_BASENAME,
    ignore_progress=False
)

_ = build_monthly_aggregates(out_basename=OUT_BASENAME)


In [None]:
REGION = "US"          
MODE   = "recent"      

CAP_PER_CHANNEL = 3    
DATE_FROM_TEST  = DATE_FROM
DATE_TO_TEST    = DATE_TO
START_YEAR      = 2010
END_YEAR        = 2025


if REGION.upper() == "US":
    CHMAP = CHANNELS_US
elif REGION.upper() == "EU":
    CHMAP = CHANNELS_EU
else:
    CHMAP = ALL_CHANNELS


if MODE == "recent":
    manifest = build_video_manifest(
        channels_map=CHMAP,
        region_labels=REGION_LABELS,
        mode="recent",
        cap_per_channel=CAP_PER_CHANNEL,
        date_from=DATE_FROM_TEST, date_to=DATE_TO_TEST,
        verbose=True
    )
else:
    manifest = build_video_manifest(
        channels_map=CHMAP,
        region_labels=REGION_LABELS,
        mode="per_year",
        start_year=START_YEAR, end_year=END_YEAR,
        per_year_cap=2,                 
        verbose=True
    )

print(f"Manifest size: {len(manifest)} videos")
if not manifest.empty:
    display(manifest.head())
    print("Videos by region:")
    display(manifest.groupby("region_group")["video_id"].count().rename("videos").to_frame())


manifest = manifest[manifest["video_comment_count"].fillna(0).astype(int) > 0].copy()
print("After filtering zero-comment videos:", len(manifest))


def _slug(s): 
    return str(s).replace(" ", "").replace(":", "").replace("-", "")
OUT_BASENAME = f"climate_comments_{REGION.upper()}_{MODE}_{_slug(DATE_FROM_TEST)}_{_slug(DATE_TO_TEST)}" \
               if MODE=="recent" else f"climate_comments_{REGION.upper()}_{MODE}_{START_YEAR}_{END_YEAR}"


run_crawl_from_manifest(
    manifest_df=manifest,
    include_replies=False,
    max_top_level_per_video=150,    
    out_basename=OUT_BASENAME,
    ignore_progress=False
)


_ = build_monthly_aggregates(out_basename=OUT_BASENAME)


In [None]:
#Comment collection by year
import pandas as pd
from datetime import datetime, timezone
import calendar

YEAR = 2024
YEAR_FROM = f"{YEAR}-01-01"
YEAR_TO   = f"{YEAR}-12-31"


PER_KEYWORD_PAGES = None  

def _rfc3339(dt: datetime) -> str:
    
    if dt.tzinfo is None:
        dt = dt.replace(tzinfo=timezone.utc)
    return dt.isoformat().replace("+00:00", "Z")

def _quarter_slices(date_from: str, date_to: str):
    """Generate [start,end] RFC3339 strings per quarter covering [date_from, date_to]."""
    y1, m1, _ = map(int, date_from.split("-"))
    y2, m2, _ = map(int, date_to.split("-"))
    q_start_m = ((m1 - 1) // 3) * 3 + 1
    q_end_m   = ((m2 - 1) // 3) * 3 + 1
    y, m = y1, q_start_m
    while (y < y2) or (y == y2 and m <= q_end_m):
        m_end = m + 2
        last_day = calendar.monthrange(y, m_end)[1]
        start = datetime(y, m, 1, 0, 0, 0, tzinfo=timezone.utc)
        end   = datetime(y, m_end, last_day, 23, 59, 59, tzinfo=timezone.utc)
        
        start = max(start, datetime.fromisoformat(YEAR_FROM + "T00:00:00+00:00"))
        end   = min(end,   datetime.fromisoformat(YEAR_TO   + "T23:59:59+00:00"))
        yield _rfc3339(start), _rfc3339(end)
        m += 3
        if m > 12:
            m = 1
            y += 1

def _search_channel_window(channel_id: str, keyword: str, start_rfc: str, end_rfc: str,
                           per_keyword_pages: int | None) -> list[dict]:
    """
    Channel-scoped search in [start_rfc, end_rfc].
    Walks pages up to per_keyword_pages (or all pages if None). Returns [{'videoId','publishedAt','title'}...].
    """
    out, token, pages = [], None, 0
    while True:
        resp = _yt_call(
            YOUTUBE.search().list,
            part="snippet",
            channelId=channel_id,
            q=keyword,
            type="video",
            maxResults=50,
            order="date",
            publishedAfter=start_rfc,
            publishedBefore=end_rfc,
            pageToken=token
        )
        for it in resp.get("items", []):
            sid = it.get("id", {}) or {}
            vid = sid.get("videoId")
            sn  = it.get("snippet", {}) or {}
            if vid:
                out.append({
                    "videoId": vid,
                    "publishedAt": sn.get("publishedAt",""),
                    "title": sn.get("title","")
                })
        token = resp.get("nextPageToken"); pages += 1
        if not token:
            break
        if per_keyword_pages is not None and pages >= per_keyword_pages:
            break
    return out

def discover_climate_videos_for_channel_SEARCH_range(
    channel_id: str,
    date_from: str = YEAR_FROM,
    date_to: str   = YEAR_TO,
    verbose: bool = True
) -> list[dict]:
    """
    Robust discovery using search.list over quarterly slices to bypass uploads depth issues.
    Dedupes across keywords+slices, then verifies via videos.list and your is_climate_text().
    """
    candidates = {}
    slices = list(_quarter_slices(date_from, date_to))
    for start_rfc, end_rfc in slices:
        for kw in CLIMATE_KEYWORDS:
            hits = _search_channel_window(channel_id, kw, start_rfc, end_rfc, PER_KEYWORD_PAGES)
            for h in hits:
                vid = h["videoId"]
                if vid not in candidates:
                    candidates[vid] = h

    if verbose:
        print(f"[discover-search] channel={channel_id}: raw_candidates={len(candidates)} "
              f"(slices={len(slices)}, keywords={len(CLIMATE_KEYWORDS)})")

    if not candidates:
        return []

    
    metas = batch_get_video_meta(list(candidates.keys()))
    start = pd.to_datetime(date_from, utc=True)
    end   = pd.to_datetime(date_to,   utc=True)

    rows = []
    for it in metas:
        sn = it.get("snippet", {}) or {}
        vid_dt = pd.to_datetime(sn.get("publishedAt", ""), utc=True, errors="coerce")
        if pd.isna(vid_dt) or not (start <= vid_dt <= end):
            continue
        title = sn.get("title","")
        desc  = sn.get("description","")
        tags  = " ".join(sn.get("tags", []) or [])
        if is_climate_text(title) or is_climate_text(desc) or is_climate_text(tags):
            stats = it.get("statistics", {}) or {}
            rows.append({
                "video_id": it["id"],
                "video_title": title,
                "video_publish_date": sn.get("publishedAt",""),
                "video_comment_count": int(stats.get("commentCount", 0) or 0),
                "video_view_count": int(stats.get("viewCount", 0) or 0),
            })

    rows.sort(key=lambda r: (r["video_publish_date"], r["video_comment_count"]), reverse=True)
    if verbose:
        print(f"[discover-search] channel={channel_id}: kept={len(rows)} within window {date_from}→{date_to}")
    return rows

def build_manifest_one_year_searchonly(channels_map, region_labels,
                                       date_from=YEAR_FROM, date_to=YEAR_TO) -> pd.DataFrame:
    discovered = []
    for name, cid in channels_map.items():
        vids = discover_climate_videos_for_channel_SEARCH_range(cid, date_from, date_to, verbose=True)
        for v in vids:
            v.update({"channel_name": name, "channel_id": cid, "region_group": region_labels.get(cid, "UNK")})
            discovered.append(v)
    df = pd.DataFrame(discovered)
    if not df.empty:
        save_df(df, DATA_DIR / f"manifest_{YEAR}_searchonly.parquet")
    return df

manifest_year = build_manifest_one_year_searchonly(ALL_CHANNELS, REGION_LABELS, YEAR_FROM, YEAR_TO)
print(f"\n=== {YEAR} Manifest summary (search-only) ===")
if manifest_year.empty:
    print("No videos matched — consider adjusting keywords or increasing PER_KEYWORD_PAGES.")
else:
    print("Total videos:", len(manifest_year))
    display(manifest_year.groupby("region_group")["video_id"].count().rename("videos").to_frame())

manifest_year = manifest_year[manifest_year["video_comment_count"].fillna(0).astype(int) > 0].copy()
print("After filtering zero-comment videos:", len(manifest_year))

run_crawl_from_manifest(
    manifest_df=manifest_year,
    include_replies=False,
    max_top_level_per_video=10000,           
    out_basename=f"climate_comments_{YEAR}_all",
    ignore_progress=True
)

_ = build_monthly_aggregates(out_basename=f"climate_comments_{YEAR}_all")
