In [1]:
import os
from dotenv import load_dotenv
from langchain_openai import AzureChatOpenAI

  from pydantic.v1.fields import FieldInfo as FieldInfoV1


In [2]:
load_dotenv(".env")

True

In [3]:
llm = AzureChatOpenAI(
    azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
    azure_deployment=os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME"),
    api_key=os.getenv("AZURE_OPENAI_API_KEY"),
    api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
)

In [None]:
## smoke test
response = llm.invoke("say something in english")
print(response)

In [5]:
from langchain_core.prompts import ChatPromptTemplate

#prompt = ChatPromptTemplate.from_messages([
#    ("system", "You are a sentiment analyst. Be consice in one sentence."),
#    ("human", "Analyse the sentiment of this text: {text}"),])
#
#chain = prompt | llm
#result = chain.invoke({"text": "I recently reviewed my latest bill and found a charge that I didn’t recognize. The support I received when I called billing was polite and patient, which I appreciated. The representative explained the charges clearly and showed me how to read the bill line items step by step. I asked for a temporary adjustment while the issue was investigated, and the agent assured me they would escalate it and keep me updated. The explanation helped me understand where the duplicate item came from, which reduced my frustration. However, I still felt a bit uncertain about the next steps until the investigation confirmed the error. I value transparency, so I appreciated the email summary that outlined the investigation timeline. Overall, this experience was better than previous billing encounters because I felt listened to and informed. I would recommend continuing to provide proactive updates if similar issues come up in the future."})

In [6]:
from langchain_core.output_parsers import JsonOutputParser
from pydantic import BaseModel, Field

class SentimentResult(BaseModel):
    sentiment: str = Field(description="positive, negative, or neutral")
    score: float = Field(description="confidence score 0.0 to 1.0")
    emotion: str = Field(description="primary emotion detected")

parser = JsonOutputParser(pydantic_object=SentimentResult)

prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a sentiment analyst. Return JSON only."),
    ("human", "{format_instructions}\n\nText: {text}")
]).partial(format_instructions=parser.get_format_instructions())

chain = prompt | llm | parser

result = chain.invoke({"text": "I love this product but shipping was slow."})
print(result)  # → {'sentiment': 'positive', 'score': 0.75, 'emotion': 'satisfaction'}

{'sentiment': 'positive', 'score': 0.72, 'emotion': 'joy'}


In [7]:
parser

JsonOutputParser(pydantic_object=<class '__main__.SentimentResult'>)

In [8]:
## sentiment agent

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from pydantic import BaseModel, Field

class SentimentResult(BaseModel):
    sentiment: str
    score: float
    emotion: str
    key_phrases: list[str]

sentiment_parser = JsonOutputParser(pydantic_object=SentimentResult)    

sentiment_prompt_text = open("sentiment_prompt").read()
sentiment_prompt = ChatPromptTemplate.from_messages(
    [("system", sentiment_prompt_text + " {format_instructions}"),
    ("human", "Analyse the customer message: \n\n{text}")]).partial(format_instructions=sentiment_parser.get_format_instructions())

sentiment_chain = sentiment_prompt | llm | sentiment_parser

In [9]:
class RecommendationOutput(BaseModel):
    recommended_actions: list[str]
    priority: str # low, medium, high, critical
    department: str # customer service, technical support, billing, etc.
    suggested_response: str

rec_parser = JsonOutputParser(pydantic_object=RecommendationOutput)

rec_prompt_text = open("recommendation_prompt").read()
rec_prompt = ChatPromptTemplate.from_messages(
    [("system", rec_prompt_text + " {format_instructions}"),
    ("human", """Customer message: {text}
    Sentiment analysis result: {sentiment_result}
    Provide recommendation.""")]).partial(format_instructions=rec_parser.get_format_instructions())

rec_chain = rec_prompt | llm | rec_parser

In [10]:
class EscalationOuptput(BaseModel):
    should_escalate: bool
    escalation_level: str # none | low | medium | high
    reason: str
    urgency_score: float # 1-10
    suggested_sla_hours: int

esc_parser = JsonOutputParser(pydantic_object=EscalationOuptput)

esc_prompt_text = open("escalation_prompt").read()
esc_prompt = ChatPromptTemplate.from_messages(
    [("system", esc_prompt_text + " {format_instructions}"),
    ("human", """Customer message: {text}
    Sentiment analysis result: {sentiment_result}
    Recommendation result: {rec_result}
    Should this be escalated?""")]).partial(format_instructions=esc_parser.get_format_instructions())

esc_chain = esc_prompt | llm | esc_parser


In [11]:
## orchastration

from langgraph.graph import StateGraph, END
from typing import TypedDict, Optional, Any

class AnalysisState(TypedDict):
    ## Input
    customer_review: str
    customer_id: Optional[str]

    ## Agent outputs
    sentiment_result: Optional[dict]
    rec_result: Optional[dict]
    esc_result: Optional[dict]

    ## output

    final_report: Optional[dict]
    error: Optional[str]


In [12]:
def run_sentiment_agent(state: AnalysisState) -> AnalysisState:
    print("Running Sentiment Engine...")
    try:
        result = sentiment_chain.invoke({"text": state["customer_review"]})
        return {**state, "sentiment_result": result}
    except Exception as e:
        return {**state, "error": str(e), "sentiment_result": None}
    

def run_recommendation_agent(state: AnalysisState) -> AnalysisState:
    print("Running Recommendation Engine...")
    try:
        result = rec_chain.invoke({"text": state["customer_review"], "sentiment_result": str(state["sentiment_result"])})
        return {**state, "rec_result": result}
    except Exception as e:
        return {**state, "error": str(e), "rec_result": None}
    

def run_escalation_agent(state: AnalysisState) -> AnalysisState:
    print("Running Escalation Engine...")
    try:
        result = esc_chain.invoke({"text": state["customer_review"], "sentiment_result": str(state["sentiment_result"]), "rec_result": str(state["rec_result"])})
        return {**state, "esc_result": result}
    except Exception as e:
        return {**state, "error": str(e), "esc_result": None}
    

def compile_final_report(state: AnalysisState) -> AnalysisState:
    print("Compiling final report...")
    final_report = {
        "customer_id": state.get("customer_id", "unknown"),
        "customer_review": state.get("customer_review"),
        "sentiment_analysis": state.get("sentiment_result"),
        "recommendation": state.get("rec_result"),
        "escalation_decision": state.get("esc_result")
        }
    return {**state, "final_report": final_report}

def should_continue(state: AnalysisState) -> str:
    if state.get("error"):
        return "error"
    return "continue"

In [15]:
graph = StateGraph(AnalysisState)

# Add nodes
graph.add_node("sentiment", run_sentiment_agent)
graph.add_node("recommendation", run_recommendation_agent)
graph.add_node("escalation", run_escalation_agent)
graph.add_node("report", compile_final_report)

# Define flow (sequential with error short-circuit)
graph.set_entry_point("sentiment")

graph.add_conditional_edges("sentiment", should_continue, {
    "continue": "recommendation",
    "error": END
})
graph.add_conditional_edges("recommendation", should_continue, {
    "continue": "escalation",
    "error": END
})
graph.add_conditional_edges("escalation", should_continue, {
    "continue": "report",
    "error": END
})
graph.add_edge("report", END)

# Compile
app = graph.compile()

In [17]:
import json
import requests

url = "https://raw.githubusercontent.com/himmng/agentic_ai/main/data/raw/synergy_inmoment.json"
data = requests.get(url).json()
sample_review = data[0]

In [22]:
sample_review['answers']

[{'answerId': 'ANS-1',
  'text': 'I recently reviewed my latest bill and found a charge that I didn’t recognize. The support I received when I called billing was polite and patient, which I appreciated. The representative explained the charges clearly and showed me how to read the bill line items step by step. I asked for a temporary adjustment while the issue was investigated, and the agent assured me they would escalate it and keep me updated. The explanation helped me understand where the duplicate item came from, which reduced my frustration. However, I still felt a bit uncertain about the next steps until the investigation confirmed the error. I value transparency, so I appreciated the email summary that outlined the investigation timeline. Overall, this experience was better than previous billing encounters because I felt listened to and informed. I would recommend continuing to provide proactive updates if similar issues come up in the future.'}]

In [23]:
initial_state = AnalysisState  = {
    "customer_review": sample_review['answers'],
    "customer_id": sample_review["customer_id"],
    "sentiment_result": None,
    "recommendation_result": None,
    "escalation_result": None,
    "final_report": None,
    "error": None
}

In [24]:
result = app.invoke(initial_state)


Running Sentiment Engine...
Running Recommendation Engine...
Running Escalation Engine...
Compiling final report...


In [25]:
print(json.dumps(result["final_report"], indent=2))

{
  "customer_id": 10007,
  "customer_review": [
    {
      "answerId": "ANS-1",
      "text": "I recently reviewed my latest bill and found a charge that I didn\u2019t recognize. The support I received when I called billing was polite and patient, which I appreciated. The representative explained the charges clearly and showed me how to read the bill line items step by step. I asked for a temporary adjustment while the issue was investigated, and the agent assured me they would escalate it and keep me updated. The explanation helped me understand where the duplicate item came from, which reduced my frustration. However, I still felt a bit uncertain about the next steps until the investigation confirmed the error. I value transparency, so I appreciated the email summary that outlined the investigation timeline. Overall, this experience was better than previous billing encounters because I felt listened to and informed. I would recommend continuing to provide proactive updates if simil