In [None]:
# ============================================================
# Step 0: Imports, type aliases, and shared utilities
# ============================================================
 
from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Literal, TypedDict, Callable

from datetime import datetime
from pydantic import BaseModel, Field


# LangChain / LangGraph / CrewAI imports
# (Make sure these are installed in your environment)
from langchain_core.tools import BaseTool, tool
# from langgraph.graph import StateGraph, START, END
from langgraph.graph import StateGraph, END, START
from langchain.agents import AgentExecutor
# from langchain_core.tools import BaseTool, tool
from langchain.agents import AgentExecutor


try:
    from crewai.flow import Flow, start, listen
except ImportError:
    # Optional: you can comment this out if CrewAI isn't installed yet
    Flow = object  # type: ignore
    def start():
        def _wrap(fn): return fn
        return _wrap
    def listen(*_args, **_kwargs):
        def _wrap(fn): return fn
        return _wrap



TypeError: metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its bases

In [5]:
# Import Core Libraries
from typing import TypedDict, List, Dict, Optional, Any, Callable, Union
from dataclasses import dataclass, field
from enum import Enum
import json
from datetime import datetime
import uuid

# LangChain imports
from langchain.tools import tool, StructuredTool, Tool
from pydantic import BaseModel, Field
from langchain.agents import Agent, AgentExecutor

# LangGraph imports
from langgraph.graph import StateGraph, END
from langgraph.graph.graph import CompiledGraph

print("✓ Core imports successful")

✓ Core imports successful


# ============================================================
# Step 1: Load & parse design files
# (You can use this to introspect the original .md specs if needed)
# ============================================================

In [6]:

def load_design_files(base_path: Path | str = Path(".")) -> Dict[str, str]:
    """
    Implement basic file loading for the two design docs.

    This keeps the notebook self-contained while still letting you
    inspect the original schema/layout/graph contracts if you want
    to programmatically derive anything later.
    """
    base_path = Path(base_path)
    files = {
        "schema_layout_contracts": base_path / "Schema, Layout, Contracts.md",
        "universal_graph": base_path / "Universal Graph.md",
    }

    contents: Dict[str, str] = {}
    for key, path in files.items():
        try:
            contents[key] = path.read_text(encoding="utf-8")
        except FileNotFoundError:
            # It's fine if the files aren't present in this environment;
            # the graph wiring below is already baked from the spec.
            contents[key] = ""
    return contents

# ============================================================
# Step 2: Define universal state + tools abstraction
# (Minimal but structurally aligned with your design)
# ============================================================

# --- 2.1 State shape (TypedDict for LangGraph type checking) ---

In [7]:

class PlanState(TypedDict, total=False):
    work_queue: List[str]
    in_progress: Optional[str]
    changesets: List[Dict[str, Any]]
    tickets: List[Dict[str, Any]]
    plan_created_at: Optional[str]  # iso8601


class ApprovalsState(TypedDict, total=False):
    required: bool
    status: str  # ApprovalStatus in your schema
    decisions: List[Dict[str, Any]]


class ExecutionState(TypedDict, total=False):
    applied_changesets: List[str]
    failed_changesets: List[str]
    skipped_changesets: List[str]
    execution_notes: List[str]


class ValidationState(TypedDict, total=False):
    pre_metrics: Dict[str, Any]
    post_metrics: Dict[str, Any]
    checks: List[Dict[str, Any]]
    validated_at: Optional[str]


class SEOState(TypedDict, total=False):
    """
    Design this as a thin shell over your full AurumSEOState Pydantic model.

    Top-level keys directly mirror the schema in the design doc:
      - run, config, inputs, findings, scores, plan, approvals, execution,
        validation, audit_log, errors.
    The nested structures are intentionally loose here so you can swap in
    the full Pydantic models without changing graph wiring.
    """
    run: Dict[str, Any]
    config: Dict[str, Any]
    inputs: Dict[str, Any]
    findings: Dict[str, Any]
    scores: Dict[str, Any]
    plan: PlanState
    approvals: ApprovalsState
    execution: ExecutionState
    validation: ValidationState
    audit_log: List[Dict[str, Any]]
    errors: List[Dict[str, Any]]


def ensure_default_state(state: Optional[SEOState] = None) -> SEOState:
    """
    Ensure the state has all required top-level containers.

    This lets you start from {} and still get a structurally valid state,
    while remaining compatible with your richer Pydantic AurumSEOState
    if you plug that in instead.
    """
    if state is None:
        state = SEOState()  # type: ignore[call-arg]

    state.setdefault("run", {})
    state.setdefault("config", {})
    state.setdefault("inputs", {})
    state.setdefault("findings", {})
    state.setdefault("scores", {})
    state.setdefault("plan", {})
    state.setdefault("approvals", {})
    state.setdefault("execution", {})
    state.setdefault("validation", {})
    state.setdefault("audit_log", [])
    state.setdefault("errors", [])

    plan: PlanState = state["plan"]  # type: ignore[assignment]
    plan.setdefault("work_queue", [])
    plan.setdefault("in_progress", None)
    plan.setdefault("changesets", [])
    plan.setdefault("tickets", [])
    plan.setdefault("plan_created_at", None)

    approvals: ApprovalsState = state["approvals"]  # type: ignore[assignment]
    approvals.setdefault("required", False)
    approvals.setdefault("status", "NOT_REQUIRED")
    approvals.setdefault("decisions", [])

    execution: ExecutionState = state["execution"]  # type: ignore[assignment]
    execution.setdefault("applied_changesets", [])
    execution.setdefault("failed_changesets", [])
    execution.setdefault("skipped_changesets", [])
    execution.setdefault("execution_notes", [])

    validation: ValidationState = state["validation"]  # type: ignore[assignment]
    validation.setdefault("pre_metrics", {})
    validation.setdefault("post_metrics", {})
    validation.setdefault("checks", [])
    validation.setdefault("validated_at", None)

    return state

# --- 2.2 LangChain tools bundle for SEO connectors ---

In [8]:
@tool
def fetch_gsc_snapshot(domain: str) -> Dict[str, Any]:
    """Implement: Fetch a GSC snapshot for the given domain (placeholder)."""
    raise NotImplementedError("Wire this to your GSC connector.")


@tool
def fetch_ga4_snapshot(property_id: str) -> Dict[str, Any]:
    """Implement: Fetch a GA4 snapshot (placeholder)."""
    raise NotImplementedError("Wire this to your GA4 connector.")


@tool
def fetch_serp_snapshot(keywords: List[str]) -> Dict[str, Any]:
    """Implement: Fetch a SERP snapshot (placeholder)."""
    raise NotImplementedError("Wire this to your SERP provider.")


@tool
def fetch_crawl_diff(domain: str) -> Dict[str, Any]:
    """Implement: Fetch crawl diff from Screaming Frog / custom crawler (placeholder)."""
    raise NotImplementedError("Wire this to your crawler output.")


@tool
def apply_cms_changes(changeset: Dict[str, Any]) -> Dict[str, Any]:
    """Implement: Apply a ChangeSet to your CMS (placeholder)."""
    raise NotImplementedError("Wire this to Shopify/Woo/Headless CMS.")


@tool
def create_ticket(payload: Dict[str, Any]) -> Dict[str, Any]:
    """Implement: Create a ticket in Jira/Asana/etc (placeholder)."""
    raise NotImplementedError("Wire this to your ticketing system.")


@dataclass
class SEOAgentTools:
    """
    Design: thin container for all LangChain tools used by the graph nodes.
    You can inject different implementations in tests vs production.
    """
    gsc: BaseTool = fetch_gsc_snapshot
    ga4: BaseTool = fetch_ga4_snapshot
    serp: BaseTool = fetch_serp_snapshot
    crawl: BaseTool = fetch_crawl_diff
    cms: BaseTool = apply_cms_changes
    ticketing: BaseTool = create_ticket


# ============================================================
# Step 3: Implement reusable node functions (universal core)
# Each node: SEOState × SEOAgentTools -> SEOState
# ============================================================

In [9]:
def _append_audit(state: SEOState, node: str, event: str, detail: Dict[str, Any]) -> None:
    state["audit_log"].append(
        {
            "ts": datetime.utcnow().isoformat(),
            "node": node,
            "event": event,
            "detail": detail,
        }
    )

# ------------ Observe phase nodes ------------

In [10]:
def node_collect_gsc(state: SEOState, tools: SEOAgentTools) -> SEOState:
    state = ensure_default_state(state)
    domain = state["run"].get("domain", "https://aurumpickleball.com")
    # Placeholder: real implementation should call tools.gsc.invoke
    state["inputs"]["gsc"] = {
        "collected_at": datetime.utcnow().isoformat(),
        "window_days": 28,
        "pages": [],
        "queries": [],
        "index_coverage": [],
        "sitemap_status": [],
        "domain": domain,
    }
    _append_audit(state, "collect_gsc", "END", {"domain": domain})
    return state


def node_collect_ga4(state: SEOState, tools: SEOAgentTools) -> SEOState:
    state = ensure_default_state(state)
    property_id = state.get("config", {}).get("integrations", {}).get("ga4_property_id", "UNSET")
    state["inputs"]["ga4"] = {
        "collected_at": datetime.utcnow().isoformat(),
        "window_days": 28,
        "landing_pages": [],
        "conversions": [],
        "property_id": property_id,
    }
    _append_audit(state, "collect_ga4", "END", {"property_id": property_id})
    return state


def node_collect_serp_snapshot(state: SEOState, tools: SEOAgentTools) -> SEOState:
    state = ensure_default_state(state)
    keywords = []  # In practice: derive from GSC queries
    state["inputs"]["serp"] = {
        "collected_at": datetime.utcnow().isoformat(),
        "keywords": [],
        "competitors": [],
    }
    _append_audit(state, "collect_serp_snapshot", "END", {"keywords_seeded": len(keywords)})
    return state


def node_collect_crawl_diff(state: SEOState, tools: SEOAgentTools) -> SEOState:
    state = ensure_default_state(state)
    domain = state["run"].get("domain", "https://aurumpickleball.com")
    state["inputs"]["crawl"] = {
        "collected_at": datetime.utcnow().isoformat(),
        "tool": "screamingfrog",
        "pages": [],
        "diffs": [],
        "domain": domain,
    }
    _append_audit(state, "collect_crawl_diff", "END", {"domain": domain})
    return state

# ------------ Diagnose phase nodes ------------

In [11]:
def node_normalize_inputs(state: SEOState, tools: SEOAgentTools) -> SEOState:
    state = ensure_default_state(state)
    # Design: normalization is conceptual here; real implementation should
    # canonicalize shapes according to your schema contract doc.
    _append_audit(state, "normalize_inputs", "END", {"normalized": True})
    return state


def node_detect_anomalies(state: SEOState, tools: SEOAgentTools) -> SEOState:
    state = ensure_default_state(state)
    # Design: in a full build, you'd compare inputs.gsc / inputs.ga4 against
    # previous baselines and record anomalies. Here we just log a stub.
    _append_audit(state, "detect_anomalies", "END", {"anomaly_scan": "stub"})
    return state


def node_classify_issues(state: SEOState, tools: SEOAgentTools) -> SEOState:
    state = ensure_default_state(state)
    findings = state.setdefault("findings", {})
    issues: List[Dict[str, Any]] = findings.get("issues", [])
    if not issues:
        # Implement: create at least one stub issue so the downstream flow has something to route
        issue_id = "issue_stub_001"
        issues.append(
            {
                "issue_id": issue_id,
                "issue_type": "CTR_DROP",
                "market": "US",
                "url": "https://aurumpickleball.com/collections/paddles",
                "severity": 3,
                "evidence": {},
                "detected_at": datetime.utcnow().isoformat(),
                "owner_hint": "PRODUCT",
            }
        )
        findings["issues"] = issues
    _append_audit(state, "classify_issues", "END", {"issues": len(issues)})
    return state

# ------------ Recommend phase nodes ------------

In [12]:
def _score_opportunity(issue: Dict[str, Any]) -> Dict[str, Any]:
    """
    Implement deterministic opportunity scoring consistent with the spec:
      priority_score = (impact * confidence) / max(effort, 1)
    For now, we stub a simple mapping and default score.
    """
    opportunity_id = f"opp_{issue['issue_id']}"
    return {
        "opportunity_id": opportunity_id,
        "opportunity_type": issue.get("owner_hint", "TECH"),
        "market": issue.get("market", "US"),
        "primary_url": issue.get("url"),
        "related_urls": [],
        "keyword_cluster": [],
        "issue_refs": [issue["issue_id"]],
        "evidence": {},
        "impact": 3,
        "confidence": 3,
        "effort": 2,
        "risk": 2,
        "priority_score": (3 * 3) / 2,
        "rationale": "stub scoring",
    }


def node_score_opportunities(state: SEOState, tools: SEOAgentTools) -> SEOState:
    state = ensure_default_state(state)
    findings = state.setdefault("findings", {})
    issues: List[Dict[str, Any]] = findings.get("issues", [])

    opportunities: List[Dict[str, Any]] = []
    for issue in issues:
        opportunities.append(_score_opportunity(issue))

    # Deterministic sort:
    opportunities.sort(
        key=lambda o: (
            -o["priority_score"],
            -o["impact"],
            -o["confidence"],
            o["effort"],
            o["primary_url"] or "",
        )
    )

    scores = state.setdefault("scores", {})
    scores["ranked_opportunity_ids"] = [o["opportunity_id"] for o in opportunities]
    scores["score_calculated_at"] = datetime.utcnow().isoformat()

    findings["opportunities"] = opportunities

    plan: PlanState = state["plan"]
    plan["work_queue"] = scores["ranked_opportunity_ids"].copy()
    plan["plan_created_at"] = datetime.utcnow().isoformat()

    _append_audit(
        state,
        "score_opportunities",
        "END",
        {"opportunities": len(opportunities)},
    )
    return state


def node_route_by_type_and_priority(state: SEOState, tools: SEOAgentTools) -> SEOState:
    """
    This node itself doesn't mutate much; routing decisions happen in the
    LangGraph conditional edge function. Here we only ensure the state
    has a valid 'in_progress' set when the router is invoked directly
    (e.g., from CrewAI).
    """
    state = ensure_default_state(state)
    plan: PlanState = state["plan"]

    if plan["in_progress"] is None and plan["work_queue"]:
        next_id = plan["work_queue"].pop(0)
        plan["in_progress"] = next_id

    _append_audit(
        state,
        "route_by_type_and_priority",
        "END",
        {"in_progress": state["plan"]["in_progress"]},
    )
    return state

# ------------ Specialist nodes (ChangeSet generators) ------------

In [13]:
def _append_changeset(
    state: SEOState,
    opportunity_id: str,
    kind: str,
    target_url: str,
    risk: int = 2,
) -> None:
    plan: PlanState = state["plan"]
    changesets = plan["changesets"]
    cs_id = f"cs_{kind.lower()}_{len(changesets) + 1:03d}"
    changesets.append(
        {
            "changeset_id": cs_id,
            "kind": kind,
            "opportunity_id": opportunity_id,
            "market": "US",
            "approval_required": True,
            "approval_status": "PENDING",
            "target": {"url": target_url},
            "operations": [
                {
                    "op_id": f"op_{cs_id}_01",
                    "op_type": "UPDATE_META",
                    "target": {"url": target_url},
                    "payload": {},
                    "rollback": {},
                    "qa_checks": ["meta_length_ok", "no_indexing_issues"],
                    "risk": risk,
                }
            ],
            "predicted_impact": {},
            "rollback_plan": {},
            "notes": "stub changeset",
            "execution_status": "QUEUED",
            "executed_at": None,
        }
    )


def _get_current_opportunity(state: SEOState) -> Optional[Dict[str, Any]]:
    findings = state.get("findings", {})
    opps: List[Dict[str, Any]] = findings.get("opportunities", [])
    in_progress = state["plan"]["in_progress"]
    if not in_progress:
        return None
    for o in opps:
        if o["opportunity_id"] == in_progress:
            return o
    return None


def node_techseo_agent_node(state: SEOState, tools: SEOAgentTools) -> SEOState:
    state = ensure_default_state(state)
    opp = _get_current_opportunity(state)
    if opp:
        _append_changeset(
            state,
            opportunity_id=opp["opportunity_id"],
            kind="TECH_FIX",
            target_url=opp["primary_url"],
            risk=3,
        )
    state["plan"]["in_progress"] = None
    _append_audit(state, "techseo_agent_node", "END", {"changesets": len(state["plan"]["changesets"])})
    return state


def node_productseo_agent_node(state: SEOState, tools: SEOAgentTools) -> SEOState:
    state = ensure_default_state(state)
    opp = _get_current_opportunity(state)
    if opp:
        _append_changeset(
            state,
            opportunity_id=opp["opportunity_id"],
            kind="PRODUCT_PDP_UPDATE",
            target_url=opp["primary_url"],
            risk=2,
        )
    state["plan"]["in_progress"] = None
    _append_audit(state, "productseo_agent_node", "END", {"changesets": len(state["plan"]["changesets"])})
    return state


def node_content_agent_node(state: SEOState, tools: SEOAgentTools) -> SEOState:
    state = ensure_default_state(state)
    opp = _get_current_opportunity(state)
    if opp:
        _append_changeset(
            state,
            opportunity_id=opp["opportunity_id"],
            kind="CONTENT_BRIEF",
            target_url=opp["primary_url"],
            risk=2,
        )
    state["plan"]["in_progress"] = None
    _append_audit(state, "content_agent_node", "END", {"changesets": len(state["plan"]["changesets"])})
    return state


def node_authority_agent_node(state: SEOState, tools: SEOAgentTools) -> SEOState:
    state = ensure_default_state(state)
    opp = _get_current_opportunity(state)
    if opp:
        _append_changeset(
            state,
            opportunity_id=opp["opportunity_id"],
            kind="AUTHORITY_OUTREACH_PLAN",
            target_url=opp["primary_url"],
            risk=1,
        )
    state["plan"]["in_progress"] = None
    _append_audit(state, "authority_agent_node", "END", {"changesets": len(state["plan"]["changesets"])})
    return state

# ------------ Approve / Execute / Validate nodes ------------

In [14]:
def node_qa_guardrail_node(state: SEOState, tools: SEOAgentTools) -> SEOState:
    state = ensure_default_state(state)
    plan: PlanState = state["plan"]
    approvals: ApprovalsState = state["approvals"]

    risk_cfg = state.get("config", {}).get("risk", {})
    approval_threshold = risk_cfg.get("approval_threshold", 4)
    auto_execute_max_risk = risk_cfg.get("auto_execute_max_risk", 2)

    requires_approval = False
    has_high_risk = False

    for cs in plan["changesets"]:
        max_risk = max((op.get("risk", 0) for op in cs.get("operations", [])), default=0)
        if max_risk >= approval_threshold:
            cs["approval_required"] = True
            cs["approval_status"] = "PENDING"
            requires_approval = True
            has_high_risk = True
        elif max_risk <= auto_execute_max_risk:
            cs["approval_required"] = False
            cs["approval_status"] = "NOT_REQUIRED"
        else:
            cs["approval_required"] = True
            cs["approval_status"] = "PENDING"
            requires_approval = True

    approvals["required"] = requires_approval
    approvals["status"] = "PENDING" if requires_approval or has_high_risk else "NOT_REQUIRED"

    _append_audit(
        state,
        "qa_guardrail_node",
        "END",
        {"changesets": len(plan["changesets"]), "required": approvals["required"]},
    )
    return state


def node_approval_interrupt_node(state: SEOState, tools: SEOAgentTools) -> SEOState:
    """
    In a real system this node would be paired with LangGraph interrupts
    and/or CrewAI HITL to actually wait for human approval.

    Here we assume `approvals.decisions` has been populated externally
    and we simply aggregate them into final statuses.
    """
    state = ensure_default_state(state)
    approvals: ApprovalsState = state["approvals"]
    decisions = approvals.get("decisions", [])

    decisions_by_cs: Dict[str, str] = {d["changeset_id"]: d["decision"] for d in decisions}
    plan: PlanState = state["plan"]

    statuses: List[str] = []
    for cs in plan["changesets"]:
        cs_id = cs["changeset_id"]
        decision = decisions_by_cs.get(cs_id)
        if not decision:
            continue
        cs["approval_status"] = decision
        statuses.append(decision)

    if not statuses:
        approvals["status"] = "PENDING"
    elif all(s == "APPROVED" for s in statuses):
        approvals["status"] = "APPROVED"
    elif all(s == "REJECTED" for s in statuses):
        approvals["status"] = "REJECTED"
    else:
        approvals["status"] = "PARTIAL"

    _append_audit(
        state,
        "approval_interrupt_node",
        "END",
        {"status": approvals["status"], "decisions": len(decisions)},
    )
    return state


def node_execute_changeset_node(state: SEOState, tools: SEOAgentTools) -> SEOState:
    state = ensure_default_state(state)
    plan: PlanState = state["plan"]
    approvals: ApprovalsState = state["approvals"]
    execution: ExecutionState = state["execution"]

    risk_cfg = state.get("config", {}).get("risk", {})
    blocklist_kinds = risk_cfg.get("blocklist_kinds", [])

    for cs in plan["changesets"]:
        cs_id = cs["changeset_id"]
        if cs["kind"] in blocklist_kinds:
            cs["execution_status"] = "SKIPPED"
            execution["skipped_changesets"].append(cs_id)
            continue

        status = cs.get("approval_status", "NOT_REQUIRED")
        if status not in ("APPROVED", "NOT_REQUIRED"):
            cs["execution_status"] = "SKIPPED"
            execution["skipped_changesets"].append(cs_id)
            continue

        # Implement: call CMS tool here in a real build
        cs["execution_status"] = "APPLIED"
        cs["executed_at"] = datetime.utcnow().isoformat()
        execution["applied_changesets"].append(cs_id)
        execution["execution_notes"].append(f"Applied {cs_id} (stub).")

    _append_audit(
        state,
        "execute_changeset_node",
        "END",
        {"applied": len(execution["applied_changesets"]), "skipped": len(execution["skipped_changesets"])},
    )
    return state


def node_create_tickets_node(state: SEOState, tools: SEOAgentTools) -> SEOState:
    state = ensure_default_state(state)
    plan: PlanState = state["plan"]
    execution: ExecutionState = state["execution"]

    for cs in plan["changesets"]:
        if cs.get("execution_status") in ("SKIPPED", "FAILED"):
            ticket_id = f"t_{cs['changeset_id']}"
            plan["tickets"].append(
                {
                    "ticket_id": ticket_id,
                    "system": "jira",
                    "title": f"Investigate ChangeSet {cs['changeset_id']}",
                    "description": "Stub ticket based on skipped/failed ChangeSet.",
                    "priority": "P1",
                    "url_refs": [cs["target"]["url"]],
                    "created_at": datetime.utcnow().isoformat(),
                }
            )
            execution["execution_notes"].append(
                f"Created ticket {ticket_id} for {cs['changeset_id']}."
            )

    _append_audit(
        state,
        "create_tickets_node",
        "END",
        {"tickets": len(plan["tickets"])},
    )
    return state


def node_validate_metrics_node(state: SEOState, tools: SEOAgentTools) -> SEOState:
    state = ensure_default_state(state)
    validation: ValidationState = state["validation"]
    # Implement: compute real before/after metrics in production
    validation["pre_metrics"] = validation.get("pre_metrics", {"org_revenue_28d": 0})
    validation["post_metrics"] = validation.get("post_metrics", {"org_revenue_28d": 0})
    validation["checks"].append(
        {"name": "schema_validates", "passed": True, "ts": datetime.utcnow().isoformat()}
    )
    validation["validated_at"] = datetime.utcnow().isoformat()

    _append_audit(state, "validate_metrics_node", "END", {"checks": len(validation["checks"])})
    return state


def node_persist_run_node(state: SEOState, tools: SEOAgentTools) -> SEOState:
    state = ensure_default_state(state)
    # Implement: persist to your DB/log; here we just audit.
    _append_audit(state, "persist_run_node", "END", {"persisted": True})
    return state

# ============================================================
# Step 4: Construct the LangGraph universal graph
# ============================================================

In [15]:


def _router_condition(state: SEOState) -> str:
    """
    Implement: routing logic for `route_by_type_and_priority` node.

    Returns the name of the next node, consistent with the design:
      - techseo_agent_node / productseo_agent_node / content_agent_node /
        authority_agent_node, or
      - qa_guardrail_node if the queue is empty.
    """
    state = ensure_default_state(state)
    plan: PlanState = state["plan"]

    if not plan["work_queue"]:
        return "qa_guardrail_node"

    # If in_progress isn't set yet, pick the next opportunity and infer type.
    if plan["in_progress"] is None:
        next_id = plan["work_queue"].pop(0)
        plan["in_progress"] = next_id

    opp = _get_current_opportunity(state)
    opp_type = (opp or {}).get("opportunity_type", "TECH")

    if opp_type == "TECH":
        return "techseo_agent_node"
    if opp_type == "PRODUCT":
        return "productseo_agent_node"
    if opp_type == "CONTENT":
        return "content_agent_node"
    if opp_type == "AUTHORITY":
        return "authority_agent_node"

    # Fallback: send to QA if unknown
    return "qa_guardrail_node"


def _qa_condition(state: SEOState) -> str:
    """
    Implement: routing from `qa_guardrail_node` → approval / execute / tickets.
    """
    state = ensure_default_state(state)
    plan: PlanState = state["plan"]
    approvals: ApprovalsState = state["approvals"]

    if not plan["changesets"]:
        return "create_tickets_node"

    if approvals.get("required", False):
        return "approval_interrupt_node"

    # No approvals required → execute directly
    return "execute_changeset_node"


def _approval_gate_condition(state: SEOState) -> str:
    """
    Implement: routing from `approval_interrupt_node` → execute or tickets.
    """
    state = ensure_default_state(state)
    approvals: ApprovalsState = state["approvals"]
    status = approvals.get("status", "PENDING")

    if status in ("APPROVED", "PARTIAL"):
        return "execute_changeset_node"
    if status == "REJECTED":
        return "create_tickets_node"

    # Still pending: in a real system you'd interrupt; here we default to tickets.
    return "create_tickets_node"


def build_langgraph_universal_app(
    tools: Optional[SEOAgentTools] = None,
) -> Any:
    """
    Construct: LangGraph StateGraph wired according to the design doc.
    All node functions close over the provided SEOAgentTools bundle.
    """
    if tools is None:
        tools = SEOAgentTools()

    graph = StateGraph(SEOState)

    # Helper to bind tools into node callables
    def bind(fn: Callable[[SEOState, SEOAgentTools], SEOState]) -> Callable[[SEOState], SEOState]:
        return lambda state: fn(state, tools)

    # Register nodes (names must exactly match the design)
    graph.add_node("collect_gsc", bind(node_collect_gsc))
    graph.add_node("collect_ga4", bind(node_collect_ga4))
    graph.add_node("collect_serp_snapshot", bind(node_collect_serp_snapshot))
    graph.add_node("collect_crawl_diff", bind(node_collect_crawl_diff))

    graph.add_node("normalize_inputs", bind(node_normalize_inputs))
    graph.add_node("detect_anomalies", bind(node_detect_anomalies))
    graph.add_node("classify_issues", bind(node_classify_issues))
    graph.add_node("score_opportunities", bind(node_score_opportunities))
    graph.add_node("route_by_type_and_priority", bind(node_route_by_type_and_priority))

    graph.add_node("techseo_agent_node", bind(node_techseo_agent_node))
    graph.add_node("productseo_agent_node", bind(node_productseo_agent_node))
    graph.add_node("content_agent_node", bind(node_content_agent_node))
    graph.add_node("authority_agent_node", bind(node_authority_agent_node))

    graph.add_node("qa_guardrail_node", bind(node_qa_guardrail_node))
    graph.add_node("approval_interrupt_node", bind(node_approval_interrupt_node))
    graph.add_node("execute_changeset_node", bind(node_execute_changeset_node))
    graph.add_node("create_tickets_node", bind(node_create_tickets_node))
    graph.add_node("validate_metrics_node", bind(node_validate_metrics_node))
    graph.add_node("persist_run_node", bind(node_persist_run_node))

    # Direct edges (Observe → Diagnose → Recommend → Router)
    graph.add_edge(START, "collect_gsc")
    graph.add_edge("collect_gsc", "collect_ga4")
    graph.add_edge("collect_ga4", "collect_serp_snapshot")
    graph.add_edge("collect_serp_snapshot", "collect_crawl_diff")
    graph.add_edge("collect_crawl_diff", "normalize_inputs")
    graph.add_edge("normalize_inputs", "detect_anomalies")
    graph.add_edge("detect_anomalies", "classify_issues")
    graph.add_edge("classify_issues", "score_opportunities")
    graph.add_edge("score_opportunities", "route_by_type_and_priority")

    # Conditional routing from router → specialist / QA
    graph.add_conditional_edges(
        "route_by_type_and_priority",
        _router_condition,
        {
            "techseo_agent_node": "techseo_agent_node",
            "productseo_agent_node": "productseo_agent_node",
            "content_agent_node": "content_agent_node",
            "authority_agent_node": "authority_agent_node",
            "qa_guardrail_node": "qa_guardrail_node",
        },
    )

    # Loop back from specialists → router
    graph.add_edge("techseo_agent_node", "route_by_type_and_priority")
    graph.add_edge("productseo_agent_node", "route_by_type_and_priority")
    graph.add_edge("content_agent_node", "route_by_type_and_priority")
    graph.add_edge("authority_agent_node", "route_by_type_and_priority")

    # QA conditional routing
    graph.add_conditional_edges(
        "qa_guardrail_node",
        _qa_condition,
        {
            "approval_interrupt_node": "approval_interrupt_node",
            "execute_changeset_node": "execute_changeset_node",
            "create_tickets_node": "create_tickets_node",
        },
    )

    # Approval gate routing
    graph.add_conditional_edges(
        "approval_interrupt_node",
        _approval_gate_condition,
        {
            "execute_changeset_node": "execute_changeset_node",
            "create_tickets_node": "create_tickets_node",
        },
    )

    # Execute → Tickets → Validate → Persist → END
    graph.add_edge("execute_changeset_node", "create_tickets_node")
    graph.add_edge("create_tickets_node", "validate_metrics_node")
    graph.add_edge("validate_metrics_node", "persist_run_node")
    graph.add_edge("persist_run_node", END)

    # Compile the graph into an app
    app = graph.compile()
    return app

# ============================================================
# Step 5: Construct the CrewAI Flow adapter using the same nodes
# ============================================================

In [16]:


class AurumSEO_Flow(Flow):
    """
    Design: CrewAI Flow that reuses the exact same node functions
    as the LangGraph app. This gives you a single source of truth
    for business logic, with two orchestration front-ends.
    """

    def __init__(self, tools: Optional[SEOAgentTools] = None):
        super().__init__()
        self.tools = tools or SEOAgentTools()

    # --- Observe chain ---

    @start()
    def Task_CollectGSC(self, state: SEOState) -> SEOState:
        return node_collect_gsc(state, self.tools)

    @listen(Task_CollectGSC)
    def Task_CollectGA4(self, state: SEOState) -> SEOState:
        return node_collect_ga4(state, self.tools)

    @listen(Task_CollectGA4)
    def Task_CollectSERP(self, state: SEOState) -> SEOState:
        return node_collect_serp_snapshot(state, self.tools)

    @listen(Task_CollectSERP)
    def Task_CollectCrawlDiff(self, state: SEOState) -> SEOState:
        return node_collect_crawl_diff(state, self.tools)

    # --- Diagnose chain ---

    @listen(Task_CollectCrawlDiff)
    def Task_NormalizeInputs(self, state: SEOState) -> SEOState:
        return node_normalize_inputs(state, self.tools)

    @listen(Task_NormalizeInputs)
    def Task_DetectAnomalies(self, state: SEOState) -> SEOState:
        return node_detect_anomalies(state, self.tools)

    @listen(Task_DetectAnomalies)
    def Task_ClassifyIssues(self, state: SEOState) -> SEOState:
        return node_classify_issues(state, self.tools)

    # --- Recommend chain ---

    @listen(Task_ClassifyIssues)
    def Task_ScoreOpportunitiesAndBuildQueue(self, state: SEOState) -> SEOState:
        return node_score_opportunities(state, self.tools)

    @listen(Task_ScoreOpportunitiesAndBuildQueue)
    def Task_RouteNextOpportunity(self, state: SEOState) -> SEOState:
        # We call the same router node, then do simple branching
        state = node_route_by_type_and_priority(state, self.tools)
        next_node = _router_condition(state)
        if next_node == "techseo_agent_node":
            return node_techseo_agent_node(state, self.tools)
        if next_node == "productseo_agent_node":
            return node_productseo_agent_node(state, self.tools)
        if next_node == "content_agent_node":
            return node_content_agent_node(state, self.tools)
        if next_node == "authority_agent_node":
            return node_authority_agent_node(state, self.tools)
        # If router says QA, skip to QA
        return state

    # --- QA / Approve / Execute / Validate chain ---

    @listen(Task_RouteNextOpportunity)
    def Task_QAGuardrail(self, state: SEOState) -> SEOState:
        return node_qa_guardrail_node(state, self.tools)

    @listen(Task_QAGuardrail)
    def Task_ApprovalGate(self, state: SEOState) -> SEOState:
        # Same logic as LangGraph conditional edges
        decision = _qa_condition(state)
        if decision == "approval_interrupt_node":
            state = node_approval_interrupt_node(state, self.tools)
        # After approval gate, we route again as in LangGraph
        decision2 = _approval_gate_condition(state)
        if decision2 == "execute_changeset_node":
            state = node_execute_changeset_node(state, self.tools)
        elif decision2 == "create_tickets_node":
            state = node_create_tickets_node(state, self.tools)
        return state

    @listen(Task_ApprovalGate)
    def Task_ValidateMetrics(self, state: SEOState) -> SEOState:
        return node_validate_metrics_node(state, self.tools)

    @listen(Task_ValidateMetrics)
    def Task_PersistRun(self, state: SEOState) -> SEOState:
        return node_persist_run_node(state, self.tools)


# ============================================================
# Step 6: Minimal smoke test / visualization hook
# ============================================================

In [17]:

if __name__ == "__main__":
    # Implement: basic end-to-end invocation of the LangGraph app
    tools = SEOAgentTools()
    app = build_langgraph_universal_app(tools)

    initial_state: SEOState = ensure_default_state(
        SEOState(  # type: ignore[call-arg]
            run={"run_id": "demo_run_001", "source": "manual", "domain": "https://aurumpickleball.com"},
            config={
                "risk": {
                    "approval_threshold": 4,
                    "auto_execute_max_risk": 2,
                    "blocklist_kinds": [],
                }
            },
        )
    )

    final_state = app.invoke(initial_state)
    print("Work queue:", final_state["plan"]["work_queue"])
    print("Changesets:", len(final_state["plan"]["changesets"]))
    print("Tickets:", len(final_state["plan"]["tickets"]))
    print("Applied:", final_state["execution"]["applied_changesets"])
    print("Audit events:", len(final_state["audit_log"]))

NameError: name 'START' is not defined