In [1]:
from typing import TypedDict, List
from langgraph.graph import StateGraph

class ChatbotState(TypedDict):
    messages: List[dict]
    user_context: dict
    current_intent: str
    conversation_history: List[dict]
    pending_actions: List[str]

In [2]:
from typing import TypedDict, List, Optional
from langgraph.graph import StateGraph, END
from langchain_core.messages import HumanMessage, AIMessage

class ConversationState(TypedDict):
    messages: List[dict]
    user_profile: dict
    current_task: Optional[str]
    context_memory: dict
    requires_human_review: bool

In [3]:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

def intent_classifier(state: ConversationState) -> ConversationState:
    """Classify user intent and update state accordingly."""
    llm = ChatOpenAI(model="gpt-4")
    
    prompt = ChatPromptTemplate.from_messages([
        ("system", "Classify the user's intent from their message. Categories: question, request, complaint, compliment, other"),
        ("human", "{user_message}")
    ])
    
    last_message = state["messages"][-1]["content"]
    response = llm.invoke(prompt.format(user_message=last_message))
    
    state["current_task"] = response.content.strip().lower()
    return state

def response_generator(state: ConversationState) -> ConversationState:
    """Generate contextual response based on intent and history."""
    llm = ChatOpenAI(model="gpt-4")
    
    context = f"""
    User Intent: {state['current_task']}
    Conversation History: {state['context_memory']}
    User Profile: {state['user_profile']}
    """
    
    prompt = ChatPromptTemplate.from_messages([
        ("system", f"You are a helpful assistant. Context: {context}"),
        ("human", "{user_message}")
    ])
    
    last_message = state["messages"][-1]["content"]
    response = llm.invoke(prompt.format(user_message=last_message))
    
    # Add AI response to messages
    state["messages"].append({
        "role": "assistant",
        "content": response.content
    })
    
    return state

In [4]:
def should_escalate(state: ConversationState) -> str:
    """Determine if conversation should be escalated to human agent."""
    escalation_intents = ["complaint", "complex_request", "billing_issue"]
    
    if state["current_task"] in escalation_intents:
        return "human_agent"
    elif state["requires_human_review"]:
        return "human_review"
    else:
        return "continue_bot"

def human_escalation_handler(state: ConversationState) -> ConversationState:
    """Handle escalation to human agents."""
    state["requires_human_review"] = True
    state["messages"].append({
        "role": "system",
        "content": "Escalating to human agent. Please wait for assistance."
    })
    return state

In [5]:
def create_chatbot_graph():
    # Initialize the graph
    workflow = StateGraph(ConversationState)
    
    # Add nodes
    workflow.add_node("classify_intent", intent_classifier)
    workflow.add_node("generate_response", response_generator)
    workflow.add_node("human_escalation", human_escalation_handler)
    
    # Define the flow
    workflow.set_entry_point("classify_intent")
    
    # Add conditional edges
    workflow.add_conditional_edges(
        "classify_intent",
        should_escalate,
        {
            "human_agent": "human_escalation",
            "human_review": "human_escalation",
            "continue_bot": "generate_response"
        }
    )
    
    # End points
    workflow.add_edge("generate_response", END)
    workflow.add_edge("human_escalation", END)
    
    return workflow.compile()

In [6]:
class AdvancedMemoryManager:
    def __init__(self):
        self.conversation_memory = {}
        self.user_profiles = {}
    
    def update_context(self, state: ConversationState) -> ConversationState:
        """Update long-term memory and context."""
        user_id = state.get("user_id", "anonymous")
        
        # Update conversation memory
        if user_id not in self.conversation_memory:
            self.conversation_memory[user_id] = []
        
        self.conversation_memory[user_id].extend(state["messages"][-2:])
        
        # Update user profile based on conversation patterns
        self.update_user_profile(user_id, state)
        
        state["context_memory"] = self.conversation_memory[user_id][-10:]  # Keep last 10 exchanges
        return state
    
    def update_user_profile(self, user_id: str, state: ConversationState):
        """Update user profile based on interaction patterns."""
        if user_id not in self.user_profiles:
            self.user_profiles[user_id] = {
                "preferences": {},
                "interaction_count": 0,
                "common_intents": []
            }
        
        profile = self.user_profiles[user_id]
        profile["interaction_count"] += 1
        
        # Track common intents
        current_intent = state.get("current_task")
        if current_intent:
            profile["common_intents"].append(current_intent)

In [7]:
def create_multi_agent_system():
    """Create a system with specialized agents."""
    
    def route_to_specialist(state: ConversationState) -> str:
        """Route to appropriate specialist agent."""
        intent = state["current_task"]
        
        routing_map = {
            "technical_support": "tech_agent",
            "billing_inquiry": "billing_agent",
            "product_question": "product_agent",
            "general": "general_agent"
        }
        
        return routing_map.get(intent, "general_agent")
    
    # Specialized agent functions
    def technical_support_agent(state: ConversationState) -> ConversationState:
        """Handle technical support queries."""
        llm = ChatOpenAI(model="gpt-4")
        
        prompt = ChatPromptTemplate.from_messages([
            ("system", """You are a technical support specialist. 
            Provide detailed, step-by-step solutions for technical issues.
            Always ask for clarification if the problem is unclear."""),
            ("human", "{user_message}")
        ])
        
        # Implementation details...
        return state
    
    def billing_agent(state: ConversationState) -> ConversationState:
        """Handle billing and payment queries."""
        # Implementation for billing-specific logic
        return state
    
    # Build multi-agent graph
    workflow = StateGraph(ConversationState)
    
    workflow.add_node("intent_router", intent_classifier)
    workflow.add_node("tech_agent", technical_support_agent)
    workflow.add_node("billing_agent", billing_agent)
    workflow.add_node("product_agent", response_generator)  # Reuse for product queries
    workflow.add_node("general_agent", response_generator)
    
    workflow.set_entry_point("intent_router")
    
    workflow.add_conditional_edges(
        "intent_router",
        route_to_specialist,
        {
            "tech_agent": "tech_agent",
            "billing_agent": "billing_agent",
            "product_agent": "product_agent",
            "general_agent": "general_agent"
        }
    )
    
    return workflow.compile()

In [8]:
import logging
from typing import Any

def robust_node_wrapper(func):
    """Decorator to add error handling to graph nodes."""
    def wrapper(state: ConversationState) -> ConversationState:
        try:
            return func(state)
        except Exception as e:
            logging.error(f"Error in {func.__name__}: {str(e)}")
            
            # Add error message to conversation
            state["messages"].append({
                "role": "system",
                "content": "I encountered an issue. Let me try a different approach."
            })
            
            # Set flag for human review
            state["requires_human_review"] = True
            return state
    
    return wrapper

@robust_node_wrapper
def safe_response_generator(state: ConversationState) -> ConversationState:
    """Error-resistant response generation."""
    return response_generator(state)

In [9]:
from functools import lru_cache
import asyncio

class OptimizedChatbot:
    def __init__(self):
        self.llm_cache = {}
        self.response_cache = {}
    
    @lru_cache(maxsize=1000)
    def cached_intent_classification(self, message: str) -> str:
        """Cache intent classifications for common queries."""
        # Implementation with caching
        pass
    
    async def async_response_generation(self, state: ConversationState) -> ConversationState:
        """Asynchronous response generation for better performance."""
        # Async implementation
        pass

In [10]:
import time
from datetime import datetime

class ChatbotAnalytics:
    def __init__(self):
        self.metrics = {
            "total_conversations": 0,
            "average_response_time": 0,
            "escalation_rate": 0,
            "user_satisfaction": 0
        }
    
    def track_conversation(self, state: ConversationState, start_time: float):
        """Track conversation metrics."""
        response_time = time.time() - start_time
        
        self.metrics["total_conversations"] += 1
        self.metrics["average_response_time"] = (
            (self.metrics["average_response_time"] * (self.metrics["total_conversations"] - 1) + response_time) 
            / self.metrics["total_conversations"]
        )
        
        if state.get("requires_human_review"):
            self.metrics["escalation_rate"] += 1
    
    def log_interaction(self, state: ConversationState):
        """Log interaction for analysis."""
        log_entry = {
            "timestamp": datetime.now().isoformat(),
            "intent": state.get("current_task"),
            "messages_count": len(state["messages"]),
            "escalated": state.get("requires_human_review", False)
        }
        
        # Send to logging system
        logging.info(f"Interaction logged: {log_entry}")

In [13]:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel


app = FastAPI()
chatbot_graph = create_chatbot_graph()
class ChatRequest(BaseModel):
    message: str
    user_id: str
    session_id: str
class ChatResponse(BaseModel):
    response: str
    requires_human: bool
    session_id: str
@app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(request: ChatRequest):
    """API endpoint for chatbot interactions."""
    try:
        initial_state = {
            "messages": [{"role": "user", "content": request.message}],
            "user_profile": {},
            "current_task": None,
            "context_memory": {},
            "requires_human_review": False,
            "user_id": request.user_id
        }
        
        result = chatbot_graph.invoke(initial_state)
        
        return ChatResponse(
            response=result["messages"][-1]["content"],
            requires_human=result["requires_human_review"],
            session_id=request.session_id
        )
    
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

In [14]:
import sqlite3
from typing import Dict, Any

class ConversationDatabase:
    def __init__(self, db_path: str):
        self.db_path = db_path
        self.init_database()
    
    def init_database(self):
        """Initialize database schema."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS conversations (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                user_id TEXT,
                session_id TEXT,
                message TEXT,
                response TEXT,
                intent TEXT,
                timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
            )
        """)
        
        conn.commit()
        conn.close()
    
    def save_conversation(self, state: ConversationState, session_id: str):
        """Save conversation to database."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        last_user_message = None
        last_bot_response = None
        
        for msg in reversed(state["messages"]):
            if msg["role"] == "user" and not last_user_message:
                last_user_message = msg["content"]
            elif msg["role"] == "assistant" and not last_bot_response:
                last_bot_response = msg["content"]
        
        cursor.execute("""
            INSERT INTO conversations (user_id, session_id, message, response, intent)
            VALUES (?, ?, ?, ?, ?)
        """, (
            state.get("user_id", "anonymous"),
            session_id,
            last_user_message,
            last_bot_response,
            state.get("current_task")
        ))
        
        conn.commit()
        conn.close()

In [15]:
import unittest
from unittest.mock import Mock, patch

class TestChatbotComponents(unittest.TestCase):
    def setUp(self):
        self.sample_state = {
            "messages": [{"role": "user", "content": "Hello"}],
            "user_profile": {},
            "current_task": None,
            "context_memory": {},
            "requires_human_review": False
        }
    
    def test_intent_classification(self):
        """Test intent classification accuracy."""
        with patch('langchain_openai.ChatOpenAI') as mock_llm:
            mock_llm.return_value.invoke.return_value.content = "question"
            
            result = intent_classifier(self.sample_state)
            self.assertEqual(result["current_task"], "question")
    
    def test_escalation_logic(self):
        """Test escalation decision logic."""
        self.sample_state["current_task"] = "complaint"
        result = should_escalate(self.sample_state)
        self.assertEqual(result, "human_agent")
    
    def test_response_generation(self):
        """Test response generation."""
        with patch('langchain_openai.ChatOpenAI') as mock_llm:
            mock_llm.return_value.invoke.return_value.content = "Test response"
            
            result = response_generator(self.sample_state)
            self.assertEqual(len(result["messages"]), 2)
            self.assertEqual(result["messages"][-1]["role"], "assistant")

In [16]:
def test_full_conversation_flow():
    """Test complete conversation flow."""
    chatbot = create_chatbot_graph()
    
    test_cases = [
        {
            "input": "I have a billing question",
            "expected_intent": "billing_inquiry",
            "should_escalate": True
        },
        {
            "input": "What are your business hours?",
            "expected_intent": "general",
            "should_escalate": False
        }
    ]
    
    for case in test_cases:
        initial_state = {
            "messages": [{"role": "user", "content": case["input"]}],
            "user_profile": {},
            "current_task": None,
            "context_memory": {},
            "requires_human_review": False
        }
        
        result = chatbot.invoke(initial_state)
        
        # Verify expected behavior
        assert result["current_task"] == case["expected_intent"]
        assert result["requires_human_review"] == case["should_escalate"]