In [None]:
import json
from typing import Dict, Any, List
from langchain_google_genai import ChatGoogleGenerativeAI

ALLOWED_TASKS = [
    "check_breach_exposure",
    "analyze_github_public_data",
    "check_username_reuse",
    "analyze_bio_exposure"
]


llm = ChatGoogleGenerativeAI(
    model="gemini-2.5-flash",
    temperature=0
)

def planner_agent_llm(user_input: Dict[str, Any]) -> Dict[str, Any]:
    """
    LLM-based Planner Agent using Gemini

    - Decides which analysis tasks are relevant
    - Enforces a strict JSON output schema
    - Sanitizes output to prevent hallucinated tasks
    """

    prompt = f"""
You are a planner agent for a cyber risk analysis system.

Your ONLY job:
- Look at the user input
- Decide which analysis tasks apply

STRICT RULES:
- You may ONLY select tasks from the allowed list below
- Do NOT invent new tasks
- Do NOT explain your reasoning
- Do NOT use markdown or code blocks
- Output MUST be valid JSON and nothing else

Allowed tasks:
{ALLOWED_TASKS}

User input:
{json.dumps(user_input, indent=2)}

Return JSON in EXACTLY this format:
{{
  "tasks": [string],
  "notes": [string]
}}
"""

    response = llm.invoke(prompt)

    raw = response.content.strip()

    # Gemini sometimes adds code fences; remove them safely
    if raw.startswith("```"):
        raw = raw.strip("```").strip()

    try:
        parsed = json.loads(raw)
    except json.JSONDecodeError:
        return {
            "normalized_input": user_input,
            "tasks": [],
            "notes": ["Planner LLM returned invalid JSON"]
        }

    raw_tasks = parsed.get("tasks", [])
    notes = parsed.get("notes", [])

    # Enforce whitelist
    tasks: List[str] = [t for t in raw_tasks if t in ALLOWED_TASKS]

    if not tasks:
        notes.append("No valid tasks selected by planner")

    return {
        "normalized_input": user_input,
        "tasks": tasks,
        "notes": notes
    }


In [None]:
test_input = {
    "email": "aryannaithani1085@gmail.com",
    "github": "https://github.com/aryannaithani",
    "username": "aryannaithani",
    "bio": "CS student interested in security"
}

#planner_output = planner_agent_llm(test_input)
#planner_output


In [None]:
import requests

def fetch_github_public_data(username: str) -> dict:
    """
    Fetch basic public GitHub profile data.

    Public, unauthenticated, best-effort.
    Falls back gracefully on failure.
    """

    url = f"https://api.github.com/users/{username}"
    headers = {
        "Accept": "application/vnd.github+json"
    }

    try:
        response = requests.get(url, headers=headers, timeout=10)
        if response.status_code != 200:
            return {
                "public_repos": None,
                "commit_email_exposed": None,
                "error": f"GitHub API returned {response.status_code}"
            }

        data = response.json()

        return {
            "public_repos": data.get("public_repos"),
            # heuristic placeholder; real check would require repo scan
            "commit_email_exposed": True
        }

    except Exception as e:
        return {
            "public_repos": None,
            "commit_email_exposed": None,
            "error": str(e)
        }

#import requests

def check_email_breach_leakcheck(email: str) -> dict:
    """
    Check email exposure using LeakCheck public API.

    Uses publicly available breach data.
    No API key required.
    Fails gracefully.
    """

    url = f"https://leakcheck.net/api/public?check={email}"

    try:
        response = requests.get(url, timeout=10)

        if response.status_code != 200:
            return {
                "found_in_breaches": None,
                "breach_sources": [],
                "error": f"LeakCheck returned {response.status_code}"
            }

        data = response.json()

        # LeakCheck response format can vary slightly, so be defensive
        found = bool(data.get("found", False))
        sources = []

        if found:
            # Try to extract source names if present
            sources = data.get("sources", [])
            if not isinstance(sources, list):
                sources = []

        return {
            "found_in_breaches": found,
            "breach_sources": sources
        }

    except Exception as e:
        return {
            "found_in_breaches": None,
            "breach_sources": [],
            "error": str(e)
        }

#import requests
PLATFORMS = {
    "GitHub": "https://github.com/{username}",
    "Reddit": "https://www.reddit.com/user/{username}",
    "Dev.to": "https://dev.to/{username}",
    "Medium": "https://medium.com/@{username}",
    "Twitter": "https://twitter.com/{username}"
}


def check_username_reuse(username: str) -> dict:
    """
    Check username reuse across major platforms using existence checks.

    - No auth
    - No scraping
    - Best-effort, fail-safe
    """

    found_platforms = []

    headers = {
        "User-Agent": "DigitalFootprintRiskAgent"
    }

    for platform, url_template in PLATFORMS.items():
        url = url_template.format(username=username)

        try:
            resp = requests.get(url, headers=headers, timeout=8, allow_redirects=True)

            # Heuristic:
            # 200 -> exists
            # 404 -> does not exist
            # 429/403 -> skip quietly
            if resp.status_code == 200:
                found_platforms.append(platform)

        except Exception:
            # Ignore platform failures completely
            continue

    return {
        "value": username,
        "reuse_count": len(found_platforms),
        "platforms": found_platforms
    }




In [None]:
def information_gathering_agent(planner_output: dict) -> dict:
    tasks = planner_output.get("tasks", [])
    inputs = planner_output.get("normalized_input", {})

    evidence = {}

    # ---- Email breach intelligence (LeakCheck – REAL DATA) ----
    if "check_breach_exposure" in tasks and "email" in inputs:
        breach_data = check_email_breach_leakcheck(inputs["email"])

        evidence["email"] = {
            "value": inputs["email"],
            "found_in_breaches": breach_data.get("found_in_breaches"),
            "breach_sources": breach_data.get("breach_sources", [])
        }

    # ---- GitHub public data (already integrated) ----
    if "analyze_github_public_data" in tasks and "username" in inputs:
        username = inputs["username"].rstrip("/").split("/")[-1]
        github_data = fetch_github_public_data(username)

        evidence["github"] = {
            "username": username,
            "public_repos": github_data.get("public_repos"),
            "commit_email_exposed": github_data.get("commit_email_exposed")
        }

        # ---- Username reuse detection (REAL TOOL) ----
    if "check_username_reuse" in tasks and "username" in inputs:
        username_data = check_username_reuse(inputs["username"])

        evidence["username"] = {
            "value": username_data["value"],
            "reuse_count": username_data["reuse_count"],
            "platforms": username_data["platforms"]
        }


    return evidence


In [None]:

evidence = information_gathering_agent(planner_output)
evidence


In [None]:
import json
from typing import Dict, Any
from langchain_google_genai import ChatGoogleGenerativeAI

llm = ChatGoogleGenerativeAI(
    model="gemini-2.5-flash",
    temperature=0
)

def output_generator_agent(evidence: Dict[str, Any]) -> Dict[str, Any]:
    """
    Output Generator Agent

    - Uses an LLM to analyze collected evidence
    - Produces a structured cyber risk assessment
    - Outputs JSON only (no prose)
    """

    prompt = f"""
You are a cyber risk analysis agent.

Your job:
- Analyze the provided evidence
- Compute a cyber risk score between 0 and 100
- Assign a risk level (Low, Medium, High)
- List the main risk factors
- Suggest concrete mitigation steps

STRICT RULES:
- Use ONLY the evidence provided
- Do NOT invent facts
- Do NOT mention tools or APIs
- Do NOT output explanations outside JSON
- Output MUST be valid JSON
- Do NOT use markdown or code blocks

Risk level thresholds:
- 0–30: Low
- 31–60: Medium
- 61–100: High

Evidence:
{json.dumps(evidence, indent=2)}

Return JSON in EXACTLY this format:
{{
  "risk_score": number,
  "risk_level": "Low|Medium|High",
  "risk_factors": [string],
  "mitigations": [string]
}}
"""

    response = llm.invoke(prompt)
    raw = response.content.strip()

    # Gemini sometimes adds code fences
    if raw.startswith("```"):
        raw = raw.strip("```").strip()

    try:
        parsed = json.loads(raw)
    except json.JSONDecodeError:
        return {
            "risk_score": 0,
            "risk_level": "Low",
            "risk_factors": [],
            "mitigations": ["Unable to generate risk assessment due to parsing error"]
        }

    return parsed


In [None]:
draft_output = output_generator_agent(evidence)
draft_output

In [None]:
import json
from typing import Dict, Any
from langchain_google_genai import ChatGoogleGenerativeAI

llm = ChatGoogleGenerativeAI(
    model="gemini-2.5-flash",
    temperature=0
)

def evaluator_agent(
    evidence: Dict[str, Any],
    draft_output: Dict[str, Any]
) -> Dict[str, Any]:
    """
    Evaluator Agent

    - Verifies that all risk factors are supported by evidence
    - Removes unsupported or speculative claims
    - Produces a clean, user-facing final report
    """

    prompt = f"""
You are an evaluator agent for a cyber risk analysis system.

Your job:
- Review the draft risk assessment
- Cross-check it against the provided evidence
- Remove or correct any unsupported claims
- Produce a final, concise report for the user

STRICT RULES:
- Use ONLY the provided evidence
- Do NOT invent new risk factors
- Do NOT change the risk score arbitrarily
- If a risk factor is unsupported, remove it
- Output MUST be valid JSON
- Do NOT use markdown or code blocks

Evidence:
{json.dumps(evidence, indent=2)}

Draft risk assessment:
{json.dumps(draft_output, indent=2)}

Return JSON in EXACTLY this format:
{{
  "summary": string,
  "risk_score": number,
  "risk_level": string,
  "validated_risk_factors": [string],
  "validated_mitigations": [string]
}}
"""

    response = llm.invoke(prompt)
    raw = response.content.strip()

    # Gemini code-fence cleanup
    if raw.startswith("```"):
        raw = raw.strip("```").strip()

    try:
        parsed = json.loads(raw)
    except json.JSONDecodeError:
        return {
            "summary": "Unable to generate final report due to evaluation error",
            "risk_score": draft_output.get("risk_score", 0),
            "risk_level": draft_output.get("risk_level", "Low"),
            "validated_risk_factors": [],
            "validated_mitigations": []
        }

    return parsed


In [None]:
final_report = evaluator_agent(evidence, draft_output)
final_report


In [None]:
def run_full_pipeline(user_input: dict) -> dict:
    planner_output = planner_agent_llm(user_input)
    evidence = information_gathering_agent(planner_output)
    draft_output = output_generator_agent(evidence)
    final_report = evaluator_agent(evidence, draft_output)
    return final_report


In [None]:
user_input = {
    "email": "User@Example.com",
    "github": "https://github.com/octocat",
    "username": "octocat",
    "bio": "CS student interested in security"
}

final_report = run_full_pipeline(user_input)
final_report


In [None]:
from typing import TypedDict, Dict, Any

class AgentState(TypedDict):
    user_input: Dict[str, Any]
    planner_output: Dict[str, Any]
    evidence: Dict[str, Any]
    draft_output: Dict[str, Any]
    final_report: Dict[str, Any]


In [None]:
def planner_node(state: AgentState) -> AgentState:
    state["planner_output"] = planner_agent_llm(state["user_input"])
    return state

def gather_node(state: AgentState) -> AgentState:
    state["evidence"] = information_gathering_agent(state["planner_output"])
    return state

def generate_node(state: AgentState) -> AgentState:
    state["draft_output"] = output_generator_agent(state["evidence"])
    return state

def evaluate_node(state: AgentState) -> AgentState:
    state["final_report"] = evaluator_agent(
        state["evidence"], state["draft_output"]
    )
    return state


In [None]:
from langgraph.graph import StateGraph, END
from IPython.display import Image, display

graph = StateGraph(AgentState)

graph.add_node("planner", planner_node)
graph.add_node("gather", gather_node)
graph.add_node("generate", generate_node)
graph.add_node("evaluate", evaluate_node)

graph.set_entry_point("planner")

graph.add_edge("planner", "gather")
graph.add_edge("gather", "generate")
graph.add_edge("generate", "evaluate")
graph.add_edge("evaluate", END)

app = graph.compile()
display(Image(app.get_graph().draw_mermaid_png()))


In [None]:
initial_state = {
    "user_input": {
        "email": "User@Example.com",
        "github": "https://github.com/octocat",
        "username": "octocat",
        "bio": "CS student interested in security"
    },
    "planner_output": {},
    "evidence": {},
    "draft_output": {},
    "final_report": {}
}

result = app.invoke(initial_state)
print(result["planner_output"])
print(result["evidence"])
print(result["draft_output"])
print(result["final_report"])
