In [2]:
import pandas as pd
# Basic
pr_df = pd.read_parquet("hf://datasets/hao-li/AIDev/pull_request.parquet")
repo_df = pd.read_parquet("hf://datasets/hao-li/AIDev/repository.parquet")

# Commits
# pr_commits_df = pd.read_parquet("hf://datasets/hao-li/AIDev/pr_commits.parquet")
pr_commit_details_df = pd.read_parquet("hf://datasets/hao-li/AIDev/pr_commit_details.parquet")

# Related issues
# related_issue_df = pd.read_parquet("hf://datasets/hao-li/AIDev/related_issue.parquet")
# issue_df = pd.read_parquet("hf://datasets/hao-li/AIDev/issue.parquet")


In [20]:
import os
import re
import time
import random
from typing import Optional

import requests
import pandas as pd

# Reuse a single session (connection pooling + slightly faster)
_SESSION = requests.Session()

def _to_api_repo_url(repo_url: str) -> str:
    """
    Normalize a repo URL to GitHub API base:
    - 'https://api.github.com/repos/owner/repo' -> unchanged
    - 'https://github.com/owner/repo'           -> 'https://api.github.com/repos/owner/repo'
    """
    repo_url = str(repo_url).rstrip("/")
    if "api.github.com/repos/" in repo_url:
        return repo_url
    m = re.match(r"^https?://github\.com/([^/]+)/([^/]+)$", repo_url)
    if not m:
        raise ValueError(f"Unrecognized repo_url format: {repo_url}")
    owner, repo = m.group(1), m.group(2)
    return f"https://api.github.com/repos/{owner}/{repo}"

def _sleep_until(reset_epoch: int) -> None:
    # reset header is unix seconds; add a 1s cushion
    sleep_s = max(0, reset_epoch - int(time.time())) + 1
    if sleep_s > 0:
        print(f"Sleeping for {sleep_s/60} minutes until rate limit reset...")
        time.sleep(sleep_s)

def _request_with_rate_limit(url: str, headers: dict, timeout: int, max_retries: int = 5):
    """
    GET with:
      - X-RateLimit-* handling (sleep until reset when remaining==0)
      - 429 / Retry-After handling
      - transient 5xx / network errors with exponential backoff + jitter
      - returns Response on success, None on 404, raises on final failure
    """
    backoff = 1.0
    for attempt in range(max_retries):
        try:
            resp = _SESSION.get(url, headers=headers, timeout=timeout)
        except requests.RequestException:
            # network hiccup → backoff and retry
            time.sleep(backoff + random.random())
            backoff = min(backoff * 2, 30)
            continue

        # 404: missing commit (deleted fork, etc.)
        if resp.status_code == 404:
            return None

        # Hard rate limit: remaining==0 → sleep until reset, then retry
        if resp.status_code == 403:
            remaining = resp.headers.get("X-RateLimit-Remaining")
            reset = resp.headers.get("X-RateLimit-Reset")
            # Secondary rate limit sometimes has Retry-After
            retry_after = resp.headers.get("Retry-After")
            if remaining is not None and remaining.isdigit() and int(remaining) == 0 and reset and reset.isdigit():
                _sleep_until(int(reset))
                continue
            if retry_after and retry_after.isdigit():
                time.sleep(int(retry_after))
                continue
            # Generic 403 (e.g., perms): break and raise
            resp.raise_for_status()

        # Too Many Requests
        if resp.status_code == 429:
            retry_after = resp.headers.get("Retry-After")
            time.sleep(int(retry_after) if retry_after and retry_after.isdigit() else 60)
            continue

        # Transient server errors → backoff
        if resp.status_code in (502, 503, 504):
            time.sleep(backoff + random.random())
            backoff = min(backoff * 2, 30)
            continue

        # Success codes
        if 200 <= resp.status_code < 300:
            # Also check rate-limit headers and pause *after* a successful call
            remaining = resp.headers.get("X-RateLimit-Remaining")
            reset = resp.headers.get("X-RateLimit-Reset")
            if remaining is not None and remaining.isdigit() and int(remaining) == 0 and reset and reset.isdigit():
                # We consumed the last request in the window; sleep before returning
                _sleep_until(int(reset))
            return resp

        # Other errors → raise (will exit loop)
        resp.raise_for_status()

    # If we exit loop without returning/raising, raise generic error
    raise RuntimeError(f"Failed GET after {max_retries} attempts: {url}")

def fetch_commit_json(repo_url: str, sha: str, token: Optional[str] = None, timeout: int = 30) -> dict | None:
    """
    GET the commit JSON from GitHub.
    Returns parsed JSON dict, or None on 404.
    Raises for other HTTP errors after retries.
    """
    api_repo = _to_api_repo_url(repo_url)  # normalize to API base safely
    url = f"{api_repo}/commits/{sha}"
    headers = {
        "Accept": "application/vnd.github+json",
        "User-Agent": "pr-commit-details-fetch/1.1",
    }
    token = token or os.getenv("GITHUB_TOKEN")
    if token:
        headers["Authorization"] = f"Bearer {token}"

    resp = _request_with_rate_limit(url, headers=headers, timeout=timeout, max_retries=5)
    if resp is None:  # 404
        print(f"[404] Commit {sha} not found at {api_repo}", flush=True)
        return None
    return resp.json()

def fix_commits(broken_commit_details_pd: pd.DataFrame) -> pd.DataFrame:
    """
    For each row (expects columns: sha, repo_url, ...), fetch the commit JSON and
    expand into one row per file. Returns a DataFrame with the SAME columns as input.
    """
    token = os.getenv("GITHUB_TOKEN")
    if not token:
        raise ValueError("Error: GITHUB_TOKEN not set in environment")

    # Validate required columns
    required = {"sha", "repo_url"}
    missing = required - set(broken_commit_details_pd.columns)
    if missing:
        raise ValueError(f"Input is missing required columns: {missing}")

    fixed_rows = []
    # faster & safer than iterrows (typed attributes, no dtype coercion)
    for row in broken_commit_details_pd.itertuples(index=False):
        sha = str(getattr(row, "sha"))
        repo_url = str(getattr(row, "repo_url"))

        try:
            commit_json = fetch_commit_json(repo_url, sha, token=token)
        except Exception as e:
            # Log and continue; do not halt the whole batch
            print(f"[error] fetch_commit_json failed for {repo_url} {sha}: {e}", flush=True)
            continue

        if not commit_json:
            continue

        files = (commit_json.get("files") or [])
        if not files:
            # likely a merge commit; nothing to expand
            continue

        # Build a base dict from the row (all original columns)
        base = {col: getattr(row, col) for col in broken_commit_details_pd.columns}

        for f in files:
            rec = base.copy()
            rec["filename"]  = f.get("filename")
            rec["status"]    = f.get("status")
            rec["additions"] = int(f.get("additions") or 0)
            rec["deletions"] = int(f.get("deletions") or 0)
            rec["changes"]   = int(f.get("changes")  or 0)
            rec["patch"]     = f.get("patch")
            fixed_rows.append(rec)

    return pd.DataFrame(fixed_rows, columns=broken_commit_details_pd.columns)


In [28]:
from pathlib import Path

CSV_PATH = "fixed_commits.csv"
BATCH_SIZE = 100

pr_core = pr_df.loc[:, ["id", "repo_url"]].drop_duplicates("id")

# left-join by PR id → adds repo_url to each file row
pcd_with_repo = (
    pr_commit_details_df
    .merge(pr_core, left_on="pr_id", right_on="id", how="left", validate="many_to_one")
    .drop(columns=["id"])  # drop the duplicate PR id from pr_df
)
todo = pcd_with_repo[pcd_with_repo["filename"].isna() & (pcd_with_repo["commit_stats_total"]>0)].drop_duplicates(subset=['sha', 'pr_id'])

if os.path.exists(CSV_PATH) and os.path.getsize(CSV_PATH) > 0:
    done = pd.read_csv(CSV_PATH)
    print(len(done))
    # Build a simple key for set-membership; normalize to string to avoid dtype mismatches
    done_keys = set(done["pr_id"].astype(str) + "|" + done["sha"].astype(str))
    todo_key  = (todo["pr_id"].astype(str) + "|" + todo["sha"].astype(str))
    todo      = todo.loc[~todo_key.isin(done_keys)].copy()

print(f"Unique commits to process: {len(todo):,}")

# 4) Helper to append without losing header on first write
def _append_csv(df: pd.DataFrame, path: str) -> None:
    exists = os.path.exists(path) and os.path.getsize(path) > 0
    df.to_csv(path, mode=("a" if exists else "w"), header=not exists, index=False)

# 5) Stream through in batches of 100 and save progress each time
processed = 0
for start in range(0, len(todo), BATCH_SIZE):
    batch = todo.iloc[start:start + BATCH_SIZE].copy()

    # Optional guard: skip rows without a usable repo_url
    if "repo_url" in batch.columns:
        batch = batch[batch["repo_url"].notna() & (batch["repo_url"].astype(str).str.len() > 0)]
    if batch.empty:
        continue

    # Your rate-limit–aware function that expands rows one-per-file
    fixed = fix_commits(batch)

    if not fixed.empty:
        _append_csv(fixed, CSV_PATH)

    processed += len(batch)
    print(f"[progress] {processed:,}/{len(todo):,} commits processed; wrote {len(fixed):,} rows to {CSV_PATH}")

print("Done. Re-run any time; it will skip (pr_id, sha) pairs already in the CSV.")

181511
Unique commits to process: 0
Done. Re-run any time; it will skip (pr_id, sha) pairs already in the CSV.
