Finding 709 Defects in 258 Projects: An Experience Report onApplying CodeQL to Open-Source Embedded Software(Experience Paper):

34% false postive security issues from codeql


An Empirical Study of Static Analysis Tools for Secure Code Review


AgenticSCR: An Autonomous Agentic Secure Code Review for Immature Vulnerabilities Detection:

CodeQL 2.23.3

now we are using codeql v2.24.0

In [1]:
import json
from urllib.parse import urlparse

def extract_security_results_with_locations(sarif_path: str):
    with open(sarif_path, "r", encoding="utf-8") as f:
        sarif = json.load(f)

    security_rule_ids = set()
    rule_map = {}

    # ---- collect ALL security rules ----
    for run in sarif.get("runs", []):
        rules = run.get("tool", {}).get("driver", {}).get("rules", [])
        for rule in rules:
            properties = rule.get("properties", {}) or {}
            tags = properties.get("tags", []) or []

            if "security" in tags:
                rule_id = rule.get("id")
                if rule_id:
                    security_rule_ids.add(rule_id)

                    rule_map[rule_id] = {
                        "rule_id": rule_id,
                        "name": rule.get("name"),
                        "short_description": rule.get("shortDescription", {}).get("text"),
                        "full_description": rule.get("fullDescription", {}).get("text"),
                        "tags": tags,
                        "precision": properties.get("precision"),
                        "problem_severity": properties.get("problem.severity"),
                        "security_severity": properties.get("security-severity"),
                        "sub_severity": properties.get("sub-severity"),
                        "cwe": [
                            t for t in tags if t.startswith("external/cwe/")
                        ],
                    }

    security_instances = []

    # ---- collect result instances ----
    for run in sarif.get("runs", []):
        for r in run.get("results", []) or []:
            rule_id = r.get("ruleId") or (r.get("rule") or {}).get("id")
            if rule_id not in security_rule_ids:
                continue

            message = (r.get("message") or {}).get("text")
            level = r.get("level")

            for loc in r.get("locations", []) or []:
                pl = loc.get("physicalLocation") or {}
                artifact = (pl.get("artifactLocation") or {}).get("uri")
                region = pl.get("region") or {}

                if not artifact:
                    continue

                # normalize path
                if artifact.startswith("file://"):
                    artifact = urlparse(artifact).path

                security_instances.append({
                    # ---- instance-level ----
                    "rule_id": rule_id,
                    "message": message,
                    "level": level,
                    "file": artifact,
                    "start_line": region.get("startLine"),
                    "end_line": region.get("endLine"),
                    "start_column": region.get("startColumn"),
                    "end_column": region.get("endColumn"),

                    # ---- rule-level (full copy) ----
                    "rule": rule_map.get(rule_id),
                })

    return security_instances

In [2]:
import os
import glob
import json
from collections import defaultdict

def scan_and_save_security_issues(
    report_root="codeql-reports",
    output_root="security-issues",
):
    os.makedirs(output_root, exist_ok=True)

    sarif_files = glob.glob(
        f"{report_root}/**/*.sarif",
        recursive=True
    )

    print(f"üîç Found {len(sarif_files)} SARIF files")

    saved = 0
    skipped = 0

    for sarif_path in sarif_files:
        sec_instances = extract_security_results_with_locations(sarif_path)

        if not sec_instances:
            skipped += 1
            continue

        # ---- mirror directory structure ----
        rel_path = os.path.relpath(sarif_path, report_root)
        # e.g. BOINC_boinc/PR_1/python_security_report.sarif

        rel_dir = os.path.dirname(rel_path)
        out_dir = os.path.join(output_root, rel_dir)
        os.makedirs(out_dir, exist_ok=True)

        out_path = os.path.join(out_dir, "security_issues.json")

        with open(out_path, "w", encoding="utf-8") as f:
            json.dump(
                {
                    "sarif_path": sarif_path,
                    "security_issue_count": len(sec_instances),
                    "issues": sec_instances,
                },
                f,
                indent=2,
                ensure_ascii=False,
            )

        print(f"üíæ Saved {len(sec_instances)} issues ‚Üí {out_path}")
        saved += 1

    print("\n================ Summary ================")
    print(f"‚úÖ Saved PRs with security issues : {saved}")
    print(f"‚è≠Ô∏è  PRs without security issues   : {skipped}")
    print(f"üì¶ Total SARIF files scanned      : {len(sarif_files)}")

In [3]:
scan_and_save_security_issues(
    report_root="codeql-reports",
    output_root="security-issues",
)

üîç Found 162 SARIF files
üíæ Saved 16 issues ‚Üí security-issues/514-labs_moose/PR_1/security_issues.json
üíæ Saved 24 issues ‚Üí security-issues/514-labs_moose/PR_2/security_issues.json
üíæ Saved 21 issues ‚Üí security-issues/514-labs_moose/PR_5/security_issues.json
üíæ Saved 21 issues ‚Üí security-issues/514-labs_moose/PR_4/security_issues.json
üíæ Saved 21 issues ‚Üí security-issues/514-labs_moose/PR_3/security_issues.json
üíæ Saved 22 issues ‚Üí security-issues/AgentDock_AgentDock/PR_1/security_issues.json
üíæ Saved 22 issues ‚Üí security-issues/AgentDock_AgentDock/PR_2/security_issues.json
üíæ Saved 303 issues ‚Üí security-issues/Azure_azure-sdk-for-js/PR_6/security_issues.json
üíæ Saved 303 issues ‚Üí security-issues/Azure_azure-sdk-for-js/PR_1/security_issues.json
üíæ Saved 306 issues ‚Üí security-issues/Azure_azure-sdk-for-js/PR_7/security_issues.json
üíæ Saved 303 issues ‚Üí security-issues/Azure_azure-sdk-for-js/PR_2/security_issues.json
üíæ Saved 303 issues ‚Üí

In [5]:
# ========= imports =========
import os
import json
import glob
import subprocess
import shutil
import re
from collections import defaultdict
from urllib.parse import urlparse
import pandas as pd


# ========= SARIF parsing (WITH end_line + rule info) =========
def parse_sarif_findings(sarif_path: str) -> list[dict]:
    with open(sarif_path, "r", encoding="utf-8") as f:
        sarif = json.load(f)

    rule_map = {}
    for run in sarif.get("runs", []):
        for rule in run.get("tool", {}).get("driver", {}).get("rules", []):
            rid = rule.get("id")
            if rid:
                rule_map[rid] = rule

    findings = []

    for run in sarif.get("runs", []):
        for r in run.get("results", []) or []:
            rule_id = r.get("ruleId") or (r.get("rule") or {}).get("id")
            if not rule_id:
                continue

            message = (r.get("message") or {}).get("text", "")
            level = r.get("level")

            for loc in r.get("locations", []) or []:
                pl = loc.get("physicalLocation") or {}
                artifact = (pl.get("artifactLocation") or {}).get("uri")
                region = pl.get("region") or {}

                start_line = region.get("startLine")
                end_line = region.get("endLine")

                if not artifact or not start_line:
                    continue

                findings.append({
                    "rule_id": rule_id,
                    "message": message,
                    "level": level,
                    "file": artifact,
                    "start_line": int(start_line),
                    "end_line": int(end_line) if end_line else None,
                    "rule": rule_map.get(rule_id),
                })

    return findings


# ========= path normalization =========
def normalize_sarif_path(uri: str, repo_root_abs: str) -> str:
    if uri.startswith("file://"):
        path = urlparse(uri).path
    else:
        path = uri

    path = os.path.normpath(path).lstrip("./")

    if os.path.isabs(path):
        repo_root_abs = os.path.normpath(repo_root_abs)
        if path.startswith(repo_root_abs):
            path = os.path.relpath(path, repo_root_abs)

    return path.lstrip("./")


# ========= introduced-by-PR marking (RANGE-AWARE) =========
def mark_introduced(findings, added_lines_by_file, repo_root_abs):
    out = []

    for f in findings:
        file_path = normalize_sarif_path(f["file"], repo_root_abs)

        start = f["start_line"]
        end = f["end_line"] or start  # üîë ÂçïË°åÈóÆÈ¢ò fallback

        added_lines = added_lines_by_file.get(file_path, set())

        introduced = any(start <= l <= end for l in added_lines)

        f2 = dict(f)
        f2["normalized_file"] = file_path
        f2["introduced_by_pr"] = introduced
        out.append(f2)

    return out


# ========= diff parsing =========
HUNK_RE = re.compile(r"^\@\@ -\d+(?:,\d+)? \+(\d+)(?:,(\d+))? \@\@")

def get_added_lines_by_file(repo_dir: str, merge_sha: str) -> dict[str, set[int]]:
    cmd = ["git", "diff", "-U0", f"{merge_sha}^1", merge_sha]
    p = subprocess.run(
        cmd,
        cwd=repo_dir,
        capture_output=True,
        text=True,
        check=True
    )

    added = defaultdict(set)
    current_file = None
    new_line = None

    for line in p.stdout.splitlines():
        if line.startswith("+++ b/"):
            current_file = line[len("+++ b/"):].strip().lstrip("./")
            new_line = None
            continue

        m = HUNK_RE.match(line)
        if m:
            new_line = int(m.group(1))
            continue

        if current_file is None or new_line is None:
            continue

        if line.startswith("+") and not line.startswith("+++"):
            added[current_file].add(new_line)
            new_line += 1
        elif line.startswith("-") and not line.startswith("---"):
            continue
        else:
            new_line += 1

    return dict(added)


# ========= helpers =========
def api_to_clone_url(api_url: str) -> str:
    return api_url.replace("https://api.github.com/repos/", "https://github.com/")


def repo_folder_to_api_url(repo_folder: str) -> str:
    owner, repo = repo_folder.split("_", 1)
    return f"https://api.github.com/repos/{owner}/{repo}"


def write_jsonl(path: str, rows: list[dict]):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    with open(path, "w", encoding="utf-8") as f:
        for r in rows:
            f.write(json.dumps(r, ensure_ascii=False) + "\n")

# ========= CONFIG =========
SECURITY_ISSUES_BASE = "security-issues"
OUTPUT_BASE = "issues-introduced-by-agent"
TMP_REPO = "_tmp_repo"

os.makedirs(OUTPUT_BASE, exist_ok=True)


# ========= MAIN LOOP =========

import json

with open("../../AI-Code-Characteristic/artifacts/repo_pr_records_clean.json", "r") as f:
    repo_pr_records_clean = json.load(f)
    
for repo_folder in sorted(os.listdir(SECURITY_ISSUES_BASE)):
    repo_path = os.path.join(SECURITY_ISSUES_BASE, repo_folder)
    if not os.path.isdir(repo_path):
        continue

    print(f"\nüì¶ Repo: {repo_folder}")

    api_repo = repo_folder_to_api_url(repo_folder)
    if api_repo not in repo_pr_records_clean:
        print("  ‚ö†Ô∏è Repo not found in records, skipped")
        continue

    clone_url = api_to_clone_url(api_repo)
    pr_records = repo_pr_records_clean[api_repo]

    pr_by_order = {
        p["pr_order_in_repo"]: p
        for p in pr_records
        if p.get("merge_commit_sha")
    }

    for pr_dir in sorted(os.listdir(repo_path)):
        if not pr_dir.startswith("PR_"):
            continue

        pr_order = int(pr_dir.replace("PR_", ""))
        print(f"  üîπ PR {pr_order}")

        if pr_order not in pr_by_order:
            print("    ‚ö†Ô∏è PR not in records, skipped")
            continue

        pr_record = pr_by_order[pr_order]
        merge_sha = pr_record["merge_commit_sha"]

        sec_json_path = os.path.join(
            repo_path, pr_dir, "security_issues.json"
        )
        if not os.path.exists(sec_json_path):
            print("    ‚ö†Ô∏è No security_issues.json, skipped")
            continue

        # ---- load security issues ----
        with open(sec_json_path, "r", encoding="utf-8") as f:
            sec_data = json.load(f)

        issues = sec_data.get("issues", [])
        if not issues:
            print("    ‚ö†Ô∏è Empty issues list, skipped")
            continue

        # ---- checkout repo ----
        if os.path.exists(TMP_REPO):
            shutil.rmtree(TMP_REPO)
        os.makedirs(TMP_REPO, exist_ok=True)

        try:
            subprocess.run(
                f"git init && "
                f"git remote add origin {clone_url} && "
                f"git fetch --depth 2 origin {merge_sha} && "
                f"git checkout {merge_sha}",
                shell=True,
                cwd=TMP_REPO,
                check=True,
                stdout=subprocess.DEVNULL,
                stderr=subprocess.DEVNULL,
            )

            subprocess.run(
                ["git", "rev-parse", f"{merge_sha}^1"],
                cwd=TMP_REPO,
                check=True,
                stdout=subprocess.DEVNULL,
            )

            added_lines = get_added_lines_by_file(TMP_REPO, merge_sha)

            all_marked = []
            all_introduced = []

            for issue in issues:
                file_path = normalize_sarif_path(
                    issue["file"],
                    os.path.abspath(TMP_REPO)
                )

                start = issue.get("start_line")
                end = issue.get("end_line") or start

                introduced = any(
                    start <= l <= end
                    for l in added_lines.get(file_path, set())
                )

                marked = dict(issue)
                marked["normalized_file"] = file_path
                marked["introduced_by_pr"] = introduced

                all_marked.append(marked)
                if introduced:
                    all_introduced.append(marked)

            # ---- save ----
            out_dir = os.path.join(OUTPUT_BASE, repo_folder, pr_dir)
            os.makedirs(out_dir, exist_ok=True)

            write_jsonl(
                os.path.join(out_dir, "security_all.jsonl"),
                all_marked
            )
            write_jsonl(
                os.path.join(out_dir, "security_introduced.jsonl"),
                all_introduced
            )

            summary = {
                "repo": api_repo,
                "repo_folder": repo_folder,
                "pr_order_in_repo": pr_order,
                "pr_html_url": pr_record["html_url"],
                "merge_sha": merge_sha,
                "total_security_issues": len(all_marked),
                "introduced_security_issues": len(all_introduced),
                "pr_author_type": "agent",
            }

            with open(os.path.join(out_dir, "summary.json"), "w") as f:
                json.dump(summary, f, indent=2)

            print(f"    ‚úÖ Saved ({len(all_introduced)} introduced)")

        except Exception as e:
            print(f"    ‚ùå Error: {e}")

        finally:
            if os.path.exists(TMP_REPO):
                shutil.rmtree(TMP_REPO)


üì¶ Repo: 0x80_isolate-package
  üîπ PR 1
    ‚úÖ Saved (0 introduced)
  üîπ PR 2
    ‚úÖ Saved (0 introduced)
  üîπ PR 3
    ‚úÖ Saved (0 introduced)
  üîπ PR 4
    ‚úÖ Saved (0 introduced)

üì¶ Repo: 0xfurai_peekaping
  üîπ PR 1
    ‚úÖ Saved (0 introduced)
  üîπ PR 2
    ‚úÖ Saved (0 introduced)
  üîπ PR 3
    ‚úÖ Saved (0 introduced)
  üîπ PR 4
    ‚úÖ Saved (0 introduced)
  üîπ PR 5
    ‚úÖ Saved (0 introduced)
  üîπ PR 6
    ‚úÖ Saved (0 introduced)
  üîπ PR 7
    ‚úÖ Saved (0 introduced)

üì¶ Repo: 1340691923_ElasticView
  üîπ PR 1
    ‚úÖ Saved (0 introduced)

üì¶ Repo: 4ian_GDevelop
  üîπ PR 1
    ‚úÖ Saved (0 introduced)

üì¶ Repo: 514-labs_moose
  üîπ PR 1
    ‚úÖ Saved (0 introduced)
  üîπ PR 2
    ‚úÖ Saved (0 introduced)
  üîπ PR 3
    ‚úÖ Saved (1 introduced)
  üîπ PR 4
    ‚ö†Ô∏è PR not in records, skipped
  üîπ PR 5
    ‚úÖ Saved (0 introduced)

üì¶ Repo: AFASSoftware_maquette
  üîπ PR 1
    ‚úÖ Saved (0 introduced)

üì¶ Repo: AIDotNet_Thor
 

In [6]:
import os
import json

CODEQL_REPORTS = "codeql-reports"
INTRODUCED_BASE = "issues-introduced-by-agent"

repo_count = 0
pr_count = 0

introduced_issue_count = 0
prs_with_introduced = 0
repos_with_introduced = set()

for repo_folder in os.listdir(CODEQL_REPORTS):
    repo_path = os.path.join(CODEQL_REPORTS, repo_folder)
    if not os.path.isdir(repo_path):
        continue

    repo_count += 1

    for item in os.listdir(repo_path):
        if item.startswith("PR_"):
            pr_count += 1


print("\n========== CodeQL Coverage ==========")
print(f"üì¶ Total repos scanned       : {repo_count}")
print(f"üîÄ Total PRs scanned          : {pr_count}")



print("\n========== Agent-introduced Issues ==========")

import os
import json

BASE = "issues-introduced-by-agent"

total_issues = 0
prs_with_issues = set()
repos_with_issues = set()

for repo in os.listdir(BASE):
    repo_path = os.path.join(BASE, repo)
    if not os.path.isdir(repo_path):
        continue

    for pr in os.listdir(repo_path):
        pr_path = os.path.join(repo_path, pr)
        if not pr.startswith("PR_"):
            continue

        issue_file = os.path.join(pr_path, "security_introduced.jsonl")
        if not os.path.exists(issue_file):
            continue

        with open(issue_file, "r", encoding="utf-8") as f:
            lines = [line for line in f if line.strip()]

        if not lines:
            continue

        issue_count = len(lines)
        total_issues += issue_count
        prs_with_issues.add(f"{repo}/{pr}")
        repos_with_issues.add(repo)

        print(f"‚úÖ {repo}/{pr}: {issue_count} introduced issues")

print(f"üêõ Total introduced issues : {total_issues}")
print(f"üîπ PRs with ‚â•1 issue       : {len(prs_with_issues)}")
print(f"üì¶ Repos with ‚â•1 issue     : {len(repos_with_issues)}")


üì¶ Total repos scanned       : 30
üîÄ Total PRs scanned          : 162

‚úÖ 514-labs_moose/PR_3: 1 introduced issues
‚úÖ CapSoftware_Cap/PR_6: 1 introduced issues
‚úÖ CapSoftware_Cap/PR_4: 29 introduced issues
‚úÖ Codehagen_Badget/PR_8: 6 introduced issues
‚úÖ Codehagen_Badget/PR_3: 3 introduced issues
‚úÖ CitizensFoundation_your-priorities-app/PR_14: 1 introduced issues
‚úÖ BlueWallet_GroundControl/PR_1: 1 introduced issues
üêõ Total introduced issues : 42
üîπ PRs with ‚â•1 issue       : 7
üì¶ Repos with ‚â•1 issue     : 5
