# URL Threat Analysis using LangChain + LangGraph

Python 3.11.14 compatible

## Install Dependencies
Run this once

In [1]:

!pip install requests python-whois beautifulsoup4 pandas langchain==0.1.16 langgraph==0.0.36 langchain-community==0.0.34 ollama


^C


Collecting langchain-core<0.2.0,>=0.1.42 (from langchain==0.1.16)
  Using cached langchain_core-0.1.53-py3-none-any.whl.metadata (5.9 kB)
Collecting langsmith<0.2.0,>=0.1.17 (from langchain==0.1.16)
  Using cached langsmith-0.1.147-py3-none-any.whl.metadata (14 kB)
Collecting packaging<24.0,>=23.2 (from langchain-core<0.2.0,>=0.1.42->langchain==0.1.16)
  Using cached packaging-23.2-py3-none-any.whl.metadata (3.2 kB)
Using cached langchain_core-0.1.53-py3-none-any.whl (303 kB)
Using cached langsmith-0.1.147-py3-none-any.whl (311 kB)
Using cached packaging-23.2-py3-none-any.whl (53 kB)
Installing collected packages: packaging, langsmith, langchain-core

  Attempting uninstall: packaging

    Found existing installation: packaging 26.0

    Uninstalling packaging-26.0:

      Successfully uninstalled packaging-26.0

  Attempting uninstall: langsmith

    Found existing installation: langsmith 0.7.6

    Uninstalling langsmith-0.7.6:

      Successfully uninstalled langsmith-0.7.6

   ----

ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
build 1.4.0 requires packaging>=24.0, but you have packaging 23.2 which is incompatible.
langchain-huggingface 1.2.0 requires langchain-core<2.0.0,>=1.2.0, but you have langchain-core 0.1.53 which is incompatible.
transformers 5.2.0 requires huggingface-hub<2.0,>=1.3.0, but you have huggingface-hub 0.36.2 which is incompatible.


## Imports

In [None]:

import requests
import whois
import re
import pandas as pd
from bs4 import BeautifulSoup
from urllib.parse import urlparse
from typing import TypedDict


## Evidence Collection

In [None]:
import re
import requests
import whois
from bs4 import BeautifulSoup
from urllib.parse import urlparse

def fetch_whois(domain):
    try:
        w = whois.whois(domain)
        return {
            "registrar": getattr(w, "registrar", None),
            "creation_date": str(getattr(w, "creation_date", None)),
            "expiration_date": str(getattr(w, "expiration_date", None)),
            "country": getattr(w, "country", None),
        }
    except Exception as e:
        return {"error": str(e)}

def fetch_headers_from_response(resp):
    h = resp.headers
    return {
        "status": resp.status_code,
        "CSP": h.get("Content-Security-Policy"),
        "XFO": h.get("X-Frame-Options"),
        "HSTS": h.get("Strict-Transport-Security"),
    }

def analyze_html(html):
    soup = BeautifulSoup(html or "", "html.parser")
    return {
        "hidden_forms": len(soup.select("form[style*='display:none']")),
        "password_inputs": len(soup.find_all("input", {"type": "password"})),
        "iframes": len(soup.find_all("iframe")),
    }

def analyze_js(html):
    html = html or ""
    patterns = ["eval(", "document.cookie", "localStorage", "XMLHttpRequest"]
    found = [p for p in patterns if p in html]
    return {
        "dangerous_js": found,
        "obfuscation": bool(re.search(r"[a-zA-Z0-9]{300,}", html)),
    }

def collect_evidence(url):
    if not str(url).startswith(("http://", "https://")):
        url = "http://" + str(url)

    domain = urlparse(url).netloc
    html = ""
    headers = {}
    fetch_error = None

    try:
        resp = requests.get(url, timeout=10, allow_redirects=True)
        html = resp.text
        headers = fetch_headers_from_response(resp)
    except requests.RequestException as e:
        fetch_error = str(e)
        headers = {"error": fetch_error}

    return {
        "whois": fetch_whois(domain),
        "headers": headers,
        "html": analyze_html(html),
        "javascript": analyze_js(html),
        "fetch_error": fetch_error,
    }


## LLM + LangGraph

In [None]:

from langchain_community.llms import Ollama
from langchain.prompts import ChatPromptTemplate
from langgraph.graph import StateGraph, END

llm = Ollama(model="gpt-oss:20b", temperature=0.1)

SYSTEM_PROMPT = '''
Classify URL as SAFE, WARNING or ALERT using evidence only.
Return format:

URL:
FINAL CLASSIFICATION:
CONFIDENCE SCORE:
'''

class State(TypedDict):
    url: str
    evidence: dict
    result: str

prompt = ChatPromptTemplate.from_messages([
    ("system", SYSTEM_PROMPT),
    ("human", "URL: {url}\nEVIDENCE: {evidence}")
])

def analyze(state: State):
    out = llm.invoke(prompt.format(
        url=state["url"],
        evidence=state["evidence"]
    ))
    return {"result": out}

graph = StateGraph(State)
graph.add_node("analyze", analyze)
graph.set_entry_point("analyze")
graph.add_edge("analyze", END)
app = graph.compile()


## Run Dataset

In [None]:
import pandas as pd

def run(csv_path="url_dataset_safe_warning_alert_200.csv"):
    df = pd.read_csv(csv_path)

    outputs = []
    for _, row in df.iterrows():
        ev = collect_evidence(row["url"])
        res = app.invoke({"url": row["url"], "evidence": ev})
        outputs.append({
            "url": row["url"],
            "ground_truth": row["label"],
            "llm_output": res["result"]
        })
    return outputs
    df = pd.read_csv(url_dataset_safe_warning_alert_200.csv)    

## Evaluation Metrics

In [None]:

LABELS = ["SAFE", "WARNING", "ALERT"]

def extract_prediction(text):
    m = re.search(r"(SAFE|WARNING|ALERT)", text)
    return m.group(1) if m else "UNKNOWN"

def confusion_matrix(results):
    cm = pd.DataFrame(0, index=LABELS, columns=LABELS)
    for r in results:
        gt = r["ground_truth"]
        pred = extract_prediction(r["llm_output"])
        if gt in LABELS and pred in LABELS:
            cm.loc[gt, pred] += 1
    return cm

def precision_recall_f1(cm):
    metrics = {}
    for l in LABELS:
        TP = cm.loc[l, l]
        FP = cm[l].sum() - TP
        FN = cm.loc[l].sum() - TP
        p = TP/(TP+FP) if TP+FP else 0
        r = TP/(TP+FN) if TP+FN else 0
        f1 = 2*p*r/(p+r) if p+r else 0
        metrics[l] = {"Precision": p, "Recall": r, "F1": f1}
    return metrics


In [None]:
results = run("url_dataset_safe_warning_alert_200.csv")

Error trying to connect to socket: closing socket - timed out
Error trying to connect to socket: closing socket - timed out
Error trying to connect to socket: closing socket - timed out
Error trying to connect to socket: closing socket - timed out
Error trying to connect to socket: closing socket - timed out
Error trying to connect to socket: closing socket - timed out
Error trying to connect to socket: closing socket - timed out
Error trying to connect to socket: closing socket - timed out
Error trying to connect to socket: closing socket - timed out
Error trying to connect to socket: closing socket - timed out
Error trying to connect to socket: closing socket - timed out
Error trying to connect to socket: closing socket - timed out
Error trying to connect to socket: closing socket - timed out
Error trying to connect to socket: closing socket - timed out
Error trying to connect to socket: closing socket - timed out
Error trying to connect to socket: closing socket - timed out
Error tr