In [1]:

# -*- coding: utf-8 -*-
"""
多线程批量获取 celebrity 的 Wikipedia pageviews（支持边跑边存 & 断点续跑）
输入：celebrity_name_all_unique.csv
输出：celebrity_popularity_wiki_pageviews.csv

依赖：pip install requests pandas
"""

import os
import csv
import time
import threading
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed

import pandas as pd
import requests

# ====== 配置区 ======
INPUT_CSV = "celebrity_name_all_unique.csv"
OUTPUT_CSV = "celebrity_popularity_wiki_pageviews.csv"

START = "20250101"
END = "20251231"

MAX_WORKERS = 12          # 并发线程数：建议 8~24 之间按网络情况调
REQUEST_TIMEOUT = 20
MAX_RETRIES = 6
SAVE_FSYNC_EVERY = 50     # 每写入多少行做一次 fsync（0 表示不做）
SKIP_IF_STATUS_OK = True  # 断点续跑：只跳过 status==ok 的行（更合理）
# ====================

REQUEST_HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36 Edg/144.0.0.0",
    "Accept": "application/json",
}


WIKI_API = "https://en.wikipedia.org/w/api.php"
PV_API_TMPL = (
    "https://wikimedia.org/api/rest_v1/metrics/pageviews/per-article/"
    "{project}/{access}/{agent}/{article}/{granularity}/{start}/{end}"
)

# --- 线程本地存储：每个线程一个 Session ---
_thread_local = threading.local()

def get_session() -> requests.Session:
    s = getattr(_thread_local, "session", None)
    if s is None:
        s = requests.Session()
        s.headers.update(REQUEST_HEADERS)
        s.trust_env = False
        _thread_local.session = s
    return s

def _get(url: str, params=None, timeout=REQUEST_TIMEOUT, max_retries=MAX_RETRIES):
    """带重试的 GET（处理 429/5xx/临时 403/网络波动）。"""
    backoff = 1.7
    last_exc = None
    for attempt in range(max_retries):
        try:
            r = get_session().get(url, params=params, timeout=timeout)
            if r.status_code in (429, 500, 502, 503, 504, 403):
                # 建议尊重 Retry-After
                ra = r.headers.get("Retry-After")
                if ra and ra.isdigit():
                    time.sleep(int(ra))
                else:
                    time.sleep(backoff * (attempt + 1))
                continue
            return r
        except requests.RequestException as exc:
            last_exc = exc
            time.sleep(backoff * (attempt + 1))
            continue
    return None

def search_wiki_title(name: str) -> tuple[str | None, str]:
    """用 MediaWiki 搜索接口把人名映射到最可能的英文维基条目标题"""
    params = {
        "action": "query",
        "list": "search",
        "srsearch": name,
        "format": "json",
        "formatversion": 2,
        "srlimit": 1
    }
    try:
        r = _get(WIKI_API, params=params)
        if r is None:
            return None, "err:request_failed"
        r.raise_for_status()
        data = r.json()
        hits = data.get("query", {}).get("search", [])
        if not hits:
            return None, "no_wiki_hit"
        return hits[0]["title"], "ok"
    except Exception as e:
        return None, f"err:{type(e).__name__}"

def get_pageviews(title: str, start_yyyymmdd: str, end_yyyymmdd: str,
                  project="en.wikipedia", access="all-access", agent="all-agents",
                  granularity="daily") -> tuple[int, int, int]:
    """
    拉取某个维基条目在 [start, end] 的 pageviews
    返回：(总浏览量, 日均, 峰值)
    """
    article = title.replace(" ", "_")
    url = PV_API_TMPL.format(
        project=project,
        access=access,
        agent=agent,
        article=requests.utils.quote(article, safe=""),
        granularity=granularity,
        start=start_yyyymmdd + "00",
        end=end_yyyymmdd + "00"
    )
    r = _get(url)
    if r is None:
        raise RuntimeError("request_failed")
    if r.status_code == 404:
        return (0, 0, 0)
    r.raise_for_status()
    items = r.json().get("items", [])
    views = [it.get("views", 0) for it in items]
    if not views:
        return (0, 0, 0)
    total = int(sum(views))
    mean = int(round(total / len(views)))
    peak = int(max(views))
    return (total, mean, peak)

def worker(name: str) -> list:
    """
    单个任务：name -> title -> pageviews
    返回一行：[
      celebrity_name, wiki_title, views_total, views_daily_mean, views_peak, status
    ]
    """
    title, status = search_wiki_title(name)
    if title is None:
        return [name, None, 0, 0, 0, status]

    try:
        total, mean, peak = get_pageviews(title, START, END)
        return [name, title, total, mean, peak, "ok"]
    except Exception as e:
        return [name, title, 0, 0, 0, f"err:{type(e).__name__}"]

def load_done(output_csv: str) -> set[str]:
    """断点续跑：读取已存在输出，得到 done 集合。"""
    done = set()
    if not Path(output_csv).exists():
        return done
    try:
        old = pd.read_csv(output_csv)
        if "celebrity_name" not in old.columns:
            return done
        if SKIP_IF_STATUS_OK and "status" in old.columns:
            old_ok = old[old["status"].astype(str) == "ok"]
            done = set(old_ok["celebrity_name"].astype(str).tolist())
        else:
            done = set(old["celebrity_name"].astype(str).tolist())
    except Exception:
        pass
    return done

def ensure_header(output_csv: str, header: list[str]):
    """若文件不存在或为空，写入 header。"""
    p = Path(output_csv)
    if not p.exists() or p.stat().st_size == 0:
        with open(output_csv, "w", newline="", encoding="utf-8-sig") as f:
            w = csv.writer(f)
            w.writerow(header)

def append_row(output_csv: str, row: list, fsync=False):
    """追加写一行，并可选 fsync。"""
    with open(output_csv, "a", newline="", encoding="utf-8-sig") as f:
        w = csv.writer(f)
        w.writerow(row)
        f.flush()
        if fsync:
            os.fsync(f.fileno())

def main():
    df = pd.read_csv(INPUT_CSV)
    names = df["celebrity_name"].astype(str).tolist()

    header = ["celebrity_name", "wiki_title",
              "views_total", "views_daily_mean", "views_peak",
              "status"]
    ensure_header(OUTPUT_CSV, header)

    done = load_done(OUTPUT_CSV)
    todo = [n for n in names if n not in done]
    print(f"[info] total={len(names)}, done={len(done)}, todo={len(todo)}")

    write_count = 0
    submitted = 0

    # 提交任务
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
        future_to_name = {}
        for n in todo:
            future_to_name[ex.submit(worker, n)] = n
            submitted += 1

        # 完成一个写一个（边跑边存）
        for idx, fut in enumerate(as_completed(future_to_name), start=1):
            name = future_to_name[fut]
            try:
                row = fut.result()
            except Exception as e:
                row = [name, None, 0, 0, 0, f"err:{type(e).__name__}"]

            write_count += 1
            do_fsync = (SAVE_FSYNC_EVERY > 0 and (write_count % SAVE_FSYNC_EVERY == 0))
            append_row(OUTPUT_CSV, row, fsync=do_fsync)

            if idx % 50 == 0:
                print(f"[info] finished {idx}/{submitted} (appended {write_count})")

    print("[info] Saved:", OUTPUT_CSV)

if __name__ == "__main__":
    main()

[info] total=408, done=0, todo=408
[info] finished 50/408 (appended 50)
[info] finished 100/408 (appended 100)
[info] finished 150/408 (appended 150)
[info] finished 200/408 (appended 200)
[info] finished 250/408 (appended 250)
[info] finished 300/408 (appended 300)
[info] finished 350/408 (appended 350)
[info] finished 400/408 (appended 400)
[info] Saved: celebrity_popularity_wiki_pageviews.csv
