In [3]:
from dotenv import load_dotenv
import os

load_dotenv(override=True)

opik_enabled = os.getenv("ENABLE_OPIK")

#openai_api_key = os.getenv("OPENAI_API_KEY")
#print(f"Open AI Key is {openai_api_key}")



In [4]:
import nest_asyncio
from agents import Agent, handoff, GuardrailFunctionOutput, HandoffOutputItem, ItemHelpers, MessageOutputItem, RunContextWrapper, Runner, TResponseInputItem, ToolCallItem, ToolCallOutputItem, WebSearchTool, function_tool, input_guardrail, enable_verbose_stdout_logging, set_tracing_disabled, set_trace_processors
from agents.extensions.handoff_prompt import RECOMMENDED_PROMPT_PREFIX
from pydantic import BaseModel
from opik.integrations.openai.agents import OpikTracingProcessor

if opik_enabled:
 set_trace_processors(processors=[OpikTracingProcessor()])

print(f"Loaded common dependencies..")

Loaded common dependencies..


## Standalone Auth Guard Agent
![Diagram](./assets/AuthGuard-Agent.svg)

In [15]:

class UserAuthOutput(BaseModel):
    is_authenticated: bool
    user_id: str | None = None
    user_name: str | None = None


class UserInfoContext(BaseModel):
    user_id: str | None = None
    user_name: str | None = None
    auth_attempts: int = 0
    max_auth_attempts: int = 3


@function_tool
async def get_user_info(wrapper: RunContextWrapper[UserInfoContext], user_id: str, otp: str) -> str:
    """
        Check user authentication and attach additional information
        Args:
            user_id: User ID
            otp: OTP for the user
        Returns:
            str: Authentication status message
    """
    print(f"Tool called with user_id: {user_id} and otp: {otp}")
    try:
        users_map = [
            {
                "user_id": "Mohit21",
                "otp": "1234",
                "user_name": "Mohit Yadav"
            },
            {
                "user_id": "Mohit22",
                "otp": "1235",
                "user_name": "Mohit Y"
            }
        ]
        
        user = next(user for user in users_map if user["user_id"] == user_id and user["otp"] == otp)
        
        if user:
            try:
                wrapper.context.user_id = user_id
                wrapper.context.user_name = user["user_name"]
                wrapper.context.auth_attempts = 0  # Reset attempts on success
            except Exception as e:
                print(f"Error while setting context: {str(e)}")
            
            print(f"Authentication successful. Context: {wrapper.context}")
            return f"Authentication successful. Welcome, {user['user_name']}!"
        
        wrapper.context.auth_attempts += 1
        remaining_attempts = wrapper.context.max_auth_attempts - wrapper.context.auth_attempts
        return f"Authentication failed. {remaining_attempts} attempts remaining."
    
    except Exception as error:
        print(f"Authentication error: {str(error)}")
        return "Authentication failed. Please try again with valid credentials."


user_auth_check_agent = Agent(
    name="Auth Check Agent",
    instructions="""
    You are a professional authentication agent responsible for user verification.

    # Core Responsibilities
    - Verify user credentials securely and efficiently
    - Track authentication attempts
    - Provide clear feedback on authentication status
    - Handle authentication failures gracefully

    # Authentication Protocol
    1. Initial Greeting:
       - Introduce yourself as the authentication agent
       - Request user credentials (user_id and OTP)

    2. Credential Verification:
       - Use get_user_info tool to verify credentials
       - Track number of authentication attempts
       - Provide clear feedback on success/failure

    3. Success Handling:
       - Welcome authenticated users professionally
       - Transfer to router agent when available
       - Include user's name in welcome message

    4. Failure Handling:
       - Clearly communicate remaining attempts
       - Provide guidance on correct credential format
       - Offer another attempt if maximum not reached

    5. Security Measures:
       - Never reveal existing user IDs
       - Don't provide hints about valid credentials
       - Maintain professional tone throughout

    Remember: Security and user experience must be balanced appropriately.
    """,
    model="gpt-4",  # Upgraded to GPT-4 for better security handling
    tools=[
        get_user_info
    ]
)

async def standalone_auth_agent(max_user_turns=3):
    input_items: list[TResponseInputItem] = []
    context = UserInfoContext()
    current_agent: Agent[UserInfoContext] = user_auth_check_agent
    current_user_turns = 0
    while current_user_turns < max_user_turns:
        user_input = input("Enter message: ")
        input_items.append({"content": user_input, "role": "user"})
        result = await Runner.run(
            current_agent,
            input_items,
            context=context,
        )
        current_user_turns = current_user_turns + 1

        for new_item in result.new_items:
            agent_name = new_item.agent.name
            if isinstance(new_item, MessageOutputItem):
                print(f"\033[94m{agent_name}\033[0m: {ItemHelpers.text_message_output(new_item)}")
            elif isinstance(new_item, HandoffOutputItem):
                print(f"Handed off from \033[92m{new_item.source_agent.name}\033[0m to \033[93m{new_item.target_agent.name}\033[0m")
            elif isinstance(new_item, ToolCallItem):
                print(f"\033[95m{agent_name}\033[0m: Calling a tool")
            elif isinstance(new_item, ToolCallOutputItem):
                print(f"\033[96m{agent_name}\033[0m: Tool call output: {new_item.output}")
            else:
                print(f"\033[91m{agent_name}\033[0m: Skipping item: {new_item.__class__.__name__}")
        input_items = result.to_input_list()
        current_agent = result.last_agent

await standalone_auth_agent(3)

[94mAuth Check Agent[0m: Hello! I am your authentication agent. Could you please provide your user ID and OTP (One-Time-Password) for verification purposes?
Tool called with user_id: JohnDoe123 and otp: 789456
Authentication error: 
[95mAuth Check Agent[0m: Calling a tool
[96mAuth Check Agent[0m: Tool call output: Authentication failed. Please try again with valid credentials.
[94mAuth Check Agent[0m: I'm sorry, but the authentication attempt was not successful. Please ensure that your user ID and OTP are entered correctly. Keep in mind that OTP is case-sensitive and should be entered precisely as given. Please try again. You have 2 attempts remaining.
Tool called with user_id: JohnDoe123 and otp: 123456
Authentication error: 
Tool called with user_id: JohnDoe123 and otp: 987654
Authentication error: 
[95mAuth Check Agent[0m: Calling a tool
[96mAuth Check Agent[0m: Tool call output: Authentication failed. Please try again with valid credentials.
[94mAuth Check Agent[0m: I

In [16]:
english_speaker_agent = Agent[UserInfoContext](
    name="English Speaker Agent",
    handoff_description="Professional English language support agent for customer interactions.",
    instructions=f"""
    {RECOMMENDED_PROMPT_PREFIX}
    You are a professional English language customer service agent.
    
    # Core Responsibilities
    - Communicate in clear, professional British English
    - Maintain formal yet friendly tone
    - Use simple, unambiguous language
    - Avoid technical jargon unless specifically requested
    
    # Response Protocol
    1. First verify if user's query is in English
    2. If unclear, ask specific clarifying questions
    3. For product queries, transfer to requirements agent
    4. For non-English queries, transfer to appropriate language agent
    5. For unrelated topics, transfer to router agent
    
    # Context Awareness
    - Always check user authentication status before providing sensitive information
    - Reference user's name when available in context
    - Keep track of previous interactions in the conversation
    """,
    model="gpt-4o",
)

hindi_speaker_agent = Agent[UserInfoContext](
    name="Hindi Speaker Agent",
    handoff_description="Professional Hindi language support agent for customer interactions.",
    instructions=f"""
    {RECOMMENDED_PROMPT_PREFIX}
    You are a professional Hindi language customer service agent.
    
    # Core Responsibilities
    - Communicate exclusively in formal Hindi (शुद्ध हिंदी)
    - Maintain professional yet approachable tone
    - Transliterate Hindi text to English for TTS compatibility
    - Preserve cultural context and respect
    
    # Response Protocol
    1. First verify if user's query is in Hindi
    2. Always provide responses in this format:
       Hindi: [Hindi text]
       Transliteration: [English transliteration]
    3. For product queries, transfer to requirements agent
    4. For non-Hindi queries, transfer to appropriate language agent
    5. For unrelated topics, transfer to router agent
    
    # Context Awareness
    - Always check user authentication status before providing sensitive information
    - Use appropriate Hindi honorifics based on context
    - Keep track of previous interactions in the conversation
    """,
    model="gpt-4o-mini",
)


router_agent = Agent[UserInfoContext](
    name="Router Agent",
    handoff_description="Central routing agent for language and service delegation.",
    instructions=f"""
    {RECOMMENDED_PROMPT_PREFIX}
    You are the central routing agent responsible for request delegation.
    
    # Core Responsibilities
    - Language detection and appropriate agent routing
    - Service type classification
    - Authentication status verification
    
    # Routing Protocol
    1. Analyze user input for:
       - Primary language (English/Hindi)
       - Request type (Product/Service/Support)
       - Authentication requirements
    
    2. Route based on priority:
       - Authentication needs -> Auth agent
       - Language specific -> Language agents
       - Product queries -> Requirements agent
    
    3. Monitor conversation flow:
       - Track handoffs
       - Ensure proper agent transitions
       - Maintain context continuity
    """,
    handoffs=[
        hindi_speaker_agent,
        english_speaker_agent,
    ],
)

user_auth_check_agent.handoffs.append(router_agent)


async def router_and_auth_agent(max_user_turns=3):
    input_items: list[TResponseInputItem] = []
    context = UserInfoContext()
    current_agent: Agent[UserInfoContext] = user_auth_check_agent
    current_user_turns = 0
    while current_user_turns < max_user_turns:
        user_input = input("Enter message: ")
        input_items.append({"content": user_input, "role": "user"})
        result = await Runner.run(
            current_agent,
            input_items,
            context=context,
        )
        current_user_turns = current_user_turns + 1
        for new_item in result.new_items:
            agent_name = new_item.agent.name
            if isinstance(new_item, MessageOutputItem):
                print(f"\033[94m{agent_name}\033[0m: {ItemHelpers.text_message_output(new_item)}")
            elif isinstance(new_item, HandoffOutputItem):
                print(f"Handed off from \033[92m{new_item.source_agent.name}\033[0m to \033[93m{new_item.target_agent.name}\033[0m")
            elif isinstance(new_item, ToolCallItem):
                print(f"\033[95m{agent_name}\033[0m: Calling a tool")
            elif isinstance(new_item, ToolCallOutputItem):
                print(f"\033[96m{agent_name}\033[0m: Tool call output: {new_item.output}")
            else:
                print(f"\033[91m{agent_name}\033[0m: Skipping item: {new_item.__class__.__name__}")
        input_items = result.to_input_list()
        current_agent = result.last_agent
        
# await router_and_auth_agent()

In [17]:
@function_tool
async def check_product_info(wrapper: RunContextWrapper[UserInfoContext], product_id: str) -> str:
    """
        Checks if the product exists and is authorized to be queried by the user
        Args:
            product_id (str): The product id to check
    """
    products_map = {
        "product_1": {
            "product_owner": "Mohit21",
            "name": "Blender from Amazon"
        },
        "product_2": {
            "product_owner": "Mohit22",
            "name": "Joystick from Noon.com"
        }
    }
    
    if wrapper.context.user_id is None:
        return "User must be authenticated"
        
    if not products_map[product_id]:
        return "Product does not exist"
    
    if products_map[product_id]["product_owner"] != wrapper.context.user_id:
        print(f"Product ownership mismatch. Expected '{wrapper.context.user_id}', found '{products_map[product_id]['product_owner']}'")
        return f"Product {product_id} does not belong to user {wrapper.context.user_id}"
    
    return f"The product name is {products_map[product_id]['name']}"
    
gather_requirements_agent = Agent[UserInfoContext](
    name="Gather Requirements Agent",
    handoff_description="Product information specialist agent that handles product queries and validations.",
    instructions=f"""
    {RECOMMENDED_PROMPT_PREFIX}
    You are a specialized product information agent with access to product validation tools.

    # Core Responsibilities
    - Handle all product-related queries and validations
    - Extract product IDs from user conversations
    - Verify product ownership and access rights
    - Provide clear product information to users

    # Query Processing Protocol
    1. Product ID Detection:
       - Look for product IDs in user messages (format: product_1, product_2, etc.)
       - If no product ID found, ask user to provide one
       - Guide users on correct product ID format

    2. Authentication Check:
       - Verify user authentication status before product lookup
       - If user not authenticated, inform and route back to auth agent

    3. Product Validation:
       - Use check_product_info tool to validate product access
       - Handle ownership verification
       - Present product details when authorized

    4. Response Handling:
       - Provide clear feedback on product status
       - Format product information in a readable manner
       - Handle errors with clear explanations

    5. Routing Rules:
       - For non-product queries -> Route to router agent
       - For authentication issues -> Route to auth agent
       - For language-specific needs -> Route to language agents

    Remember: Always maintain context of the conversation and user's authentication status.
    """,
    model="gpt-4",
    tools=[
        check_product_info
    ]
)
gather_requirements_agent.handoffs.append(router_agent)
gather_requirements_agent.handoffs.append(hindi_speaker_agent)
gather_requirements_agent.handoffs.append(english_speaker_agent)
hindi_speaker_agent.handoffs.append(gather_requirements_agent)
english_speaker_agent.handoffs.append(gather_requirements_agent)

async def router_with_product_info(max_user_turns=3):
    input_items: list[TResponseInputItem] = []
    context = UserInfoContext()
    current_agent: Agent[UserInfoContext] = user_auth_check_agent
    current_user_turns = 0
    while current_user_turns < max_user_turns:
        user_input = input("Enter message: ")
        input_items.append({"content": user_input, "role": "user"})
        result = await Runner.run(
            current_agent,
            input_items,
            context=context,
        )
        current_user_turns = current_user_turns + 1

        for new_item in result.new_items:
            agent_name = new_item.agent.name
            if isinstance(new_item, MessageOutputItem):
                print(f"\033[94m{agent_name}\033[0m: {ItemHelpers.text_message_output(new_item)}")
            elif isinstance(new_item, HandoffOutputItem):
                print(f"Handed off from \033[92m{new_item.source_agent.name}\033[0m to \033[93m{new_item.target_agent.name}\033[0m")
            elif isinstance(new_item, ToolCallItem):
                print(f"\033[95m{agent_name}\033[0m: Calling a tool")
            elif isinstance(new_item, ToolCallOutputItem):
                print(f"\033[96m{agent_name}\033[0m: Tool call output: {new_item.output}")
            else:
                print(f"\033[91m{agent_name}\033[0m: Skipping item: {new_item.__class__.__name__}")
        input_items = result.to_input_list()
        current_agent = result.last_agent
    
# await router_with_product_info(6)

In [None]:
# ... existing code ...

@function_tool
async def send_feedback_email(wrapper: RunContextWrapper[UserInfoContext], product_id: str, feedback: str) -> str:
    """
    Sends product feedback via email
    Args:
        product_id (str): The product ID
        feedback (str): The feedback content
    """
    # Simulating email sending
    print(f"Sending email feedback for product {product_id}")
    return f"Email feedback sent successfully for product {product_id}"

@function_tool
async def create_crm_ticket(wrapper: RunContextWrapper[UserInfoContext], product_id: str, feedback: str) -> str:
    """
    Creates a CRM ticket for product feedback
    Args:
        product_id (str): The product ID
        feedback (str): The feedback content
    """
    # Simulating CRM ticket creation
    ticket_id = f"TICKET-{product_id}-{hash(feedback) % 1000}"
    return f"CRM ticket created successfully. Ticket ID: {ticket_id}"

product_feedback_agent = Agent[UserInfoContext](
    name="Product Feedback Agent",
    handoff_description="Specialized agent for collecting and processing product feedback through various channels.",
    instructions="""
    You are a product feedback collection specialist that can process feedback through multiple channels.

    # Core Responsibilities
    - Collect product feedback from users
    - Validate product IDs before processing feedback
    - Offer multiple feedback submission methods (email, CRM ticket)
    - Guide users through the feedback submission process

    # Feedback Collection Protocol
    1. Initial Information Gathering:
       - Request product ID if not provided
       - Ask for feedback content
       - Confirm feedback channel preference (email or CRM ticket)

    2. Feedback Processing:
       - Validate product exists using check_product_info
       - Process feedback through chosen channel
       - Provide confirmation and reference numbers

    3. Channel-Specific Handling:
       - Email: Format and send feedback via email
       - CRM: Create ticket and provide ticket ID

    4. Response Protocol:
       - Confirm successful submission
       - Provide reference numbers or IDs
       - Handle any submission errors

    Remember: Always verify product existence before processing feedback.
    """,
    model="gpt-4",
    tools=[
        check_product_info,
        send_feedback_email,
        create_crm_ticket
    ]
)

# Add handoffs
product_feedback_agent.handoffs.append(router_agent)
gather_requirements_agent.handoffs.append(product_feedback_agent)
router_agent.handoffs.append(product_feedback_agent)

async def router_with_product_feedback(max_user_turns=3):
    input_items: list[TResponseInputItem] = []
    context = UserInfoContext()
    current_agent: Agent[UserInfoContext] = user_auth_check_agent
    current_user_turns = 0
    while current_user_turns < max_user_turns:
        user_input = input("Enter message: ")
        input_items.append({"content": user_input, "role": "user"})
        result = await Runner.run(
            current_agent,
            input_items,
            context=context,
        )
        current_user_turns = current_user_turns + 1

        for new_item in result.new_items:
            agent_name = new_item.agent.name
            if isinstance(new_item, MessageOutputItem):
                print(f"\033[94m{agent_name}\033[0m: {ItemHelpers.text_message_output(new_item)}")
            elif isinstance(new_item, HandoffOutputItem):
                print(f"Handed off from \033[92m{new_item.source_agent.name}\033[0m to \033[93m{new_item.target_agent.name}\033[0m")
            elif isinstance(new_item, ToolCallItem):
                print(f"\033[95m{agent_name}\033[0m: Calling a tool")
            elif isinstance(new_item, ToolCallOutputItem):
                print(f"\033[96m{agent_name}\033[0m: Tool call output: {new_item.output}")
            else:
                print(f"\033[91m{agent_name}\033[0m: Skipping item: {new_item.__class__.__name__}")
        input_items = result.to_input_list()
        current_agent = result.last_agent

await router_with_product_feedback(6)
