In [13]:
from support_files.graph_connection import neo4j_connection, test_neo4j_connection
import json
test_graph = test_neo4j_connection()

[32m2024-09-17 11:08:45.444[0m | [1mINFO    [0m | [36msupport_files.graph_connection[0m:[36mtest_neo4j_connection[0m:[36m42[0m - [1mSuccessfully established Neo4j connection for testing.[0m


In [None]:
# import neccasary packages
from datetime import datetime
from pytz import timezone
from typing import TypedDict, Annotated
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.graph import StateGraph, START, END
from typing import Callable
from langchain_core.messages import ToolMessage
from langgraph.graph.message import AnyMessage, add_messages
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.runnables import Runnable, RunnableConfig
from support_files.tool_execution import *
from support_files.lead_agent import lead_assistant_runnable, lead_agent_tool,safe_tool, sensitive_tool
from langchain_groq import ChatGroq
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, StateGraph, START
from langgraph.prebuilt import tools_condition
ist_timezone = timezone("Asia/Kolkata")
from dotenv import load_dotenv
load_dotenv()
from langchain_core.messages import ToolMessage
from langchain_core.runnables import RunnableLambda
from langgraph.prebuilt import ToolNode
from typing import Annotated, Literal, Optional
from typing_extensions import TypedDict
from langgraph.graph.message import AnyMessage, add_messages


def update_dialog_stack(left: list[str], right: Optional[str]) -> list[str]:
    """Push or pop the state."""
    if right is None:
        return left
    if right == "pop":
        return left[:-1]
    return left + [right]


class State(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]
    user_info: str
    dialog_state: Annotated[
        list[
            Literal[
                "assistant",
                "lead_existance_verification",
                "lead_creation",
            ]
        ],
        update_dialog_stack,
    ]
class Assistant:
    def __init__(self, runnable: Runnable):
        self.runnable = runnable

    def __call__(self, state: State, config: RunnableConfig):
        while True:
            result = self.runnable.invoke(state)
            # If the LLM happens to return an empty response, we will re-prompt it
            # for an actual response.
            if not result.tool_calls and (
                not result.content
                or isinstance(result.content, list)
                and not result.content[0].get("text")
            ):
                messages = state["messages"] + [("user", "Respond with a real output.")]
                state = {**state, "messages": messages}
                messages = state["messages"] + [("user", "Respond with a real output.")]
                state = {**state, "messages": messages}
            else:
                break
        return {"messages": result}

class CompleteOrEscalate(BaseModel):
    """A tool to mark the current task as completed and/or to escalate control of the dialog to the main assistant,
    who can re-route the dialog based on the user's needs."""

    cancel: bool = True
    reason: str

    class Config:
        schema_extra = {
            "example": {
                "cancel": True,
                "reason": "User changed their mind about the current task.",
            },
            "example 2": {
                "cancel": True,
                "reason": "I have fully completed the task.",
            },
            "example 3": {
                "cancel": False,
                "reason": "I need to search the emails for more information.",
            },
        }

def handle_tool_error(state) -> dict:
    error = state.get("error")
    tool_calls = state["messages"][-1].tool_calls
    return {
        "messages": [
            ToolMessage(
                content=f"Error: {repr(error)}\n please fix your mistakes.",
                tool_call_id=tc["id"],
            )
            for tc in tool_calls
        ]
    }


def create_tool_node_with_fallback(tools: list) -> dict:
    return ToolNode(tools).with_fallbacks(
        [RunnableLambda(handle_tool_error)], exception_key="error"
    )


def _print_event(event: dict, _printed: set, max_length=1500):
    current_state = event.get("dialog_state")
    if current_state:
        print("Currently in: ", current_state[-1])
    message = event.get("messages")
    if message:
        if isinstance(message, list):
            message = message[-1]
        if message.id not in _printed:
            msg_repr = message.pretty_repr(html=True)
            if len(msg_repr) > max_length:
                msg_repr = msg_repr[:max_length] + " ... (truncated)"
            print(msg_repr)
            _printed.add(message.id)

class Lead_assistant(BaseModel):
    """Transfer work to a specialized assistant to handle the lead relevant details."""

    location: str = Field(
        description="The location where the user wants create or udate or delete the lead."
    )
    name: str = Field(description="Name of the customer.")
    phone: str = Field(description="Phone number of the customer.")
    email: str = Field(description="Email of the customer.")
    civilID: str = Field(description="Civil ID of the customer.")

    request: str = Field(
        description="Any additional information or requests from the user regarding the create or udate or delete the lead."
    )

    class Config:
        schema_extra = {
            "example": {
                "name": "Chandru",
                "phone": "+91 8124832683",
                "email": "chandruganeshan@gmail.com",
                "civilID": "986534567893",
            }
        }


def create_entry_node(assistant_name: str, new_dialog_state: str) -> Callable:
    def entry_node(state: State) -> dict:
        tool_call_id = state["messages"][-1].tool_calls[0]["id"]
        return {
            "messages": [
                ToolMessage(
                    content=f"The assistant is now the {assistant_name}. Reflect on the above conversation between the host assistant and the user."
                    f" The user's intent is unsatisfied. Use the provided tools to assist the user. Remember, you are {assistant_name},"
                    " and the booking, update, other other action is not complete until after you have successfully invoked the appropriate tool."
                    " If the user changes their mind or needs help for other tasks, call the CompleteOrEscalate function to let the primary host assistant take control."
                    " Do not mention who you are - just act as the proxy for the assistant.",
                    tool_call_id=tool_call_id,
                )
            ],
            "dialog_state": new_dialog_state,
        }

    return entry_node


model = ChatGroq(model="llama3-70b-8192",temperature=0)

primary_assistant_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a helpful customer support assistant for Automotive Industry."
            "Your primary role is search leads, customer details, test drive details and other customer required deatils."
            "If the customer wants to create or update or delete the lead, book test drive, create a quotation, "
            "delegate the task to appropriate specialized assistants by invoking corresponding tools. You are not able to make these type of changes yourself"
            "Only the specialized assistants are given permission to do this for the user."
            "The user is not aware of the different specialized assistants, so do not mention them; just quietly delegate through function calls. "
            "Provide detailed information to the customer, and always double-check the database before concluding that information is unavailable. "
            "When searching, be persistent. Expand your query bounds if the first search returns no results."
            "If a search comes up empty, expand your search before giving up."
            "\nCurrent time: {time}."
        ),
        ("placeholder", "{messages}"),
    ]
).partial(time=datetime.now(ist_timezone).isoformat())


primary_assistant_runnable = primary_assistant_prompt | model.bind_tools([Lead_assistant])
def pop_dialog_state(state: State) -> dict:
    """Pop the dialog stack and return to the main assistant.

    This lets the full graph explicitly track the dialog flow and delegate control
    to specific sub-graphs.
    """
    messages = []
    if state["messages"][-1].tool_calls:
        # Note: Doesn't currently handle the edge case where the llm performs parallel tool calls
        messages.append(
            ToolMessage(
                content="Resuming dialog with the host assistant. Please reflect on the past conversation and assist the user as needed.",
                tool_call_id=state["messages"][-1].tool_calls[0]["id"],
            )
        )
    return {
        "dialog_state": "pop",
        "messages": messages,
    }

# Compile graph
builder = StateGraph(State)

builder.add_node("enter_lead_assistant",create_entry_node("Lead Assistant", "lead_agent"))
builder.add_node("lead_agent", Assistant(lead_assistant_runnable))
builder.add_edge("enter_lead_assistant", "lead_agent")

builder.add_node(
    "lead_assistant_safe_tools",
    create_tool_node_with_fallback(safe_tool)
)

builder.add_node(
    "lead_assistant_sensitive_tools",
    create_tool_node_with_fallback(sensitive_tool)
)

builder.add_node("primary_assistant", Assistant(primary_assistant_runnable))
builder.add_edge(START, "primary_assistant")

builder.add_node("leave_skill", pop_dialog_state)
builder.add_edge("leave_skill", "primary_assistant")
builder.add_edge("primary_assistant", "enter_lead_assistant")

def route_lead_assistant(
    state: State,
) -> Literal[
    "lead_assistant_safe_tools",
    "lead_assistant_sensitive_tools",
    "leave_skill",
    "__end__",
]:
    route = tools_condition(state)
    if route == END:
        return END
    tool_calls = state["messages"][-1].tool_calls
    did_cancel = any(tc["name"] == CompleteOrEscalate.__name__ for tc in tool_calls)
    if did_cancel:
        return "leave_skill"
    safe_toolnames = [t.name for t in safe_tool]
    if all(tc["name"] in safe_toolnames for tc in tool_calls):
        return "lead_assistant_safe_tools"
    return "lead_assistant_sensitive_tools"


######################################################################################

def route_primary_assistant(
    state: State,
) -> Literal[
    "primary_assistant_tools",
    "enter_lead_assistant",
    "__end__",
]:
    route = tools_condition(state)
    if route == END:
        return END
    tool_calls = state["messages"][-1].tool_calls
    if tool_calls:
        if tool_calls[0]["name"] == Lead_assistant.__name__:
            return "enter_lead_assistant"
        return "primary_assistant_tools"
    raise ValueError("Invalid route")


# The assistant can route to one of the delegated assistants,
# directly use a tool, or directly respond to the user
builder.add_conditional_edges(
    "primary_assistant",
    route_primary_assistant,
    {
        "enter_lead_assistant": "enter_lead_assistant",
        "primary_assistant_tools": "primary_assistant_tools",
        END: END,
    },
)
builder.add_edge("primary_assistant_tools", "primary_assistant")

def route_to_workflow(
    state: State,
) -> Literal[
    "primary_assistant",
    "lead_agent",
]:
    """If we are in a delegated state, route directly to the appropriate assistant."""
    dialog_state = state.get("dialog_state")
    if not dialog_state:
        return "primary_assistant"
    return dialog_state[-1]


# builder.add_conditional_edges("fetch_user_info", route_to_workflow)


####################################################################################




builder.add_edge("lead_assistant_safe_tools", "lead_agent")
builder.add_edge("lead_assistant_sensitive_tools", "lead_agent")
builder.add_conditional_edges("lead_agent",route_lead_assistant)

# Compile graph
memory = MemorySaver()
part_4_graph = builder.compile(checkpointer=memory,interrupt_before=["lead_assistant_sensitive_tool"],)

part_4_graph.get_graph(xray=True).draw_mermaid_png(output_file_path="part_4_graph.png")

config = {
    "configurable": {
        "thread_id": 1,
    }
}

_printed = set()
while True:
    print(State["messages"])
    question = input("Ask question: ")
    events = part_4_graph.stream(
        {"messages": ("user", question)}, config, stream_mode="values"
    )
    for event in events:
        _print_event(event, _printed)

In [None]:
# import neccasary packages
from datetime import datetime
from pytz import timezone
from typing import TypedDict, Annotated
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.graph import StateGraph, START, END
from typing import Callable
from langchain_core.messages import ToolMessage
from langgraph.graph.message import AnyMessage, add_messages
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.runnables import Runnable, RunnableConfig
from support_files.tool_execution import *
from support_files.lead_agent import lead_assistant_runnable, lead_agent_tool,safe_tool, sensitive_tool
from langchain_groq import ChatGroq
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, StateGraph, START
from langgraph.prebuilt import tools_condition
ist_timezone = timezone("Asia/Kolkata")
from dotenv import load_dotenv
load_dotenv()
from langchain_core.messages import ToolMessage
from langchain_core.runnables import RunnableLambda
from langgraph.prebuilt import ToolNode
from typing import Annotated, Literal, Optional
from typing_extensions import TypedDict
from langgraph.graph.message import AnyMessage, add_messages


def update_dialog_stack(left: list[str], right: Optional[str]) -> list[str]:
    """Push or pop the state."""
    if right is None:
        return left
    if right == "pop":
        return left[:-1]
    return left + [right]


class State(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]
    user_info: str
    dialog_state: Annotated[
        list[
            Literal[
                "assistant",
                "lead_existance_verification",
                "lead_creation",
            ]
        ],
        update_dialog_stack,
    ]
class Assistant:
    def __init__(self, runnable: Runnable):
        self.runnable = runnable

    def __call__(self, state: State, config: RunnableConfig):
        while True:
            result = self.runnable.invoke(state)
            # If the LLM happens to return an empty response, we will re-prompt it
            # for an actual response.
            if not result.tool_calls and (
                not result.content
                or isinstance(result.content, list)
                and not result.content[0].get("text")
            ):
                messages = state["messages"] + [("user", "Respond with a real output.")]
                state = {**state, "messages": messages}
                messages = state["messages"] + [("user", "Respond with a real output.")]
                state = {**state, "messages": messages}
            else:
                break
        return {"messages": result}

class CompleteOrEscalate(BaseModel):
    """A tool to mark the current task as completed and/or to escalate control of the dialog to the main assistant,
    who can re-route the dialog based on the user's needs."""

    cancel: bool = True
    reason: str

    class Config:
        schema_extra = {
            "example": {
                "cancel": True,
                "reason": "User changed their mind about the current task.",
            },
            "example 2": {
                "cancel": True,
                "reason": "I have fully completed the task.",
            },
            "example 3": {
                "cancel": False,
                "reason": "I need to search the emails for more information.",
            },
        }

def handle_tool_error(state) -> dict:
    error = state.get("error")
    tool_calls = state["messages"][-1].tool_calls
    return {
        "messages": [
            ToolMessage(
                content=f"Error: {repr(error)}\n please fix your mistakes.",
                tool_call_id=tc["id"],
            )
            for tc in tool_calls
        ]
    }
def create_tool_node_with_fallback(tools: list) -> dict:
    return ToolNode(tools).with_fallbacks(
        [RunnableLambda(handle_tool_error)], exception_key="error"
    )


def _print_event(event: dict, _printed: set, max_length=1500):
    current_state = event.get("dialog_state")
    if current_state:
        print("Currently in: ", current_state[-1])
    message = event.get("messages")
    if message:
        if isinstance(message, list):
            message = message[-1]
        if message.id not in _printed:
            msg_repr = message.pretty_repr(html=True)
            if len(msg_repr) > max_length:
                msg_repr = msg_repr[:max_length] + " ... (truncated)"
            print(msg_repr)
            _printed.add(message.id)

class Lead_assistant(BaseModel):
    """Transfer work to a specialized assistant to handle the lead relevant details."""

    location: str = Field(
        description="The location where the user wants create or udate or delete the lead."
    )
    name: str = Field(description="Name of the customer.")
    phone: str = Field(description="Phone number of the customer.")
    email: str = Field(description="Email of the customer.")
    civilID: str = Field(description="Civil ID of the customer.")

    request: str = Field(
        description="Any additional information or requests from the user regarding the create or udate or delete the lead."
    )

    class Config:
        schema_extra = {
            "example": {
                "name": "Chandru",
                "phone": "+91 8124832683",
                "email": "chandruganeshan@gmail.com",
                "civilID": "986534567893",
            }
        }

def create_entry_node(assistant_name: str, new_dialog_state: str) -> Callable:
    def entry_node(state: State) -> dict:
        tool_call_id = state["messages"][-1].tool_calls[0]["id"]
        return {
            "messages": [
                ToolMessage(
                    content=f"The assistant is now the {assistant_name}. Reflect on the above conversation between the host assistant and the user."
                    f" The user's intent is unsatisfied. Use the provided tools to assist the user. Remember, you are {assistant_name},"
                    " and the booking, update, other other action is not complete until after you have successfully invoked the appropriate tool."
                    " If the user changes their mind or needs help for other tasks, call the CompleteOrEscalate function to let the primary host assistant take control."
                    " Do not mention who you are - just act as the proxy for the assistant.",
                    tool_call_id=tool_call_id,
                )
            ],
            "dialog_state": new_dialog_state,
        }

    return entry_node


model = ChatGroq(model="llama3-70b-8192",temperature=0)

primary_assistant_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a helpful customer support assistant for Automotive Industry."
            "Your primary role is search leads, customer details, test drive details and other customer required deatils."
            "If the customer wants to create or update or delete the lead, book test drive, create a quotation, "
            "delegate the task to appropriate specialized assistants by invoking corresponding tools. You are not able to make these type of changes yourself"
            "Only the specialized assistants are given permission to do this for the user."
            "The user is not aware of the different specialized assistants, so do not mention them; just quietly delegate through function calls. "
            "Provide detailed information to the customer, and always double-check the database before concluding that information is unavailable. "
            "When searching, be persistent. Expand your query bounds if the first search returns no results."
            "If a search comes up empty, expand your search before giving up."
            "\nCurrent time: {time}."
        ),
        ("placeholder", "{messages}"),
    ]
).partial(time=datetime.now(ist_timezone).isoformat())


primary_assistant_runnable = primary_assistant_prompt | model.bind_tools([Lead_assistant])
def pop_dialog_state(state: State) -> dict:
    """Pop the dialog stack and return to the main assistant.

    This lets the full graph explicitly track the dialog flow and delegate control
    to specific sub-graphs.
    """
    messages = []
    if state["messages"][-1].tool_calls:
        # Note: Doesn't currently handle the edge case where the llm performs parallel tool calls
        messages.append(
            ToolMessage(
                content="Resuming dialog with the host assistant. Please reflect on the past conversation and assist the user as needed.",
                tool_call_id=state["messages"][-1].tool_calls[0]["id"],
            )
        )
    return {
        "dialog_state": "pop",
        "messages": messages,
    }

# Compile graph
builder = StateGraph(State)

builder.add_node("enter_lead_assistant",create_entry_node("Lead Assistant", "lead_agent"))
builder.add_node("lead_agent", Assistant(lead_assistant_runnable))
builder.add_edge("enter_lead_assistant", "lead_agent")

builder.add_node(
    "lead_assistant_safe_tools",
    create_tool_node_with_fallback(safe_tool)
)

builder.add_node(
    "lead_assistant_sensitive_tools",
    create_tool_node_with_fallback(sensitive_tool)
)

builder.add_node("primary_assistant", Assistant(primary_assistant_runnable))
builder.add_edge(START, "primary_assistant")

builder.add_node("leave_skill", pop_dialog_state)
builder.add_edge("leave_skill", "primary_assistant")
builder.add_edge("primary_assistant", "enter_lead_assistant")

def route_lead_assistant(
    state: State,
) -> Literal[
    "lead_assistant_safe_tools",
    "lead_assistant_sensitive_tools",
    "leave_skill",
    "__end__",
]:
    route = tools_condition(state)
    if route == END:
        return END
    tool_calls = state["messages"][-1].tool_calls
    did_cancel = any(tc["name"] == CompleteOrEscalate.__name__ for tc in tool_calls)
    if did_cancel:
        return "leave_skill"
    safe_toolnames = [t.name for t in safe_tool]
    if all(tc["name"] in safe_toolnames for tc in tool_calls):
        return "lead_assistant_safe_tools"
    return "lead_assistant_sensitive_tools"

builder.add_edge("lead_assistant_safe_tools", "lead_agent")
builder.add_edge("lead_assistant_sensitive_tools", "lead_agent")
builder.add_conditional_edges("lead_agent",route_lead_assistant)

# Compile graph
memory = MemorySaver()
part_4_graph = builder.compile(checkpointer=memory,interrupt_before=["lead_assistant_sensitive_tools"],)

part_4_graph.get_graph(xray=True).draw_mermaid_png(output_file_path="part_4_graph.png")

config = {
    "configurable": {
        "thread_id": 1,
    }
}

_printed = set()
while True:
    print(State["messages"])
    question = input("Ask question: ")
    events = part_4_graph.stream(
        {"messages": ("user", question)}, config, stream_mode="values"
    )
    for event in events:
        _print_event(event, _printed)

In [3]:
def verify_lead(name: str = None, email: str = None, phone: str = None, civil_id: str = None):
    """
    Verify the existence of the Customer in our Database.
    
    Parameters:
    - name: Customer's name (mandatory).
    - email: Customer's email (optional).
    - phone: Customer's phone (optional).
    - civil_id: Customer's civil ID (optional).
    
    Returns:
    - Customer details if the lead exists.
    - Message indicating whether the customer exists or not.
    """
    
    # Base query for fully verified search
    query = """
    MATCH (l:Lead)
    WHERE toLower(l.name) = toLower($name)
    """
    
    # Conditions for additional fields
    conditions = []
    if phone:
        conditions.append("l.phone_number = $phone")
    if civil_id:
        conditions.append("l.civil_id = $civil_id")
    if email:
        conditions.append("l.email = $email")
    
    if conditions:
        query += " AND " + " AND ".join(conditions)

    query += """
    RETURN l.name AS lead_name, l.phone_number AS phone_number, l.civil_id AS civil_id, l.email AS email, l.id AS lead_id
    """
    
    # Execute the fully verified query
    verified_result = graph.query(query, {
        'name': name,
        'phone': phone,
        'civil_id': civil_id,
        'email': email
    })
    
    # Check if fully verified result exists
    print(verified_result)
    if verified_result:
        return f"A customer named '{name}' already exists in our system. Would you like to continue with this existing customer, or should I create a new customer?"
    else:        

        cypher_queries = []
        semi_verified_result = []

        # Prepare similarity query for the name
        name_query = """
                    MATCH (c:Lead)
                    WITH c, apoc.text.levenshteinSimilarity(toLower(c.name), toLower($name)) AS similarity_score
                    WHERE similarity_score > 0.8
                    RETURN c.name AS customer_name, c.phone_number AS phone_number, c.email AS email, c.civil_id AS civil_id
                    ORDER BY similarity_score DESC
                    """

        # Prepare similarity query for the name
        if name is not None and phone == None and civil_id == None and email == None:
            name_result = graph.query(name_query, {
                'name': name
            })

            if name_result:
                return f"The given Name is associated with the customer {name_result[0]['customer_name']}. Are you looking for this customer? or want to create a new lead?"
            else:
               return f"{name} is not available or no matching result found for the given Name."

        # Prepare phone query
        if phone:
            phone_query = """
            MATCH (c:Lead)
            WHERE c.phone_number = $phone 
            RETURN c.name AS customer_name, c.phone_number AS phone_number, c.email AS email, c.civil_id AS civil_id
            """
            cypher_queries.append(("phone number", phone_query))

        # Prepare civil ID query
        if civil_id:
            civil_id_query = """
            MATCH (c:Lead) 
            WHERE c.civil_id = $civil_id 
            RETURN c.name AS customer_name, c.phone_number AS phone_number, c.email AS email, c.civil_id AS civil_id
            """
            cypher_queries.append(("civil-id", civil_id_query))

        # Prepare email query
        if email:
            email_query = """
            MATCH (c:Lead) 
            WHERE c.email = $email 
            RETURN c.name AS customer_name, c.phone_number AS phone_number, c.email AS email, c.civil_id AS civil_id
            """
            cypher_queries.append(("e-mail", email_query))

        # Execute each semi-verified query
        for query_type, query in cypher_queries:
            db_result = graph.query(query, {
                'name': name,
                'phone': phone,
                'civil_id': civil_id,
                'email': email
            })
            # print(db_result)
            # Check if the query returned any results
            if db_result:
                semi_verified_result.append(f"I see that the {query_type} you provided is associated with {db_result[0]['customer_name']}. Could you please provide the {query_type} for {name}?"
)
            else:
                semi_verified_result.append(f"I was unable to locate the {query_type} in our system. Would you like me to proceed with creating a new lead?")

        for results in set(semi_verified_result):
            print(results)

In [1]:
import os
import tkinter as tk
from tkinter import filedialog, scrolledtext

class ProjectStructure:
    def __init__(self, startpath, ignore_dirs=None, ignore_files=None):
        self.startpath = startpath
        self.ignore_dirs = ignore_dirs if ignore_dirs is not None else ['.git', '__pycache__', 'node_modules']
        self.ignore_files = ignore_files if ignore_files is not None else ['*.pyc', '*.log', '*.tmp', '*.swp']

    def list_files(self):
        file_tree = []
        try:
            for root, dirs, files in os.walk(self.startpath):
                # Filter out ignored directories
                dirs[:] = [d for d in dirs if d not in self.ignore_dirs]
                # Filter out ignored files
                files = [f for f in files if not any(f.endswith(ext) for ext in self.ignore_files)]

                level = root.replace(self.startpath, '').count(os.sep)
                indent = ' '*  4 *level
                file_tree.append(f'{indent}{os.path.basename(root)}/')
                subindent = ' '*  4 * (level + 1)
                for f in files:
                    file_tree.append(f'{subindent}{f}')
        except Exception as e:
            file_tree.append(f"Error occurred: {e}")
        return "\n".join(file_tree)

class App:
    def __init__(self, root):
        self.root = root
        self.root.title("Project Folder Structure")
        self.root.attributes('-fullscreen', True)  # Open in full screen
        
        # Disable default close button
        self.root.protocol("WM_DELETE_WINDOW", self.disable_close)
        
        # Decorative background
        self.bg_color = "#f0f8ff"
        self.root.configure(bg=self.bg_color)
        
        # Apply a custom font and styling
        self.font = ('Arial', 12)
        self.create_widgets()

    def create_widgets(self):
        # Custom title bar
        self.title_bar = tk.Frame(self.root, bg="#4682b4", relief="raised", bd=2)
        self.title_bar.pack(fill=tk.X)

        # Minimize, Maximize, Close buttons
        self.minimize_button = tk.Button(self.title_bar, text="_", command=self.minimize_window, bg="#87cefa", fg="black", font=self.font)
        self.minimize_button.pack(side=tk.LEFT, padx=5)
        
        self.maximize_button = tk.Button(self.title_bar, text="☐", command=self.maximize_window, bg="#87cefa", fg="black", font=self.font)
        self.maximize_button.pack(side=tk.LEFT, padx=5)
        
        self.close_button = tk.Button(self.title_bar, text="X", command=self.close_window, bg="#ff6347", fg="black", font=self.font)
        self.close_button.pack(side=tk.RIGHT, padx=5)
        
        # Path entry
        self.path_label = tk.Label(self.root, text="Select Project Folder:", bg=self.bg_color, font=self.font, padx=10, pady=5)
        self.path_label.pack(pady=10)

        self.path_entry = tk.Entry(self.root, width=60, font=self.font)
        self.path_entry.pack(pady=5)

        self.browse_button = tk.Button(self.root, text="Browse", command=self.browse_folder, font=self.font, bg="#87cefa", fg="black")
        self.browse_button.pack(pady=5)

        self.show_button = tk.Button(self.root, text="Show Structure", command=self.show_structure, font=self.font, bg="#87cefa", fg="black")
        self.show_button.pack(pady=10)

        # Text area to show file structure
        self.text_area = scrolledtext.ScrolledText(self.root, wrap=tk.WORD, height=20, width=80, font=self.font, bg="#ffffff", fg="black")
        self.text_area.pack(pady=10, padx=10)

    def browse_folder(self):
        folder_path = filedialog.askdirectory()
        if folder_path:
            self.path_entry.delete(0, tk.END)
            self.path_entry.insert(0, folder_path)

    def show_structure(self):
        path = self.path_entry.get()
        if os.path.isdir(path):
            project_structure = ProjectStructure(path)
            structure = project_structure.list_files()
            self.text_area.delete(1.0, tk.END)
            self.text_area.insert(tk.END, structure)
        else:
            self.text_area.delete(1.0, tk.END)
            self.text_area.insert(tk.END, "Invalid directory path.")
    
    def disable_close(self):
        pass  # Do nothing on close attempt

    def minimize_window(self):
        self.root.iconify()

    def maximize_window(self):
        self.root.attributes('-fullscreen', not self.root.attributes('-fullscreen'))

    def close_window(self):
        self.root.destroy()

if __name__ == "__main__":
    root = tk.Tk()
    app = App(root)
    root.mainloop()

In [1]:
from typing import Optional

# Function to update the dialog stack
def update_dialog_stack(left: list[str], right: Optional[str]) -> list[str]:
    """Update the dialog stack based on the new action."""
    if right is None:
        return left
    if right == "pop":
        return left[:-1]
    return left + [right]

if __name__ == "__main__":
    stack = ["assistant", "book_hotel"]
    new_stack = update_dialog_stack(stack, "update_flight")
    print("After adding 'update_flight':", new_stack)
    new_stack = update_dialog_stack(new_stack, "pop")
    print("After popping the last action:", new_stack)
    new_stack = update_dialog_stack(new_stack, None)
    print("After passing None:", new_stack)

After adding 'update_flight': ['assistant', 'book_hotel', 'update_flight']
After popping the last action: ['assistant', 'book_hotel']
After passing None: ['assistant', 'book_hotel']


In [None]:
def customer_existence_verification(
    name: Annotated[Optional[str],"Customer name in lower case"],
    email: Annotated[Optional[str],"Customer email in lower case"], 
    phone: Annotated[Optional[str],"Customer phone number in 10 digits"],
    civil_id: Annotated[Optional[str],"Customer civil ID in 12 digits"],):
    
    
    # Base query for fully verified search
    """
    Tool to check the customer existence before proceeding to lead creation.

    Args:
        name (str): The name of the customer to search for.
        email (str): The email of the customer to search for.
        phone (str): The phone number of the customer to search for.
        civil_id (str): The civil ID of the customer to search for.

    Returns:
        A list of dictionaries containing the search results. Each dictionary
        contains the name, phone number, email, civil ID, and ID of the lead.
    """
    def validation():
        phone_validation = True
        civil_validation = True
        email_validation = True

        if phone:
            phone_validation = validate_phone_number(phone)
            if phone_validation is not True:
                return phone_validation
        
        if civil_id:
            civil_validation = validate_civil_id(civil_id)
            if civil_validation is not True:
                return civil_validation
        
        if email:
            email_validation = validate_email_address(email)
            if email_validation is not True:
                return email_validation

        # If all validations pass, return True
        return all([phone_validation, civil_validation, email_validation])

    validated = validation()

    if validated == True:

        name_query = """
                    MATCH (c:Lead)
                    WITH c, apoc.text.levenshteinSimilarity(toLower(c.name), toLower($name)) AS similarity_score
                    WHERE similarity_score > 0.5
                    RETURN c.name AS customer_name, c.phone_number AS phone_number, c.email AS email, c.civil_id AS civil_id
                    ORDER BY similarity_score DESC
                    """

        name_contains_query = """
                    MATCH (c:Lead)
                    WHERE toLower(c.name) CONTAINS toLower($name)
                    RETURN c.name AS customer_name, 
                        c.phone_number AS phone_number, 
                        c.email AS email, 
                        c.civil_id AS civil_id
                    """
        both_query = """MATCH (c:Lead)
                        WHERE toLower(c.name) CONTAINS toLower($name)
                        WITH c
                        WITH c, apoc.text.levenshteinSimilarity(toLower(c.name), toLower($name)) AS similarity_score
                        WHERE similarity_score > 0.6
                        RETURN c.name AS customer_name, 
                            c.phone_number AS phone_number, 
                            c.email AS email, 
                            c.civil_id AS civil_id
                        ORDER BY similarity_score DESC
                        """    

        if name and all(x is None for x in (phone, civil_id, email)):

            name_result = graph.query(name_query, {'name': name})
            name_contains_result = graph.query(name_contains_query, {'name': name})
            both_result = graph.query(both_query, {'name': name})

            if name_result:
                return ToolMessage(f"The provided Name is associated with the following customer(s): {[entry['customer_name'] for entry in name_result]}. Would you like to proceed with one of these customers, or would you prefer to create a new lead?")        
            else:
                return ToolMessage(f"No matching results were found for the name '{name}'. Please review or confirm the provided details. Would you like to create a new lead instead?")

        else:
            query = """
            MATCH (l:Lead)
            WHERE toLower(l.name) = toLower($name)
            """
            conditions = []
            if phone:
                conditions.append("l.phone_number = $phone")
            if civil_id:
                conditions.append("l.civil_id = $civil_id")
            if email:
                conditions.append("l.email = $email")
            
            if conditions:
                query += " AND " + " AND ".join(conditions)

            query += """
            RETURN l.name AS lead_name, l.phone_number AS phone_number, l.civil_id AS civil_id, l.email AS email, l.id AS lead_id
            """        
            verified_result = graph.query(query, {
                'name': name,
                'phone': phone,
                'civil_id': civil_id,
                'email': email
            })

            if verified_result:
                return ToolMessage(f"A customer named '{name}' already exists in our system with matching details (Phone: {verified_result[0]['phone_number']}, Email: {verified_result[0]['email']}, Civil ID: {verified_result[0]['civil_id']}). Would you like to proceed with this customer, or create a new lead?")
            
            else:        
                cypher_queries = []
                semi_verified_result = []

                if phone:
                    phone_query = """
                    MATCH (c:Lead)
                    WHERE c.phone_number = $phone 
                    RETURN c.name AS customer_name, c.phone_number AS phone_number, c.email AS email, c.civil_id AS civil_id
                    """
                    cypher_queries.append(("phone number", phone_query))

                if civil_id:
                    civil_id_query = """
                    MATCH (c:Lead) 
                    WHERE c.civil_id = $civil_id 
                    RETURN c.name AS customer_name, c.phone_number AS phone_number, c.email AS email, c.civil_id AS civil_id
                    """
                    cypher_queries.append(("civil ID", civil_id_query))

                if email:
                    email_query = """
                    MATCH (c:Lead) 
                    WHERE c.email = $email 
                    RETURN c.name AS customer_name, c.phone_number AS phone_number, c.email AS email, c.civil_id AS civil_id
                    """
                    cypher_queries.append(("email", email_query))

                for query_type, query in cypher_queries:
                    db_result = graph.query(query, {
                        'name': name,
                        'phone': phone,
                        'civil_id': civil_id,
                        'email': email
                    })

                    if db_result:
                        return ToolMessage(f"I have identified that the {query_type} you provided is associated with {db_result[0]['customer_name']}. Could you please confirm or provide the correct {query_type} for {name}?")

                    else:
                        return ToolMessage(f"I was unable to locate the {query_type} ({phone if query_type == 'phone number' else civil_id if query_type == 'civil ID' else email}) in our system. Would you like to proceed with creating a new lead?")
    else:return validated

In [None]:
def create_lead(name: str, phone: str, civil_id: str, email: str, model: str, variant: str) -> str:
    """
    Creates a lead and links it to a customer in the Neo4j database. 
    This function performs the following operations:

    1. Validates the email, phone, and civil ID.
    2. Ensures all parameters are provided. If any are missing, it prompts the lead agent to ask for the missing details.
    3. Merges the customer node based on the phone number. If the customer is new, it creates the customer with additional details.
    4. Creates a lead and links it to the customer.
    5. Associates the lead with a specific car model based on the customer's preference.

    Args:
        name (str): The full name of the customer.
        phone (str): The customer's phone number.
        civil_id (str): The civil ID of the customer.
        email (str): The customer's email address.
        model (str): The car model the customer is interested in.
        variant (str): The variant of the car model.

    Returns:
        str: A message confirming the lead's creation or an error message for missing/invalid details.
    
    Raises:
        ValueError: If any mandatory details are missing or the email, phone, or civil ID are invalid.
    """

    # Check for missing parameters and prompt lead agent accordingly
    missing_params = []
    if not name:
        missing_params.append("Name")
    if not phone:
        missing_params.append("Phone number")
    if not civil_id:
        missing_params.append("Civil ID")
    if not email:
        missing_params.append("Email address")
    if not model:
        missing_params.append("Model")
    if not variant:
        missing_params.append("Variant")

    if missing_params:
        # Return a tool message asking the lead agent to prompt the user for missing details
        return (
            f"Error: Missing required details: {', '.join(missing_params)}. "
            "Please ask the user to provide the missing information."
        )

    # Validate email, phone, and civil ID
    if not validate_email_address(email):
        return "Error: Invalid email address. Please provide a valid email."
    if not validate_phone_number(phone):
        return "Error: Invalid phone number. Please provide a valid phone number."
    if not validate_civil_id(civil_id):
        return "Error: Invalid civil ID. Please provide a valid civil ID."

    # Proceed with creating the lead
    query = """
        MERGE (l:Customer {phone_number: $mobile})
        ON CREATE SET l.createdAt = $createdAt,
                      l.id = apoc.create.uuid()
        SET l.name = $name,
            l.email = $email,
            l.model = $model,
            l.variant = $variant,
            l.civil_id = $civil_id
        
        WITH l
        MERGE (c:Lead {id: l.id})
        ON CREATE SET c.createdAt = $createdAt
        SET c.name = $name, 
            c.phone_number = $mobile, 
            c.email = $email, 
            c.model = $model,
            c.variant = $variant,
            c.level = "High"
        MERGE (l)-[:CUSTOMER_OF_LEAD]->(c)
        
        WITH c
        MATCH (m:Model {name: $model})
        MERGE (c)-[:PREFERENCE]->(m)
    """
    
    params = {
        "name": name.capitalize(),
        "mobile": phone,
        "email": email,
        "createdAt": datetime.datetime.now().isoformat(),
        "variant": variant,
        "model": model,
        "civil_id": civil_id
    }
    
    try:
        # Execute the query on the Neo4j database
        test_graph.query(query, params)
        
        # Return success message with lead details
        return (
            f"Lead successfully created for {name.capitalize()}.\n"
            f"Customer Info:\n"
            f"Name: {name.capitalize()}\n"
            f"Phone: {phone}\n"
            f"Email: {email}\n"
            f"Model: {model} (Variant: {variant})\n"
            f"Civil ID: {civil_id}\n"
            f"Lead Level: High\n"
            f"Created At: {params['createdAt']}"
        )

    except Exception as e:
        return f"Error: Failed to create lead due to: {str(e)}."


In [None]:
@tool
def customer_existence_verification(name: str = None, email: str = None, phone: str = None, civil_id: str = None):
    """
    Verify the existence of the Customer in the database before lead creation.

    Parameters:
    - name: Customer's name (optional).
    - email: Customer's email (optional).
    - phone: Customer's phone (optional).
    - civil_id: Customer's civil ID (optional).

    Returns:
    - Clear and actionable messages for the lead agent to understand the verification results.
    """

    def get_value(attr):
        """Helper function to clean up attribute values."""
        return None if attr is None or attr.strip() == "" else attr

    # Clean inputs
    name = get_value(name)
    email = get_value(email)
    phone = get_value(phone)
    civil_id = get_value(civil_id)

    def validation():
        """Validate the phone, email, and civil ID if provided."""
        if phone:
            phone_validation = validate_phone_number(phone)
            if phone_validation is not True:
                return phone_validation

        if civil_id:
            civil_validation = validate_civil_id(civil_id)
            if civil_validation is not True:
                return civil_validation

        if email:
            email_validation = validate_email_address(email)
            if email_validation is not True:
                return email_validation

        return True

    validated = validation()

    if validated == True:
        # Fetch all customer names from the database
        query = graph.query("""MATCH (c:Lead) RETURN c.name AS customer_name""")
        names_list = [name['customer_name'] for name in query]

        def split_names(names):
            """Split full names into first and last for better matching."""
            return [name.split(' ', 1) if ' ' in name else [name] for name in names]

        names_list = split_names(names_list)

        def find_similar_names(input_name, names_list, threshold=0.86):
            """Find similar names in the database based on Jaro-Winkler similarity."""
            similar_names = []
            for name in names_list:
                if isinstance(name, list):
                    full_name = ' '.join(name)
                    similarity = distance.get_jaro_distance(input_name, full_name)
                    if similarity >= threshold:
                        similar_names.append((full_name, similarity))
                else:
                    similarity = distance.get_jaro_distance(input_name, name)
                    if similarity >= threshold:
                        similar_names.append((name, similarity))
            return similar_names

        if name and all(param is None for param in (phone, civil_id, email)):
            # Verify only by name
            name_matches = find_similar_names(name, names_list)
            if name_matches:
                matched_names = ', '.join([i[0] for i in name_matches])
                return (
                    f"The provided name is associated with the following customer(s): {matched_names}. "
                    "Would you like to proceed with one of these customers, or create a new lead?"
                )
            else:
                return (
                    f"No matching results found for the name '{name}'. Please review or confirm the provided details. "
                    "Would you like to create a new lead instead?"
                )

        else:
            # Create query with dynamic conditions based on provided data
            query = """
            MATCH (l:Lead)
            WHERE toLower(l.name) = toLower($name)
            """
            conditions = []
            if phone:
                conditions.append("l.phone_number = $phone")
            if civil_id:
                conditions.append("l.civil_id = $civil_id")
            if email:
                conditions.append("l.email = $email")

            if conditions:
                query += " AND " + " AND ".join(conditions)

            query += """
            RETURN l.name AS lead_name, l.phone_number AS phone_number, l.civil_id AS civil_id, l.email AS email, l.id AS lead_id
            """

            # Run the query and check if a customer exists with the provided details
            verified_result = graph.query(query, {
                'name': name,
                'phone': phone,
                'civil_id': civil_id,
                'email': email
            })

            if verified_result:
                customer_data = verified_result[0]
                return (
                    f"A customer named '{customer_data['lead_name']}' already exists in our system "
                    f"with matching details (Phone: {customer_data['phone_number']}, Email: {customer_data['email']}, Civil ID: {customer_data['civil_id']}). "
                    "Would you like to proceed with this customer, or create a new lead?"
                )
            else:
                # Handle semi-verified results by running individual queries for each identifier
                cypher_queries = []
                if phone:
                    phone_query = """
                    MATCH (c:Lead)
                    WHERE c.phone_number = $phone 
                    RETURN c.name AS customer_name, c.phone_number AS phone_number, c.email AS email, c.civil_id AS civil_id
                    """
                    cypher_queries.append(("phone number", phone_query))

                if civil_id:
                    civil_id_query = """
                    MATCH (c:Lead) 
                    WHERE c.civil_id = $civil_id 
                    RETURN c.name AS customer_name, c.phone_number AS phone_number, c.email AS email, c.civil_id AS civil_id
                    """
                    cypher_queries.append(("civil ID", civil_id_query))

                if email:
                    email_query = """
                    MATCH (c:Lead) 
                    WHERE c.email = $email 
                    RETURN c.name AS customer_name, c.phone_number AS phone_number, c.email AS email, c.civil_id AS civil_id
                    """
                    cypher_queries.append(("email", email_query))

                # Run each individual query for partial verification
                for query_type, query in cypher_queries:
                    db_result = graph.query(query, {
                        'name': name,
                        'phone': phone,
                        'civil_id': civil_id,
                        'email': email
                    })

                    if db_result:
                        customer_data = db_result[0]
                        return (
                            f"I have identified that the {query_type} you provided is associated with "
                            f"{customer_data['customer_name']}. Could you please confirm or provide the correct {query_type} "
                            f"for {name}?"
                        )

                return (
                    "No matching records found for the provided details. "
                    "Would you like to proceed with creating a new lead?"
                )
    else:
        # Return the validation error message
        return f"Validation Error: {validated}"


In [22]:
def create_lead(name: str, phone: str, civil_id: str, email: str, model: str, variant: str) -> str:
    """
    Creates a lead and links it to a customer in the Neo4j database.

    Parameters:
    - name: Customer's name (mandatory).
    - phone: Customer's phone number (mandatory).
    - civil_id: Customer's civil ID (mandatory).
    - email: Customer's email (mandatory).
    - model: The car model the customer is interested in (mandatory).
    - variant: The car variant (mandatory).

    Returns:
    - A message confirming lead creation or an error message.
    """
    
    # Helper function to handle null/empty strings
    def get_value(attr):
        return None if attr is None or attr.strip() == "" else attr

    # Check for missing parameters and prompt lead agent accordingly
    missing_params = []
    if not get_value(name):
        missing_params.append("Name")
    if not get_value(phone):
        missing_params.append("Phone number")
    if not get_value(civil_id):
        missing_params.append("Civil ID")
    if not get_value(email):
        missing_params.append("Email address")
    if not get_value(model):
        missing_params.append("Model")
    if not get_value(variant):
        missing_params.append("Variant")

    if missing_params:
        # Return a tool message asking the lead agent to prompt the user for missing details
        return (
            f"Error: Missing required details: {', '.join(missing_params)}. "
            "Please ask the user to provide the missing information."
        )

    # Validate email, phone, and civil ID
    if not validate_email_address(email):
        return "Error: Invalid email address. Please provide a valid email."
    if not validate_phone_number(phone):
        return "Error: Invalid phone number. Please provide a valid phone number."
    if not validate_civil_id(civil_id):
        return "Error: Invalid civil ID. Please provide a valid civil ID."

    # Proceed with creating the lead
    query = """
        MERGE (l:Customer {phone_number: $mobile})
        ON CREATE SET l.createdAt = $createdAt,
                      l.id = apoc.create.uuid()
        SET l.name = $name,
            l.email = $email,
            l.model = $model,
            l.variant = $variant,
            l.civil_id = $civil_id
        
        WITH l
        MERGE (c:Lead {id: l.id})
        ON CREATE SET c.createdAt = $createdAt
        SET c.name = $name, 
            c.phone_number = $mobile, 
            c.email = $email, 
            c.model = $model,
            c.variant = $variant,
            c.level = "High"
        MERGE (l)-[:CUSTOMER_OF_LEAD]->(c)
        
        WITH c
        MATCH (m:Model {name: $model})
        MERGE (c)-[:PREFERENCE]->(m)
    """
    
    params = {
        "name": name.capitalize(),
        "mobile": phone,
        "email": email,
        "createdAt": datetime.datetime.now().isoformat(),
        "variant": variant,
        "model": model,
        "civil_id": civil_id
    }

    try:
        # Execute the query on the Neo4j database
        graph.query(query, params)

        # Return success message with lead details
        return (
            f"Lead successfully created for {name.capitalize()}.\n"
            f"Customer Info:\n"
            f"Name: {name.capitalize()}\n"
            f"Phone: {phone}\n"
            f"Email: {email}\n"
            f"Model: {model} (Variant: {variant})\n"
            f"Civil ID: {civil_id}\n"
            f"Lead Level: High\n"
            f"Created At: {params['createdAt']}"
        )

    except Exception as e:
        # Catch any exceptions during the database operation and return an error message
        return f"Error: Failed to create lead due to: {str(e)}."


In [None]:
#TEST DATABASE
from langchain_community.graphs import Neo4jGraph
def graph_db():
    graph = Neo4jGraph(
        url="neo4j+s://d652d339.databases.neo4j.io", username="neo4j", password="S_rv6XHVT3WdLHrXfgKxEPEu7an6vsc71HFS6ZzpJfc")
    return graph

schema = graph_db().schema
test_graph = graph_db()
# print(schema)

In [None]:
# def customer_existence_verification(name: str = None, email: str = None, phone: str = None, civil_id: str = None):
#     """
#     Verify the existence of the Customer in our Database before lead creation.
    
#     Parameters:
#     - name: Customer's name (optional).
#     - email: Customer's email (optional).
#     - phone: Customer's phone (optional).
#     - civil_id: Customer's civil ID (optional).
    
#     Returns:
#     - Clear and actionable messages for the lead agent to understand the verification results.
# """
#     def get_value(attributes):
#         return None if attributes is None or attributes.strip() == "" else attributes

#     name = get_value(name)
#     email = get_value(email)
#     phone = get_value(phone)
#     civil_id = get_value(civil_id)
#     def validation():
#         phone_validation = True
#         civil_validation = True
#         email_validation = True

#         if phone:
#             phone_validation = validate_phone_number(phone)
#             if phone_validation is not True:
#                 return phone_validation
        
#         if civil_id:
#             civil_validation = validate_civil_id(civil_id)
#             if civil_validation is not True:
#                 return civil_validation
        
#         if email:
#             email_validation = validate_email_address(email)
#             if email_validation is not True:
#                 return email_validation

#         # If all validations pass, return True
#         return all([phone_validation, civil_validation, email_validation])

#     validated = validation()

#     if validated == True:

#         query = graph.query("""MATCH (c:Lead) RETURN c.name AS customer_name""")
#         names_list = [name for i in query for name in [i['customer_name']]]

#         def split_names(names):
#             return [name.split(' ', 1) if ' ' in name else name for name in names]

#         names_list = split_names(names_list)
#         def similar_names(user_name, names_list, threshold=0.86):
#             similar_names = []
#             for name in names_list:
#                 if isinstance(name, str):
#                     similarity = distance.get_jaro_distance(user_name, name)
#                     if similarity >= threshold:
#                         similar_names.append((name, similarity))
#                 else:
#                     for i in name:
#                         similarity = distance.get_jaro_distance(user_name, i)
#                         if similarity >= threshold:
#                             similar_names.append((' '.join(name), similarity))
#             return similar_names

#         if name and all(x is None for x in (phone, civil_id, email)):

#             name_result = similar_names(name, names_list, threshold=0.86)
#             print(name_result)
#             if name_result:
#                 return f"The provided Name is associated with the following customer(s): {[i[0] for i in name_result]}. Would you like to proceed with one of these customers, or would you prefer to create a new lead?"        
#             else:
#                 return f"No matching results were found for the name '{name}'. Please review or confirm the provided details. Would you like to create a new lead instead?"

#         else:
#             query = """
#             MATCH (l:Lead)
#             WHERE toLower(l.name) = toLower($name)
#             """
#             conditions = []
#             if phone:
#                 conditions.append("l.phone_number = $phone")
#             if civil_id:
#                 conditions.append("l.civil_id = $civil_id")
#             if email:
#                 conditions.append("l.email = $email")
            
#             if conditions:
#                 query += " AND " + " AND ".join(conditions)

#             query += """
#             RETURN l.name AS lead_name, l.phone_number AS phone_number, l.civil_id AS civil_id, l.email AS email, l.id AS lead_id
#             """        
#             verified_result = graph.query(query, {
#                 'name': name,
#                 'phone': phone,
#                 'civil_id': civil_id,
#                 'email': email
#             })

#             if verified_result:
#                 return f"A customer named '{name}' already exists in our system with matching details (Phone: {verified_result[0]['phone_number']}, Email: {verified_result[0]['email']}, Civil ID: {verified_result[0]['civil_id']}). Would you like to proceed with this customer, or create a new lead?"
            
#             else:        
#                 cypher_queries = []
#                 semi_verified_result = []

#                 if phone:
#                     phone_query = """
#                     MATCH (c:Lead)
#                     WHERE c.phone_number = $phone 
#                     RETURN c.name AS customer_name, c.phone_number AS phone_number, c.email AS email, c.civil_id AS civil_id
#                     """
#                     cypher_queries.append(("phone number", phone_query))

#                 if civil_id:
#                     civil_id_query = """
#                     MATCH (c:Lead) 
#                     WHERE c.civil_id = $civil_id 
#                     RETURN c.name AS customer_name, c.phone_number AS phone_number, c.email AS email, c.civil_id AS civil_id
#                     """
#                     cypher_queries.append(("civil ID", civil_id_query))

#                 if email:
#                     email_query = """
#                     MATCH (c:Lead) 
#                     WHERE c.email = $email 
#                     RETURN c.name AS customer_name, c.phone_number AS phone_number, c.email AS email, c.civil_id AS civil_id
#                     """
#                     cypher_queries.append(("email", email_query))

#                 for query_type, query in cypher_queries:
#                     db_result = graph.query(query, {
#                         'name': name,
#                         'phone': phone,
#                         'civil_id': civil_id,
#                         'email': email
#                     })

#                     if db_result:
#                         return f"I have identified that the {query_type} you provided is associated with {db_result[0]['customer_name']}. Could you please confirm or provide the correct {query_type} for {name}?"

#                     else:
#                         return f"I was unable to locate the {query_type} ({phone if query_type == 'phone number' else civil_id if query_type == 'civil ID' else email}) in our system. Would you like to proceed with creating a new lead?"
#     else:return validated

In [8]:
def customer_existence_verification(name: str = None, email: str = None, phone: str = None, civil_id: str = None):
    """
    Verify the existence of the Customer in the database before lead creation.

    Parameters:
    - name: Customer's name (optional).
    - email: Customer's email (optional).
    - phone: Customer's phone (optional).
    - civil_id: Customer's civil ID (optional).

    Returns:
    - Clear and actionable messages for the lead agent to understand the verification results.
    """

    def get_value(attr):
        """Helper function to clean up attribute values."""
        return None if attr is None or attr.strip() == "" else attr

    # Clean inputs
    name = get_value(name)
    email = get_value(email)
    phone = get_value(phone)
    civil_id = get_value(civil_id)

    if not name and not email and not phone and not civil_id:
        # Ensure at least one parameter is provided
        return "Error: At least one of name, email, phone, or civil ID is required to verify customer existence."

    def validation():
        """Validate the phone, email, and civil ID if provided."""
        if phone:
            phone_validation = validate_phone_number(phone)
            if phone_validation is not True:
                return phone_validation

        if civil_id:
            civil_validation = validate_civil_id(civil_id)
            if civil_validation is not True:
                return civil_validation

        if email:
            email_validation = validate_email_address(email)
            if email_validation is not True:
                return email_validation

        return True

    validated = validation()

    if validated == True:
        query = graph.query("""MATCH (c:Lead) RETURN c.name AS customer_name""")
        names_list = [name['customer_name'] for name in query]

        def split_names(names):
            """Split full names into first and last for better matching."""
            return [name.split(' ', 1) if ' ' in name else [name] for name in names]

        names_list = split_names(names_list)

        def find_similar_names(input_name, names_list, threshold=0.86):
            """Find similar names in the database based on Jaro-Winkler similarity."""
            similar_names = []
            for name in names_list:
                full_name = ' '.join(name) if isinstance(name, list) else name
                similarity = distance.get_jaro_distance(input_name, full_name)
                if similarity >= threshold:
                    similar_names.append((full_name, similarity))
            return similar_names

        if name and all(param is None for param in (phone, civil_id, email)):
            # Verify only by name
            name_matches = find_similar_names(name, names_list)
            if name_matches:
                matched_names = ', '.join([i[0] for i in name_matches])
                return (
                    f"The provided name is associated with the following customer(s): {matched_names}. "
                    "Would you like to proceed with one of these customers, or create a new lead?"
                )
            else:
                return (
                    f"No matching results found for the name '{name}'. Please review or confirm the provided details. "
                    "Would you like to create a new lead instead?"
                )

        else:
            # Create query with dynamic conditions based on provided data
            query = """
            MATCH (l:Lead)
            WHERE toLower(l.name) = toLower($name)
            """
            conditions = []
            if phone:
                conditions.append("l.phone_number = $phone")
            if civil_id:
                conditions.append("l.civil_id = $civil_id")
            if email:
                conditions.append("l.email = $email")

            if conditions:
                query += " AND " + " AND ".join(conditions)

            query += """
            RETURN l.name AS lead_name, l.phone_number AS phone_number, l.civil_id AS civil_id, l.email AS email, l.id AS lead_id
            """

            # Run the query and check if a customer exists with the provided details
            verified_result = graph.query(query, {
                'name': name,
                'phone': phone,
                'civil_id': civil_id,
                'email': email
            })

            if verified_result:
                customer_data = verified_result[0]
                return (
                    f"A customer named '{customer_data['lead_name']}' already exists in our system "
                    f"with matching details (Phone: {customer_data['phone_number']}, Email: {customer_data['email']}, Civil ID: {customer_data['civil_id']}). "
                    "Would you like to proceed with this customer, or create a new lead?"
                )
            else:
                # Handle semi-verified results by running individual queries for each identifier
                cypher_queries = []
                if phone:
                    phone_query = """
                    MATCH (c:Lead)
                    WHERE c.phone_number = $phone 
                    RETURN c.name AS customer_name, c.phone_number AS phone_number, c.email AS email, c.civil_id AS civil_id
                    """
                    cypher_queries.append(("phone number", phone_query))

                if civil_id:
                    civil_id_query = """
                    MATCH (c:Lead) 
                    WHERE c.civil_id = $civil_id 
                    RETURN c.name AS customer_name, c.phone_number AS phone_number, c.email AS email, c.civil_id AS civil_id
                    """
                    cypher_queries.append(("civil ID", civil_id_query))

                if email:
                    email_query = """
                    MATCH (c:Lead) 
                    WHERE c.email = $email 
                    RETURN c.name AS customer_name, c.phone_number AS phone_number, c.email AS email, c.civil_id AS civil_id
                    """
                    cypher_queries.append(("email", email_query))

                # Run each individual query for partial verification
                for query_type, query in cypher_queries:
                    db_result = graph.query(query, {
                        'name': name,
                        'phone': phone,
                        'civil_id': civil_id,
                        'email': email
                    })

                    if db_result:
                        customer_data = db_result[0]
                        return (
                            f"I have identified that the {query_type} you provided is associated with "
                            f"{customer_data['customer_name']}. Could you please confirm or provide the correct {query_type} "
                            f"for {name}?"
                        )

                return (
                    "No matching records found for the provided details. "
                    "Would you like to proceed with creating a new lead?"
                )
    else:
        # Return the validation error message
        return f"Validation Error: {validated}"


In [21]:
customer_existence_verification(name="Lissa")

'The provided name is associated with the following customer(s): Lissa Mahimadoss, Lissa. Would you like to proceed with one of these customers, or create a new lead?'

In [14]:
from typing_extensions import Annotated, Optional
from support_files.validation_functions import validate_email_address,validate_civil_id, validate_phone_number
from datetime import datetime 


In [15]:
def customer_lead_creation(name    : Annotated[str,"Customer name in lower case"],
                           phone   : Annotated[str,"Customer phone number in 10 digits"],
                           civil_id: Annotated[str,"Customer civil ID in 12 digits"],
                           email   : Annotated[str,"Customer email address"],
                           model   : Annotated[str,"Car model"],
                           variant : Annotated[str,"Car variant"]) -> str:
    """
    Creates a lead and links it to a customer in the Neo4j database.

    Parameters:
    - name: Customer's name (mandatory).
    - phone: Customer's phone number (mandatory).
    - civil_id: Customer's civil ID (mandatory).
    - email: Customer's email (mandatory).
    - model: The car model the customer is interested in (mandatory).
    - variant: The car variant (mandatory).

    Returns:
    - A message confirming lead creation or an error message.
    """
    
    # Helper function to handle null/empty strings
    def get_value(attr):
        return None if attr is None or attr.strip() == "" else attr

    # Check for missing parameters and prompt lead agent accordingly
    missing_params = []
    if not get_value(name):
        missing_params.append("Name")
    if not get_value(phone):
        missing_params.append("Phone number")
    if not get_value(civil_id):
        missing_params.append("Civil ID")
    if not get_value(email):
        missing_params.append("Email address")
    if not get_value(model):
        missing_params.append("Model")
    if not get_value(variant):
        missing_params.append("Variant")

    if missing_params:
        # Return a tool message asking the lead agent to prompt the user for missing details
        return (
            f"Error: Missing required details: {', '.join(missing_params)}. "
            "Please ask the user to provide the missing information."
        )

    # Validate email, phone, and civil ID
    if not validate_email_address(email):
        return "Error: Invalid email address. Please provide a valid email."
    if not validate_phone_number(phone):
        return "Error: Invalid phone number. Please provide a valid phone number."
    if not validate_civil_id(civil_id):
        return "Error: Invalid civil ID. Please provide a valid civil ID."

    # Proceed with creating the lead
    query = """
        MERGE (l:Customer {phone_number: $mobile})
        ON CREATE SET l.createdAt = $createdAt,
                      l.id = apoc.create.uuid()
        SET l.name = $name,
            l.email = $email,
            l.model = $model,
            l.variant = $variant,
            l.civil_id = $civil_id
        
        WITH l
        MERGE (c:Lead {id: l.id})
        ON CREATE SET c.createdAt = $createdAt
        SET c.name = $name, 
            c.phone_number = $mobile, 
            c.email = $email, 
            c.model = $model,
            c.variant = $variant,
            c.level = "High"
        MERGE (l)-[:CUSTOMER_OF_LEAD]->(c)
        
        WITH c
        MATCH (m:Model {name: $model})
        MERGE (c)-[:PREFERENCE]->(m)
    """
    
    params = {
        "name": name.capitalize(),
        "mobile": phone,
        "email": email,
        "createdAt": datetime.now().isoformat(),
        "variant": variant,
        "model": model,
        "civil_id": civil_id
    }

    try:
        # Execute the query on the Neo4j database
        test_graph.query(query, params)

        # Return success message with lead details
        return (
            f"Lead successfully created for {name.capitalize()}.\n"
            f"Customer Info:\n"
            f"Name: {name.capitalize()}\n"
            f"Phone: {phone}\n"
            f"Email: {email}\n"
            f"Model: {model} (Variant: {variant})\n"
            f"Civil ID: {civil_id}\n"
            f"Lead Level: High\n"
            f"Created At: {params['createdAt']}"
        )

    except Exception as e:
        # Catch any exceptions during the database operation and return an error message
        return f"Error: Failed to create lead due to: {str(e)}."


In [16]:
customer_lead_creation(name="John Doe", phone="+1234567890", civil_id="123456789012", email="jdoe@x.com", model="TIGGO8PRO", variant= "TIGGO8 PRO 2.0L I4 LUXURY")



'Lead successfully created for John doe.\nCustomer Info:\nName: John doe\nPhone: +1234567890\nEmail: jdoe@x.com\nModel: TIGGO8PRO (Variant: TIGGO8 PRO 2.0L I4 LUXURY)\nCivil ID: 123456789012\nLead Level: High\nCreated At: 2024-09-17T11:08:58.865788'