In [78]:
from dotenv import load_dotenv
from agents import Agent, Runner, trace, SQLiteSession, function_tool
import os
from pypdf import PdfReader
import gradio as gr
import requests
import agentops
from pydantic import BaseModel
from typing import Optional

# Initialize with automatic trace creation (default)
load_dotenv(override=True)
agentops_api_key = os.getenv("AGENTOPS_API_KEY")

agentops.init(api_key=agentops_api_key, default_tags=["production"])

In [55]:
session = SQLiteSession("user_123", "conversation.db")
await session.clear_session()

In [56]:
kb=""
reader= PdfReader("musketeerly_ai_knowledge_base.pdf")
for page in reader.pages:
    kb +=page.extract_text()
    

In [69]:
company_name= "Musketeerly AI"
intent_classifier_instructions = f"""
You are a classification agent for Musketeerly AI. Your task is to analyze customer message and answer according to the knowledge base and only withing the scope of the knowledge base
You have the ability to update customer information, get customer informstion and return it to them and escalate requests as well
Your Tools and how and when to use them: (This does not apply sequentially, follow them as they apply instead)
1) You must make use the escalate_to_human tool only when it involves an order cancellation or a refund, an order id only is needed for order cancellation while an email and reason is needed for a refund(you must include the users email in the push notification, you must not send the push notification if the email cant be found in the database).
2) You must make use of the get_customer_info tool when the customer asks for retrieval of information from the database, email only is needed for this,If an email cant be found on the database, you must tell the user it cant be found and suggest they double check or contact the company support email, You must only return a users information when they ask for it,if they provide a mail and do not specify what it is for then do not retrieve any information, ask for the purpose instead.
3) You must make use the update_customer_by_email tool when the customer wants to update their information, an email as well as the field and the value changes they want to be made to the must be required to make changes, example: 'I want to update my account name(field) to Kage first (Name field Value), my email is john@example.com(email needed for record identification).
Follow these rules while you operate:
-for the sake of safety, you must only return user information when the user explicitly states that they want to retrieve their information. If the user inputsn email withoout asking you explicitly to retrieve their information in their last text or in the same sentence do not retrieve it, ask a question instead to find out the purpose of the email.
-order id should only be used for cancellations and never for refunds
-Except the user has provided their email and reason in the last 5 texts always ask for the email and reason when requesting a refund
- never try to initiate a refund using email and reason from memory
- never use the customer id for a refund
- if a user provides an email only and requests for a refund, you must ask for the reason.
- Each time a request for information retrieval is made, you must use the get_customer_information tool, you must never retrieve the information from your memory
- When an a request to update information is made you must use the update_customer_by email tool and never the get_customer_info tool
Knowledge Base: {kb}"""





In [63]:
pushover_user = os.getenv("PUSHOVER_USER")
pushover_token = os.getenv("PUSHOVER_TOKEN")
pushover_url = "https://api.pushover.net/1/messages.json"

airtable_token= os.getenv("AIRTABLE_TOKEN")
table_name= os.getenv("TABLE_NAME")
base_id= os.getenv("BASE_ID")

AIRTABLE_URL = f"https://api.airtable.com/v0/{base_id}/{table_name}"
HEADERS = {
    "Authorization": f"Bearer {airtable_token}",
    "Content-Type": "application/json"
}


In [82]:
@function_tool
def escalate_to_human(issue_summary:str):
    """Sends a Push Notification For escalation of issues."""
    payload = {"user": pushover_user, "token": pushover_token, "message": issue_summary}
    requests.post(pushover_url, data=payload)



# Tool 1: Get Customer by Email
# ----------------------------
@function_tool
def get_customer_info(email: str):
    
    params = {
        "filterByFormula": f"{{Email}}='{email}'",
        "maxRecords": 1
    }
    response = requests.get(AIRTABLE_URL, headers=HEADERS, params=params)
    response.raise_for_status()
    data = response.json()
    
    if data.get("records"):
        return data["records"][0]  # Return first matching record
    else:
        return None

class FieldUpdate(BaseModel):
    Name: Optional[str] = None
    Email: Optional[str] = None
    Phone: Optional[str] = None
    Address: Optional[str] = None  # Add more fields as needed
    # Example:
    # Name: Optional[str] = None
    # Email: Optional[str] = None

@function_tool
def update_customer_by_email(email: str, fields: FieldUpdate) -> dict:
    """
    Look up a customer by email and update their information in the database.

    Args:
        email (str): Customer's email address.
        fields (dict): Fields to update.

    Returns:
        dict: Result of the update or an error message.
    """
    try:
        # Step 1: Search for the record by email
        search_url = f"{AIRTABLE_URL}?filterByFormula=LOWER(Email)='{email.lower()}'"
        response = requests.get(search_url, headers=HEADERS)
        response.raise_for_status()

        records = response.json().get("records", [])
        if not records:
            return {
                "status": "not_found",
                "message": f"No customer found with email: {email}. Please verify and try again or contact support."
            }

        # Step 2: Extract the record ID
        record_id = records[0]["id"]

        # Step 3: Update the customer record
        update_url = f"{AIRTABLE_URL}/{record_id}"
        payload = {"fields": fields.dict(exclude_none=True)}
        update_response = requests.patch(update_url, headers=HEADERS, json=payload)
        update_response.raise_for_status()

        updated_fields = update_response.json().get("fields", {})
        return {
            "status": "success",
            "message": "Customer record updated successfully.",
            "updated_fields": updated_fields
        }

    except requests.exceptions.HTTPError:
        return {
            "status": "error",
            "message": "A problem occurred while accessing the database. Please try again or escalate."
        }


In [83]:



intent_agent= Agent(name="Intent Classifier Agent", instructions= intent_classifier_instructions, model= "gpt-4o-mini", tools= [escalate_to_human, get_customer_info, update_customer_by_email])





In [None]:
async def chat(message, history):
    with trace("Musketeerly"):
        response = await Runner.run(intent_agent, message, session=session)
        return response.final_output
gr.ChatInterface(chat).launch()







  self.chatbot = Chatbot(


* Running on local URL:  http://127.0.0.1:7870
* To create a public link, set `share=True` in `launch()`.




/tmp/ipykernel_1377/2592881882.py:66: PydanticDeprecatedSince20: The `dict` method is deprecated; use `model_dump` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.11/migration/
  payload = {"fields": fields.dict(exclude_none=True)}
/tmp/ipykernel_1377/2592881882.py:66: PydanticDeprecatedSince20: The `dict` method is deprecated; use `model_dump` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.11/migration/
  payload = {"fields": fields.dict(exclude_none=True)}
/tmp/ipykernel_1377/2592881882.py:66: PydanticDeprecatedSince20: The `dict` method is deprecated; use `model_dump` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.11/migration/
  payload = {"fields": fields.dict(exclude_none=True)}
