In [5]:
#!/usr/bin/env python3
"""
LeetCode metadata fetcher -> CSV export (comprehensive fields).

- Tries /api/problems/all/ for list; falls back to GraphQL pagination.
- Queries detailed question fields (with similarQuestions requested as scalar),
  parses JSON-string fields like stats and similarQuestions.
- Best-effort scrapes /problems/<slug>/discuss/ for a short discussions preview.
- Writes a CSV "output/leetcode_full_metadata.csv" with one row per problem.
  Nested fields (tags, stats_parsed, similarQuestions_parsed, discussions_preview)
  are JSON-encoded strings inside CSV fields.

Requirements:
    pip install requests beautifulsoup4 tqdm

Notes:
 - If you get blocked by Cloudflare, supply a logged-in cookie string in COOKIE_STRING
   (e.g. "LEETCODE_SESSION=...; csrftoken=...") or run via a headless browser.
 - Be polite with rate limits; you can increase sleeps if needed.
"""

import requests
import json
import time
import os
import csv
import re
from bs4 import BeautifulSoup
from tqdm import tqdm
from datetime import datetime

BASE = "https://leetcode.com"
GRAPHQL_URL = BASE + "/graphql"
PROBLEMS_API = BASE + "/api/problems/all/"

# Optionally paste cookies here to improve access (LEETCODE_SESSION, csrftoken, ...)
COOKIE_STRING = ""  # e.g. "LEETCODE_SESSION=xxx; csrftoken=yyy;"

DEFAULT_HEADERS = {
    "User-Agent": "Mozilla/5.0 (compatible; LeetCodeMetaFetcher/1.0)",
    "Accept": "application/json, text/plain, */*",
    "Accept-Language": "en-US,en;q=0.9",
    "Referer": BASE + "/problems/",
    "Origin": BASE,
    "X-Requested-With": "XMLHttpRequest",
}

# GraphQL queries
PROBLEMLIST_QUERY = """
query problemsetQuestionList($categorySlug: String, $skip: Int, $limit: Int) {
  problemsetQuestionList(
    categorySlug: $categorySlug
    skip: $skip
    limit: $limit
  ) {
    hasMore
    total
    questions {
      acRate
      difficulty
      freqBar
      frontendQuestionId
      isFavor
      paidOnly
      status
      title
      titleSlug
      topicTags {
        name
        id
        slug
      }
    }
  }
}
"""

QUESTION_DETAIL_QUERY = """
query questionData($titleSlug: String!) {
  question(titleSlug: $titleSlug) {
    questionId
    title
    titleSlug
    content
    difficulty
    likes
    dislikes
    isPaidOnly
    sampleTestCase
    codeSnippets {
      lang
      langSlug
      code
    }
    topicTags {
      name
      slug
      translatedName
    }
    stats
    similarQuestions
  }
}
"""

def session_with_cookies():
    s = requests.Session()
    s.headers.update(DEFAULT_HEADERS)
    if COOKIE_STRING:
        for pair in [c.strip() for c in COOKIE_STRING.split(";") if c.strip()]:
            if "=" in pair:
                k, v = pair.split("=", 1)
                s.cookies.set(k.strip(), v.strip(), domain="leetcode.com")
    else:
        # initial GET to pick up public cookies like csrftoken
        try:
            s.get(BASE + "/problems/", timeout=15)
        except Exception:
            pass
    csrftoken = s.cookies.get("csrftoken") or s.cookies.get("CSRFToken") or s.cookies.get("csrf_token")
    if csrftoken:
        s.headers.update({"x-csrftoken": csrftoken})
    return s

def graphql_post(session, query, variables):
    payload = {"query": query, "variables": variables}
    r = session.post(GRAPHQL_URL, json=payload, timeout=30)
    if r.status_code != 200:
        print(f"[GraphQL] HTTP {r.status_code} -> {r.text[:800]}")
        r.raise_for_status()
    return r.json()

def get_problem_list_via_api(session):
    try:
        r = session.get(PROBLEMS_API, timeout=20)
        if r.status_code == 200:
            j = r.json()
            if "stat_status_pairs" in j:
                out = []
                for p in j["stat_status_pairs"]:
                    stat = p.get("stat", {})
                    out.append({
                        "title": stat.get("question__title"),
                        "titleSlug": stat.get("question__title_slug"),
                        "frontendQuestionId": stat.get("question_id"),
                        "acRate": p.get("ac_rate"),
                        "difficulty": p.get("difficulty"),
                        "paidOnly": p.get("paid_only"),
                        "isFavor": p.get("is_favor"),
                        "status": p.get("status"),
                    })
                return out
            if "questions" in j:
                return j["questions"]
        else:
            print(f"[API] problems/all/ returned HTTP {r.status_code} - body: {r.text[:500]}")
    except Exception as e:
        print("[API] exception while fetching problems API:", e)
    return None

def fetch_all_problem_list_via_graphql(session, batch_size=100):
    all_questions = []
    skip = 0
    while True:
        vars_ = {"categorySlug": "", "skip": skip, "limit": batch_size}
        r = graphql_post(session, PROBLEMLIST_QUERY, vars_)
        if "errors" in r:
            raise RuntimeError("GraphQL error fetching list: " + str(r["errors"]))
        payload = r["data"]["problemsetQuestionList"]
        questions = payload["questions"]
        all_questions.extend(questions)
        print(f"  got {len(all_questions)} / {payload.get('total')}")
        if not payload["hasMore"]:
            break
        skip += batch_size
        time.sleep(0.4)
    return all_questions

def try_parse_json_field(maybe_json):
    """Try to parse fields that may be JSON strings; return original value otherwise."""
    if maybe_json is None:
        return None
    if isinstance(maybe_json, (dict, list)):
        return maybe_json
    if isinstance(maybe_json, str):
        s = maybe_json.strip()
        if not s:
            return s
        try:
            return json.loads(s)
        except Exception:
            # best-effort: replace single quotes
            try:
                return json.loads(s.replace("'", '"'))
            except Exception:
                return maybe_json
    return maybe_json

def fetch_question_detail(session, slug):
    vars_ = {"titleSlug": slug}
    r = graphql_post(session, QUESTION_DETAIL_QUERY, vars_)
    if "errors" in r:
        return {"_error": r["errors"]}
    q = r["data"]["question"]
    if q is None:
        return {"_error": "no question data returned"}
    # parse JSON-string fields
    q["stats_parsed"] = try_parse_json_field(q.get("stats"))
    q["similarQuestions_parsed"] = try_parse_json_field(q.get("similarQuestions"))
    return q

def fetch_discussions_for_problem(session, slug, max_pages=1):
    """Best-effort scraping of the problem's discuss page(s) to extract basic post metadata."""
    out = []
    base = f"{BASE}/problems/{slug}/discuss/"
    for page in range(1, max_pages + 1):
        url = base if page == 1 else base + f"?page={page}"
        try:
            resp = session.get(url, timeout=20)
            if resp.status_code != 200:
                out.append({"_error": f"status_code={resp.status_code}", "page": page})
                break
            soup = BeautifulSoup(resp.text, "html.parser")
            # try a variety of selectors; LeetCode layout may change
            posts = soup.select(".discuss-item, .question-list-item, .discuss-list li")
            if not posts:
                # fallback: try to locate links that look like discuss links
                candidates = soup.select("a[href*='/problems/'][href*='/discuss/']")
                for c in candidates[:10]:
                    out.append({
                        "title": c.get_text(strip=True),
                        "url": c.get("href"),
                        "author": None,
                        "votes": None,
                        "comment_count": None
                    })
                break
            for p in posts:
                title_tag = p.select_one("a.title") or p.select_one("a")
                title = title_tag.get_text(strip=True) if title_tag else None
                url_rel = title_tag["href"] if title_tag and title_tag.has_attr("href") else None
                author_tag = p.select_one(".author") or p.select_one(".user-info a")
                author = author_tag.get_text(strip=True) if author_tag else None
                vote_tag = p.select_one(".vote-count") or p.select_one(".vote")
                votes = vote_tag.get_text(strip=True) if vote_tag else None
                comment_tag = p.select_one(".reply-count") or p.select_one(".comments")
                comment_count = comment_tag.get_text(strip=True) if comment_tag else None
                out.append({
                    "title": title,
                    "url": url_rel,
                    "author": author,
                    "votes": votes,
                    "comment_count": comment_count
                })
            time.sleep(0.6)
        except Exception as e:
            out.append({"_error": f"exception: {str(e)}", "page": page})
            break
    return out

def text_preview(html_content, length=500):
    if not html_content:
        return ""
    # naive strip of HTML tags for preview
    text = re.sub("<[^<]+?>", "", html_content)
    text = re.sub(r"\s+", " ", text).strip()
    if len(text) <= length:
        return text
    return text[:length] + "..."

def ensure_output_dir(path="output"):
    os.makedirs(path, exist_ok=True)
    return path

def write_csv(rows, out_path):
    # determine header (use a fixed order)
    header = [
        "title", "slug", "frontendQuestionId", "acRate", "difficulty",
        "paidOnly_basic", "isFavor", "status",
        "likes", "dislikes", "isPaidOnly_detail", "sampleTestCase",
        "topicTags_json", "stats_parsed_json", "similarQuestions_parsed_json",
        "content_preview", "num_discussions", "first_discussion_title", "first_discussion_url",
        "fetched_at"
    ]
    with open(out_path, "w", newline="", encoding="utf8") as f:
        writer = csv.DictWriter(f, fieldnames=header, quoting=csv.QUOTE_ALL)
        writer.writeheader()
        for r in rows:
            writer.writerow(r)

def main():
    out_dir = ensure_output_dir("output")
    session = session_with_cookies()

    # 1) get problem list
    problem_list = get_problem_list_via_api(session)
    if problem_list:
        print(f"[OK] got {len(problem_list)} problems from /api/problems/all/")
    else:
        print("[WARN] falling back to GraphQL pagination for problem list...")
        problem_list = fetch_all_problem_list_via_graphql(session, batch_size=100)

    print(f"Total problems to process: {len(problem_list)}")
    csv_rows = []
    start_time = datetime.utcnow().isoformat() + "Z"

    for p in tqdm(problem_list, desc="problems"):
        slug = p.get("titleSlug") or p.get("title_slug")
        if not slug:
            continue
        basic = p
        try:
            detail = fetch_question_detail(session, slug)
        except requests.HTTPError as e:
            print(f"[ERROR] GraphQL detail failed for {slug}: {e}")
            detail = {"_error": f"HTTPError: {e}"}
        except Exception as e:
            detail = {"_error": str(e)}

        # fetch a small discussions preview (best-effort)
        try:
            discussions = fetch_discussions_for_problem(session, slug, max_pages=1)
        except Exception as e:
            discussions = [{"_error": str(e)}]

        # prepare CSV row
        topic_tags = detail.get("topicTags") if detail and isinstance(detail, dict) else basic.get("topicTags")
        stats_parsed = detail.get("stats_parsed") if detail and isinstance(detail, dict) else None
        similar_parsed = detail.get("similarQuestions_parsed") if detail and isinstance(detail, dict) else None

        first_discussion = discussions[0] if discussions else None
        content_preview = text_preview(detail.get("content") if detail else None, length=500)

        row = {
            "title": detail.get("title") if detail and isinstance(detail, dict) else basic.get("title"),
            "slug": slug,
            "frontendQuestionId": basic.get("frontendQuestionId") or basic.get("frontend_question_id") or None,
            "acRate": basic.get("acRate") or basic.get("ac_rate") or None,
            "difficulty": basic.get("difficulty") or detail.get("difficulty") if detail else None,
            "paidOnly_basic": basic.get("paidOnly") if "paidOnly" in basic else basic.get("paid_only"),
            "isFavor": basic.get("isFavor"),
            "status": basic.get("status"),
            "likes": detail.get("likes") if detail else None,
            "dislikes": detail.get("dislikes") if detail else None,
            "isPaidOnly_detail": detail.get("isPaidOnly") if detail else None,
            "sampleTestCase": detail.get("sampleTestCase") if detail else None,
            "topicTags_json": json.dumps(topic_tags, ensure_ascii=False),
            "stats_parsed_json": json.dumps(stats_parsed, ensure_ascii=False),
            "similarQuestions_parsed_json": json.dumps(similar_parsed, ensure_ascii=False),
            "content_preview": content_preview,
            "num_discussions": len(discussions),
            "first_discussion_title": first_discussion.get("title") if first_discussion and isinstance(first_discussion, dict) else None,
            "first_discussion_url": first_discussion.get("url") if first_discussion and isinstance(first_discussion, dict) else None,
            "fetched_at": start_time
        }
        csv_rows.append(row)

        # write per-problem JSON for backup (useful if interrupted)
        with open(os.path.join(out_dir, f"{slug}.json"), "w", encoding="utf8") as f:
            json.dump({"basic": basic, "detail": detail, "discussions_preview": discussions, "fetched_at": start_time}, f, ensure_ascii=False, indent=2)

        # polite sleep
        time.sleep(0.35)

    csv_path = os.path.join(out_dir, "leetcode_full_metadata.csv")
    write_csv(csv_rows, csv_path)
    print(f"Done. CSV saved to: {csv_path}")
    print("Per-problem JSON files are in the output/ folder too.")

if __name__ == "__main__":
    main()

[OK] got 3758 problems from /api/problems/all/
Total problems to process: 3758


problems: 100%|██████████| 3758/3758 [3:04:00<00:00,  2.94s/it]  

Done. CSV saved to: output/leetcode_full_metadata.csv
Per-problem JSON files are in the output/ folder too.



