In [7]:
!pip install pandas python-dotenv
!python -m pip install google-genai==1.53.0
import json
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Tuple
import pandas as pd
from google import genai
from google.genai import types
import os
from dotenv import load_dotenv, find_dotenv
import time
load_dotenv(find_dotenv()) 


1063.51s - pydevd: Sending message related to process being replaced timed-out after 5 seconds
1069.25s - pydevd: Sending message related to process being replaced timed-out after 5 seconds


True

In [8]:
schema = {
    "type": "object",
    "properties": {
        "validation_present": {
            "type": "boolean"
        },
        "evidence_sources": {
            "type": "array",
            "items": {
                "type": "string",
                "enum": ["pipeline", "description", "comments"]
            }
        },
        "validation_type": {
            "type": "string",
            "enum": [
                "benchmark",
                "profiling",
                "static-analysis",
                "anecdotal",
            ]
        },
        "validation_description": {
            "type": "string"
        },
        "pipeline_signal": {
            "type": "string"
        },
        "description_signal": {
            "type": "string"
        },
        "comment_signal": {
            "type": "string"
        }
    },
    "required": [
        "validation_present",
        "evidence_sources",
        "validation_type",
        "validation_description",
        "pipeline_signal",
        "description_signal",
        "comment_signal"
    ]
}

In [9]:
counter = 0  

def run_llm(prompt: str,user_prompt: str) -> str:
    model = "gemini-3-pro-preview"
    print("Running GEMINI, model:", model)
        
    GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
    GEMINI_API_KEY_2 = os.getenv("GEMINI_API_KEY_2")
    client = genai.Client(api_key=GEMINI_API_KEY)
    client_2 = genai.Client(api_key=GEMINI_API_KEY_2)

        
    config = types.GenerateContentConfig(
            temperature=0.0,
            response_mime_type="application/json",
            response_schema=schema,
            thinking_config=types.ThinkingConfig(
                thinking_level=types.ThinkingLevel.HIGH
            ),
            system_instruction=prompt
    )
    
    try:
        response = None    
        if counter >= 205:   
            response = client_2.models.generate_content(
                model=model,
                contents=user_prompt,
                config=config,
        )
        else:
            response = client.models.generate_content(
                model=model,
                contents=user_prompt,
                config=config,
        )  
        return response.text
    except Exception as e:
        raise e
        
   

In [10]:
def find_datasets_dir(start: Optional[Path] = None) -> Path:
    start = start or Path.cwd()
    for path in (start, *start.parents):
        candidate = path / "datasets"
        if candidate.exists():
            return candidate
    raise FileNotFoundError(f"Could not find 'datasets' directory from {start}")


DATASETS_DIR = find_datasets_dir()
PROJECT_ROOT = DATASETS_DIR.parent

def extract_json(text: str) -> Dict:
    """Best-effort JSON extraction from model output."""
    start = text.find("{")
    end = text.rfind("}")
    if start != -1 and end != -1 and end > start:
        snippet = text[start : end + 1]
        try:
            return json.loads(snippet)
        except json.JSONDecodeError:
            pass
    return {}

def truncate(text: str, limit: int = 10000) -> str:
    return text if len(text) <= limit else text[:limit] + "...[truncated]"

def load_pr_core(prefix: str) -> pd.DataFrame:
    commits = pd.read_parquet(
        DATASETS_DIR / f"{prefix}_pr" / f"{prefix}_pr_commits.parquet"
    )
    return commits.drop_duplicates("pr_id").set_index("pr_id")


def collect_comments(prefix: str, pr_id: int) -> List[str]:
    issue = pd.read_parquet(
        DATASETS_DIR / f"{prefix}_pr" / f"{prefix}_pr_issue_comments.parquet"
    )
    review = pd.read_parquet(
        DATASETS_DIR / f"{prefix}_pr" / f"{prefix}_pr_review_comments.parquet"
    )
    texts = []
    for df in (issue, review):
        subset = df[df["pr_id"] == pr_id]
        texts.extend(subset["body"].dropna().tolist())
    return texts


def collect_pipeline_names(prefix: str, pr_id: int) -> List[str]:
    workflows = pd.read_parquet(
        DATASETS_DIR / f"{prefix}_pr" / f"{prefix}_pr_workflow_runs.parquet"
    )
    subset = workflows[workflows["pr_id"] == pr_id]
    return sorted(subset["workflow_name"].dropna().unique().tolist())

def pr_ids_from_commits(prefix: str, limit: Optional[int] = None) -> Iterable[int]:
    commits = pd.read_parquet(
        DATASETS_DIR / f"{prefix}_pr" / f"{prefix}_pr_commits.parquet"
    )
    pr_ids = sorted(commits["pr_id"].dropna().astype(int).unique().tolist())
    return pr_ids if limit is None else pr_ids[:limit]


Generate by begining of file

In [11]:
def analyze_pr(
    prefix: str,
    pr_id: int,
    author_type: str,
    pr_core: pd.DataFrame,
) -> Tuple[Dict, Dict]:
    row = pr_core.loc[pr_id]
    pipeline_names = collect_pipeline_names(prefix, pr_id)
    comments = collect_comments(prefix, pr_id)
    description = (row.get("pr_description") or "").strip()
    code_diff = (row.get("patch") or "").strip()

    DEVELOPER_PROMPT_TEMPLATE = """
    You are a classifier for performance validation evidence in GitHub Pull Requests (PRs).

    Your job is to decide whether there is explicit performance validation, and if so,
    to classify it into EXACTLY ONE validation_type category based on the definitions below.

    First, carefully read and internalize these validation_type categories:

    1. Benchmark-Based Validation (Unit Tests or Microbenchmarks)
    Definition:
    The PR validates performance by running benchmark tests—either existing unit tests or newly added benchmark/microbenchmark tests. It includes explicit before-and-after comparisons such as runtime, throughput, memory, CPU usage, or any quantitative metric collected from tests.

    2. Profiling-Based Validation (Application- or Function-Level Profiling)
    Definition:
    The PR uses profiling tools to validate performance, typically capturing stack samples, CPU hotspots, flamegraphs, or function-level timings. Evidence includes profiling outputs before and after the change.

    3. Static-Analysis-Based Validation (Reasoning Without Runtime Evidence)
    Definition:
    The PR argues for performance improvement through static reasoning about the code—algorithmic complexity, data-structure changes, loop bounds, allocation count reduction, etc., without providing runtime/profiling data.

    4. Anecdotal or Informal Local Testing (No Evidence Provided)
    Definition:
    The PR claims that performance is improved based on local testing, intuition, or manual observation, but provides no quantitative metrics, no profiling output, and no static-analysis justification.

    Decision Rules:

    1) First decide if performance validation is explicitly present (validation_present).
    - Set validation_present = TRUE only when the PR explicitly shows some form of validation evidence:
        benchmarks/microbenchmarks, profiling traces, static performance reasoning,
        or explicit local testing statements.
    - Do NOT infer validation from performance intent alone.

    2) If validation_present = FALSE:
    - Set validation_type = "none".
    - Set evidence_sources = [].
    - Set validation_description to explain the absence of validation.
    - Never assign any other validation_type.
    
    3) If validation_present = TRUE:
    - Choose exactly ONE non-"none" validation_type from the following: Benchmark-Based Validation; Profiling-Based Validation; Static-Analysis-Based Validation; Anecdotal or Informal Local Testing

    4) evidence_sources must list where the validation is explicitly mentioned:
    - "pipeline", "description", "comments", "code_diff"

    5) When validation_type is "benchmark", "profiling":
    mention the metrics used (latency, throughput, memory, CPU, etc.).

    You must ALWAYS return STRICT JSON with exactly these keys:

    validation_present, evidence_sources, validation_type, metrics,
    validation_description, pipeline_signal,
    description_signal, comment_signal.

    No extra commentary. No markdown.
    No explanations.
    """

    USER_PROMPT_TEMPLATE = """
    Classify the following PR strictly using the rules and definitions from the system.

    PIPELINES:
    {pipeline_names}

    DESCRIPTION:
    {description}

    COMMENTS:
    {comments}
    
    CODE DIFF:
    {code_diff}
    """

    empty_record = {
        "pr_id": pr_id,
        "author_type": author_type,
        "repo": f"{row.get('repo_owner')}/{row.get('repo_name')}",
        "pr_number": row.get("pr_number"),
        "pr_title": row.get("pr_title"),
        "pipeline_names": pipeline_names,
        "validation_present": False,
        "evidence_sources": [],
        "validation_type": "none",
        "validation_description": "No validation evidence",
        "pipeline_signal": "",
        "description_signal": "",
        "comment_signal": "",
    }

    if not pipeline_names and not description and not comments and not code_diff:
        print(f"Short-circuiting PR {pr_id} with no signals")
        return empty_record

    prompt = USER_PROMPT_TEMPLATE.format(
        pipeline_names="- " + "- ".join(pipeline_names) if pipeline_names else "None",
        description=truncate(description) if description else "None",
        comments="- " + "- ".join(truncate(" | ".join(comments)).split(" | ")) if comments else "None",
        code_diff=truncate(code_diff) if code_diff else "None",
    )

    developer_prompt = DEVELOPER_PROMPT_TEMPLATE

    raw = ""
    try:
        raw = run_llm(
            prompt=developer_prompt,
            user_prompt=prompt,
        )
    except Exception as exc:
        raw = ""
        print(f"gemini model call failed for PR {pr_id}: {exc}")
        raise exc
        

    parsed_llm = extract_json(raw) or {}

    evidence_sources = parsed_llm.get("evidence_sources") or []
    if isinstance(evidence_sources, (tuple, list)):
        evidence_sources = list(evidence_sources)
    else:
        evidence_sources = []

    result = {
        "pr_id": pr_id,
        "author_type": author_type,
        "repo": f"{row.get('repo_owner')}/{row.get('repo_name')}",
        "pr_number": row.get("pr_number"),
        "pr_title": row.get("pr_title"),
        "pipeline_names": pipeline_names,
        "validation_present": parsed_llm.get("validation_present", False),
        "evidence_sources": evidence_sources,
        "validation_type": parsed_llm.get("validation_type", "none"),
        "validation_description": parsed_llm.get("validation_description", "No validation evidence"),
        "pipeline_signal": parsed_llm.get("pipeline_signal", ""),
        "description_signal": parsed_llm.get("description_signal", ""),
        "comment_signal": parsed_llm.get("comment_signal", ""),
    }

    return result

Resume from start point

In [None]:
import pandas as pd
from pathlib import Path

out_dir = PROJECT_ROOT / "RQ3"
out_dir.mkdir(exist_ok=True, parents=True)

out_path_gemini = out_dir / "rq3_validation_evidence_gemini.parquet"
error_csv_path = out_dir / "rq3_validation_errors_gemini.csv"

records_gemini = []
processed = set()

ai_core = load_pr_core("ai")
human_core = load_pr_core("human")

limit = None

# ============================
# Resume positions (1-based)
# ============================
start_ai_pos = 171    
start_human_pos = 1    

ai_start_idx = max(start_ai_pos - 1, 0)
human_start_idx = max(start_human_pos - 1, 0)

# ============================
# Helpers
# ============================
def save_partial(records, out_path):
    """Save a partial parquet snapshot of current records."""
    if not records:
        return
    df_tmp = pd.DataFrame(records)
    df_tmp.to_parquet(out_path, index=False)
    print(f"[partial save] Saved {len(df_tmp)} rows to {out_path}")

def log_error(pr_id, prefix, author_type, exc):
    """Append an error row to the CSV log."""
    row = {
        "prefix": prefix,
        "pr_id": pr_id,
        "author_type": author_type,
        "error": str(exc),
    }
    df_err = pd.DataFrame([row])
    header = not error_csv_path.exists()
    # Append mode: this does NOT delete existing CSV content
    df_err.to_csv(error_csv_path, mode="a", header=header, index=False)
    print(f"[error] Logged PR {pr_id} ({prefix}/{author_type}) to {error_csv_path}: {exc}")

def _merge_record(prefix, pr_id, author_type, res_dict):
    """
    Merge metadata with the model result without overwriting metadata keys.
    Ensures prefix/pr_id/author_type are present in the saved parquet.
    """
    base = {"prefix": prefix, "pr_id": pr_id, "author_type": author_type}
    if isinstance(res_dict, dict):
        for k, v in res_dict.items():
            if k not in base:
                base[k] = v
    return base

def is_gemini_overloaded_error(exc: Exception) -> bool:
    """Detect transient Gemini overload errors that are retryable once."""
    msg = str(exc).lower()
    return ("503" in msg) or ("unavailable" in msg) or ("overloaded" in msg)

def analyze_pr_with_one_retry(prefix: str, pr_id: int, author_type: str, core):
    """
    Try analyze_pr once.
    If it fails due to 503/UNAVAILABLE/overloaded, wait 5s and retry exactly one more time.
    If it fails again, re-raise so the caller logs it to CSV.
    """
    try:
        return analyze_pr(prefix, pr_id, author_type, core)
    except Exception as exc:
        if is_gemini_overloaded_error(exc):
            print(f"[retry] Gemini overloaded for PR {pr_id}. Waiting 5s before retry...")
            time.sleep(5)
            print(f"[retry] Retrying now for PR {pr_id}...")
            return analyze_pr(prefix, pr_id, author_type, core)
        raise

# ============================
# Load existing partial (recommended)
# ============================
if out_path_gemini.exists():
    try:
        df_prev = pd.read_parquet(out_path_gemini)

        # Keep existing records in memory so new saves include old + new
        records_gemini = df_prev.to_dict("records")

        # Build processed as (prefix, pr_id) tuples if possible
        if {"prefix", "pr_id"}.issubset(df_prev.columns):
            processed = set(
                zip(
                    df_prev["prefix"].astype(str),
                    df_prev["pr_id"].astype(int)
                )
            )
        else:
            # If old parquet doesn't have prefix, we do NOT build a generic int-based
            # processed set to avoid accidental skipping.
            processed = set()

        print(f"[resume] Loaded {len(records_gemini)} existing records from {out_path_gemini}")
        print(f"[resume] Processed keys loaded: {len(processed)}")

    except Exception as exc:
        # Safer behavior: refuse to continue if we cannot read the parquet
        raise RuntimeError(f"Parquet read failed; refusing to overwrite. Reason: {exc}")

# ============================
# Build ID lists
# ============================
ai_ids = list(pr_ids_from_commits("ai", limit=limit))
human_ids = list(pr_ids_from_commits("human", limit=limit))

print(f"Processing {len(ai_ids)} AI PRs and {len(human_ids)} human PRs (first {limit} each).")
print(f"[resume] AI starting at position {start_ai_pos} (slice index {ai_start_idx})")
print(f"[resume] Human starting at position {start_human_pos} (slice index {human_start_idx})")

# Slice the lists to start from the requested positions
ai_ids_to_process = ai_ids[ai_start_idx:]
human_ids_to_process = human_ids[human_start_idx:]

# ============================
# Process AI PRs
# ============================
for idx, pr_id in enumerate(ai_ids_to_process, start=start_ai_pos):

    # Skip if already processed
    if ("ai", pr_id) in processed:
        # Uncomment if you want explicit skip logs
        # print(f"[skip] AI PR already processed: {pr_id}")
        continue

    # Print only when we are actually going to process it
    print(f"Processing AI PR {idx}/{len(ai_ids)}: {pr_id}")

    try:
        gemini_res = analyze_pr_with_one_retry("ai", pr_id, "ai_agent", ai_core)
        record = _merge_record("ai", pr_id, "ai_agent", gemini_res)
        records_gemini.append(record)
        processed.add(("ai", pr_id))
    except Exception as exc:
        log_error(pr_id, prefix="ai", author_type="ai_agent", exc=exc)

    # Save every 10 total records currently in memory
    if idx % 10 == 0:
        save_partial(records_gemini, out_path_gemini)

# ============================
# Process Human PRs
# ============================
for idx, pr_id in enumerate(human_ids_to_process, start=start_human_pos):

    # Skip if already processed
    if ("human", pr_id) in processed:
        # Uncomment if you want explicit skip logs
        # print(f"[skip] Human PR already processed: {pr_id}")
        continue

    # Print only when we are actually going to process it
    print(f"Processing human PR {idx}/{len(human_ids)}: {pr_id}")

    try:
        gemini_res = analyze_pr_with_one_retry("human", pr_id, "human", human_core)
        record = _merge_record("human", pr_id, "human", gemini_res)
        records_gemini.append(record)
        processed.add(("human", pr_id))
    except Exception as exc:
        log_error(pr_id, prefix="human", author_type="human", exc=exc)

    # Save every 10 total records currently in memory
    if idx % 10 == 0:
        save_partial(records_gemini, out_path_gemini)

# ============================
# Final save
# ============================
df_gemini = pd.DataFrame(records_gemini)
df_gemini.to_parquet(out_path_gemini, index=False)
print(f"Saved FINAL GEMINI {len(df_gemini)} rows to {out_path_gemini}")
print(f"Errored PRs (if any) logged to {error_csv_path}")

Generate by id erros log

In [12]:
import pandas as pd

out_dir = PROJECT_ROOT / "RQ3"
out_dir.mkdir(exist_ok=True, parents=True)

out_path_gemini = out_dir / "rq3_validation_evidence_gemini.parquet"
out_path_gemini_partial = out_dir / "rq3_validation_evidence_gemini_reprocess.partial.parquet"

error_csv_path = out_dir / "rq3_validation_errors.csv"

df = pd.read_csv("/Users/antoniozhong/Documents/dev/purdue/MSR2026/github_perf_patch_study/RQ3/rq3_validation_errors_gemini.csv")
ids = df["pr_id"].tolist()

records_gemini = []
ai_core = load_pr_core("ai")
human_core = load_pr_core("human")

def save_partial(records, out_path):
    if not records:
        return
    df_tmp = pd.DataFrame(records)
    df_tmp.to_parquet(out_path, index=False)
    print(f"[partial save] Saved {len(df_tmp)} rows to {out_path}")

def log_error(pr_id, prefix, author_type, exc):
    row = {
        "prefix": prefix,
        "pr_id": pr_id,
        "author_type": author_type,
        "error": str(exc),
    }
    df_err = pd.DataFrame([row])
    header = not error_csv_path.exists()
    df_err.to_csv(error_csv_path, mode="a", header=header, index=False)
    print(f"[error] Logged PR {pr_id} ({prefix}/{author_type}) to {error_csv_path}: {exc}")

def resolve_target(pr_id: int):
    if pr_id in ai_core.index:
        return "ai", "ai_agent", ai_core
    if pr_id in human_core.index:
        return "human", "human", human_core

    try:
        pid = int(pr_id)
        if pid in ai_core.index:
            return "ai", "ai_agent", ai_core
        if pid in human_core.index:
            return "human", "human", human_core
    except Exception:
        pass

    raise KeyError(f"PR {pr_id} not found in ai_core or human_core")

print(f"Re-processing {len(ids)} PRs (Gemini only).")

processed_ok = 0

for idx, pr_id in enumerate(ids, 1):
    try:
        prefix, author_type, core = resolve_target(pr_id)
        print(f"Processing [{idx}/{len(ids)}] {prefix} PR: {pr_id}")

        gemini_res = analyze_pr(prefix, pr_id, author_type, core)

        records_gemini.append(gemini_res)
        processed_ok += 1

        # GUARDA CADA ÉXITO
        save_partial(records_gemini, out_path_gemini_partial)

    except Exception as exc:
        try:
            prefix, author_type, _ = resolve_target(pr_id)
        except Exception:
            prefix, author_type = "unknown", "unknown"
        log_error(pr_id, prefix=prefix, author_type=author_type, exc=exc)

# Construye new_df desde lo que tienes en memoria (o del archivo partial si prefieres)
new_df = pd.DataFrame(records_gemini)

if new_df.empty:
    print("[final] No new Gemini records to merge.")
    print(f"Errored PRs (if any) logged to {error_csv_path}")
else:
    if "pr_id" in new_df.columns:
        try:
            new_df["pr_id"] = new_df["pr_id"].astype("int64")
        except Exception:
            new_df["pr_id"] = pd.to_numeric(new_df["pr_id"], errors="coerce").astype("Int64")

    ids_set = set(int(x) for x in ids)

    if out_path_gemini.exists():
        old_df = pd.read_parquet(out_path_gemini)

        if "pr_id" in old_df.columns and "pr_id" in new_df.columns:
            try:
                old_df["pr_id"] = old_df["pr_id"].astype("int64")
            except Exception:
                old_df["pr_id"] = pd.to_numeric(old_df["pr_id"], errors="coerce").astype("Int64")

            old_df = old_df[~old_df["pr_id"].isin(ids_set)]
            final_df = pd.concat([old_df, new_df], ignore_index=True)
        else:
            final_df = pd.concat([old_df, new_df], ignore_index=True)
    else:
        final_df = new_df

    final_df.to_parquet(out_path_gemini, index=False)
    print(f"Saved MERGED GEMINI {len(final_df)} rows to {out_path_gemini}")
    print(f"Errored PRs (if any) logged to {error_csv_path}")

Re-processing 274 PRs (Gemini only).
Processing [1/274] ai PR: 2843312341
Running GEMINI, model: gemini-3-pro-preview
gemini model call failed for PR 2843312341: 503 UNAVAILABLE. {'error': {'code': 503, 'message': 'Deadline expired before operation could complete.', 'status': 'UNAVAILABLE'}}
[error] Logged PR 2843312341 (ai/ai_agent) to /Users/antoniozhong/Documents/dev/purdue/MSR2026/github_perf_patch_study/RQ3/rq3_validation_errors.csv: 503 UNAVAILABLE. {'error': {'code': 503, 'message': 'Deadline expired before operation could complete.', 'status': 'UNAVAILABLE'}}
Processing [2/274] ai PR: 2843334531
Running GEMINI, model: gemini-3-pro-preview
gemini model call failed for PR 2843334531: 503 UNAVAILABLE. {'error': {'code': 503, 'message': 'The model is overloaded. Please try again later.', 'status': 'UNAVAILABLE'}}
[error] Logged PR 2843334531 (ai/ai_agent) to /Users/antoniozhong/Documents/dev/purdue/MSR2026/github_perf_patch_study/RQ3/rq3_validation_errors.csv: 503 UNAVAILABLE. {'er

KeyboardInterrupt: 