In [143]:
from typing import Dict, Optional, List
from pydantic import BaseModel
import uuid
from datetime import datetime
from typing import Callable

customers = []


class Customer(BaseModel):
    id: str
    name: str
    email: str
    product_choice: str
    conversation_history: List[Dict]


class Session(BaseModel):
    session_id: str = str(uuid.uuid4())
    context: Dict = {}
    current_node_id: str = "welcome"
    conversation_history: List[str] = []

class Node(BaseModel):
    id: str
    prompt_template: str
    required_fields: List[str] = []
    list_of_next_possible_nodes: List[str] = []
    custom_handler_function: Optional[Callable] = None
    
    def get_processed_prompt_template(self, context: Dict) -> str:
        return self.prompt_template.format(**context)
    
async def get_product_details_using_rag(question: str, history: List[str], context: Dict) -> str:
    #  TODO: Implement RAG
    return "Product A is a cutting-edge solution designed to streamline business operations and enhance productivity. With robust features including support for up to 500 concurrent users, advanced analytics capabilities, and seamless integration options, it stands out in the market. The platform offers real-time collaboration tools, automated workflow management, and customizable dashboards that adapt to your business needs. Security is paramount, with enterprise-grade encryption and role-based access control. Regular updates ensure you stay ahead with the latest features, while our 24/7 technical support team ensures smooth operations. Perfect for both small businesses and large enterprises looking to scale efficiently."

def update_scheduled_demo_details(name: str, email: str, product_choice: Optional[str] = None,date: Optional[str] = None) -> str:
    # TODO: Implement actual DB update or API call
    print("------------------------------ Updating scheduled demo details -----------------------------")
    print(f"Demo scheduled successfully for {name} with email {email} for product {product_choice} on {date}")
    print("--------------------------------------------------------------------------------")
    return f"Demo scheduled successfully for {name} with email {email} for product {product_choice} on {date}"


In [136]:
from typing import Dict
from together import Together
from pydantic import BaseModel, Field
from typing import List
import json
from typing import Any, Tuple

class InputAnalysis(BaseModel):
    next_node_id: str = Field(description="The next node id from the possible next nodes list")
    user_inputs : Dict = Field(description="Any identified user inputs from the user input eg: name, email, product_choice")
    confidence: float = Field(description="Confidence score between 0 and 1")
    suggested_response: str = Field(description="Optional suggested response", default=None)
    

class PromptService:
    @staticmethod
    def create_input_prompt(current_message: str, history: List[str], context: Dict, list_of_next_possible_nodes: List[str], nodes: Dict[str, Node]) -> str:
        list_of_nodes = [nodes[node_id] for node_id in list_of_next_possible_nodes]
        prompt = f"""Analyze the following user input and determine the next node. if the required fields are not met, add follow up questions to the user.
        
        user details: {context}
        User Input: {current_message}
        
        History:
        {history}
        
        Possible next nodes:
        {list_of_nodes}
        """
        print("------------------------------ Generated Intent Prompt -----------------------------")
        print(prompt)
        print("--------------------------------------------------------------------------------")
        return prompt

    @staticmethod
    def create_demo_tool_prompt(schedule_demo_tool: Dict) -> str:
        prompt = f"""
        You have access to the following function:

        Use the function '{schedule_demo_tool["name"]}' to '{schedule_demo_tool["description"]}':
        {json.dumps(schedule_demo_tool)}

        Schedule a demo for tomorrow using the context provided. Format the date as YYYY-MM-DD.

        If you choose to call a function ONLY reply in the following format with no prefix or suffix:
        <function=example_function_name>{{"example_name": "example_value"}}</function>
        """
        print("------------------------------ Generated Schedule Demo Tool Prompt -----------------------------")
        print(prompt)
        print("--------------------------------------------------------------------------------")
        return prompt

class TogetherAIClient:
    def __init__(self, model: str = "meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo"):
        self.client = Together()
        self.model = model

    def create_chat_completion(self, messages: List[Dict], temperature: float = 0.7, response_format: Optional[Dict] = None) -> Any:
        print("------------------------------ Sending Request to Together AI -----------------------------")
        print(f"Model: {self.model}")
        print(f"Messages: {json.dumps(messages, indent=2)}")
        print(f"Temperature: {temperature}")
        if response_format:
            print(f"Response Format: {json.dumps(response_format, indent=2)}")
        print("--------------------------------------------------------------------------------")
        
        response = self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            temperature=temperature,
            response_format=response_format
        )
        
        print("------------------------------ Received Response from Together AI -----------------------------")
        print(response)
        print("--------------------------------------------------------------------------------")
        return response

class ResponseParser:
    @staticmethod
    def parse_llm_response(response) -> Dict:
        print("------------------------------ Parsing LLM Response -----------------------------")
        try:
            if isinstance(response.choices[0].message.content, str):
                result = json.loads(response.choices[0].message.content)
            else:
                result = response.choices[0].message.content
            print(f"Parsed Result: {json.dumps(result, indent=2)}")
            print("--------------------------------------------------------------------------------")
            return result
        except Exception as e:
            print(f"Error parsing response: {e}")
            print("Returning default response")
            print("--------------------------------------------------------------------------------")
            return {
                "input": "unknown",
                "entities": {},
                "confidence": 0.0,
                "suggested_response": None
            }

    @staticmethod
    def parse_tool_response(response) -> Tuple[str, Dict]:
        print("------------------------------ Parsing Tool Response -----------------------------")
        function = response.choices[0].message.tool_calls[0].function
        arguments = json.loads(function.arguments)
        print(f"Function Name: {function.name}")
        print(f"Arguments: {json.dumps(arguments, indent=2)}")
        print("--------------------------------------------------------------------------------")
        return function.name, arguments

class TogetherAIService:
    def __init__(self):
        print("------------------------------ Initializing TogetherAIService -----------------------------")
        self.ai_client = TogetherAIClient()
        self.prompt_service = PromptService()
        self.response_parser = ResponseParser()
        print("--------------------------------------------------------------------------------")

    async def analyze_input(self, current_message: str, history: List[str], context: Dict, list_of_next_possible_nodes: List[str]) -> Dict:
        print("------------------------------ Starting Input Analysis -----------------------------")
        prompt = self.prompt_service.create_input_prompt(
            current_message, history, context, list_of_next_possible_nodes, nodes
        )
        
        response = self.ai_client.create_chat_completion(
            messages=[
                {"role": "system", "content": "You are a sales assistant. Only respond in JSON format."},
                {"role": "user", "content": prompt}
            ],
            response_format={
                "type": "json_object",
                "schema": InputAnalysis.model_json_schema()
            }
        )
        
        result = self.response_parser.parse_llm_response(response)
        print("------------------------------ Completed Input Analysis -----------------------------")
        return result

    async def schedule_demo(self, question: str, history: List[str], context: Dict) -> str:
        print("------------------------------ Starting Demo Scheduling -----------------------------")
        schedule_demo_tool = self._get_schedule_demo_tool_config()
        tool_prompt = self.prompt_service.create_demo_tool_prompt(schedule_demo_tool)
        
        messages = [
            {"role": "system", "content": tool_prompt},
            {"role": "user", "content": f"Schedule a demo for customer with context: {json.dumps(context)}"}
        ]
        
        response = self.ai_client.create_chat_completion(messages, temperature=0)
        function_name, arguments = self.response_parser.parse_tool_response(response)
        
        print("------------------------------ Processing Demo Schedule -----------------------------")
        if function_name == "update_scheduled_demo_details":
            result = update_scheduled_demo_details(**arguments)
            print(f"Demo scheduled successfully: {result}")
            print("--------------------------------------------------------------------------------")
            return result
        
        print("Failed to schedule demo")
        print("--------------------------------------------------------------------------------")
        return "Failed to schedule demo"

    @staticmethod
    def _get_schedule_demo_tool_config() -> Dict:
        config = {
            "name": "update_scheduled_demo_details",
            "description": "Schedule a demo with the customer",
            "parameters": {
                "type": "object",
                "properties": {
                    "name": {"type": "string", "description": "Customer's name"},
                    "email": {"type": "string", "description": "Customer's email"},
                    "product_choice": {"type": "string", "description": "Product selected by customer"},
                    "date": {"type": "string", "description": "Demo date (one day after today)"}
                },
                "required": ["name", "email"]
            }
        }
        print("------------------------------ Demo Tool Config -----------------------------")
        print(json.dumps(config, indent=2))
        print("--------------------------------------------------------------------------------")
        return config




In [148]:
class ConversationManager:
    def __init__(self):
        self.customers = []

    def _create_basic_node(self, id: str, description: str, prompt: str, required: List[str], next_nodes: List[str]) -> Node:
        return Node(
            id=id,
            description=description,
            prompt_template=prompt,
            required_fields=required,
            list_of_next_possible_nodes=next_nodes
        )

    def _create_custom_node(self, id: str, description: str, prompt: str, required: List[str], 
                          next_nodes: List[str], handler: Callable) -> Node:
        return Node(
            id=id,
            description=description,
            prompt_template=prompt,
            required_fields=required,
            list_of_next_possible_nodes=next_nodes,
            custom_handler_function=handler
        )

    def initialize_nodes(self) -> Dict[str, Node]:
        nodes = {}
        
        # Basic conversation nodes
        nodes["welcome"] = self._create_basic_node(
            "welcome",
            "Welcome message",
            "Hello! I'm your sales assistant. May I know your name?",
            [],
            ["collect_email", "get_products", "question_and_answer_node_for_product_details"]
        )

        nodes["collect_name"] = self._create_basic_node(
            "collect_name",
            "Collect name from user",
            "Nice to meet you, {name}! What's your email address?",
            [],
            ["collect_email", "get_products", "question_and_answer_node_for_product_details"]
        )

        nodes["collect_email"] = self._create_basic_node(
            "collect_email",
            "Collect email from user",
            "Nice to meet you, {name}! What's your email address?",
            ["name"],
            ["get_products"]
        )

        nodes["get_products"] = self._create_basic_node(
            "get_products",
            "Get product details from user",
            "What product are you interested in today? We offer: \n- Product A\n- Product B\n- Product C",
            ["name", "email"],
            ["question_and_answer_node_for_product_details", "schedule_demo"]
        )

        # Nodes with custom handlers
        nodes["question_and_answer_node_for_product_details"] = self._create_custom_node(
            "question_and_answer_node_for_product_details",
            "Answers any questions about the product, using RAG",
            "",
            ["name", "email"],
            ["schedule_demo", "end_conversation", "question_and_answer_node_for_product_details"],
            get_product_details_using_rag
        )

        nodes["schedule_demo"] = self._create_custom_node(
            "schedule_demo",
            "Schedule a demo with the user",
            "Success Response: I've scheduled a demo for you. I'll send you a confirmation email shortly. \n"
            "Error Response: I'm sorry, I couldn't find your name or email in the conversation history. "
            "Please provide your name and email so I can schedule the demo.",
            ["name", "email"],
            ["end_conversation", "question_and_answer_node_for_product_details", "collect_name", "collect_email"],
            TogetherAIService().schedule_demo
        )

        nodes["end_conversation"] = self._create_basic_node(
            "end_conversation",
            "End the conversation",
            "Thank you for using our service. Have a great day!",
            [],
            []
        )

        return nodes

    async def _save_customer(self, session: Session):
        customer = Customer(
            id=session.session_id,
            name=session.context.get("name"),
            email=session.context.get("email"),
            product_choice=session.context.get("product_choice"),
            conversation_history=[{
                "timestamp": datetime.utcnow().isoformat(),
                "context": session.context
            }]
        )
        self.customers.append(customer)

    def _should_save_customer(self, session: Session) -> bool:
        required_fields = ["name", "email", "product_choice"]
        return all(field in session.context for field in required_fields)

    def get_node(self, node_id: str) -> Node:
        return self.nodes[node_id]


In [149]:
#  start conversation
session = Session()
together_service = TogetherAIService()
response = None
# Initialize the conversation manager
conversation_manager = ConversationManager()
nodes = conversation_manager.initialize_nodes()  

print(session)
while True:
    print("------------------------------ Current Node -----------------------------")
    print(session.current_node_id)
    current_node = nodes[session.current_node_id]
    suggested_response = response.get("suggested_response", None) if response else current_node.get_processed_prompt_template(session.context)
    message_to_user = suggested_response
    print("Suggested response", suggested_response)
    if current_node.custom_handler_function:
        message_to_user = await current_node.custom_handler_function(user_input, session.conversation_history, session.context)
    
    session.conversation_history.append(f'AI: {message_to_user} \n')
    print("Message sent to user:", message_to_user)
    print("\n")
    print("\n")
    print("--------------------------------------------------------------------------------")
    user_input = input(f"{message_to_user} \n ")
    if user_input == "exit" or user_input == "quit" or user_input == "q":
        break
    print(user_input)
    session.conversation_history.append(f'User: {user_input} \n')
    response = await together_service.analyze_input(user_input, session.conversation_history, session.context, current_node.list_of_next_possible_nodes)
    session.current_node_id = response["next_node_id"]
    session.context = {**session.context, **response["user_inputs"]}
    print("Updated context", session.context)
    print("Updated current node", session.current_node_id)
    print("--------------------------------------------------------------------------------")
    


------------------------------ Initializing TogetherAIService -----------------------------
--------------------------------------------------------------------------------
------------------------------ Initializing TogetherAIService -----------------------------
--------------------------------------------------------------------------------
session_id='ed2fd72b-95e4-47c8-bd7c-75fb94a65712' context={} current_node_id='welcome' conversation_history=[]
------------------------------ Current Node -----------------------------
welcome
Suggested response Hello! I'm your sales assistant. May I know your name?
Message sent to user: Hello! I'm your sales assistant. May I know your name?




--------------------------------------------------------------------------------
i am vineeth
------------------------------ Starting Intent Analysis -----------------------------
------------------------------ Generated Intent Prompt -----------------------------
Analyze the following user input and dete

In [150]:
session

Session(session_id='ed2fd72b-95e4-47c8-bd7c-75fb94a65712', context={'name': 'vineeth', 'email': 'vineethtngl@gmail.com', 'product': 'Product A'}, current_node_id='end_conversation', conversation_history=["AI: Hello! I'm your sales assistant. May I know your name? \n", 'User: i am vineeth \n', "AI: Nice to meet you, vineeth! What's your email address? \n", 'User: vineethtngl@gmail.com \n', 'AI: What product are you interested in today? We offer: \\\\- Product A\\\\- Product B\\\\- Product C \n', 'User: Product A \n', 'AI: Product A is a cutting-edge solution designed to streamline business operations and enhance productivity. With robust features including support for up to 500 concurrent users, advanced analytics capabilities, and seamless integration options, it stands out in the market. The platform offers real-time collaboration tools, automated workflow management, and customizable dashboards that adapt to your business needs. Security is paramount, with enterprise-grade encryption

In [152]:
from pprint import pprint

def format_session(session):
    """
    Format and print a Session object in a clean, readable way.
    
    Args:
        session: Session object containing conversation details
    """
    print("""
┌─────────────────────────────────────────────┐
│              Session Details                 │
└─────────────────────────────────────────────┘
""")
    
    print(f"Session ID: {session.session_id}\n")
    
    print("Context:")
    print(f"├── Name: {session.context.get('name')}")
    print(f"├── Email: {session.context.get('email')}")
    print(f"└── Product: {session.context.get('product') or session.context.get('product_choice')}\n")
    
    print(f"Current Node: {session.current_node_id}\n")
    
    print("┌─────────────────────────────────────────────┐")
    print("│           Conversation History               │")
    print("└─────────────────────────────────────────────┘")
    
    for message in session.conversation_history:
        # Clean up the message and handle escaped characters
        cleaned_message = message.strip().replace('\\n', '\n')
        
        # Format AI and User messages differently
        if message.startswith('AI:'):
            print(f"\n🤖 {cleaned_message[4:]}")  # Remove 'AI: ' prefix
        elif message.startswith('User:'):
            print(f"👤 {cleaned_message[6:]}")  # Remove 'User: ' prefix
        else:
            print(cleaned_message)
        
        print("─" * 50)  # Separator line

    # Print raw data if needed
    print("\nRaw Session Data:")
    pprint(vars(session), indent=2, width=100)

# Usage
if __name__ == "__main__":
    format_session(session)


┌─────────────────────────────────────────────┐
│              Session Details                 │
└─────────────────────────────────────────────┘

Session ID: ed2fd72b-95e4-47c8-bd7c-75fb94a65712

Context:
├── Name: vineeth
├── Email: vineethtngl@gmail.com
└── Product: Product A

Current Node: end_conversation

┌─────────────────────────────────────────────┐
│           Conversation History               │
└─────────────────────────────────────────────┘

🤖 Hello! I'm your sales assistant. May I know your name?
──────────────────────────────────────────────────
👤 i am vineeth
──────────────────────────────────────────────────

🤖 Nice to meet you, vineeth! What's your email address?
──────────────────────────────────────────────────
👤 vineethtngl@gmail.com
──────────────────────────────────────────────────

🤖 What product are you interested in today? We offer: \\- Product A\\- Product B\\- Product C
──────────────────────────────────────────────────
👤 Product A
──────────────────────────