In [14]:
%pip install -r requirements.txt > /dev/null


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [15]:
from dotenv import load_dotenv
from langchain.output_parsers import PydanticOutputParser
from langchain_core.messages import (AIMessage, BaseMessage, HumanMessage, SystemMessage)
from langchain_core.runnables.graph import MermaidDrawMethod
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import InMemorySaver, MemorySaver
from langgraph.graph import END, START, MessagesState, StateGraph
from langgraph.graph.message import add_messages
from langgraph.prebuilt import create_react_agent
from langgraph.types import Command, Send
from pydantic import BaseModel, Field
from scapy.all import DNS, DNSQR, IP, UDP, sr1
from typing_extensions import TypedDict

In [16]:
load_dotenv()

True

In [17]:
from tools import *

In [18]:
download_maxmind_city_db()

In [None]:
API_KEY = os.getenv("TOKEN",os.getenv("API_KEY"))
API_BASE = os.getenv("API_BASE", "https://api.openai.com/v1/")
MODEL_NAME = os.getenv("MODEL", "gpt-4o")

In [20]:
def get_thread_id() -> str:
    """Get the current thread ID as a string."""
    return str(threading.get_ident())

def get_config() -> dict:
    return {"recursion_limit": 50, "configurable": {"thread_id": get_thread_id()}}

config = get_config()


In [21]:
class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], add_messages]
    next: str # Worker to route to next

async_client = httpx.AsyncClient(
    verify=False, headers={"Authorization": f"Bearer {API_KEY}"}
)
sync_client = httpx.Client(
    verify=False, headers={"Authorization": f"Bearer {API_KEY}"}
)

model = ChatOpenAI(
    model=MODEL_NAME,
    temperature=0.6,
    # top_p=1,
    seed=42,
    # top_p=0.2,
    api_key=API_KEY,
    base_url=API_BASE,
    streaming=False,
    http_async_client=async_client,
    http_client=sync_client,
)

In [22]:
domain_tools = [shodan_lookup, whois_lookup, forward_dns_lookup, ssl_certificate_lookup, virustotal_get_uri]
ip_tools = [geoip_lookup, reverse_dns_lookup, shodan_lookup, virustotal_get_ip]



ip_analyzer_sys_msg = f"""You are an expert IP address analyst. Your goal is to gather information about a given IP address using the available tools.
Available IP tools:
- geoip_lookup(ip: str) -> str: Provides geolocation data for the IP.
- reverse_dns_lookup(ip_address: str) -> List[str]: Finds hostnames associated with the IP (PTR records).
- shodan_lookup(target: str) -> str: Searches Shodan for information about the IP address including open ports, services, and potential vulnerabilities.
- virustotal_get_ip(ip: str) -> str: Retrieves information about the IP address from VirusTotal, including reputation and security status.

Instructions:

1. Analyze the provided IP address based on the user request or previous messages.
2. Use all the tools to find relevant information.
3. Once you have gathered the information, report your findings clearly. If you cannot find information, state that clearly.
"""

ip_analyzer_agent = create_react_agent(
    model=model,
    tools=ip_tools,
    prompt=ip_analyzer_sys_msg,
)

# Domain Analyzer Agent
url_analyzer_sys_msg = """You are an expert domain name analyst. Your task is to gather comprehensive information about a given domain name using the available tools.

Available Domain tools:
1. shodan_lookup(target: str) -> str: Searches Shodan for information about the domain including exposed services, open ports, and other intelligence. Always call it on TLDs and subdomains.
3. forward_dns_lookup(domain: str) -> List[Tuple[str, str]]: Finds various DNS records (A, AAAA, MX, NS, TXT, CNAME) for the domain.
2. whois_lookup(domain: str) -> str: Provides WHOIS registration details for the domain.
4. ssl_certificate_lookup(domain: str) -> str: Looks up SSL certificate information for the domain.
5. virustotal_get_uri(uri_path: str) -> str: Looks up for a partial URI in search for other systems that might contain it. Use it when URLs or URIs are provided.

Instructions:
1. Analyze the provided domain name based on the user request or previous messages.
2. Use the available tools to find relevant information, including associated IP addresses, registration details, mail servers, and SSL certificate information.
3. If the domain contains a TLD and a subdomain, analyze both. If there is an URI with an URI path, analyze the path with VirusTotal.
4. Report your findings clearly, including all the indicators (IP addresses, ports, every attribute). If you cannot find information, state that clearly.

Your goal is to provide a thorough analysis of the domain name, including subdomains."""

url_analyzer_agent = create_react_agent(
    model=model,
    tools=domain_tools,
    prompt=url_analyzer_sys_msg,
    name="url_analyzer",
)


# --- Node Functions ---

def ip_analyzer_node(state: AgentState)-> Command[Literal["supervisor"]]:
    result = ip_analyzer_agent.invoke(state, config=config, stream_mode="values")
    return Command(
        update={
            "messages": [
                HumanMessage(content=result["messages"][-1].content, name="ip_analyzer"),
            ]
        },
        goto="supervisor",
    )


def url_analyzer_node(state: AgentState)-> Command[Literal["supervisor"]]:
    result = url_analyzer_agent.invoke(state, config=config, stream_mode="values")
    return Command(
        update={
            "messages": [
                HumanMessage(content=result["messages"][-1].content, name="url_analyzer"),
            ]
        },
        goto="supervisor",
    )

In [25]:

query_type = "ip"  # "ip" o "domain"
query_value = "8.8.8.8"  # O un dominio como "example.com"


full_query_message = f"""
Please analyze the following indicator of compromise: {query_value}
"""
if query_type == "ip":
    print(f"Analizando IP: {query_value}\n")
    result = ip_analyzer_agent.invoke({"messages": [{"role": "user", "content": full_query_message}]})
else:
    print(f"Analizando URL: {query_value}\n")
    result = url_analyzer_agent.invoke({"messages": [{"role": "user", "content": full_query_message}]})

Analizando IP: 8.8.8.8



In [24]:
for message in result["messages"]:
    print(message.pretty_print())


8.8.8.8
None
Tool Calls:
  geoip_lookup (VUVN6H4hBYcdxl4kMSnytdevwquyavfl)
 Call ID: VUVN6H4hBYcdxl4kMSnytdevwquyavfl
  Args:
    ip: 8.8.8.8
None
Name: geoip_lookup

Error: MaxMind database not found at /app/GeoLite2-City.mmdb. Please run download_maxmind_city_db().
None
Tool Calls:
  reverse_dns_lookup (lZexy0zAAguIWd1792auz76SWOK6235J)
 Call ID: lZexy0zAAguIWd1792auz76SWOK6235J
  Args:
    ip_address: 8.8.8.8
None
Name: reverse_dns_lookup

["dns.google"]
None
Tool Calls:
  shodan_lookup (w0aua9y2CulFOvNfL4fySLtNuhIAeovO)
 Call ID: w0aua9y2CulFOvNfL4fySLtNuhIAeovO
  Args:
    target: 8.8.8.8
None
Name: shodan_lookup

{
  "ip": "8.8.8.8",
  "hostnames": [
    "dns.google"
  ],
  "country": "United States",
  "org": "Google LLC",
  "isp": "Google LLC",
  "os": null,
  "ports": [
    443,
    53
  ],
  "vulns": [],
  "last_update": "2025-04-24T00:47:47.266763",
  "services": [
    {
      "port": 53,
      "protocol": "tcp",
      "product": "",
      "version": ""
    },
    {
      "