In [1]:
# Install dependencies (run once per environment)
%pip install -q dspy python-dotenv


Note: you may need to restart the kernel to use updated packages.


In [10]:
# Basic imports and environment setup
import os
import dspy
from dotenv import load_dotenv
from dspy.adapters import JSONAdapter
from dspy import History

# Load API keys from .env (ANTHROPIC_API_KEY is expected, already set in your env)
load_dotenv()

# Configure model provider to match LangGraph (Claude 3.5 Sonnet)
lm = dspy.LM("anthropic/claude-3-5-sonnet-20241022", api_key=os.getenv("ANTHROPIC_API_KEY"), temperature=0.1, max_tokens=4000)

# Configure DSPy with LM and JSON adapter
dspy.configure(lm=lm)
dspy.configure(adapter=JSONAdapter())

# Conversation memory
conversation_history = History(messages=[])

print("DSPy configured with Claude + JSONAdapter. Ready to build customer support agent.")


DSPy configured with Claude + JSONAdapter. Ready to build customer support agent.


In [3]:
# Tools: Mock orders DB and documentation helpers
from pathlib import Path
from typing import Optional, Literal
from dataclasses import dataclass

# Documentation category literal
DocumentationCategory = Literal["shipping", "returns", "products", "account", "payment"]

# Mock orders database
ORDERS_DATABASE = [
    {
        "order_id": "ORD-001",
        "customer_email": "john.doe@email.com",
        "customer_name": "John Doe",
        "product": "Wireless Headphones",
        "price": 99.99,
        "status": "delivered",
        "order_date": "2024-01-15",
        "delivery_date": "2024-01-18",
    },
    {
        "order_id": "ORD-002",
        "customer_email": "jane.smith@email.com",
        "customer_name": "Jane Smith",
        "product": "Smart Watch",
        "price": 199.99,
        "status": "shipped",
        "order_date": "2024-01-20",
        "delivery_date": None,
    },
    {
        "order_id": "ORD-003",
        "customer_email": "bob.wilson@email.com",
        "customer_name": "Bob Wilson",
        "product": "Laptop Stand",
        "price": 49.99,
        "status": "processing",
        "order_date": "2024-01-22",
        "delivery_date": None,
    },
    {
        "order_id": "ORD-004",
        "customer_email": "alice.brown@email.com",
        "customer_name": "Alice Brown",
        "product": "Bluetooth Speaker",
        "price": 79.99,
        "status": "delivered",
        "order_date": "2024-01-10",
        "delivery_date": "2024-01-13",
    },
    {
        "order_id": "ORD-005",
        "customer_email": "charlie.davis@email.com",
        "customer_name": "Charlie Davis",
        "product": "Phone Case",
        "price": 24.99,
        "status": "cancelled",
        "order_date": "2024-01-25",
        "delivery_date": None,
    },
]

# Documentation directory (local to this notebook)
try:
    BASE_DIR = Path(__file__).parent  # type: ignore[name-defined]
except NameError:
    BASE_DIR = Path().resolve()
DOCS_DIR = BASE_DIR / "documentation"


def _load_documentation_file(category: DocumentationCategory) -> str:
    """Load documentation content from the repo .txt files."""
    file_path = DOCS_DIR / f"{category}.txt"
    try:
        if file_path.exists():
            return file_path.read_text(encoding="utf-8").strip()
        return f"No documentation file found for category: {category}"
    except Exception as e:
        return f"Error reading documentation file for {category}: {e}"


def _classify_query_to_category(query: str) -> DocumentationCategory:
    """Heuristic classification."""
    q = query.lower()
    if any(k in q for k in ["shipping", "delivery", "express", "standard", "overnight", "tracking"]):
        return "shipping"
    if any(k in q for k in ["return", "refund", "exchange", "policy", "rma"]):
        return "returns"
    if any(k in q for k in ["product", "warranty", "compatibility", "specification", "specs", "feature"]):
        return "products"
    if any(k in q for k in ["account", "password", "login", "profile", "dashboard", "reset"]):
        return "account"
    if any(k in q for k in ["payment", "billing", "credit", "card", "paypal", "apple pay", "invoice", "receipt"]):
        return "payment"
    return "products"


def is_relevant_query(query: str) -> bool:
    """Check if the query is ecommerce-related."""
    q = query.lower()
    keywords = [
        "order", "purchase", "buy", "bought", "ordered", "order id", "order number",
        "tracking", "delivery", "shipped", "delivered", "status",
        "product", "item", "specification", "compatibility", "warranty",
        "return", "refund", "exchange", "defective", "broken", "damaged",
        "account", "login", "password", "profile", "billing", "payment",
        "credit card", "paypal", "invoice", "receipt",
        "shipping", "express", "standard", "overnight", "cost", "free shipping",
        "address", "package", "shipment", "help", "support", "customer service",
        "assistance", "problem", "issue", "question", "inquiry", "complaint", "concern",
    ]
    return any(k in q for k in keywords)


In [4]:
# Tools module (plain Python functions)

class CustomerSupportTools(dspy.Module):
    """Thin wrappers that shape inputs/outputs and add safety checks."""

    def forward_doc_search(self, query: str, category: str = "auto") -> str:
        resolved = _classify_query_to_category(query) if category == "auto" else category  # type: ignore[assignment]
        content = _load_documentation_file(resolved)  # type: ignore[arg-type]
        if not content or "No documentation file found" in content or "Error reading" in content:
            return f"I couldn't find documentation for '{resolved}'."
        MAX_CONTENT_LENGTH = 8000
        if len(content) > MAX_CONTENT_LENGTH:
            trimmed_content = content[:MAX_CONTENT_LENGTH]
            last = max(trimmed_content.rfind("."), trimmed_content.rfind("\n"))
            if last > 0:
                trimmed_content = content[:last]
            large_file_notice = (
                f"\n\n---\n"
                f"LARGE DOCUMENTATION FILE DETECTED\n"
                f"Original file size: {len(content)} characters\n"
                f"Trimmed to: {len(trimmed_content)} characters\n\n"
                f"DEVELOPER NOTE: This file exceeds the recommended size for direct LLM processing.\n"
                f"For production systems, consider implementing: document chunking, vector search, hybrid search, pagination, and summarization.\n\n"
                f"For now, showing the first {len(trimmed_content)} characters of documentation.\n"
                f"---\n"
            )
            content = trimmed_content + large_file_notice
        header = f"FULL DOCUMENTATION FOR {resolved.upper()} CATEGORY:\n\n"
        trailer = f"\n\n---\nUSER QUERY: {query}\nPlease search through the above documentation and answer the user's query."
        return header + content + trailer

    def forward_search_orders(self, customer_email: str = "", order_id: str = "") -> str:
        if not customer_email and not order_id:
            return "Please provide either a customer email or order ID to search for orders."
        found = []
        for o in ORDERS_DATABASE:
            if customer_email and o["customer_email"].lower() == customer_email.lower():
                found.append(o)
            elif order_id and o["order_id"].upper() == order_id.upper():
                found.append(o)
        if not found:
            if customer_email:
                return f"No orders found for email: {customer_email}"
            return f"No orders found for order ID: {order_id}"
        chunks = []
        for o in found:
            chunks.append(
                (
                    f"Order ID: {o['order_id']}\n"
                    f"Customer: {o['customer_name']} ({o['customer_email']})\n"
                    f"Product: {o['product']}\n"
                    f"Price: ${o['price']}\n"
                    f"Status: {o['status'].title()}\n"
                    f"Order Date: {o['order_date']}\n"
                    f"Delivery Date: {o['delivery_date'] if o['delivery_date'] else 'Not delivered yet'}\n"
                ).strip()
            )
        return "\n\n".join(chunks)

    def forward_refund(self, order_id: str, reason: str = "Customer request") -> str:
        order = None
        for o in ORDERS_DATABASE:
            if o["order_id"].upper() == order_id.upper():
                order = o
                break
        if not order:
            return f"Order {order_id} not found. Please verify the order ID."
        if order["status"] == "cancelled":
            return f"Order {order_id} has already been cancelled and cannot be refunded."
        if order["status"] == "processing":
            return (
                f"Order {order_id} is still being processed. Refunds can only be processed "
                "for shipped or delivered orders."
            )
        order["status"] = "refunded"
        refund_amount = order["price"]
        return (
            "Refund processed successfully!\n\n"
            f"Order ID: {order_id}\n"
            f"Customer: {order['customer_name']} ({order['customer_email']})\n"
            f"Product: {order['product']}\n"
            f"Refund Amount: ${refund_amount}\n"
            f"Reason: {reason}\n"
            "Status: Refunded\n\n"
            "The refund will be credited to the original payment method within 5-7 business days."
        )

    def forward_get_status(self, order_id: str) -> str:
        for o in ORDERS_DATABASE:
            if o["order_id"].upper() == order_id.upper():
                details = (
                    f"Order ID: {o['order_id']}\n"
                    f"Status: {o['status'].title()}\n"
                    f"Customer: {o['customer_name']}\n"
                    f"Product: {o['product']}\n"
                    f"Order Date: {o['order_date']}\n"
                )
                details += (
                    f"Delivery Date: {o['delivery_date']}" if o["delivery_date"] else "Delivery Date: Not delivered yet"
                )
                return details.strip()
        return f"Order {order_id} not found. Please verify the order ID."


In [5]:
# ReAct agent with tools + History + JSON output
from typing import Any
import json

# Wrap tools as plain functions for ReAct
_tools_impl = CustomerSupportTools()


def tool_doc_search(query: str, category: str = "auto") -> dict:
    """
    Search company documentation by category with large-document notice.
    Returns a dict with keys: tool, category, content.
    """
    resolved = _classify_query_to_category(query) if category == "auto" else category  # type: ignore[assignment]
    content = _load_documentation_file(resolved)  # type: ignore[arg-type]
    if (not content) or ("No documentation file found" in content) or ("Error reading" in content):
        return {"tool": "doc_search", "category": resolved, "content": f"I couldn't find documentation for '{resolved}'."}

    MAX_CONTENT_LENGTH = 8000
    if len(content) > MAX_CONTENT_LENGTH:
        trimmed = content[:MAX_CONTENT_LENGTH]
        last = max(trimmed.rfind("."), trimmed.rfind("\n"))
        if last > 0:
            trimmed = content[:last]
        large_file_notice = (
            "\n\n---\n"
            "LARGE DOCUMENTATION FILE DETECTED\n"
            f"Original file size: {len(content)} characters\n"
            f"Trimmed to: {len(trimmed)} characters\n\n"
            "DEVELOPER NOTE: This file exceeds the recommended size for direct LLM processing.\n"
            "For production systems with large documentation, consider: document chunking, vector search, hybrid search, pagination, and summarization.\n\n"
            f"For now, showing the first {len(trimmed)} characters of documentation.\n"
            "---\n"
        )
        content = trimmed + large_file_notice

    header = f"FULL DOCUMENTATION FOR {resolved.upper()} CATEGORY:\n\n"
    trailer = (
        f"\n\n---\nUSER QUERY: {query}\n"
        "Please search through the above documentation and answer the user's query."
    )
    return {"tool": "doc_search", "category": resolved, "content": header + content + trailer}


def tool_search_orders(customer_email: str = "", order_id: str = "") -> dict:
    """Search mock orders by email or order_id and return a dict result."""
    result = _tools_impl.forward_search_orders(customer_email=customer_email, order_id=order_id)
    return {"tool": "search_orders", "result": result}


def tool_refund(order_id: str, reason: str = "Customer request") -> dict:
    """Process a refund; includes 5–7 business days notice."""
    if not order_id:
        return {"tool": "refund", "result": "Order ID is required to process a refund."}
    result = _tools_impl.forward_refund(order_id=order_id, reason=reason)
    return {"tool": "refund", "result": result}


def tool_get_status(order_id: str) -> dict:
    """Get order status by ID."""
    if not order_id:
        return {"tool": "get_status", "result": "Order ID is required to check status."}
    result = _tools_impl.forward_get_status(order_id=order_id)
    return {"tool": "get_status", "result": result}


class SupportReActSignature(dspy.Signature):
    """
    You are a professional customer service representative for TechStore, an e-commerce company specializing in electronics and tech accessories.
    Use provided customer context to personalize responses.
    Guidelines: Verify order IDs before refunds; use documentation for policy info; search orders for lookups; use status for quick checks; escalate politely when needed.

    You can call tools: doc_search, search_orders, refund, get_status.
    When finished, produce:
    - `action`: the primary tool used (one of: doc_search, search_orders, refund, get_status, answer_direct)
    - `tool_result`: the most relevant tool output you used (may be empty for answer_direct)
    - `answer`: the final customer-facing reply
    Keep responses concise and professional.
    """
    user_message: str = dspy.InputField(description="The customer's message")
    customer_email: str = dspy.InputField(description="Customer email if available")
    order_id: str = dspy.InputField(description="Order ID if available")
    history: dspy.History = dspy.InputField(description="Conversation history")

    reasoning: str = dspy.OutputField(description="Brief plan and justification")
    action: str = dspy.OutputField(description="Chosen action/tool")
    tool_result: str = dspy.OutputField(description="Tool output used to answer")
    answer: str = dspy.OutputField(description="Final answer")


react_agent = dspy.ReAct(
    SupportReActSignature,
    tools=[
        tool_doc_search,
        tool_search_orders,
        tool_refund,
        tool_get_status,
    ],
    max_iters=3,
)


def run_support_agent(user_message: str, customer_email: str = "", order_id: str = "") -> dict:
    """Single entrypoint that runs the ReAct agent and returns a JSON-serializable dict."""
    # Relevance filter first
    if user_message and not is_relevant_query(user_message):
        rejection = (
            "I'm sorry, but I can only assist with e-commerce related inquiries such as order status, "
            "product information, shipping, returns, refunds, account and payment issues."
        )
        # Append to History using signature-conformant keys
        conversation_history.messages.append({
            "user_message": user_message,
            "customer_email": customer_email,
            "order_id": order_id,
        })
        conversation_history.messages.append({
            "answer": rejection,
            "action": "reject",
            "tool_result": "",
        })
        return {"answer": rejection, "action": "reject", "tool_result": ""}

    # Maintain history and run (append the user turn)
    conversation_history.messages.append({
        "user_message": user_message,
        "customer_email": customer_email,
        "order_id": order_id,
    })

    result = react_agent(
        user_message=user_message,
        customer_email=customer_email,
        order_id=order_id,
        history=conversation_history,
    )

    answer = getattr(result, "answer", "")
    action = getattr(result, "action", "")
    tool_result = getattr(result, "tool_result", "")

    # Append assistant turn to History
    conversation_history.messages.append({
        "answer": answer,
        "action": action,
        "tool_result": tool_result,
    })

    # Return structured JSON
    return {
        "answer": answer,
        "action": action,
        "tool_result": tool_result,
    }

print("ReAct support agent ready.")



ReAct support agent ready.


In [11]:
# Example: minimal required inputs using ReAct agent returning JSON

input_payload = {
    "messages": [
        {"role": "human", "content": "What's the status of order ORD-001?"}
    ],
    "customer_email": "john.doe@email.com",
    "order_id": "ORD-001",
}

user_text = input_payload["messages"][0]["content"]
resp = run_support_agent(user_message=user_text, customer_email=input_payload["customer_email"], order_id=input_payload["order_id"])
print(resp)


InternalServerError: litellm.InternalServerError: AnthropicError - {"type":"error","error":{"type":"overloaded_error","message":"Overloaded"},"request_id":null}

In [None]:
# Inspect the underlying prompts used by DSPy components
lm.inspect_history(n=3)


In [None]:
# Additional examples using ReAct + JSON

# 1) Documentation question
print("\n--- Doc question ---")
msg = "What's your return policy?"
print(run_support_agent(user_message=msg, customer_email="", order_id=""))

# 2) Orders lookup by email
print("\n--- Search orders by email ---")
print(run_support_agent(user_message="Can you find my orders?", customer_email="jane.smith@email.com"))

# 3) Refund path (eligible)
print("\n--- Refund ---")
print(run_support_agent(user_message="I want a refund", customer_email="john.doe@email.com", order_id="ORD-001"))

# 4) Irrelevant query
print("\n--- Irrelevant ---")
print(run_support_agent(user_message="Tell me a joke about quantum cats"))
