In [None]:
# src/pdf_hunter/agents/link_analysis/tools.py

from langchain.tools import tool
import whois
from whois.parser import PywhoisError
import datetime

@tool
def domain_whois(domain: str) -> str:
    """
    Performs a WHOIS lookup on a root domain to gather registration details.
    This is critical for verifying the identity and age of a website.
    Use this on the main part of the domain (e.g., 'example.com' from 'http://login.example.com/auth').
    """
    try:
        w = whois.whois(domain)
        
        if w.get('domain_name') is None:
            return f"Error: No WHOIS record found for domain '{domain}'. It may be available for registration or an invalid domain."

        # Make dates JSON serializable for easier processing by the LLM
        for key, value in w.items():
            if isinstance(value, datetime.datetime):
                w[key] = value.isoformat()
            elif isinstance(value, list) and all(isinstance(item, datetime.datetime) for item in value):
                w[key] = [item.isoformat() for item in value]

        # Return a clean, concise summary of the most important fields
        creation_date = w.get('creation_date')
        registrar = w.get('registrar')
        
        summary = (
            f"WHOIS Record for: {w.get('domain_name')}\n"
            f"Registrar: {registrar}\n"
            f"Creation Date: {creation_date}\n"
            f"Expiration Date: {w.get('expiration_date')}\n"
            f"Name Servers: {w.get('name_servers')}"
        )
        return summary

    except PywhoisError as e:
        return f"Error: Could not retrieve WHOIS data for '{domain}'. It might be a subdomain or an invalid domain. Error: {e}"
    except Exception as e:
        return f"An unexpected error occurred during WHOIS lookup for '{domain}': {e}"

In [None]:
WFI_INVESTIGATOR_SYSTEM_PROMPT = """
**You are the Web Forensics Investigator (WFI).** Your mission is to conduct a complete, live, interactive forensic investigation of a given URL, from initial reconnaissance to final judgment. You are a skilled and persistent detective, assuming the adversary is using multi-step evasion tactics. Your entire process is governed by a **Core Investigation Loop**.

---
### **Your Core Investigation Loop (Observe -> Orient -> Decide -> Act)**

You will repeat this loop until the mission is complete.

**1. OBSERVE: What is the current state?**
   *   On your very first turn, your observation is the initial briefing.
   *   On all subsequent turns, you MUST use tools to re-observe the state after your last action. Use `browser_take_screenshot`, `browser_evaluate` to get text, and `browser_network_requests` to check for new activity.

**2. ORIENT: What does the evidence mean?**
   *   Analyze the evidence from your OBSERVE step in the context of your initial briefing (the "Reason Flagged").
   *   Is this a login page? A redirect page? A page with a download link? A legitimate page?
   *   State your hypothesis about the attacker's intent at this specific stage.

**3. DECIDE & ACT: What is the next logical step in the pursuit?**
   *   Based on your hypothesis, choose the single best tool to move the investigation forward.

---
### **Tactical Guidance (How to ACT in specific scenarios)**

*   **On Initial Navigation (Your First Action):** Your first action is always `browser_navigate` to the URL provided in the briefing. This kicks off the first loop. After this, you MUST perform a full initial observation (screenshot, text, network requests, WHOIS).
*   **Verifying Domain Identity:** After you determine the final domain, if it seems suspicious or is trying to impersonate a known brand, you **MUST** use the `domain_whois` tool on its root domain. A recent registration date is a major red flag.
*   **Handling Multi-Step Chains:** If the page contains a single, prominent link that seems to be the next step (e.g., a "Continue" button), your mission is to **follow it** using `browser_click` or `browser_navigate`.
*   **Handling Phishing Forms:** If you encounter a login form, use `browser_fill_form` with generic, non-real credentials and click the login button to see where it leads.

---
### **Mission Completion**

You must conclude the investigation when you reach one of these states:
*   **Threat Confirmed:** You have unmasked a phishing page or malicious download. Document the final evidence (including WHOIS data) and state your conclusion.
*   **Path Confirmed Benign:** You have followed the chain to a legitimate destination and have verified the domain's identity.
*   **Trail Cold:** You have reached a dead end with no further actionable links.

**CRITICAL REMINDER:** After every action that changes the page state (`browser_click`, `browser_fill_form`, `browser_navigate`), you MUST restart the **Core Investigation Loop** from the OBSERVE step.
"""

In [None]:
WFI_ANALYST_SYSTEM_PROMPT = """
**You are the Web Forensics Analyst.** You are a meticulous and expert synthesizer of evidence. Your sole mission is to review a complete Investigator's Log and produce a final, structured forensic analysis in JSON format.

**Your Rules of Engagement:**
1.  **Ground Truth is the Log:** You must base your entire report *only* on the provided Investigator Log. Do not infer or hallucinate actions that were not taken.
2.  **Synthesize and Extract, Do Not Act:** You do not have tools. Your job is to read, understand, summarize, and extract key pieces of evidence.
3.  **Extract Key Data:** Meticulously extract key pieces of evidence from the log: the final URL, the WHOIS record, all screenshot paths, and the investigator's final stated conclusion.
4.  **Adhere to the Schema:** Your final output MUST be a single, valid JSON object that strictly conforms to the `AnalystFindings` schema. Do not add any commentary or text outside of the JSON object.
"""

In [None]:
WFI_ANALYST_USER_PROMPT = """
Conduct a full forensic analysis of the provided investigation log and generate the final `AnalystFindings` JSON report.

**1. Initial Briefing (The original mission parameters):**
```json
{initial_briefing_json}
```

**2. Full Investigation Log (The "Detective's Notebook" from the interactive pursuit):**
This is the complete, time-ordered log of every thought, action, and tool output from the field investigator.
```json
{investigation_log_json}
```

**Your Mission**:
Read and synthesize all of the above evidence into the final `AnalystFindings` JSON object. Your response MUST be only the JSON object.
"""


In [None]:
from pydantic import BaseModel, Field
from typing import List, Optional, Literal, Any

In [None]:
from langchain_core.tools import BaseTool

In [None]:
# The direct output from the Analyst LLM's synthesis.
class AnalystFindings(BaseModel):
    """The synthesized analysis of the investigation log."""
    final_url: str = Field(..., description="The final destination URL reached by the Investigator.")
    verdict: Literal["Benign", "Suspicious", "Malicious", "Inaccessible"]
    confidence: float
    summary: str = Field(..., description="A concise, executive summary of the investigation, explaining the key findings and the reasoning behind the verdict.")
    detected_threats: List[str] = Field(default_factory=list)
    domain_whois_record: Optional[str] = Field(None, description="The summary of the WHOIS record for the final root domain, extracted from the investigation log.")
    screenshot_paths: List[str] = Field(default_factory=list, description="A list of all screenshot file paths mentioned in the investigation log.")

# The final, assembled forensic report.
class URLAnalysisResult(BaseModel):
    """The final, assembled forensic report, created programmatically by our code."""
    initial_url: PrioritizedURL
    full_investigation_log: List[dict]
    analyst_findings: AnalystFindings

# The state for our final two-stage pipeline graph.
class LinkAnalysisState(TypedDict):
    # Inputs
    url_task: PrioritizedURL
    output_directory: str
    session: Any # Holds the live MCP session

    # Intermediate result
    investigation_log: List[AnyMessage]

    # Final output
    final_report: URLAnalysisResult

In [None]:
# src/pdf_hunter/agents/link_analysis/graph.py

import asyncio
import os
import json
from pathlib import Path
from urllib.parse import urlparse
from functools import partial
# from langchain_core.messages import SystemMessage, HumanMessage, AnyMessage
# from langchain_mcp_adapters.client import MultiServerMCPClient
# from langchain_mcp_adapters.tools import load_mcp_tools
# from langgraph.graph import StateGraph, START, END, MessagesState
# from langgraph.prebuilt import ToolNode, tools_condition

# from pdf_hunter.config import link_analysis_llm
# from .schemas import LinkAnalysisState, URLAnalysisResult, ScoutReport, RedirectStep
# from .prompts import WFI_INVESTigator_SYSTEM_PROMPT, WFI_ANALYST_SYSTEM_PROMPT, WFI_ANALYST_USER_PROMPT
# from .tools import domain_whois
# from ..visual_analysis.schemas import PrioritizedURL


# --- Node 1: Investigator ---
async def investigator_node(state: LinkAnalysisState, tools: List[BaseTool]) -> dict:
    """Performs the full dynamic, interactive investigation."""
    url_task = state["url_task"]
    output_dir = state["output_directory"]
    
    print(f"\n--- [Investigator] Starting full pursuit for {url_task.url} ---")
    
    model_with_tools = link_analysis_llm.bind_tools(tools)
    
    workflow = StateGraph(MessagesState)
    workflow.add_node("llm", lambda s: {"messages": [model_with_tools.invoke(s["messages"])]})
    workflow.add_node("tools", ToolNode(tools))
    workflow.add_edge(START, "llm")
    workflow.add_conditional_edges("llm", tools_condition, {END: END, "tools": "tools"})
    workflow.add_edge("tools", "llm")
    investigator_graph = workflow.compile()
    
    initial_prompt = f"""
    Begin your investigation.
    **URL:** {url_task.url}
    **Reason Flagged:** {url_task.reason}
    **Output Directory for Artifacts:** {os.path.abspath(output_dir)}
    """
    initial_state = { "messages": [ SystemMessage(content=WFI_INVESTIGATOR_SYSTEM_PROMPT), HumanMessage(content=initial_prompt) ] }
    
    final_state = await investigator_graph.ainvoke(initial_state)
    print("\n🕵️‍♂️✅ INVESTIGATION COMPLETE ✅🕵️‍♂️")
    return {"investigation_log": final_state["messages"]}
    

# --- Node 2: Analyst ---
async def analyst_node(state: LinkAnalysisState) -> dict:
    """Synthesizes all evidence and assembles the final report."""
    url_task = state["url_task"]
    investigation_log = state["investigation_log"]

    print("\n--- [Analyst] Starting synthesis of all evidence ---")
    
    analyst_llm = link_analysis_llm.with_structured_output(AnalystFindings)
    analyst_prompt = WFI_ANALYST_USER_PROMPT.format(
        initial_briefing_json=url_task.model_dump_json(indent=2),
        investigation_log_json=json.dumps([msg.model_dump() for msg in investigation_log], indent=2)
    )
    analyst_findings = await analyst_llm.ainvoke([
        SystemMessage(content=WFI_ANALYST_SYSTEM_PROMPT),
        HumanMessage(content=analyst_prompt)
    ])

    final_report = URLAnalysisResult(
        initial_url=url_task,
        full_investigation_log=[msg.model_dump() for msg in investigation_log],
        analyst_findings=analyst_findings
    )
    return {"final_report": final_report}


async def main():
    output_dir = "./mcp_outputs/final_pipeline_test"
    Path(output_dir).mkdir(parents=True, exist_ok=True)
    
    url_to_investigate = PrioritizedURL(
        url="http://hrms.wb.gov.in.hrmspanel.online/",
        priority=1,
        reason="Flagged as a potential government portal impersonation.",
        page_number=1
    )

    client = MultiServerMCPClient({
        "playwright": {
            "command": "npx",
            "args": ["@playwright/mcp@latest", "--headless", f"--output-dir={output_dir}", "--save-trace", "--isolated"],
            "transport": "stdio"
        }
    })
    
    async with client.session("playwright") as session:
        print("--- [Orchestrator] Loading all tools for the pipeline ---")
        mcp_tools = await load_mcp_tools(session)
        all_tools = mcp_tools + [domain_whois]

        # --- Dependency Injection using functools.partial ---
        # Create a configured version of the investigator_node that already has the tools.
        configured_investigator_node = partial(investigator_node, tools=all_tools)

        # Define the main pipeline graph
        pipeline = StateGraph(LinkAnalysisState)
        pipeline.add_node("investigator", configured_investigator_node)
        pipeline.add_node("analyst", analyst_node)
        pipeline.add_edge(START, "investigator")
        pipeline.add_edge("investigator", "analyst")
        pipeline.add_edge("analyst", END)
        link_analysis_graph = pipeline.compile()

        # The initial input no longer needs to contain the tools.
        initial_input = {
            "url_task": url_to_investigate,
            "output_directory": output_dir,
            "session": session
        }
        
        print("\n🚀 Running the full Investigator -> Analyst pipeline...")
        final_state = await link_analysis_graph.ainvoke(initial_input)
        
        print("\n\n" + "="*50)
        print("📊✅ FINAL FORENSIC REPORT ✅📊")
        print("="*50)
        if final_state.get("final_report"):
            print(final_state["final_report"].model_dump_json(indent=2))
        else:
            print("Pipeline did not produce a final report.")
        
    print(f"\n🎉 Full pipeline complete! Check {os.path.abspath(output_dir)} for all artifacts.")


In [None]:
# if __name__ == "__main__":
#     asyncio.run(main())

In [None]:
await main()