In [1]:
#IMPORTS

from typing import Annotated, Sequence, List, Literal, TypedDict
from pydantic import BaseModel, Field 
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langgraph.types import Command 
from langgraph.graph import StateGraph, START, END, MessagesState
import uuid

from langgraph.graph.message import add_messages
import asyncio 
from dotenv import load_dotenv
import os 
from agent.request import request_llm
from agent.knowledgebase import data_fetcher
from prompt_library.merger_sys_msg import merger_system_message
load_dotenv()
import time
from langchain_openai import ChatOpenAI
from typing import Optional
from data_ingestion.ingestion_pipeline import get_rag_chain
from langgraph.checkpoint.memory import MemorySaver
import re
checkpointer = MemorySaver()

✅ Connection successful!
✅ Query executed successfully. Rows fetched: 1
embedding model


In [2]:

route_system_message = """You are a customer service routing agent for Tata Capital. Your role is to analyze incoming customer messages and classify them into exactly one of four categories: QUERY, REQUEST, COMPLAINT or KNOWLEDGEBASE.
                                      
Category Definitions
                                      
QUERY
Messages seeking information about Tata Capital, the company, its services, policies, or general inquiries that can be answered using company documentation.
Characteristics:
Questions about company background, history, financial performance
Inquiries about services offered, eligibility criteria, interest rates
General information requests about policies, procedures, or terms
Educational questions about financial products
"What is...", "How does...", "Can you explain...", "Tell me about..."
Examples:
"What is Tata Capital's revenue for this year?"
"How does your personal loan process work?"
"What are the eligibility criteria for home loans?"
"Tell me about Tata Capital's business segments"
"What is the company's market position?"
                                    
REQUEST
Messages asking for specific documents, services, or actions to be performed on behalf of the customer.
Characteristics:
Explicit requests for documents (PAN, Aadhar, CIBIL score, statements, certificates)
Service requests (account opening, loan applications, updates)
Action items that require processing or delivery
"I need...", "Please send...", "Can you provide...", "I want to..."
Examples:
"I need my CIBIL score report"
"Please send me my PAN card copy"
"Can you provide my Aadhar document?"
"I want to request my loan statement"
"Please send the NOC certificate to my email"
"I need my account balance certificate"
                                     
COMPLAINT
Messages providing a ticket number for complaint processing and resolution. The system requires a valid ticket number to scrape complaint details and process sentiment analysis, categorization, and routing.
Characteristics:
Contains a ticket number (various formats: alphanumeric, numeric, with prefixes/suffixes)
References existing complaint or issue tracking
Requests status update or processing of logged complaint
May include phrases like "ticket number", "complaint ID", "reference number"
Customer wants to process or check status of previously logged complaint
Examples:
"Please process my complaint ticket number TC123456"
"My ticket ID is COMP2024001, please update"
"I have complaint reference number 789012, can you check status?"
"Ticket: TC-2024-0456 needs processing"
"My complaint number is 123456789"
"Reference ID: REF789 - please resolve this issue"
                                      
KNOWLEDGEBASE
Messages requesting analytics, insights, reports, or data analysis from the customer support ticket database (QRCKnowledgeBase). These involve querying historical ticket data for business intelligence, performance metrics, trends, and operational insights.
Characteristics:
Requests for analytics, reports, or data insights from ticket database
Questions about ticket volumes, trends, patterns, or statistics
Month-to-Date (MTD), Year-to-Date (YTD), or period-based analysis requests
Sentiment analysis across tickets or time periods
Performance metrics for support teams or processes
Queries about ticket resolution times, escalation patterns, or customer satisfaction
Database analysis for business intelligence purposes
"Show me...", "Analyze...", "What are the trends...", "Generate report...", "MTD analysis..."
Examples:
"Show me MTD ticket volume analysis"
"What are the sentiment trends for this month?"
"Analyze complaint patterns by product category"
"Generate a report of open tickets by escalation level"
"What's the YTD resolution time for technical issues?"
"Show me customer satisfaction trends over the last quarter"
"Analyze ticket distribution by communication mode"
"What are the top 5 complaint categories this month?"
"Generate MTD analysis of negative sentiment tickets"
"Show me escalation patterns for Digital Banking vertical"
"""

In [3]:


#DECLARATIONS

api_key = os.getenv('OPENAI_API_KEY')
open_ai_llm = ChatOpenAI(model = 'gpt-4o-mini', api_key=api_key)


In [4]:

# ALL INTENTS

class IntentSchema(TypedDict):
    intent: Literal["Query", "Request", "Complaint", "knowledgebase"] = Field(
        default= "",
        description="One or more labels that define the nature of the user's input. "
                    "**Query** - for information-seeking questions about tatacapital, "
                    "**Request** - for demands or tasks processing " #actionable
                    "**Complaint** -  for expressions of dissatisfaction or issues, "
                    "and **knowledgebase** - if any question related to database or data retrieveal then use this intent."
    ) 
    message: str = Field(
        default= "",
        description="Extract message corresponding to the intent"
    )
    tone: Optional[Literal["Neutral", "Angry"]] = Field(
        default=None,
        description="REQUIRED for Complaint intent: Must be either 'Neutral' or 'Angry'. "
                    "This field indicates the emotional tone of complaints only. "
                    "Should not be provided for Query, Request, or Other intents."
    )


class IntentSchemas(TypedDict):
    intent_schema_list: List[IntentSchema] = Field(
        default=[],
        description="List of intent-message pairs, including complaints with tone where applicable"
    )

# UPDATED: Enhanced MainIntent state to track processed messages
class MainIntent(TypedDict):
    messages : Annotated[list[BaseMessage], add_messages]
    active_intents : list[dict]
    completed_intents : list[dict]
    incompleted_intents : list[dict]
    sub_response : list[str]
    active_state : str
    last_processed_index : int  # NEW: Track last processed message index


class subIntent(TypedDict):
    uid : str
    intent : str
    message : str
    tone : str
    output : str
    active_state : str
    completed : bool = False



In [5]:

# dummy definations

async def query_processor(state : subIntent)-> subIntent:
        print('fetching....RAG')
        qa =await get_rag_chain()
        print('fetched....RAG')
        response =await qa.ainvoke(state['message'])

        return {'output' : response['result'], 'completed' : True}

# UPDATED: Enhanced request processor to handle follow-up information
async def request_processor(state : subIntent)-> subIntent:
        human_msg = state['message']
        
        # NEW: Check if this message contains follow-up information (like application ID)
        if "Additional info:" in human_msg:
            # Extract the original request and additional info
            parts = human_msg.split("Additional info:", 1)
            original_request = parts[0].strip()
            additional_info = parts[1].strip() if len(parts) > 1 else ""
            
            # Try to extract application ID from additional info
            app_id_patterns = [
                r'application\s+id\s+is\s+([a-zA-Z0-9]+)',
                r'app\s+id\s*:?\s*([a-zA-Z0-9]+)',
                r'([a-zA-Z0-9]+)',  # Simple alphanumeric
            ]
            
            extracted_app_id = None
            for pattern in app_id_patterns:
                match = re.search(pattern, additional_info.lower())
                if match:
                    extracted_app_id = match.group(1)
                    break
            
            if extracted_app_id:
                # Process the request with the application ID
                result = f"SR-1320099902 has been raised for the Ops Team in the CRM. The requested document for loan application {extracted_app_id} will be sent to the customer's registered email address. Overall Sentiment seems to be 'Neutral'"
                return {'output': result, 'completed': True}

        # Original request processing logic
        response = request_llm(human_msg)

        if "message" in response:
            
            result = response["message"]
            return {'output' : result,'completed' : False}
        
        else:
            
            application_no = response["application_no"]
            process_name = response["process_name"]
            requested_document =    response["requested_document"]
            result = f"SR-1320099902 has been raised for the Ops Team in the CRM. The {requested_document} for loan application {application_no} will be sent to the customer's registered email address. Overall Sentiment seems to be 'Neutral'"
            return {'output' : result,'completed' : True}


async def complaint_processor(state : subIntent)-> subIntent:
        message = state['message']
        tone = state['tone']

        print(f'complaint : {message} , tone : {tone}')
        

        return {'output' : 'ticket against your complaint has been raised your issue will be resolved at priority','completed' : True}

async def knowledge_fetcher(state : subIntent)-> subIntent:

        print('fetching data.....')

        result =await data_fetcher(state['message'])

        return {'output' : result,'completed' : True}

def divertor(state : subIntent)-> subIntent:
    state['active_state'] = 'divertor'
    
    if state['intent'].lower() == 'query':
           
        return Command(
                update = {'active_state' : 'query_processor'},
                goto = 'query_processor'
        )

    elif state['intent'].lower() == 'request':
           
        return Command(
                update = {'active_state' : 'request_processor'},
                goto = 'request_processor'
        )
    
    elif state['intent'].lower() == 'complaint':
           
        return Command(
                update = {'active_state' : 'complaint_processor'},
                goto = 'complaint_processor'
        )
    
    elif state['intent'].lower() == 'knowledgebase':
           
        return Command(
                update = {'active_state' : 'knowledge_fetcher'},
                goto = 'knowledge_fetcher'
        )

#inner graph

In [6]:


SubGraph = StateGraph(subIntent)

SubGraph.add_node('divertor',divertor)
SubGraph.add_node('query_processor',query_processor)
SubGraph.add_node('request_processor',request_processor)
SubGraph.add_node('complaint_processor',complaint_processor)
SubGraph.add_node('knowledge_fetcher',knowledge_fetcher)

SubGraph.add_edge(START,'divertor')

ready_subgraph = SubGraph.compile()


In [7]:

#DEFINATIONS

async def handle_request(message: dict):
    print(f"Starting main node...for : {message.get('intent')}")

    input = {'uid': message.get('uid'),'intent': message.get('intent') ,'message': message.get('message'), 'tone' : message.get('tone')}

    sub_graph_retrieved_state = await ready_subgraph.ainvoke(input)
    return {'uid':sub_graph_retrieved_state['uid'],'output':sub_graph_retrieved_state['output'],
             'completed' : sub_graph_retrieved_state['completed']}


# NEW: Function to check if we should process new messages
def should_process_message(state: MainIntent) -> bool:
    """Check if we have new messages to process"""
    current_message_count = len(state['messages'])
    last_processed = state.get('last_processed_index', -1)
    return current_message_count > last_processed + 1

# NEW: Function to check for follow-up information
def check_for_followup_info(state: MainIntent) -> MainIntent:
    """Check if the latest message provides info to complete incomplete intents"""
    
    if not state.get('incompleted_intents'):
        # No incomplete intents, proceed with normal processing
        goto = 'intent_classifier' if should_process_message(state) else 'no_new_processing'
        return Command(
            update={'active_state': goto},
            goto=goto
        )
    
    latest_message = state['messages'][-1].content.lower()
    print(f"Checking if '{latest_message}' can resolve incomplete intents...")
    
    # Check if the message contains information that could complete incomplete intents
    resolution_keywords = ['application', 'app', 'id', 'number', 'ticket', 'reference']
    
    if any(keyword in latest_message for keyword in resolution_keywords):
        print("Found potential resolution information!")
        
        # Try to resolve incomplete intents
        resolved_any = False
        
        for incomplete in state['incompleted_intents'][:]:  # Copy list to modify during iteration
            if incomplete.get('intent', '').lower() == 'request':
                # Update the incomplete request with additional context
                incomplete['message'] += f" Additional info: {state['messages'][-1].content}"
                
                # Move to active intents for reprocessing
                state['active_intents'].append(incomplete)
                state['incompleted_intents'].remove(incomplete)
                resolved_any = True
                print(f"Updated incomplete request: {incomplete['uid']}")
        
        if resolved_any:
            # Process the updated incomplete intents
            goto = 'intent_invoker'
            return Command(
                update={
                    'active_state': goto,
                    'last_processed_index': len(state['messages']) - 1  # Mark this message as processed
                },
                goto=goto
            )
    
    # If we can't resolve incomplete intents, check if we should process for new intents
    goto = 'intent_classifier' if should_process_message(state) else 'no_new_processing'
    return Command(
        update={'active_state': goto},
        goto=goto
    )

# NEW: Handle case where no new processing is needed
def no_new_processing(state: MainIntent) -> MainIntent:
    """Handle case where no new processing is needed"""
    print("No new processing needed - all messages already processed")
    
    # Provide a simple acknowledgment
    response_text = "I understand. Is there anything else I can help you with?"
    state['messages'].append(AIMessage(response_text))
    
    return state

# UPDATED: Modified intent classifier to process only new messages
def intent_classifier(state : MainIntent) -> MainIntent:

    # NEW: Only classify if we have new unprocessed messages
    if not should_process_message(state):
        print("No new messages to process")
        goto = 'intent_merger'
        return Command(
            update={'active_state': goto, 'sub_response': []},
            goto=goto
        )
    
    # NEW: Get only the latest unprocessed message
    latest_message = state['messages'][-1]
    print(f"Processing new message: {latest_message.content}")
    
    # UPDATED: Pass context but only classify the latest message
    context_messages = [route_system_message]
    if len(state["messages"]) > 1:
        context_messages.extend([
            HumanMessage("Previous conversation context (for reference only):"),
            *state["messages"][:-1],
            HumanMessage("Current message to classify:"),
        ])
    context_messages.append(latest_message)
    
    response = open_ai_llm.with_structured_output(IntentSchemas).invoke(context_messages)
    active_intent_lists = response.get('intent_schema_list')

    for item in active_intent_lists:
        item['uid'] = str(uuid.uuid4())
    
    # UPDATED: Clear previous active intents and add new ones
    state['active_intents'] = []
    state['active_intents'].extend(active_intent_lists)
    
    # NEW: Mark this message as processed
    state['last_processed_index'] = len(state['messages']) - 1

    goto = 'intent_invoker'
    return Command(
        update = {'active_state' : goto},
        goto = goto
    )
    

# UPDATED: Enhanced intent invoker to handle better state management
async def intent_invoker(state : MainIntent)-> MainIntent:

    if not state['active_intents']:
        print("No active intents to process")
        goto = 'intent_merger'
        return Command(
            update={
                'sub_response': [],
                'active_state': goto
            },
            goto=goto
        )

    print(f"Processing {len(state['active_intents'])} active intents...")
    
    final_states = await asyncio.gather(*(handle_request(msg) for msg in state['active_intents']))

    print(f'final states : {final_states}') 

    # Clear active intents after processing
    state['active_intents'] = []

    for data in final_states:
        
        if data['completed'] is True:
            state['completed_intents'].append(data)
            
        else :
            state['incompleted_intents'].append(data)

    goto = 'intent_merger'
    
    return Command(
        update = {
                'sub_response' : final_states,
                'active_state' : goto},
        goto = goto
    )

# UPDATED: Enhanced intent merger for better context awareness
def intent_merger(state : MainIntent)-> MainIntent:

    if not state['sub_response']:
        # No new responses to merge, just acknowledge
        response_text = "I've noted that information. Is there anything else I can help you with?"
        state['messages'].append(AIMessage(response_text))
        return state

    # Check if this is addressing incomplete intents
    incomplete_count = len(state.get('incompleted_intents', []))
    
    if incomplete_count == 0 and len(state['sub_response']) == 1:
        # Single response that completed an incomplete intent
        completed_response = state['sub_response'][0]['output']
        response_text = f"Perfect! I've processed your request:\n\n{completed_response}\n\nIs there anything else I can help you with?"
        state['messages'].append(AIMessage(response_text))
    else:
        # Multiple responses or normal processing
        output = f"""
        question: {state['messages'][-1].content}
        outputs generated:
        {chr(10).join(f"- {resp['output']}" for resp in state['sub_response'])}
        incomplete intents remaining: {incomplete_count}
        """

        print(output)
        response = open_ai_llm.invoke([merger_system_message, output])
        state['messages'].append(AIMessage(response.content))

    print(state['messages'][-1])
    
    return state

In [8]:


# UPDATED: New graph structure with follow-up checking
graph = StateGraph(MainIntent)

graph.add_node('check_followup', check_for_followup_info)  # NEW NODE
graph.add_node('intent_classifier',intent_classifier)
graph.add_node('intent_invoker',intent_invoker)
graph.add_node('intent_merger',intent_merger)
graph.add_node('no_new_processing', no_new_processing)    # NEW NODE

# UPDATED: Start with follow-up check instead of direct classification
graph.add_edge(START,'check_followup')  # CHANGED from START -> intent_classifier

ready_graph = graph.compile(checkpointer = checkpointer)

In [9]:



config = {'configurable':{'thread_id':'1'}}

# UPDATED: Initialize with proper state structure
def create_initial_input(user_msg):
    return {
        'messages': [HumanMessage(user_msg)], 
        'active_intents': [],
        'completed_intents': [],
        'incompleted_intents': [],
        'sub_response': [],
        'active_state': '',
        'last_processed_index': -1  # NEW: Initialize tracking
    }




In [10]:
# First message
user_msg1 = 'i want to know about count open cases in database and what are the available schemes for tata capital also i am not happy with the portal of tata capital it always lags and send me loan agreement for appli'
input1 = create_initial_input(user_msg1)
# graph_retrieved_state1 = await ready_graph.ainvoke(input1, config)

In [None]:
# Second message (follow-up with application ID)
user_msg2 = 'application id is d3d3sd'
input2 = {'messages': [HumanMessage(user_msg2)]}  # LangGraph will merge with existing state
# graph_retrieved_state2 = await ready_graph.ainvoke(input2, config)

In [11]:
# For testing with your current setup:
user_msg = 'application id is d3d3sd'
input = create_initial_input(user_msg)

graph_retrieved_state = await ready_graph.ainvoke(input,config)

Processing new message: application id is d3d3sd
No active intents to process
