In [2]:
from typing import Annotated
from typing_extensions import TypedDict
from openai import OpenAI
import numpy as np
import pandas as pd

In [3]:
client = OpenAI()

class Param(TypedDict):
    type: str
    name: str
    description: str
    required: bool

class Tool(TypedDict):
    name: str
    description: str
    params: list[Param]


def system(s: str):
    return {"role": "system", "content": s}


def user(s: str):
    return {"role": "user", "content": s}


def assistant(s: str):
    return {"role": "assistant", "content": s}

import json

def format_tools(tool: dict):
    required_params = [p["name"] for p in tool["params"] if p.get("required")]
    properties = {p["name"]: {"type": p["type"], "description": p["description"]} for p in tool["params"]}
    
    tool_dict = {
        "type": "function",
        "function": {
            "name": tool["name"],
            "description": tool["description"],
            "parameters": {
                "type": "object",
                "properties": properties,
                "required": required_params,
            },
        },
    }
    
    return json.dumps(tool_dict)

def get_completion(
    messages,
    tools: list[dict] = [],
    json: bool = False,
    temperature: float = 0,
    model: str = "gpt-4o-mini",
):
    formatted_tools = [format_tools(t) for t in tools] if tools else None
    request = {
        "model": model,
        "messages": messages,
    }
    if formatted_tools:
        request["tools"] = formatted_tools
    if json:
        request["response_format"] = {"type": "json_object"}
    request["temperature"] = temperature
    response = client.chat.completions.create(**request)
    return response.choices[0].message.content

def get_embedding(text, model="text-embedding-3-large", dimensions=500):
   text = text.replace("\n", " ")
   return client.embeddings.create(input = [text], model=model,dimensions=dimensions).data[0].embedding

def normalize_l2(x):
    x = np.array(x)
    if x.ndim == 1:
        norm = np.linalg.norm(x)
        if norm == 0:
            return x
        return x / norm
    else:
        norm = np.linalg.norm(x, 2, axis=1, keepdims=True)
        return np.where(norm == 0, x, x / norm)


In [4]:

import faiss
import os

class FaissIndex:

    def __init__(self, documents, index_name):
        self.documents = documents
        self.index_name = index_name
        if os.path.exists(self.index_name):
            self.index = faiss.read_index(self.index_name)
        else:
            self.index = faiss.IndexFlatL2(500)
            self.index.add(np.array([get_embedding(doc) for doc in documents]))
            faiss.write_index(self.index, self.index_name)
    
    def save(self):
        faiss.write_index(self.index, self.index_name)        
        
    def search(self, query, k=5):
        query_embedding = get_embedding(query)
        distances, indices = self.index.search(np.array([query_embedding], dtype=np.float32), k)
        return [(self.documents[i], d) for i, d in zip(indices[0], distances[0])]
    
    def add(self, document):
        self.documents.append(document)
        self.index.add(np.array([get_embedding(document)]))

    def remove(self, document):
        self.documents.remove(document)
        self.index.remove(np.array([get_embedding(document)]))


In [5]:
import chromadb
from chromadb.config import Settings
import os

class ChromaIndex:
    def __init__(self, documents, collection_name, embedding_function=None, persist_directory="./chroma_db"):
        self.collection_name = collection_name
        self.persist_directory = persist_directory
        self.client = chromadb.Client(
            Settings(
                chroma_db_impl="duckdb+parquet",
                persist_directory=persist_directory
            )
        )
        self.collection = self.client.get_or_create_collection(
            name=self.collection_name,
            embedding_function=embedding_function
        )

        if self.collection.count() == 0 and documents:
            self.add_batch(documents)

    def add(self, document, metadata=None):
        self.collection.add(
            documents=[document],
            metadatas=[metadata] if metadata else None,
            ids=[str(self.collection.count())]
        )

    def add_batch(self, documents, metadatas=None):
        start_id = self.collection.count()
        self.collection.add(
            documents=documents,
            metadatas=metadatas if metadatas else [None] * len(documents),
            ids=[str(i) for i in range(start_id, start_id + len(documents))]
        )

    def search(self, query, k=5):
        results = self.collection.query(
            query_texts=[query],
            n_results=k
        )
        return list(zip(results['documents'][0], results['distances'][0]))

    def remove(self, document):
        results = self.collection.query(
            query_texts=[document],
            n_results=1
        )
        if results['documents'][0]:
            self.collection.delete(ids=results['ids'][0])
        else:
            raise ValueError("Document not found in the collection")

    def persist(self):
        self.client.persist()

    def get_collection(self):
        return self.collection

In [6]:
FAQ_DOCUMENT = """
Frequently Asked Questions (FAQ)
1. Where is Shahiaa located?
Shahiaa is located at 123 Nile Avenue, Cairo, Egypt.
2. What are Shahiaa's operating hours?
Shahiaa is open from Monday to Friday, 12:00 PM - 10:00 PM, and on Saturday and Sunday from 1:00 PM - 11:00 PM.
3. What type of cuisine does Shahiaa serve?
Shahiaa specializes in authentic Egyptian cuisine, offering a variety of traditional dishes and beverages.
4. Do I need a reservation to dine at Shahiaa?
Reservations are not required but are highly recommended, especially during peak hours and weekends, to ensure you have a table.
5. Does Shahiaa offer takeout or delivery services?
Yes, Shahiaa offers both takeout and delivery services. You can place your order by calling us at +20 123 456 7890 or through our website, www.shahiaa.com.
6. Is there a dress code at Shahiaa?
Shahiaa maintains a casual dress code. However, we encourage guests to dress smart-casual for a more comfortable dining experience.
7. Does Shahiaa accommodate dietary restrictions or special requests?
Absolutely! Please inform our staff of any dietary restrictions or special requests, and we will do our best to accommodate your needs.
8. Are there vegetarian or vegan options available?
Yes, Shahiaa offers a variety of vegetarian and vegan dishes. Feel free to ask our staff for recommendations.
9. Can I host a private event or party at Shahiaa?
Yes, Shahiaa provides space for private events and parties. Please contact us at least two weeks in advance to make arrangements.
10. How can I provide feedback about my experience at Shahiaa?
We value your feedback! You can share your thoughts with us directly by emailing info@shahiaa.com or through our website's feedback form.
"""

BUSINESS_DOCUMENT = """
Business Description:
Shahiaa Restaurant
Established: 2022
Location: Cairo, Egypt
About Us:
Welcome to Shahiaa, where culinary tradition meets exceptional service. Founded in 2022 and based
in the heart of Cairo, Shahiaa offers an extraordinary dining experience featuring a wide array of
delicious and authentic Egyptian cuisine. Our menu boasts a rich selection of freshly prepared dishes
and refreshing drinks, catering to all tastes and preferences. Whether you're looking for a hearty meal
or a light snack, Shahiaa is the perfect destination to satisfy your cravings.
At Shahiaa, we take pride in using the finest ingredients to prepare our dishes, ensuring every bite is a
taste of Egypt's vibrant culinary heritage. Our passionate chefs draw upon traditional recipes and
cooking techniques, adding their own unique twist to create memorable dining experiences for our
guests.
Join us at Shahiaa and immerse yourself in an ambiance that blends modern comfort with traditional
Egyptian charm. Our friendly staff is always ready to welcome you and ensure you have an
unforgettable dining experience.
Our Signature Dishes:
Koshary: A savory mix of rice, macaroni, and lentils topped with spicy tomato sauce,
chickpeas, and crispy fried onions.
Molokhia: A traditional Egyptian soup made from finely chopped jute leaves, served with rice
or bread.
Shawarma: Tender slices of seasoned meat, slow-cooked on a vertical rotisserie, served with
fresh vegetables and tahini sauce.
Baklava: A sweet pastry made with layers of filo dough, filled with chopped nuts, and
sweetened with syrup.
Operating Hours:
Monday to Friday: 12:00 PM - 10:00 PM
Saturday and Sunday: 1:00 PM - 11:00 PM
Contact Information:
Address: 123 Nile Avenue, Cairo, Egypt
Phone Number: +20 123 456 7890
Email: info@shahiaa.com
Website: www.shahiaa.com
"""

In [7]:
pf = pd.read_csv("products.csv")
documents = [doc for doc in pf["Text"]]

In [8]:
product_faiss = FaissIndex(documents, "products.faiss")

In [9]:
def faq_tool(search: str) -> str:
    return FAQ_DOCUMENT

def business_tool(search: str) -> str:
    return BUSINESS_DOCUMENT

def products_tool(search: str) -> str:
    products = product_faiss.search(search)
    return "\n".join(products)

def place_order_tool(order_text: str) -> str:
    print(f"Order placed, Order ID: {order_text}")

def cancel_order_tool(customer_phone: str) -> str:
    print(f"Order canceled, Customer phone: {customer_phone}")

def request_human(customer_phone: str) -> str:
    print(f"Human requested, Phone number: {customer_phone}")


In [27]:
products_tool("what product do you sell")

TypeError: sequence item 0: expected str instance, tuple found

In [10]:
PARSER_PROMPT = """
Your are a customer service agent parser. You will work with both english and arabic.
Your job is to extract information from the user's input.
You can extract the following information:
- Customer Phone Number: The phone number of the customer. (required if the user wants to place an order.)
- Customer Name: The name of the customer. (required if the user wants to place an order.)
- Customer Address: The address of the customer. (required if the user wants to place an order.)
- Talk To Human: If the user wants to talk to a human. (Boolean) (set to high the if the user wants to talk to a human)
- Cancel Order: If the user wants to cancel an order. (Boolean) (set to high if the user wants to cancel the order)
- Confirm Order: If the user wants to confirm an order. (Boolean) (set to high if the user wants to confirm the order this considered the end of the conversation )
- Products Query: User query about products (String separated with comma). Add this if you can't answer from the bellow products info
- Business Query: User query about business (String separated with comma). Add this if you can't answer from the bellow business info

This information would extracted from a single user message.
Because the agent will be in a normal conversation with the user.
So you will have information from the previous messages (State).
Same as the information above and you required to update it if the user provides new information,
if the user does not provide any information just output the state as it is.
In the state will get the information about the business like the name, summary,
product summary, and services summary.
The conversation state: 
customer_phone_number: {customer_phone_number}
customer_name: {customer_name}
customer_address: {customer_address}
talk_to_human: {talk_to_human}
cancel_order: {cancel_order}
confirm_order: {confirm_order}
products_info: {products_info}
business_info: {business_info}
The output should be a json object with keys.
- customer_phone_number
- customer_name
- customer_address
- talk_to_human
- cancel_order
- confirm_order
- products_query (this if the user asking for a products that not in the products info)
- business_query (this if the user asking for a business information that not in the business info)
"""

CHAT_AGENT_PROMPT = """
You are a customer service agent.
You should maintain a conversation with the user.
You should answer user queries and respond properly to the user.
You will provided with the following information:
- customer_phone_number: The phone number of the customer. (This is used to get the customer orders.)
- customer_name: The name of the customer. (This is used to address the customer.)
- customer_address: The address of the customer. (This is used to deliver the order.)
- order_info: The order information extracted about the order, this is a string and when seen by a human the human should know the customer order.
- talk_to_human: If the user wants to talk to a human.
- cancel_order: If the user wants to cancel an order.
- confirm_order: If the user wants to confirm an order.
- products_info: The information about the products.
- business_info: The information about the business.
In the state will get the information about the business like the name, summary,
product summary, and services summary.

This will help you to respond to the user properly. you should direct the conversation
tell the user confirm the order. or cancel the order. or talk to a human.
for example here is a possible conversation flow:
This is just an example, you can have different conversation flow.
The conversation state this what you know tell now about the chat: 
customer_phone_number: {customer_phone_number}
customer_name: {customer_name}
customer_address: {customer_address}
order_info: {order_info}
talk_to_human: {talk_to_human}
cancel_order: {cancel_order}
confirm_order: {confirm_order}
products_info: {products_info}
business_info: {business_info}
Don't ask the user for the information you already have.
You should control the flow of the conversation so you can
get the information needed to complete the order
"""

In [34]:
import json
import sqlite3
from typing import TypedDict

class State(TypedDict):
    messages: list[str]
    customer_phone_number: str
    customer_name: str
    customer_address: str
    order_info: str
    talk_to_human: bool
    cancel_order: bool
    confirm_order: bool
    products_info: str
    business_info: str
    product_query: str
    business_query: str


def update_state(state: State, dct: dict):
    print("update state: ", dct)
    return {**state, **dct}

def load_state(unique_id: str) -> State:
    conn = sqlite3.connect('states.db')
    cursor = conn.cursor()
    
    # Create table if it doesn't exist
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS states
    (id TEXT PRIMARY KEY, state TEXT)
    ''')
    
    cursor.execute('SELECT state FROM states WHERE id = ?', (unique_id,))
    result = cursor.fetchone()
    
    if result:
        state_json = result[0]
        state = json.loads(state_json)
    else:
        state = State(
            messages=[],
            customer_phone_number=unique_id,
            customer_name="",
            customer_address="",
            order_info="",
            talk_to_human=False,
            cancel_order=False,
            confirm_order=False,
            products_info="",
            business_info="",
            product_query="",
            business_query=""
        )
    
    conn.close()
    return state

def save_state(unique_id: str, state: State):
    conn = sqlite3.connect('states.db')
    cursor = conn.cursor()
    
    state_json = json.dumps(state)
    
    cursor.execute('''
    INSERT OR REPLACE INTO states (id, state)
    VALUES (?, ?)
    ''', (unique_id, state_json))
    
    conn.commit()
    conn.close()  


class ChainEnd:

    def __init__(self, unique_id: str):
        self.unique_id = unique_id

    def __call__(self, state: State):
        save_state(self.unique_id, state)
        return state

class ParserAgent:

    def __init__(self, next: None):
        self.next = next
    
    def __call__(self, state: State):
        prompt = PARSER_PROMPT.format(
            customer_phone_number=state["customer_phone_number"],
            customer_name=state["customer_name"],
            customer_address=state["customer_address"],
            order_info=state["order_info"],
            talk_to_human=state["talk_to_human"],
            cancel_order=state["cancel_order"],
            confirm_order=state["confirm_order"],
            products_info=state["products_info"],
            business_info=state["business_info"],
        )
        messages = [system(prompt)] + state["messages"]
        output_json = get_completion(messages, json=True)
        output_dict = json.loads(output_json)
        state = update_state(state, output_dict)
        return self.next(state)

class ChatAgent:

    def __init__(self, next=None):
        self.next = next
    
    def __call__(self, state: State):
        prompt = CHAT_AGENT_PROMPT.format(
            customer_phone_number=state["customer_phone_number"],
            customer_name=state["customer_name"],
            customer_address=state["customer_address"],
            order_info=state["order_info"],
            talk_to_human=state["talk_to_human"],
            cancel_order=state["cancel_order"],
            confirm_order=state["confirm_order"],
            products_info=state["products_info"],
            business_info=state["business_info"],
        )
        messages = [system(prompt)] + state["messages"]
        print(messages)
        response = get_completion(messages)
        state["messages"].append(assistant(response))
        update_state(state, {"messages": state["messages"]})
        return self.next(state)        

class Router:

    def __init__(self, next=None):
        self.next = next        
    
    def __call__(self, state: State):
        print(state)
        if state["talk_to_human"]:
            request_human(state["customer_phone_number"])
            message = assistant("Human has been notified")
            state["messages"].append(message)
            
        if state["cancel_order"]:
            cancel_order_tool(state["customer_phone_number"])
            message = assistant("Order has been cancelled")
            state["messages"].append(message)
        
        if state["confirm_order"]:
            place_order_tool(state["order_info"])
            message = assistant("Order has been confirmed")
            state["messages"].append(message)
        
        if len(state["product_query"]) > 0:
            products = products_tool(state["product_query"])
            state = update_state(state, {"products_info": products})
        
        if len(state["business_query"]) > 0:
            business = business_tool(state["business_query"])
            state = update_state(state, {"business_info": business})

        return self.next(state)

class Starter:

    def __init__(self, next=None):
        self.next = next
    
    def __call__(self, user_input: str, unique_id: str):
        state = load_state(unique_id)
        state["messages"].append(user(user_input))
        return self.next(state)

class AgentChain:
    def __init__(self, unique_id: str):
        self.unique_id = unique_id
        chain_end = ChainEnd(unique_id)
        chat_agent = ChatAgent(next=chain_end)
        router = Router(next=chat_agent)
        parser_agent = ParserAgent(next=router)
        self.starter = Starter(next=parser_agent)
        

    def process(self, user_input: str):
        return self.starter(user_input, self.unique_id)

def handle_user_input(user_input: str, unique_id: str):
    agent_chain = AgentChain(unique_id)
    final_state = agent_chain.process(user_input)
    return final_state["messages"][-1]["content"]


def clear_state(unique_id: str):
    conn = sqlite3.connect("states.db")
    cursor = conn.cursor()
    cursor.execute("DELETE FROM states WHERE id = ?", (unique_id,))
    conn.commit()
    conn.close()


In [35]:
conversation_id = "1"
clear_state(conversation_id)

In [36]:
while True:
    user_input = input()
    output = handle_user_input(user_input, conversation_id)
    print(output)

update state:  {'customer_phone_number': 1, 'customer_name': '', 'customer_address': '', 'talk_to_human': False, 'cancel_order': False, 'confirm_order': False, 'products_query': '', 'business_query': ''}
{'messages': [{'role': 'user', 'content': 'hi'}], 'customer_phone_number': 1, 'customer_name': '', 'customer_address': '', 'order_info': '', 'talk_to_human': False, 'cancel_order': False, 'confirm_order': False, 'products_info': '', 'business_info': '', 'product_query': '', 'business_query': '', 'products_query': ''}
[{'role': 'system', 'content': "\nYou are a customer service agent.\nYou should maintain a conversation with the user.\nYou should answer user queries and respond properly to the user.\nYou will provided with the following information:\n- customer_phone_number: The phone number of the customer. (This is used to get the customer orders.)\n- customer_name: The name of the customer. (This is used to address the customer.)\n- customer_address: The address of the customer. (Thi

KeyboardInterrupt: 