# CrewAI Multi‑Agent Workflow 

## UseCase : E‑Commerce – Order, Refund & Exception Handling with a goal‑oriented, multi‑agent workflow.

✅ **Agents (4):**
1. Order Issue Identification Agent  
2. Policy Interpretation Agent (returns/refunds/exceptions)  
3. Resolution Recommendation Agent  
4. Escalation & Approval Agent  

✅ **Key Requirements Covered**
- 3–4 collaborating agents with clear roles
- Explicit dependency on prior agent outputs (handoffs)
- Deterministic escalation logic (risk/urgency/uncertainty thresholds)
- Sample inputs & outputs (end‑to‑end runs)

## 0) Setup Notes

- Works best with **Python 3.10–3.12**.
- Requires an OpenAI API key in environment variable: `OPENAI_API_KEY`.
- Uses `gpt-4o-mini` (changeable).

If you are running in VS Code:
- Select the correct kernel: `.venv`
- Run cells **top-to-bottom** once after restart.

In [None]:
import os

# Ensure your API key is set in your environment BEFORE running.
# Example (PowerShell):
# $env:OPENAI_API_KEY="sk-..."

print("OPENAI_API_KEY set:", bool(os.environ.get("OPENAI_API_KEY")))

## 1) Imports

If you haven't installed dependencies yet, install them in your `.venv`:

```bash
pip install crewai langchain-openai pydantic python-dotenv
```

In [None]:
import json
import re
import time
from typing import Any, Dict, List, Optional

from pydantic import BaseModel, Field
from crewai import Agent, Task, Crew

## 2) Static Policies (Non‑RAG)

We simulate business rules using static policy text + deterministic checks.

In [None]:
ECOM_POLICY_TEXT = """E-COMMERCE POLICY (STATIC, SIMULATED)

Returns:
- Eligible returns: within 30 days from delivery, unused, original packaging.
- Some categories (digital goods, perishable, intimate wear) may be non-returnable.

Refunds:
- Refund to original payment method typically 3-7 business days after return approval/receipt.
- If item not returned yet, refund usually not initiated (except exceptions).

Damaged/Defective:
- If damaged on arrival: offer replacement or refund. Ask for photos.
- If customer reports defect within 7 days: prioritize resolution.

Delivery Issues:
- Delivered but not received: verify address, check delivery proof (OTP/photo), neighbor/security check.
- If high-value or repeated claims: escalate for manual investigation.

Escalation flags:
- Suspected fraud, account compromise, multiple failed deliveries, abusive language
- High value order (e.g., > INR 10,000) with non-receipt or chargeback threats
- Uncertainty / missing critical info required for policy decision
"""
print("Loaded policy text. Characters:", len(ECOM_POLICY_TEXT))

## 3) Data Models (Structured Handoffs)

In [None]:
class IssueOutput(BaseModel):
    issue_type: str = Field(..., description="One of: delivery_delay, refund_request, damaged_item, return_eligibility, wrong_item, delivered_not_received, general_query")
    urgency: int = Field(..., ge=0, le=100, description="0-100 urgency")
    confidence: float = Field(..., ge=0, le=1, description="0-1 confidence")
    key_facts_needed: List[str] = Field(default_factory=list)

class PolicyOutput(BaseModel):
    allowed_actions: List[str] = Field(default_factory=list)
    policy_notes: str = ""

class ResolutionOutput(BaseModel):
    draft_response: str

class EscalationOutput(BaseModel):
    escalate: bool
    escalation_reason: str = ""

class Handoff(BaseModel):
    customer_query: str
    issue_type: Optional[str] = None
    urgency: Optional[int] = None
    confidence: Optional[float] = None
    key_facts_needed: List[str] = Field(default_factory=list)

    allowed_actions: List[str] = Field(default_factory=list)
    policy_notes: Optional[str] = None

    draft_response: Optional[str] = None

    risk_score: Optional[int] = None
    escalate: bool = False
    escalation_reason: Optional[str] = None

## 4) Agent Prompts

In [None]:
ISSUE_AGENT_PROMPT = """You are an Order Issue Identification Agent for an e-commerce company.

Classify the customer query into ONE of:
- delivery_delay
- refund_request
- damaged_item
- return_eligibility
- wrong_item
- delivered_not_received
- general_query

Return ONLY valid JSON with keys:
- issue_type (string)
- urgency (integer 0-100)
- confidence (number 0-1)
- key_facts_needed (array of strings)

Customer query:
{customer_query}
"""

POLICY_AGENT_PROMPT = """You are a Policy Interpretation Agent for an e-commerce company.
Use the policy text below and the identified issue to decide allowed actions.

POLICY TEXT:
{policy_text}

Inputs:
- customer_query: {customer_query}
- issue_type: {issue_type}
- urgency: {urgency}
- confidence: {confidence}

Return ONLY valid JSON with keys:
- allowed_actions (array of strings)
- policy_notes (string)

Rules:
- If confidence < 0.60, include action: "ask_clarifying_questions" and mention missing facts.
- If delivered_not_received and urgency >= 70, include action: "open_investigation".
- If damaged_item, include action: "request_photos" and "offer_replacement_or_refund".
- If refund_request, include action: "explain_refund_timeline" and "check_refund_status".
"""

RESOLUTION_AGENT_PROMPT = """You are a Resolution Recommendation Agent.
Draft a helpful, policy-compliant response.

Inputs:
- customer_query: {customer_query}
- issue_type: {issue_type}
- allowed_actions: {allowed_actions}
- policy_notes: {policy_notes}

Return ONLY valid JSON with keys:
- draft_response (string)

Guidelines:
- Be polite and concise.
- Do NOT ask for sensitive data (OTP, full card number).
- If you need order details, ask for ORDER ID only.
"""

ESCALATION_AGENT_PROMPT = """You are an Escalation & Approval Agent.

Decide whether to escalate to a human support agent based on:
- High urgency (>= 80)
- Low confidence (< 0.60)
- Fraud indicators or threats (chargeback, hacked account)
- High value cues in text (e.g., "expensive", "high value", "INR", "₹" with large amount)
- 'delivered_not_received' issues with repeated claims

Return ONLY valid JSON with keys:
- escalate (true/false)
- escalation_reason (string)

Customer query:
{customer_query}

Context:
issue_type={issue_type}
urgency={urgency}
confidence={confidence}
allowed_actions={allowed_actions}
"""

## 5) LLM + CrewAI Agents

In [None]:
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(
    model="gpt-4o-mini",
    temperature=0
)

issue_agent = Agent(
    role="Order Issue Identification Agent",
    goal="Identify the customer issue type and required facts.",
    backstory="You are an expert e-commerce support triage agent.",
    llm=llm,
    verbose=False
)

policy_agent = Agent(
    role="Policy Interpretation Agent",
    goal="Apply static policies to determine allowed actions and notes.",
    backstory="You enforce return/refund/delivery policies consistently.",
    llm=llm,
    verbose=False
)

resolution_agent = Agent(
    role="Resolution Recommendation Agent",
    goal="Draft a customer-ready response aligned with policy actions.",
    backstory="You write clear and helpful customer support responses.",
    llm=llm,
    verbose=False
)

escalation_agent = Agent(
    role="Escalation & Approval Agent",
    goal="Escalate high-risk, urgent, or uncertain cases to a human.",
    backstory="You are cautious and escalate when necessary.",
    llm=llm,
    verbose=False
)

print("Agents ready.")

## 6) Utilities: JSON Parsing + Risk Score

In [None]:
def parse_json_strict(text: str) -> Dict[str, Any]:
    text = str(text).strip()
    try:
        return json.loads(text)
    except Exception:
        match = re.search(r"\{[\s\S]*\}", text)
        if not match:
            raise ValueError(f"No JSON found. Raw output (first 500 chars):\n{text[:500]}")
        return json.loads(match.group(0))

def simple_risk_score(customer_query: str, issue_type: str, confidence: float, urgency: int) -> int:
    q = customer_query.lower()
    score = 0

    if confidence < 0.6:
        score += 35
    elif confidence < 0.75:
        score += 15

    score += int(urgency * 0.3)

    fraud_cues = ["hacked", "fraud", "scam", "chargeback", "stolen", "unauthorized"]
    if any(w in q for w in fraud_cues):
        score += 30

    if issue_type == "delivered_not_received":
        score += 25

    if "₹" in customer_query or "inr" in q:
        nums = re.findall(r"\d{4,}", q)
        if nums:
            score += 20

    return min(100, score)

## 7) Run One Agent (Robust Execution)

We run each agent using a **single-task Crew** for compatibility across CrewAI versions.

In [None]:
def run_one_agent(agent: Agent, prompt: str, expected_output: str, verbose: bool = False) -> Dict[str, Any]:
    task = Task(
        description=prompt,
        expected_output=expected_output,
        agent=agent
    )
    crew = Crew(
        agents=[agent],
        tasks=[task],
        verbose=verbose
    )
    result = crew.kickoff(inputs={})
    return parse_json_strict(result)

## 8) Orchestrator (End‑to‑End Workflow)

In [None]:
def run_ecommerce_workflow(customer_query: str, verbose: bool = False) -> Handoff:
    start = time.time()

    # 1) Issue
    issue_prompt = ISSUE_AGENT_PROMPT.format(customer_query=customer_query)
    issue_raw = run_one_agent(
        issue_agent,
        issue_prompt,
        expected_output="JSON with keys: issue_type, urgency, confidence, key_facts_needed",
        verbose=verbose
    )
    issue = IssueOutput(**issue_raw)

    # 2) Policy
    policy_prompt = POLICY_AGENT_PROMPT.format(
        policy_text=ECOM_POLICY_TEXT,
        customer_query=customer_query,
        issue_type=issue.issue_type,
        urgency=issue.urgency,
        confidence=issue.confidence
    )
    policy_raw = run_one_agent(
        policy_agent,
        policy_prompt,
        expected_output="JSON with keys: allowed_actions, policy_notes",
        verbose=verbose
    )
    policy = PolicyOutput(**policy_raw)

    # 3) Resolution
    response_prompt = RESOLUTION_AGENT_PROMPT.format(
        customer_query=customer_query,
        issue_type=issue.issue_type,
        allowed_actions=policy.allowed_actions,
        policy_notes=policy.policy_notes
    )
    response_raw = run_one_agent(
        resolution_agent,
        response_prompt,
        expected_output="JSON with key: draft_response",
        verbose=verbose
    )
    response = ResolutionOutput(**response_raw)

    # 4) Escalation
    escalation_prompt = ESCALATION_AGENT_PROMPT.format(
        customer_query=customer_query,
        issue_type=issue.issue_type,
        urgency=issue.urgency,
        confidence=issue.confidence,
        allowed_actions=policy.allowed_actions
    )
    escalation_raw = run_one_agent(
        escalation_agent,
        escalation_prompt,
        expected_output="JSON with keys: escalate, escalation_reason",
        verbose=verbose
    )
    escalation = EscalationOutput(**escalation_raw)

    handoff = Handoff(
        customer_query=customer_query,
        issue_type=issue.issue_type,
        urgency=issue.urgency,
        confidence=issue.confidence,
        key_facts_needed=issue.key_facts_needed,
        allowed_actions=policy.allowed_actions,
        policy_notes=policy.policy_notes,
        draft_response=response.draft_response,
        escalate=bool(escalation.escalate),
        escalation_reason=escalation.escalation_reason
    )

    handoff.risk_score = simple_risk_score(customer_query, handoff.issue_type, handoff.confidence, handoff.urgency)

    if handoff.risk_score >= 70:
        handoff.escalate = True
        if not (handoff.escalation_reason or "").strip():
            handoff.escalation_reason = "High risk score based on static safeguards."

    if time.time() - start > 90:
        handoff.escalate = True
        handoff.escalation_reason = (handoff.escalation_reason or "") + " | Took too long; escalated for safety."

    return handoff

print("Orchestrator ready.")

## 9) Quick Test (Single Query)

In [None]:
TEST_QUERIES = [
    "My package shows delivered but I never received it. It was expensive and I need it urgently."
]

for q in TEST_QUERIES:
    print("\n" + "="*100)
    print("USER QUERY:", q)
    out = run_ecommerce_workflow(q, verbose=False)
    print(out.model_dump_json(indent=2))

## 10) Full Test Suite (Multiple Queries)

In [None]:
TEST_QUERIES = [
    "My delivery is delayed by 5 days. When will it arrive?",
    "I want a refund for my order. It was cancelled yesterday.",
    "The item arrived damaged. The screen is cracked.",
    "Is this item returnable after 20 days if unused?",
    "I received the wrong product in the box.",
    # edge / high risk
    "My package shows delivered but I never received it. It cost ₹25999. This is the second time.",
    "If you don't refund today I will do a CHARGEBACK. This is fraud!",
    "I think my account is hacked. Someone changed my address and ordered items.",
    # safe-channel warning
    "Can I share my OTP here so you can verify and issue refund?"
]

for q in TEST_QUERIES:
    print("\n" + "="*100)
    print("USER QUERY:", q)
    out = run_ecommerce_workflow(q, verbose=False)
    print(out.model_dump_json(indent=2))

## 11) Design Notes (Short)

- **No RAG**: policies are static text + rules
- **Handoffs**: strict JSON between agents
- **Escalation**: model decision + deterministic risk score safeguard
- **Safety**: response agent avoids sensitive data requests