In [16]:
from typing import List, Dict, Any
from langchain_openai import ChatOpenAI
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage

class LLMClient:
    """
    Thin wrapper over ChatOpenAI so we can swap models later easily.
    """

    def __init__(self, model: str = "gpt-4.1", temperature: float = 0.2):
        self.llm = ChatOpenAI(model=model, temperature=temperature)

    async def call(
        self,
        system_prompt: str,
        user_content: str,
        extra_messages: List[Dict[str, str]] | None = None,
        tools: List[Any] | None = None,
    ) -> AIMessage:
        messages = [SystemMessage(content=system_prompt)]

        if extra_messages:
            for msg in extra_messages:
                role = msg["role"]
                content = msg["content"]
                if role == "user":
                    messages.append(HumanMessage(content=content))
                elif role == "assistant":
                    messages.append(AIMessage(content=content))

        messages.append(HumanMessage(content=user_content))

        if tools:
            # Tool-enabled call
            chain = self.llm.bind_tools(tools)
            return await chain.ainvoke(messages)
        else:
            return await self.llm.ainvoke(messages)


In [17]:
import json
from typing import Literal, TypedDict

RouteType = Literal["seo", "other", "mixed"]

class RouterOutput(TypedDict):
    route: RouteType
    confidence: float

ROUTER_SYSTEM_PROMPT = """
        You are a routing classifier.
        Goal: Decide if the user query is about SEO or something else.

        Valid routes:
        - "seo"   -> Search Engine Optimization, keywords, traffic, SERP, GSC, DataForSEO, etc.
        - "other" -> Anything else not SEO-related.
        - "mixed" -> Contains both SEO and non-SEO aspects.

        Return STRICT JSON:
        {"route": "seo" | "other" | "mixed", "confidence": 0.0-1.0}
    """

class RouterAgent:
    def __init__(self, llm: LLMClient):
        self.llm = llm

    async def route(self, user_query: str) -> RouterOutput:
        resp = await self.llm.call(ROUTER_SYSTEM_PROMPT, user_query)
        # LLM is instructed to return JSON; parse defensively
        try:
            data = json.loads(resp.content)
            route = data.get("route", "other")
            confidence = float(data.get("confidence", 0.6))
        except Exception:
            route = "other"
            confidence = 0.5
        return {"route": route, "confidence": confidence}


In [18]:
import json
from typing import List, TypedDict

class PlanStep(TypedDict):
    id: int
    description: str
    tool_family: str  # e.g. "gsc", "dataforseo_keywords", "none"

class PlanOutput(TypedDict):
    steps: List[PlanStep]

PLANNER_SYSTEM_PROMPT = """
You are a planning agent for an SEO + general assistant system.

You receive:
- the user query
- a selected high-level route: "seo", "other", or "mixed"

You must break the task into 1-5 concise steps.
Each step should optionally mention a TOOL FAMILY, not a concrete function name.

Examples of tool families:
- "gsc"                       # Google Search Console tools
- "dataforseo_keywords"       # keyword / SERP API tools
- "dataforseo_competitors"    # competitor APIs
- "vector_cache"              # internal semantic cache
- "none"                      # pure reasoning / no tool

Return STRICT JSON:
{"steps": [{"id": 1, "description": "...", "tool_family": "gsc" }, ...]}
"""

class PlannerAgent:
    def __init__(self, llm: LLMClient):
        self.llm = llm

    async def plan(self, user_query: str, route: str) -> PlanOutput:
        user_content = f"Route: {route}\n\nUser query:\n{user_query}"
        resp = await self.llm.call(PLANNER_SYSTEM_PROMPT, user_content)
        try:
            data = json.loads(resp.content)
        except Exception:
            data = {"steps": [{"id": 1, "description": "Answer directly.", "tool_family": "none"}]}
        return data  # type: ignore[return-value]


In [19]:
# agents/seo_executor_agent.py
from typing import List, Dict, Any
from langchain.tools import tool

# ---------- Define your SEO tools here ----------

@tool("gsc_get_overview")
def gsc_get_overview(url: str) -> Dict[str, Any]:
    """
    Get high-level performance overview for a URL/domain from GSC.
    IMPLEMENTATION: call your MCP GSC tool here.
    """
    # TODO: wire up MCP client / DataForSEO / HTTP call
    return {"source": "GSC", "url": url, "clicks": 123, "impressions": 456}

@tool("dataforseo_keywords_ideas")
def dataforseo_keywords_ideas(query: str, location: str = "India") -> Dict[str, Any]:
    """
    Get keyword ideas from DataForSEO for a given query and location.
    IMPLEMENTATION: call DataForSEO MCP tool here.
    """
    # TODO: real implementation here
    return {
        "source": "DataForSEO",
        "query": query,
        "location": location,
        "keywords": [
            {"keyword": f"{query} tools", "volume": 2900},
            {"keyword": f"{query} pricing", "volume": 1300},
        ],
    }

SEO_TOOLS = [gsc_get_overview, dataforseo_keywords_ideas]

SEO_EXECUTOR_SYSTEM_PROMPT = """
You are an SEO execution agent.

- You receive the user query and a list of planned steps.
- You can call ONLY the tools you see (GSC + DataForSEO).
- For each step, decide whether to call a tool or just reason.
- Prefer 1-3 tool calls, not more, unless necessary.
- After tools are called, synthesize a structured RESULT OBJECT summarizing what you found.

Return your final answer as JSON:
{
  "tool_calls": [...],   # brief description + tool name
  "insights": [...],     # concise SEO insights
  "raw_data": {...}      # aggregated raw tool outputs (optional)
}
"""

class SEOExecutorAgent:
    def __init__(self, llm: LLMClient):
        self.llm = llm

    async def execute(
        self,
        user_query: str,
        plan_steps: List[Dict[str, Any]],
    ) -> Any:
        # Put the plan into the context as markdown / JSON
        steps_str = "\n".join(
            [f"- Step {s['id']}: {s['description']} [family={s['tool_family']}]" for s in plan_steps]
        )
        content = f"User query:\n{user_query}\n\nPlanned steps:\n{steps_str}"

        resp = await self.llm.call(
            system_prompt=SEO_EXECUTOR_SYSTEM_PROMPT,
            user_content=content,
            tools=SEO_TOOLS,
        )
        return resp  # resp.content + resp.tool_calls etc.


In [20]:
SUMMARIZER_SYSTEM_PROMPT = """
You are a summarization and explanation agent.

Input:
- User's original query
- A JSON blob with 'tool_calls', 'insights', and 'raw_data'

Your job:
- Write a clear, concise, user-facing answer.
- Focus on ACTIONABLE SEO advice, not verbose theory.
- Use bullet points where helpful.
- Mention specific metrics/keywords only if present in the JSON.

Do NOT show raw JSON back to the user.
"""

class SummarizerAgent:
    def __init__(self, llm: LLMClient):
        self.llm = llm

    async def summarize(self, user_query: str, execution_json: str) -> str:
        user_content = f"User query:\n{user_query}\n\nExecution result JSON:\n{execution_json}"
        resp = await self.llm.call(SUMMARIZER_SYSTEM_PROMPT, user_content)
        return resp.content


In [21]:
# orchestrator.py
import json
import asyncio
from typing import Any, Dict

class Orchestrator:
    def __init__(self):
        llm_router = LLMClient(model="gpt-4.1", temperature=0.0)
        llm_planner = LLMClient(model="gpt-4.1", temperature=0.2)
        llm_seo = LLMClient(model="gpt-4.1", temperature=0.2)
        llm_summarizer = LLMClient(model="gpt-4.1", temperature=0.4)

        self.router = RouterAgent(llm_router)
        self.planner = PlannerAgent(llm_planner)
        self.seo_executor = SEOExecutorAgent(llm_seo)
        self.summarizer = SummarizerAgent(llm_summarizer)

    async def handle_query(self, user_query: str) -> str:
        # 1) ROUTER
        route_result = await self.router.route(user_query)
        route = route_result["route"]

        # 2) PLANNER
        plan_result = await self.planner.plan(user_query, route)
        steps = plan_result["steps"]

        # 3) EXECUTION (branch by route)
        if route in ("seo", "mixed"):
            exec_resp = await self.seo_executor.execute(user_query, steps)
            # exec_resp.content should be JSON string (per SEO_EXECUTOR_SYSTEM_PROMPT)
            exec_json_str = exec_resp.content
        else:
            # For now: directly answer without tools (you can later add a generic executor)
            exec_json_str = json.dumps(
                {
                    "tool_calls": [],
                    "insights": [f"Direct reasoning for query: {user_query}"],
                    "raw_data": {},
                }
            )

        # 4) SUMMARIZER
        final_answer = await self.summarizer.summarize(user_query, exec_json_str)
        return final_answer


async def main():
    orch = Orchestrator()
    user_query = "Provide SEO analysis for domain strique.io"
    answer = await orch.handle_query(user_query)
    print("\n=== FINAL ANSWER ===\n")
    print(answer)

In [22]:
await main()


=== FINAL ANSWER ===

It looks like there is no SEO data available for strique.io at this time. Please ensure the domain is correct or try again later with more information. If you need actionable SEO advice, consider the following general steps:

- Check if your site is indexed by Google using "site:strique.io" in Google search.
- Ensure your website has unique title tags and meta descriptions for each page.
- Optimize on-page content with relevant keywords.
- Improve site speed and mobile responsiveness.
- Build quality backlinks from reputable sources.
- Set up Google Search Console and Google Analytics for ongoing monitoring.

If you provide more specific data or metrics, I can give more tailored recommendations!


In [None]:
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from langchain_mcp_adapters.client import MultiServerMCPClient
import os
import sys
from pathlib import Path
from typing import Dict, Any, Optional

class SEOAgent:
    def __init__(self, llm: ChatOpenAI):
        self.name = "Simple Query Agent"
        self.description = 
        self.model = llm
        self._tools = None
        self._agent = None
        self._tool_warning: Optional[str] = None
        self._mcp_client: Optional[MultiServerMCPClient] = None

    def _build_gsc_server_config(self) -> Dict[str, Any]:
        """Build configuration for the local GSC MCP server."""
        current_dir = Path(__file__).parent
        project_root = current_dir.parent.parent
        gsc_server_path = project_root / "src" / "tools" / "gsc_server.py"

        python_interpreter = sys.executable

        return {
            "command": python_interpreter,
            "args": [str(gsc_server_path)],
            "transport": "stdio",
            "env": {
                "GSC_CREDENTIALS": os.getenv("GSC_CREDENTIALS", ""),
                "GSC_SKIP_OAUTH": os.getenv("GSC_SKIP_OAUTH", "true"),
            },
        }

    def _build_dataforseo_config(self) -> Optional[Dict[str, Any]]:
        """Build configuration for the optional DataForSEO MCP server."""
        enable_remote = os.getenv("ENABLE_DATAFORSEO_MCP", "true").lower() in ("1", "true", "yes")
        if not enable_remote:
            return None

        default_url = "https://dataforseo-mcp-worker.hitesh-solanki.workers.dev/mcp"
        dataforseo_url = os.getenv("DATAFORSEO_MCP_URL", default_url).strip()
        if not dataforseo_url:
            return None

        config: Dict[str, Any] = {
            "transport": "streamable_http",
            "url": dataforseo_url,
        }

        timeout = os.getenv("DATAFORSEO_MCP_TIMEOUT")
        if timeout:
            try:
                config["timeout"] = float(timeout)
            except ValueError:
                pass

        auth_header = os.getenv("DATAFORSEO_MCP_AUTH_HEADER")
        if auth_header:
            config["headers"] = {"Authorization": auth_header.strip()}

        return config

    def _build_server_config(self, include_dataforseo: bool = True) -> Dict[str, Any]:
        """Build the MCP server configuration dictionary."""
        servers: Dict[str, Any] = {
            "gscServer": self._build_gsc_server_config()
        }

        if include_dataforseo:
            dataforseo_config = self._build_dataforseo_config()
            if dataforseo_config:
                servers["dataforseo"] = dataforseo_config

        return servers

    def get_mcp_client(self, include_dataforseo: bool = True):
        """Initialize and return the MCP client with configured servers."""
        config = self._build_server_config(include_dataforseo=include_dataforseo)
        return MultiServerMCPClient(config)
    
    async def get_tools(self):
        """Get tools from MCP clients. Caches tools for reuse."""
        if self._tools is None:
            self._tool_warning = None

            # Try loading with both servers first
            client = self.get_mcp_client(include_dataforseo=True)
            try:
                self._mcp_client = client
                self._tools = await client.get_tools()
                return self._tools
            except Exception as exc:
                # If DataForSEO is enabled and fails, fall back to GSC-only
                server_config = self._build_server_config(include_dataforseo=True)
                if "dataforseo" in server_config:
                    fallback_client = self.get_mcp_client(include_dataforseo=False)
                    try:
                        details = str(exc)
                        if len(details) > 500:
                            details = details[:500] + "... (truncated)"
                        self._tool_warning = (
                            "⚠️ DataForSEO MCP server could not be reached. "
                            "Continuing with Google Search Console tools only.\n"
                            f"Details: {details}"
                        )
                        self._mcp_client = fallback_client
                        self._tools = await fallback_client.get_tools()
                        return self._tools
                    except Exception as fallback_exc:
                        raise fallback_exc from exc
                # No fallback available, re-raise
                raise
        return self._tools

    def get_tool_warning(self) -> Optional[str]:
        """Return any warning generated while loading tools."""
        return self._tool_warning

    def update_system_prompt(self, new_prompt: str):
        """Update the system prompt and invalidate the cached agent."""
        self.description = new_prompt
        # Invalidate the cached agent so it will be recreated with the new prompt
        self._agent = None

    async def get_agent(self):
        """Get or create the agent with tools and error handling middleware."""
        if self._agent is None:
            # Get the tools
            tools = await self.get_tools()
            
            # Create the agent with error handling middleware
            self._agent = create_agent(
                model=self.model,
                tools=tools,
                system_prompt=self.description,
                middleware=[handle_tool_errors]
            )
        
        return self._agent
        
    async def run(self, messages):
        """Run the agent with the given messages."""
        # Get the agent
        agent = await self.get_agent()
        
        # Run the agent
        result = await agent.ainvoke({
            "messages": messages
        })
        
        return result
    
    async def stream(self, messages):
        """Stream agent responses as they are generated."""
        # Get the agent
        agent = await self.get_agent()
        
        # Stream the agent's response
        async for chunk in agent.astream(
            {"messages": messages},
            stream_mode="values"
        ):
            yield chunk

ModuleNotFoundError: No module named 'src.tools.seo_tools'