# Install & Imports

In [None]:
# !pip install tree-sitter tree-sitter-c==0.23.4 tree-sitter-cpp==0.23.4 -q
# !pip install openai anthropic -q
# !pip install torch==2.5.1
# !pip install transformers==4.47.0
# !pip install accelerate==1.3.0   # for automatic GPU distribution if using multi-GPU

In [None]:
import os
import re
import json
import time
import argparse
import pandas as pd
from pathlib import Path

Use this to load api keys from files or set in environment otherwise.

In [3]:
with open("creds/openai_api_key.json") as oai_fl:
	os.environ["OPENAI_API_KEY"] = json.load(oai_fl)["api_key"]
with open("creds/anthropic_api_key.json") as ant_fl:
	os.environ["ANTHROPIC_API_KEY"] = json.load(ant_fl)["api_key"]
with open("creds/hf_access_token.json") as hf_fl:
	os.environ["HF_TOKEN"] = json.load(hf_fl)["api_key"]

In [5]:
if "OPENAI_API_KEY" not in os.environ:
    raise ValueError("OPENAI_API_KEY not set")
if "ANTHROPIC_API_KEY" not in os.environ:
    raise ValueError("OPENAI_API_KEY not set")
if "HF_TOKEN" not in os.environ:
    raise ValueError("HF_TOKEN not set")

In [4]:
from agents.normalization_agent import NormalizationAgent
from agents.planning_agent      import PlanningAgent
from agents.context_agent       import ContextAgent
from agents.symbol_backend      import SymbolBackend
from agents.detection_agent	 import DetectionAgent
from agents.validation_agent	 import ValidationAgent
from load_llm import get_tokenizer, get_model, get_pipe
from utils import get_json_file_as_dict, save_dict_as_json, get_json_lines_as_list, save_json_lines

# Vul Detect

#### CWE ID and Names
Prepare CWE ID and Name for record keeping in Norm Results.

In [5]:
cwe_details = pd.read_csv("cwe_details.csv", index_col=False)
cwe_details = cwe_details.set_index("CWE-ID")
cwe_details.index = cwe_details.index.astype(str).str.strip().map(lambda x: f"CWE-{x}")
cwe_details = cwe_details.to_dict(orient="index")

### Prepare Runtime Values
Set the parameters for this experiment. This emulates a cli args interface for easy portability to `.py`.

In [None]:
# parser = argparse.ArgumentParser()
# parser.add_argument("--model_id", type=str, required=True, help="LLM to use. Use huggingface model_id or any of the following [\"gpt-4.1\", \"claude-3-7-sonnet-20250219\", \"claude-3-7-sonnet-latest\"]")
# parser.add_argument("--norm_file", type=str, help="File path to use already normalized file.", default=None)
# parser.add_argument("--run_norm", action="store_true", help="Whether to run normalization agent")
# parser.add_argument("--dataset", type=str, help="Path to the JSON subset for vulnerability detection.", default=None)

# args = parser.parse_args()

args = argparse.Namespace(
    model_id="claude-3-7-sonnet-20250219",
    norm_file=None,
    run_norm=True,
    dataset="random_subset.json"
)

## Normalization Agent

Use this if want to load a previous normalization output. This is a deterministic step so should be same for all models/configurations.

Otherwise execute the cell below to run the Normalization Agent.

In [None]:
norm_agent_otpt = get_json_lines_as_list(args.norm_file)

In [9]:
if not args.norm_file and not args.run_norm:
	raise ValueError("Please provide either --norm_file or pass --run_norm option.")
if args.run_norm and not args.dataset:
    raise ValueError("Please provide --dataset option when using --run_norm.")
if args.norm_file:
    norm_agent_otpt = get_json_file_as_dict(args.norm_file)

if args.run_norm:
    norm = NormalizationAgent()
    norm_agent_otpt = []
    random_subset = get_json_file_as_dict(args.dataset)
    for cwe_id, sample_list in random_subset.items():
        if cwe_id not in cwe_details:
            print(f"Skipping unsupported CWE ID: {cwe_id}")
            continue
        vuln_type = f"{cwe_id} - {cwe_details[cwe_id]["Name"]}"
        for sample in sample_list:
            repo_path = sample["project_repo_path"]
            commit_id = sample["commit_id"]
            func_body = sample["func_body"]
            filepath = sample["filepath"]
            is_vulnerable = sample["is_vulnerable"]

            ext = Path(filepath).suffix[1:]
            if ext in ['c']:
                file_type = "c"
            elif ext in ['cpp', 'cc', 'cxx', 'C']:
                file_type = "cpp"
            else:
                print(f"Skipping unsupported file extension: {ext}")
                continue

            norm_agent_otpt.append(dict(norm_result=dict(
                repo_path=repo_path,
                commit_id=commit_id,
                filepath=filepath,
                is_vulnerable=is_vulnerable,
                **norm.run(func_body, file_type=file_type, vuln_type=vuln_type)
            )))
        os.makedirs(f"{args.model_id.split('/')[-1]}", exist_ok=True)
        save_json_lines(f"{args.model_id.split("/")[-1]}/norm_agent_otpt.jsonl", norm_agent_otpt)

## Load Model

In [None]:
# os.environ['CUDA_VISIBLE_DEVICES'] = "2,3"
pipe = None
if not (args.model_id.startswith("claude") or args.model_id.startswith("gpt")):
	pipe = get_pipe(
		get_model(args.model_id),
		get_tokenizer(args.model_id)
	)
os.makedirs(args.model_id.split("/")[-1], exist_ok=True)

## Panning Agent

In [13]:
def get_plan_with_retry(norm_result, max_retries=3):
	for attempt in range(1, max_retries+1):
		try:
			return plan(
				clean_code=norm_result["src"],
				compact_ast=norm_result["compact_ast"]["ast"]
			)
		except Exception as e:
			print(f"Attempt {attempt} failed: {e}")
			if attempt == max_retries:
				return None
# --------------------- Planning ----------------------
plan = PlanningAgent(model=args.model_id, pipe=pipe)
plan_agent_otpt = []
with open(f"{args.model_id.split("/")[-1]}/plan_agent_otpt.jsonl", "a", encoding="utf-8") as f:
	for idx, norm_otpt in enumerate(norm_agent_otpt):
		plan_result = get_plan_with_retry(norm_otpt["norm_result"])
		if plan_result is None:
			print(f"Planning failed for {idx} - {norm_otpt['norm_result']['repo_path']}, {norm_otpt['norm_result']['commit_id']}, {norm_otpt['norm_result']['filepath']}, {norm_otpt['norm_result']['is_vulnerable']}, {norm_otpt['norm_result']['src'][:100]}\n\n")
			continue
		plan_agent_otpt.append(dict(
			norm_result=norm_otpt["norm_result"],
			plan_result=plan_result
		))
		assert isinstance(plan_agent_otpt[-1], dict)
		json.dump(plan_agent_otpt[-1], f, ensure_ascii=False)
		f.write("\n")

## Context Agent

If you want to start running from the output of previous Planning agent, load planning agent output from this cell.

Otherwise go to the next cell directly to run the Context Agent.

In [15]:
plan_agent_otpt = get_json_lines_as_list(f"{args.model_id.split('/')[-1]}/plan_agent_otpt.jsonl")

In [14]:
# ---------------- Context Extraction -----------------
cntxt_agent_otpt = []
with open(f"{args.model_id.split("/")[-1]}/cntxt_agent_otpt.jsonl", "a", encoding="utf-8") as f:
	for plan_otpt in plan_agent_otpt:
		backend = SymbolBackend(repo=plan_otpt["norm_result"]["repo_path"], commit=plan_otpt["norm_result"]["commit_id"])
		context_agent = ContextAgent(
			model=args.model_id,
			backend=backend,
			pipe=pipe,
		)
		ctx_result = context_agent(
			clean_code=plan_otpt["norm_result"]["src"],
			compact_ast=plan_otpt["norm_result"]["compact_ast"]["ast"],
			planning_plan=plan_otpt["plan_result"]
		)
		cntxt_agent_otpt.append(dict(
			norm_result=plan_otpt["norm_result"],
			plan_result=plan_otpt["plan_result"],
			ctx_result=ctx_result
		))
		assert isinstance(cntxt_agent_otpt[-1], dict)
		json.dump(cntxt_agent_otpt[-1], f, ensure_ascii=False)
		f.write("\n")

## Detection and Validation Agent

If you want to start running from the output of previous Context agent, load context agent output from this cell.

Otherwise go to the next cell directly to run the Detection and Validation Agent.

In [14]:
cntxt_agent_otpt = get_json_lines_as_list(f"{args.model_id.split('/')[-1]}/cntxt_agent_otpt.jsonl")

In [14]:
def run_detection_with_validation(
    detection_agent,                    # instance of your DetectionAgent
    validation_agent,                   # instance of ValidationAgent
    *,
    det_inputs: dict,                   # kwargs for detection_agent(...)
    val_extra: dict = {},               # kwargs *extra* for ValidationAgent
):
    """
    1) Detection → Validation
    2) If disagree → retry Detection twice → re-validate
    3) Return Validation's verdict (second round if retried)
    """
    sleep_time = 0
    if args.model_id.startswith("gpt"):
        sleep_time = 0.25
    if args.model_id.startswith("claude"):
        sleep_time = 1.5

    # round 1
    time.sleep(sleep_time)
    det = detection_agent(**det_inputs)
    val = validation_agent(detection_out=det, **det_inputs, **val_extra)

    if val["agree"]:
        return det, val                     # consensus — done!
    print("Detection and Validation disagree, retrying Detection...")

    # round 2
    time.sleep(sleep_time)
    det = detection_agent(**det_inputs)    # retry once
    val = validation_agent(detection_out=det, **det_inputs, **val_extra)

    if val["agree"]:
        return det, val                     # consensus — done!
    print("Detection and Validation disagree again, last retry...")

    # round 2
    time.sleep(sleep_time)
    det = detection_agent(**det_inputs)    # retry twice
    val = validation_agent(detection_out=det, **det_inputs, **val_extra)
    # regardless of agree flag, Validation wins third time
    return det, val

In [15]:
# ---------------- Vulnerability Detection -----------------
det = DetectionAgent(model=args.model_id, pipe=pipe)
val = ValidationAgent(model=args.model_id, pipe=pipe)
detection_otpt = []
validation_otpt = []
with open(f"{args.model_id.split("/")[-1]}/val_agent_otpt.jsonl", "a", encoding="utf-8") as f:
	for cntxt_otpt in cntxt_agent_otpt:
		det_res, val_res = run_detection_with_validation(det, val, det_inputs=dict(
				clean_code=cntxt_otpt["norm_result"]["src"],
				compact_ast=cntxt_otpt["norm_result"]["compact_ast"]["ast"],
				summary=cntxt_otpt["plan_result"]["summary"],
				checklist=cntxt_otpt["plan_result"]["checklist"],
				context=cntxt_otpt["ctx_result"]
			))
		validation_otpt.append(dict(
			norm_result=cntxt_otpt["norm_result"],
			plan_result=cntxt_otpt["plan_result"],
			ctx_result=cntxt_otpt["ctx_result"],
			detect_result=det_res,
			validate_result=val_res
		))
		assert isinstance(validation_otpt[-1], dict)
		json.dump(validation_otpt[-1], f, ensure_ascii=False)
		f.write("\n")

## Output Formatting

Convert final output to Excel for manual validation.

In [None]:
# !pip install openpyxl

In [18]:
from openpyxl import Workbook

# Create a new workbook and select the active worksheet
wb = Workbook()
ws = wb.active

# Define the column headers
headers = [
    "commit_id", "filepath", "is_vulnerable", "project", "src_snippet", 
    "vuln_type", "validate_is_vulnerable", "validate_vuln_statements"
]
ws.append(headers)  # Add headers to the first row

# Iterate over validation_otpt and extract the required fields
for entry in validation_otpt:
    norm_result = entry.get("norm_result")
    validate_result = entry.get("validate_result")
    
    # Extract fields from norm_result
    commit_id = norm_result.get("commit_id")
    filepath = norm_result.get("filepath")
    is_vulnerable = norm_result.get("is_vulnerable")
    project = norm_result.get("project")
    src_snippet = norm_result.get("src")[:50]  # First 50 characters of src
    vuln_type = norm_result.get("vuln_type")
    
    # Extract fields from validate_result
    # print(validate_result)
    # break
    validate_is_vulnerable = validate_result.get("is_vulnerable")
    vuln_statements = validate_result.get("vuln_statements")
    
    # Process vuln_statements to format as "statement-reason" pairs
    formatted_statements = "\n\n".join(
        f"{item.get('statement')} - {item.get('reason')}" for item in vuln_statements
    )
    
    # Append the row to the worksheet
    ws.append([
        commit_id, filepath, is_vulnerable, project, src_snippet, 
        vuln_type, validate_is_vulnerable, formatted_statements
    ])

# Save the workbook to a file
wb.save(f"{args.model_id.split("/")[-1]}_output.xlsx")