In [31]:
CODE_PATH = "./test/code2.py"

In [32]:
import dotenv
import os
import re
import json
from typing import Optional, Dict, Any
from openai import OpenAI
import tempfile
import subprocess
import json
import ast

dotenv.load_dotenv()

API_KEY = os.getenv("OPENAI_API_KEY", "")
if not API_KEY:
    raise ValueError("OPENAI_API_KEY environment variable is not set.")

In [33]:
model = "gpt-4.1"
base_URL = "https://api.openai.com/v1"
client = OpenAI(api_key=API_KEY, base_url=base_URL)

In [34]:
system_prompt = """
You are SecureSage — a vigilant, intelligent, and explainable security analyst. Your task is to review Python source code files, understand what the code is doing, and identify potential security vulnerabilities, such as insecure deserialization, command injection, hardcoded secrets, and other OWASP Top 10 issues.

You do this by performing step-by-step analysis. You are allowed to use the following tools:

- load_code(path: str) -> str: Loads and returns the contents of a Python (.py) source code file.
- static_analysis(code: str) -> str: Runs static security scanners (e.g., Bandit) and returns a list of flagged lines with issue types and severity.
- parse_ast(code: str) -> str: Parses the code into an abstract syntax tree and extracts function names, inputs, and risky constructs (e.g., eval, exec, os.system).
- doc_search(query: str) -> str: Searches security documentation (e.g., OWASP, CWE, Python docs) for best practices or known issues.
- suggest_fix(issue: str, code_snippet: str) -> str: Proposes a secure version of the code snippet that mitigates the vulnerability.

You may call one tool per turn, for up to 10 turns, before giving your final answer.

In each turn, respond in the following format:

<think>
[Explain what you're doing next, what you need, or what issue you're focusing on.]
</think>
<tool>
JSON with the following fields:
- name: The name of the tool to call
- args: A dictionary of arguments to pass to the tool (must be valid JSON)
</tool>

When you are done, provide a clear and structured security review in the following format:

<answer>
1. Summary of Code Purpose  
2. Detected Vulnerabilities (with line numbers and severity) and explanation of Each Issue (why it's dangerous, relevant CVE/CWE/OWASP ref) 
3. Suggested Fixes (with example code and links if needed)  
</answer>

The answer should be nice and structured, and very readable.
"""


In [35]:
def load_code(path: str) -> str:
    with open(path, "r", encoding="utf-8") as f:
        return f.read()

def static_analysis(code: str) -> list:
    with tempfile.NamedTemporaryFile(suffix=".py", mode='w+', delete=False) as tmp:
        tmp.write(code)
        tmp.flush()
        result = subprocess.run(
            ["bandit", "-f", "json", tmp.name],
            capture_output=True,
            text=True
        )
        try:
            output = json.loads(result.stdout)
            return [
                {
                    "line": item["line_number"],
                    "issue": item["issue_text"],
                    "severity": item["issue_severity"],
                    "confidence": item["issue_confidence"],
                    "id": item["test_id"]
                }
                for item in output.get("results", [])
            ]
        except Exception as e:
            return [{"error": str(e)}]

def parse_ast(code: str) -> dict:
    tree = ast.parse(code)
    functions = []
    risky_calls = []
    imports = []

    class Analyzer(ast.NodeVisitor):
        def visit_FunctionDef(self, node):
            functions.append(node.name)
            self.generic_visit(node)

        def visit_Call(self, node):
            if isinstance(node.func, ast.Attribute):
                func_name = f"{ast.unparse(node.func.value)}.{node.func.attr}"
                if func_name in ["os.system", "eval", "exec", "pickle.load", "subprocess.Popen"]:
                    risky_calls.append({
                        "line": node.lineno,
                        "call": func_name,
                        "arg": ast.unparse(node.args[0]) if node.args else ""
                    })
            self.generic_visit(node)

        def visit_Import(self, node):
            for alias in node.names:
                imports.append(alias.name)

        def visit_ImportFrom(self, node):
            imports.append(node.module)

    Analyzer().visit(tree)

    return {
        "functions": functions,
        "risky_calls": risky_calls,
        "imports": imports
    }
    
doc_database = {
    "os.system": "Avoid using os.system with user input. Prefer subprocess.run([...], check=True).",
    "eval": "Eval executes strings as code. Dangerous if input-controlled.",
    "hardcoded secret": "Never commit secrets. Use environment variables or a vault."
}

def doc_search(query: str) -> str:
    for keyword, advice in doc_database.items():
        if keyword in query.lower():
            return advice
    return "No documentation match found for your query."


def suggest_fix(issue: str, code_snippet: str, model_name: str = "gpt-4.1") -> str:
    prompt = (
        "You are a secure code advisor.\n"
        f"The following code has a security issue: {issue}.\n"
        "Suggest a safer version of the code and explain why it's better.\n\n"
        f"Code:\n{code_snippet}"
    )

    response = client.chat.completions.create(
        model=model_name,
        messages=[
            {"role": "user", "content": prompt}
        ],
    )

    return response.choices[0].message.content

In [None]:
def parse_thinking_from_response(response: str) -> Optional[str]:
    """Extract the <think> block from the LLM response."""
    match = re.search(r"<think>(.*?)</think>", response, re.DOTALL)
    return match.group(1).strip() if match else None


def parse_tool_from_response(response: str) -> Optional[Dict[str, Any]]:
    """Extract the <tool> call as a dictionary from the LLM response."""
    match = re.search(r"<tool>(.*?)</tool>", response, re.DOTALL)
    if not match:
        return None
    try:
        return json.loads(match.group(1))
    except json.JSONDecodeError as e:
        print(f"JSON parsing error in <tool>: {e}")
        return None


def parse_answer_from_response(response: str) -> Optional[str]:
    """Extract the <answer> block from the LLM response."""
    match = re.search(r"<answer>(.*?)</answer>", response, re.DOTALL)
    return match.group(1).strip() if match else None


def write_answer_to_markdown(answer: str, file_path: str = "reports/secure_sage_report.md") -> None:
    os.makedirs(os.path.dirname(file_path), exist_ok=True)  # Ensure folder exists
    with open(file_path, "w", encoding="utf-8") as f:
        f.write("# SecureSage Security Report\n\n")
        f.write(answer.strip())
        f.write("\n")



In [37]:
# Agent memory
messages = [{"role": "system", "content": system_prompt}]
tool_call_count = 0
max_turns = 10

tool_registry = {
    "load_code": load_code,
    "static_analysis": static_analysis,
    "parse_ast": parse_ast,
    "doc_search": doc_search,
    "suggest_fix": suggest_fix,
}


user_input = f"Please analyze {CODE_PATH} for vulnerabilities."
messages.append({"role": "user", "content": user_input})

file_name = os.path.basename(CODE_PATH).replace(".py", "")

while tool_call_count < max_turns:
    print(f"=============== AGENT TURN {tool_call_count} ================\n")
    response = client.chat.completions.create(
        model=model,
        messages=messages,
    )
    reply = response.choices[0].message.content
    messages.append({"role": "assistant", "content": reply})

    thought = parse_thinking_from_response(reply)
    if thought:
        print("\nAgent thought:")
        print(thought)

    answer = parse_answer_from_response(reply)
    if answer:
        print("ANSWER GIVEN BY AGENT!")

        write_answer_to_markdown(answer, file_path=f"./reports/{file_name}_report.md")
        print("Report written to markdown.")
        break

    tool_call = parse_tool_from_response(reply)
    if not tool_call:
        print("No tool call found. Exiting.")
        break

    tool_name = tool_call["name"]
    args = tool_call["args"]

    print("\nTool call:")
    print(f"Tool: {tool_name}")
    print(f"Args: {json.dumps(args, indent=2)}")

    tool_func = tool_registry.get(tool_name)
    if not tool_func:
        print(f"Unknown tool: {tool_name}")
        break

    try:
        result = tool_func(**args)
    except Exception as e:
        result = {"error": str(e)}

    print("\nTool result:")
    print(json.dumps(result, indent=2))

    messages.append({"role": "user", "content": json.dumps(result, indent=2)})
    tool_call_count += 1



Agent thought:
First, I need to load the contents of ./test/code2.py to understand what the code does and to prepare for analysis.

Tool call:
Tool: load_code
Args: {
  "path": "./test/code2.py"
}

Tool result:
"import os\nimport json\nimport secrets\nfrom flask import Flask, request\n\napp = Flask(__name__)\napp.config['DEBUG'] = True\n\nwith open(\"config.json\") as f:\n    config = json.load(f)\n\nAPI_KEY = config.get(\"api_key\", \"default_key\")\n\n@app.route(\"/generate\", methods=[\"GET\"])\ndef generate_token():\n    user_id = request.args.get(\"user\")\n    token = str(secrets.randbelow(1000000))\n    return f\"Token for {user_id}: {token}\"\n\n@app.route(\"/execute\", methods=[\"POST\"])\ndef execute_command():\n    data = request.get_json()\n    cmd = data.get(\"cmd\")\n    os.system(cmd)\n    return \"Command executed.\"\n\n@app.route(\"/data\", methods=[\"POST\"])\ndef upload_data():\n    payload = request.data\n    data = json.loads(payload)\n    return f\"Received: {da