In [None]:
import re
import csv
import json
import time
import hashlib
import logging
import argparse
from pathlib import Path
from urllib.parse import urljoin, urlparse

import requests
import requests_cache
import pandas as pd
from bs4 import BeautifulSoup
from dateutil import parser as dateparser
from tqdm import tqdm

SITEMAP_INDEX = "https://www.wired.com/sitemap.xml"
BASE = "https://www.wired.com"
USER_AGENT = "Mozilla/5.0 (compatible; WiredRiskStudyBot/1.0; +https://wired.com/)"

START_YEAR = 2014
END_YEAR = 2024

# polite rate limit between network requests
REQUEST_INTERVAL = 1.0

# length filter after cleaning
MIN_LEN = 300
MAX_LEN = 100_000

# log setup
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s"
)
logger = logging.getLogger("wired_crawler")

def session_with_cache(cache_path: Path) -> requests.Session:
    """Create a requests session with transparent caching."""
    # expire_after=None => persistent cache until manually removed
    requests_cache.install_cache(
        cache_name=str(cache_path),
        backend="sqlite",
        expire_after=None,
        allowable_methods=("GET", "HEAD"),
        stale_if_error=True,
    )
    s = requests.Session()
    s.headers.update({"User-Agent": USER_AGENT})
    return s


def sleep_politely(resp):
    """Sleep only if this was a real network hit (not from cache)."""
    from_cache = getattr(resp, "from_cache", False)
    if not from_cache:
        time.sleep(REQUEST_INTERVAL)


def get_soup(session, url, **kwargs) -> BeautifulSoup | None:
    try:
        resp = session.get(url, timeout=30, **kwargs)
        sleep_politely(resp)
        if resp.status_code != 200:
            return None
        return BeautifulSoup(resp.text, "lxml")
    except Exception as e:
        logger.warning(f"Failed to GET {url}: {e}")
        return None


def parse_iso_date(s: str) -> str | None:
    """Parse date string into ISO YYYY-MM-DD."""
    if not s:
        return None
    try:
        dt = dateparser.parse(s)
        if not dt:
            return None
        return dt.date().isoformat()
    except Exception:
        return None


def within_year_range(iso_date: str) -> bool:
    if not iso_date or len(iso_date) < 4:
        return False
    try:
        y = int(iso_date[:4])
        return (START_YEAR <= y <= END_YEAR)
    except Exception:
        return False


def clean_text(raw_html: str) -> str:
    """Normalize whitespace, remove URLs, lower-case."""
    if not raw_html:
        return ""
    text = raw_html

    # remove inline URLs
    text = re.sub(r"http[s]?://\S+", " ", text)

    # collapse whitespace
    text = re.sub(r"\s+", " ", text).strip()

    # lower-case for modeling consistency
    text = text.lower()
    return text


def dedup_key(title: str, text: str) -> str:
    base = (title or "").strip() + "||" + (text or "").strip()
    return hashlib.sha1(base.encode("utf-8", errors="ignore")).hexdigest()


# Sitemap Collection

def parse_sitemap_xml(session, url: str) -> list[dict]:
    """
    Parse a sitemap or sitemap index; return list of dicts:
        {"loc": <url>, "lastmod": <iso or None>}
    """
    soup = get_soup(session, url)
    if soup is None:
        return []

    out = []
    # sitemapindex
    for sm in soup.find_all("sitemap"):
        loc = sm.find("loc")
        lastmod = sm.find("lastmod")
        if loc and loc.text:
            out.append({"loc": loc.text.strip(), "lastmod": (lastmod.text.strip() if lastmod else None)})

    # actual URLs
    for u in soup.find_all("url"):
        loc = u.find("loc")
        lastmod = u.find("lastmod")
        if loc and loc.text:
            out.append({"loc": loc.text.strip(), "lastmod": (lastmod.text.strip() if lastmod else None)})

    return out


def collect_article_urls_from_sitemaps(session) -> set[str]:
    """
    Traverse sitemap index; collect article URLs (filter to 2014–2024 by URL-year or lastmod).
    """
    logger.info("Collecting URLs from sitemap index...")
    urls = set()
    queue = [SITEMAP_INDEX]
    seen_sitemaps = set()

    pbar = tqdm(total=0, desc="Sitemaps", unit="smap")
    while queue:
        smap = queue.pop()
        if smap in seen_sitemaps:
            continue
        seen_sitemaps.add(smap)

        entries = parse_sitemap_xml(session, smap)
        pbar.total = pbar.total + 1
        pbar.update(1)

        # classify entries
        for e in entries:
            loc = e.get("loc")
            if not loc:
                continue
            if loc.endswith(".xml")
                queue.append(loc)
            else:

                year_ok = False
                try:
                    path = urlparse(loc).path
                    m = re.search(r"/(20\d{2})/", path)
                    if m:
                        y = int(m.group(1))
                        if START_YEAR <= y <= END_YEAR:
                            year_ok = True
                    else:
                        iso = parse_iso_date(e.get("lastmod"))
                        year_ok = within_year_range(iso) if iso else True
                except Exception:
                    year_ok = True
                if year_ok and loc.startswith(BASE):
                    urls.add(loc)

    pbar.close()
    logger.info(f"Sitemap URL count (pre-unique): {len(urls)}")
    return urls


# Archive Discovery

def discover_monthly_archive_urls() -> list[str]:
    """
    Build YYYY/MM directory listing URLs to probe (best-effort; many may redirect).
    Example: https://www.wired.com/2016/05/
    """
    urls = []
    for y in range(START_YEAR, END_YEAR + 1):
        for m in range(1, 13):
            urls.append(f"{BASE}/{y:04d}/{m:02d}/")
    return urls


def extract_links_from_archive_page(session, url: str) -> set[str]:
    """
    Parse a year/month page to find article links.
    """
    soup = get_soup(session, url)
    if soup is None:
        return set()

    links = set()

    for a in soup.find_all("a", href=True):
        href = a["href"]
        if href.startswith("/"):
            href = urljoin(BASE, href)
        if href.startswith(BASE):

            if re.search(r"/(story|article|gallery|video)/", href) or re.search(r"/\d{4}/\d{2}/\d{2}/", href):
                links.add(href)
    return links


def collect_article_urls_from_archives(session) -> set[str]:
    logger.info("Collecting URLs from year/month archives...")
    urls = set()
    for url in tqdm(discover_monthly_archive_urls(), desc="Archives", unit="month"):
        found = extract_links_from_archive_page(session, url)
        urls.update(found)
    logger.info(f"Archive URL count (pre-unique): {len(urls)}")
    return urls


# Article Parsing

def extract_from_json_ld(soup: BeautifulSoup) -> dict:
    """
    Try JSON-LD first. Wired often embeds Article schema.
    """
    data = {}
    for tag in soup.find_all("script", type=lambda t: t and "ld+json" in t):
        try:
            payload = json.loads(tag.string or "")

            if isinstance(payload, list):
                for item in payload:
                    if isinstance(item, dict) and item.get("@type", "").lower() in {"article", "newsarticle", "blogposting"}:
                        data.setdefault("title", item.get("headline"))
                        data.setdefault("date", item.get("datePublished") or item.get("dateCreated"))

                        if item.get("articleBody"):
                            data.setdefault("text", item.get("articleBody"))
            elif isinstance(payload, dict):
                if payload.get("@type", "").lower() in {"article", "newsarticle", "blogposting"}:
                    data.setdefault("title", payload.get("headline"))
                    data.setdefault("date", payload.get("datePublished") or payload.get("dateCreated"))
                    if payload.get("articleBody"):
                        data.setdefault("text", payload.get("articleBody"))
        except Exception:
            continue
    return data


def extract_via_selectors(soup: BeautifulSoup) -> dict:
    """
    Fallback extraction using common selectors.
    """
    data = {}

    # title candidates
    title = None
    cand = [
        ('meta[property="og:title"]', lambda t: t.get("content")),
        ("h1", lambda t: t.get_text(" ", strip=True)),
        ('meta[name="twitter:title"]', lambda t: t.get("content")),
        ("title", lambda t: t.get_text(" ", strip=True)),
    ]
    for sel, getter in cand:
        el = soup.select_one(sel)
        if el:
            title = getter(el)
            if title:
                break
    if title:
        data["title"] = title

    # date candidates
    date_str = None
    date_sel = [
        ("time", lambda t: t.get("datetime") or t.get_text(" ", strip=True)),
        ('meta[property="article:published_time"]', lambda t: t.get("content")),
        ('meta[name="date"]', lambda t: t.get("content")),
        ('meta[name="pubdate"]', lambda t: t.get("content")),
    ]
    for sel, getter in date_sel:
        el = soup.select_one(sel)
        if el:
            date_str = getter(el)
            if date_str:
                break
    if date_str:
        data["date"] = date_str

    # body
    for bad in soup(["script", "style", "noscript"]):
        bad.extract()

    # try article tag first
    article = soup.find("article")
    if article:
        paragraphs = [p.get_text(" ", strip=True) for p in article.find_all(["p", "div"]) if p.get_text(strip=True)]
        body = "\n".join(paragraphs)
    else:

        paragraphs = [p.get_text(" ", strip=True) for p in soup.find_all("p") if p.get_text(strip=True)]
        body = "\n".join(paragraphs)
    if body:
        data["text"] = body

    return data


def parse_article(session, url: str) -> dict | None:
    soup = get_soup(session, url)
    if soup is None:
        return None

    # Prefer JSON-LD
    data = extract_from_json_ld(soup)

    # Fallbacks
    fallback = extract_via_selectors(soup)
    for k, v in fallback.items():
        data.setdefault(k, v)

    title = (data.get("title") or "").strip()
    raw_date = (data.get("date") or "").strip()
    text_raw = (data.get("text") or "").strip()

    # date normalization
    iso_date = parse_iso_date(raw_date)
    # if still None, try date guess from URL
    if not iso_date:
        m = re.search(r"/(20\d{2})/(\d{2})/(\d{2})/", urlparse(url).path)
        if m:
            iso_date = f"{m.group(1)}-{m.group(2)}-{m.group(3)}"

    if not title or not iso_date or not text_raw:
        return None

    # cleaning
    text_clean = clean_text(text_raw)

    # filter by length
    if not (MIN_LEN <= len(text_clean) <= MAX_LEN):
        return None

    # filter by year range
    if not within_year_range(iso_date):
        return None

    return {
        "title": title,
        "date": iso_date,
        "text": text_clean,
        "url": url
    }


# Pipeline

def collect_all_urls(session) -> set[str]:
    urls = set()

    urls |= collect_article_urls_from_sitemaps(session)
    urls |= collect_article_urls_from_archives(session)

    # only keep wired.com
    urls = {u for u in urls if u.startswith(BASE)}


    filtered = set()
    for u in urls:
        path = urlparse(u).path
        m = re.search(r"/(20\d{2})/", path)
        if m:
            y = int(m.group(1))
            if START_YEAR <= y <= END_YEAR:
                filtered.add(u)
        else:
            filtered.add(u)
    logger.info(f"Total candidate URLs after coarse filtering: {len(filtered)}")
    return filtered


def crawl_and_clean(session, urls: set[str], out_csv: Path):
    seen = set()
    records = []
    dup = 0
    kept = 0
    skipped = 0

    for url in tqdm(sorted(urls), desc="Crawling articles", unit="url"):
        art = parse_article(session, url)
        if not art:
            skipped += 1
            continue
        key = dedup_key(art["title"], art["text"])
        if key in seen:
            dup += 1
            continue
        seen.add(key)
        records.append(art)
        kept += 1

    logger.info(f"Parsed: kept={kept}, dup_skipped={dup}, invalid/skipped={skipped}")

    if records:
        df = pd.DataFrame.from_records(records, columns=["title", "date", "text", "url"])
        # final sanity
        df = df.dropna(subset=["title", "date", "text"])
        # ensure string types
        for col in ["title", "date", "text", "url"]:
            df[col] = df[col].astype(str)

        # save CSV
        df.to_csv(out_csv, index=False, quoting=csv.QUOTE_MINIMAL)
        logger.info(f"Saved CSV: {out_csv}  (rows={len(df)})")
    else:
        logger.warning("No records to save.")


# Main

def main():
    parser = argparse.ArgumentParser(description="Wired 2014–2024 crawler & cleaner")
    parser.add_argument("--out", type=Path, default=Path("wired_2014_2024_cleaned.csv"),
                        help="Output CSV path")
    parser.add_argument("--cache", type=Path, default=Path("./http_cache"),
                        help="HTTP cache directory (requests-cache sqlite)")
    args = parser.parse_args()

    args.cache.parent.mkdir(parents=True, exist_ok=True)

    session = session_with_cache(args.cache)

    urls = collect_all_urls(session)

    crawl_and_clean(session, urls, args.out)


if __name__ == "__main__":
    main()
