In [None]:
%pip install crewai chromadb crewai[tools]

In [None]:
os.environ["OPENAI_API_KEY"] = "xx"

In [None]:
import json
import os
import numpy as np
from typing import Dict, Any, List
from crewai import Agent, Task, Crew, Process, LLM
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
import logging
from openai import OpenAI
from sklearn.metrics.pairwise import cosine_similarity

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Sample database for order status
ORDERS_DB = {
    "ABC-123": {"status": "shipped", "tracking": "1Z999AA1234567890", "estimated_delivery": "2024-01-15"},
    "DEF-456": {"status": "processing", "estimated_ship": "2024-01-12"},
    "GHI-789": {"status": "delivered", "delivered_date": "2024-01-10"}
}

# Sample Knowledge base for policies with embeddings
KNOWLEDGE_BASE = {
    "return_policy": """
    Our return policy allows returns within 30 days of purchase.
    Items must be in original condition with tags attached.
    Refunds are processed within 5-7 business days.
    Original receipt or order confirmation required.
    """,
    "shipping_info": """
    Standard shipping: 5-7 business days
    Express shipping: 2-3 business days
    Free shipping on orders over $50
    International shipping available to most countries
    Tracking information provided via email
    """,
    "warranty_info": """
    All products come with a 1-year manufacturer warranty.
    Extended warranty options available at checkout.
    Warranty covers manufacturing defects but not normal wear.
    Contact customer service for warranty claims.
    """
}

client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

class VectorKnowledgeBase:
    def __init__(self):
        self.documents = []
        self.embeddings = []
        self.metadata = []
        self._initialize_knowledge_base()

    def _get_embedding(self, text: str) -> List[float]:
        try:
            response = client.embeddings.create(
                input=text,
                model="text-embedding-3-small"
            )
            return response.data[0].embedding
        except Exception as e:
            logger.error(f"Error generating embedding: {e}")
            return [0.0] * 1536

    def _initialize_knowledge_base(self):
        logger.info("Initializing vector knowledge base...")

        for policy_name, content in KNOWLEDGE_BASE.items():
            self.documents.append(content)

            embedding = self._get_embedding(content)
            self.embeddings.append(embedding)

            self.metadata.append({
                "policy_name": policy_name,
                "policy_type": policy_name.replace('_', ' ').title(),
                "content_length": len(content)
            })

        self.embeddings = np.array(self.embeddings)
        logger.info(f"Knowledge base initialized with {len(self.documents)} documents")

    def search(self, query: str, top_k: int = 3, threshold: float = 0.7) -> List[Dict]:
        """Perform semantic search using vector similarity"""
        logger.info(f"Performing vector search for: {query}")

        query_embedding = np.array(self._get_embedding(query)).reshape(1, -1)

        # Calculate cosine similarities
        similarities = cosine_similarity(query_embedding, self.embeddings)[0]

        results = []
        for i, similarity in enumerate(similarities):
            if similarity >= threshold:
                results.append({
                    "content": self.documents[i],
                    "metadata": self.metadata[i],
                    "similarity_score": float(similarity),
                    "index": i
                })

        results.sort(key=lambda x: x["similarity_score"], reverse=True)
        return results[:top_k]

vector_kb = VectorKnowledgeBase()

# Tool 1: Order Status Tool
class OrderStatusInput(BaseModel):
    order_id: str = Field(..., description="The order ID to look up")

class OrderStatusTool(BaseTool):
    name: str = "order_status_lookup"
    description: str = "Look up the status of an order by order ID"
    args_schema: type[BaseModel] = OrderStatusInput

    def _run(self, order_id: str) -> str:
        logger.info(f"Looking up order status for: {order_id}")

        if order_id in ORDERS_DB:
            order_info = ORDERS_DB[order_id]
            return json.dumps(order_info, indent=2)
        else:
            return f"Order {order_id} not found in our system."

# Tool 2: Knowledge Base Tool
class VectorKnowledgeBaseInput(BaseModel):
    query: str = Field(..., description="The policy or information query for semantic search")

class VectorKnowledgeBaseTool(BaseTool):
    name: str = "vector_knowledge_search"
    description: str = "Search company policies using semantic vector search (return policy, shipping, warranty)"
    args_schema: type[BaseModel] = VectorKnowledgeBaseInput

    def _run(self, query: str) -> str:
        logger.info(f"Performing vector search for: {query}")

        results = vector_kb.search(query, top_k=3, threshold=0.6)

        if not results:
            return "No relevant policy information found for your query."

        formatted_results = []
        for result in results:
            policy_type = result["metadata"]["policy_type"]
            content = result["content"].strip()
            similarity = result["similarity_score"]

            formatted_results.append(
                f"**{policy_type}** (Relevance: {similarity:.2f})\n{content}"
            )

        return "\n\n" + "="*50 + "\n\n".join(formatted_results)

# Tool 3: Chat Tool
class GeneralChatInput(BaseModel):
    message: str = Field(..., description="The user's message for general conversation")

class GeneralChatTool(BaseTool):
    name: str = "general_chat"
    description: str = "Handle general conversation and greetings"
    args_schema: type[BaseModel] = GeneralChatInput

    def _run(self, message: str) -> str:
        logger.info(f"Handling general chat: {message}")
        return f"Hello! I understand you said: '{message}'. How can I help you with your order or questions about our policies today?"

# Define Agent
def create_customer_support_agent():

    order_tool = OrderStatusTool()
    knowledge_tool = VectorKnowledgeBaseTool()
    chat_tool = GeneralChatTool()

    support_agent = Agent(
        role="Customer Support Specialist",
        goal="""Provide excellent customer support by:
        1. Analyzing user queries to determine the appropriate tool
        2. Using order_status_lookup for order-related questions (format: XXX-123)
        3. Using vector_knowledge_search for policy questions with semantic understanding
        4. Using general_chat for greetings and casual conversation
        5. Always being helpful and professional""",

        backstory="""You are an experienced customer support agent for an e-commerce store.
        You have access to order tracking systems, an advanced semantic search knowledge base,
        and can engage in friendly conversation. You excel at understanding customer intent
        and routing their requests to the appropriate tools.""",

        tools=[order_tool, knowledge_tool, chat_tool],
        verbose=True,
        llm=LLM(model="gpt-4o-mini"),
        max_iter=3,
        allow_delegation=False
    )

    return support_agent

# Task creation
def create_support_task(user_query: str):
    """Create a task that demonstrates tool selection reasoning"""

    return Task(
        description=f"""
        Analyze this customer query and provide appropriate assistance: "{user_query}"

        Follow this decision process:
        1. If the query mentions an order ID (format: XXX-123), use order_status_lookup
        2. If the query asks about policies (return, shipping, warranty), use vector_knowledge_search
        3. If the query is a greeting or general conversation, use general_chat
        4. Always explain your reasoning for tool selection
        5. Provide a complete, helpful response with similarity scores when using vector search

        Query: {user_query}
        """,

        expected_output="""A complete response that includes:
        - Explanation of which tool was selected and why
        - The relevant information retrieved (with similarity scores for vector search)
        - A helpful, professional response to the customer""",

        agent=create_customer_support_agent()
    )

def demonstrate_agent(queries: List[str]):

    print("Customer Support Agent")
    print("=" * 60)

    for i, query in enumerate(queries, 1):
        print(f"\n Test Case {i}: {query}")
        print("-" * 40)

        task = create_support_task(query)

        crew = Crew(
            agents=[task.agent],
            tasks=[task],
            process=Process.sequential,
            verbose=True
        )
        try:
            result = crew.kickoff()
            print(f"✅ Response: {result}")
        except Exception as e:
            print(f"❌ Error: {e}")

        print("\n" + "="*60)

# Testing
if __name__ == "__main__":

    test_queries = [
        "Where is my order #ABC-123?",  # Tests Order Status Tool
        "What is your return policy?",   # Tests Vector Knowledge Search
        "Hello, how are you?",          # Tests General Chat
    ]

    demonstrate_agent(test_queries)