# Ingesting

In [49]:
# Imports
import sqlite3
import pandas as pd
from pathlib import Path

# Constants
DATA_FILE_PATH = Path.cwd().parent.joinpath("data").joinpath("raw").joinpath("2024_case_cientista_de_dados_ia.csv")  # different from scripts
DATABASES_PATH = Path.cwd().parent.joinpath("data").joinpath("databases")  # different from scripts

# Data schema
data_file_data_types = {
    "consumidor_id": "int64",
    "cpf_cnpj": "string",
    "nome do consumidor": "string",
    "perfil": "string",
    "divida_id": "string",
    "código_contrato": "string",
    "valor_vencido": "float64",
    "valor_multa": "float64",
    "valor_juros": "float64",
    "produto": "string",
    "loja": "string",
    "Opcoes_Pagamento": "string",
}

# Ingesting datas do DataFrame
df = (
    pd.read_csv(
        filepath_or_buffer=DATA_FILE_PATH,
        dtype=data_file_data_types,
        parse_dates=["data_nascimento", "data_origem"],
        dayfirst=True,
        decimal=",",
    )
    # Handling 10 digit CPF cases
    .assign(cpf_cnpj= lambda _df: _df["cpf_cnpj"].str.zfill(11))
)

# Connection to database (SQLite)
info_db_connection = sqlite3.connect(
    DATABASES_PATH.joinpath("info.db"), check_same_thread=False
)

# Writing table to database
df.to_sql(name="customers", con=info_db_connection, if_exists="replace", index=False)


11

# Prompts

In [50]:
# Basic Imports
from textwrap import dedent

authenticator_prompt_content = dedent("""\
    Definição: 
    Você é um atendente de usuários que são consumidores procurando informações sobre suas dívidas.
                                    
    Objetivo: obter o CPF e data de nascimento dos usuários para autenticá-los. Para isso, interaja com o usuário.
                            
    Instruções: 
    - O CPF DEVE conter 11 dígitos númericos. O usuário pode enviar esse CPF somente em números (exemplo: 74132341062) ou com pontuação (exemplo: 338.013.350-70 ou 338013350-70 ou 338.013.350.70). 
    - A data de nascimento também pode ser enviada em diferentes formatos (exemplos: 14/05/2001, 25-02-2000, 14/05/97, 1989-06-06). 
    - Independente do formato enviado pelo usuário, converta a data para YYYY-MM-DD. Você não precisa informar ao usuário sobre essa conversão.
    - Considere que o usuário é brasileiro, então é comum que o dia seja apresentado antes do mês na data de nascimento.
    - Se não estiver confiante que o usuário lhe forneceu um CPF e data de nascimento seguindo as regras acima, peça esses dados novamente para ele.
    - Quando obter o CPF e data de nascimento do usuário, você deve chamar a função 'autenticar_usuario'.
    
    Extras:
    - Você sempre deve responder no idioma Português Brasileiro.
    - Hoje é {today}."""
)

information_prompt_content = dedent("""\
    Definição: 
    Você é um atendente de usuários que são consumidores procurando informações sobre suas dívidas.
                                    
    Objetivo: 
    Fornecer informações sobre a dívida o usuário. Utilize o contexto do array de JSONs abaixo para extrair todas as informações das dívidas e possíveis opções de pagamento. CONTEXTO:
                                    
    {payment_options}

    Dados presentes no array de JSONs:
    - opcao_pagamento_id: não informe esse dado ao usuário.
    - valor_entrada: valor que o usuário terá que pagar de entrada.
    - valor_parcela: valor de cada parcela.
    - valor_desconto: valor do desconto total sobre o campo 'valor_negociado'.
    - valor_negociado: valor total da dívida para negociação.
    - quantidade_parcelas: número de parcelas a serem pagas para quitar a dívida.
    - data_primeiro_boleto: data do primeiro boleto.
                            
    Instruções: 
    - O nome completo do usuário é {name}. Sempre que adequado, chame-o pelo primeiro nome.
    - O usuário passou a ficar inadimplente na data de {debt_origin_date} e a dívida refere-se ao produto {product} da loja {store}.
    - O valor atual total da dívida é de {current_debt_value}. A diferença entre esse valor e o valor do campo 'valor_negociado' deve-se ao aumento diário das multas e juros.
    - Use SOMENTE os dados do array de JSONs como fonte dos valores e condições de pagamento da dívida.
    - Você SOMENTE pode fornecer informações e ajudar o usuário com informações provenientes do CONTEXTO.

    Entre as opções que você pode fornecer, estão:
    - Número de opções de pagamento.
    - Características de cada opção de pagamento.
    - Quais as vantagens e desvantagens de escolher cada opção de pagamento.
    - Qual é a opção com maior desconto sobre o valor negociado (apresente a porcentagem total de desconto na comparação de 'valor_desconto' e 'valor_negociado').
    - Qual é a opção que fornece prazo mais longo para pagamento.
    
    Extras:
    - Você sempre deve responder no idioma Português Brasileiro.
    - Hoje é {today}."""
)


# Tools

In [51]:
# Basic Imports
import sqlite3
from typing import Type, Optional
from textwrap import dedent
import pandas as pd

# LangChain Imports
from langchain.pydantic_v1 import BaseModel, Field
from langchain.callbacks.manager import CallbackManagerForToolRun
from langchain.tools import BaseTool

# Local Imports
# from ingest import info_db_connection


class BaseSqlLiteTool(BaseModel):
    """
    Base tool for interacting with SQLite Database.

    Attributes
    ----------
    connection : sqlite3.Connection
        SQLite database connection.
    """

    connection: sqlite3.Connection = Field(exclude=True)

    # Pass sqlite3.Connection validation
    class Config(BaseTool.Config):
        pass


class AuthenticateUserInput(BaseModel):
    cpf: str = Field(
        description="string contendo entre 10 e 11 caracteres numéricos referentes ao CPF do usuário."
    )
    data_nascimento: str = Field(
        description="string com a data de nascimento do usuário no formato 'YYYY-MM-DD'"
    )


class AuthenticateUser(BaseSqlLiteTool, BaseTool):
    """
    Tool for authenticating users using SQLite database connection.

    Attributes
    ----------
    name : str
        Name of the tool.
    description : str
        Description of the tool.
    args_schema : Type[BaseModel]
        Schema for the tool's input arguments.

    Methods
    -------
    _run(cpf: str, data_nascimento: str, run_manager: Optional[CallbackManagerForToolRun]=None) -> bool
        Runs the tool with the provided CPF and date of birth, returning authentication result.
    """

    name: str = "autenticar_usuario"
    description: str = dedent("""\
        Autentica o usuário. 
        Os inputs são o CPF e a data de nascimento do usuário. 
        Output é uma mensagem informando se o CPF não foi encontrado nos cadastros ou se a data de nascimento não está correta para aquele usuário.
        Ou, caso tenha sido encontrado, o output é um pandas.DataFrame
        """)
    args_schema: Type[BaseModel] = AuthenticateUserInput

    def _run(
        self,
        cpf: str,
        data_nascimento: str,
        run_manager: Optional[CallbackManagerForToolRun] = None,
    ) -> bool:
        """
        Runs the tool with the provided CPF and date of birth, returning authentication result.

        Parameters
        ----------
        cpf : str
            CPF of the user.
        data_nascimento : str
            Date of birth of the user in 'YYYY-MM-DD' format.
        run_manager : Optional[CallbackManagerForToolRun], optional
            Callback manager for tool run, by default None.

        Returns
        -------
        bool
            Authentication result message or pandas.DataFrame if authenticated.
        """
        cursor = self.connection.cursor()
        if not cursor.execute(
            f"SELECT * FROM customers WHERE cpf_cnpj = '{cpf}'"
        ).fetchone():
            cursor.close()
            return "CPF não encontrado no cadastro da empresa. Checar CPF novamente."
        elif not cursor.execute(
            f"SELECT * FROM customers WHERE cpf_cnpj = '{cpf}' AND DATE(data_nascimento) = '{data_nascimento}'"
        ).fetchone():
            cursor.close()
            return "Os dados informados não foram encontrados no sistema. Checar data de nascimento novamente."
        else:
            df = pd.read_sql_query(
                f"SELECT * FROM customers WHERE cpf_cnpj = '{cpf}' AND DATE(data_nascimento) = '{data_nascimento}'",
                self.connection,
            )
            cursor.close()
            return df


# Creating tools instances
authenticate_user_tool = AuthenticateUser(connection=info_db_connection)

# Saving lists of all tools by var and name
tools = [authenticate_user_tool]
tools_by_name = {tool.name: tool for tool in tools}


# Nodes

In [52]:
# Basic Imports
from datetime import datetime

# LangChain Imports
from langchain_core.messages import ToolMessage
from langchain_core.prompts import ChatPromptTemplate

# Local Imports
# from tools import authenticate_user_tool, tools_by_name

# Constants
TODAY = str(datetime.now().date())

# Classes (Nodes)
class StartNode:
    """
    Starting node. Only required for routing to the correct agent.

    Methods
    -------
    __call__(state)
        Routes based on the authentication state.
    """

    def __init__(self):
        pass

    def __call__(self, state):
        if not state["is_authenticated"]:
            return {"is_authenticated": False}
        else:
            return state


class AuthenticatorAgentNode:
    """
    Agent responsible for authenticating the user.

    Attributes
    ----------
    model : object
        The model used for generating responses.
    prompt : str
        The prompt template for the authenticator agent.

    Methods
    -------
    __call__(state)
        Generates a prompt and invokes the model to authenticate the user.
    """

    def __init__(self, model, prompt):
        self.model = model
        self.prompt = prompt

    def __call__(self, state):
        authenticator_prompt = ChatPromptTemplate.from_messages(
            [
                ("system", self.prompt.format(today=TODAY)),
                ("placeholder", "{messages}"),
            ]
        )
        return {
            "messages": [
                (
                    authenticator_prompt
                    | self.model.bind_tools([authenticate_user_tool])
                ).invoke(state)
            ]
        }


class AuthenticatorToolNode:
    """
    Tool execution node for AuthenticatorAgentNode.

    Methods
    -------
    __call__(state)
        Executes the tool based on the messages in the state and updates the state.
    """

    def __init__(self):
        pass

    def __call__(self, state):
        tool_call = state["messages"][-1].tool_calls[0]
        tool = tools_by_name[tool_call["name"]]
        args = tool_call["args"]
        id = tool_call["id"]
        observation = tool.invoke(args)
        if isinstance(observation, str):
            state["is_authenticated"] = False
            return {"messages": [ToolMessage(content=observation, tool_call_id=id)]}
        else:
            df = observation
            state["is_authenticated"] = True
            state["cpf"] = df["cpf_cnpj"].item()
            state["data_nascimento"] = df["data_nascimento"].item()
            state["nome"] = df["nome"].item()
            state["opcoes_pagamento"] = (
                df["Opcoes_Pagamento"]
                .str.replace('"', "")
                .str.replace("{", "{{")
                .str.replace("}", "}}")
                .item()
            )
            state["valor_atual_divida"] = (
                str(
                    df["valor_vencido"]
                    .add(df["valor_multa"])
                    .add(df["valor_juros"])
                    .item()
                )
            )
            state["data_origem_divida"] = df["data_origem"].item()
            state["loja"] = df["loja"].item()
            state["produto"] = df["produto"].item()
            state["messages"] = [
                ToolMessage(content="Usuário autenticado", tool_call_id=id)
            ]
            return state


class AuthenticatorOrInfoRouter:
    """
    Decision node. Routes to the correct agent according to the state of the graph.

    Methods
    -------
    __call__(state)
        Routes based on the authentication status.
    """

    def __init__(self):
        pass

    def __call__(self, state):
        if state["is_authenticated"]:
            return "to_info"
        else:
            return "to_auth"


class ToolRouter:
    """
    Tool router for any agent that needs to execute tools.

    Methods
    -------
    __call__(state)
        Routes based on whether there are tool calls in the messages.
    """

    def __init__(self):
        pass

    def __call__(self, state):
        print(state)
        if state["messages"][-1].tool_calls:
            return "to_tool"
        else:
            return "to_user"


class InformationAgentNode:
    """
    Agent responsible for providing information to the user.

    Attributes
    ----------
    model : object
        The model used for generating responses.
    prompt : str
        The prompt template for the information agent.

    Methods
    -------
    __call__(state)
        Generates a prompt and invokes the model to provide information to the user.
    """

    def __init__(self, model, prompt):
        self.model = model
        self.prompt = prompt

    def __call__(self, state):
        information_prompt = ChatPromptTemplate.from_messages(
            [
                (
                    "system",
                    self.prompt.format(
                        payment_options=state["opcoes_pagamento"],
                        product=state["produto"],
                        debt_origin_date=state["data_origem_divida"],
                        name=state["nome"],
                        store=state["loja"],
                        current_debt_value=state["valor_atual_divida"],
                        today=TODAY,
                    ),
                ),
                ("placeholder", "{messages}"),
            ]
        )
        return {"messages": [(information_prompt | self.model).invoke(state)]}


# Graph

In [53]:
# Basic Imports
from typing import Annotated, Union
from typing_extensions import TypedDict
from pathlib import Path
import dotenv

# LangGraph Imports
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.sqlite import SqliteSaver

# LangChain Imports
from langchain_openai import ChatOpenAI

# Local Imports
# from prompts import authenticator_prompt_content, information_prompt_content
# from nodes import (
#     StartNode,
#     AuthenticatorAgentNode,
#     AuthenticatorToolNode,
#     InformationAgentNode,
#     AuthenticatorOrInfoRouter,
#     ToolRouter,
# )

# Constants
DATABASES_PATH = Path.cwd().parent.joinpath("data").joinpath("databases")  # different from scripts

# Loading env-variables
_ = dotenv.load_dotenv(dotenv.find_dotenv())

# Variables
chatgpt_model = ChatOpenAI(
    model="gpt-4o",
    temperature=0,
    max_retries=2,
)
memory = SqliteSaver.from_conn_string(DATABASES_PATH.joinpath("memory.db"))


# Graph State
class State(TypedDict):
    messages: Annotated[list, add_messages]
    is_authenticated: Union[bool, None]
    cpf: Union[str, None]
    data_nascimento: Union[str, None]
    nome: Union[str, None]
    opcoes_pagamento: Union[str, None]
    valor_atual_divida: Union[str, None]
    data_origem_divida: Union[str, None]
    loja: Union[str, None]
    produto: Union[str, None]


# Graph Design and State
graph_builder = StateGraph(State)

# Graph Nodes
graph_builder.add_node("starting_node", StartNode())
graph_builder.add_node(
    "authenticator_agent",
    AuthenticatorAgentNode(model=chatgpt_model, prompt=authenticator_prompt_content),
)
graph_builder.add_node("authenticator_tool", AuthenticatorToolNode())
graph_builder.add_node(
    "information_agent",
    InformationAgentNode(model=chatgpt_model, prompt=information_prompt_content),
)

# Graph Routes
graph_builder.set_entry_point("starting_node")
graph_builder.add_conditional_edges(
    "starting_node",
    AuthenticatorOrInfoRouter(),
    {
        "to_auth": "authenticator_agent",
        "to_info": "information_agent",
    },
)
graph_builder.add_conditional_edges(
    "authenticator_agent",
    ToolRouter(),
    {
        "to_tool": "authenticator_tool",
        "to_user": END,
    },
)
graph_builder.add_conditional_edges(
    "authenticator_tool",
    AuthenticatorOrInfoRouter(),
    {
        "to_auth": "authenticator_agent",
        "to_info": "information_agent",
    },
)
graph_builder.set_finish_point("information_agent")

# Compiling graph
graph = graph_builder.compile(checkpointer=memory)


# Utils

In [54]:
# Basic Imports
import json
import pandas as pd
import sqlite3
from pathlib import Path

# Constants
DATABASES_PATH = Path.cwd().parent.joinpath("data").joinpath("databases")

# Variables
memory_db_connection = sqlite3.connect(
    DATABASES_PATH.joinpath("memory.db"), check_same_thread=False
)

# Functions
def load_metadata_from_state_memory(thread_id):
    """
    Load metadata from the state memory database for a specific thread.

    Parameters
    ----------
    thread_id : str
        The ID of the thread for which to load the metadata.

    Returns
    -------
    pd.DataFrame
        A DataFrame containing the metadata for the specified thread.
    """
    df_metadata = pd.read_sql_query(
        f"SELECT metadata FROM checkpoints WHERE thread_id = '{thread_id}';",
        memory_db_connection,
    )
    return df_metadata

def extract_message_for_streamlit(metadata):
    """
    Extract the last message for Streamlit display from the metadata.

    Parameters
    ----------
    metadata : bytes
        The metadata containing the messages in a byte json string format.

    Returns
    -------
    dict or None
        A dictionary containing the role and content of the last message, or None if no message is found.
        
        The dictionary has the following structure:
        - For user messages: {"role": "user", "content": content}
        - For AI messages: {"role": "assistant", "content": content}
    """
    # Convert b-string to JSON string and parse it
    json_str = metadata.decode("utf-8")
    metadata_dict = json.loads(json_str)
    
    # Initialize result
    result = None
    
    # Check if "writes" is present
    writes = metadata_dict.get("writes")
    
    if writes:
        messages = None
        if "messages" in writes:
            # If "messages" key is directly present
            messages = writes["messages"]
        else:
            # If "messages" key is in nested dictionary
            for value in writes.values():
                if isinstance(value, dict) and "messages" in value:
                    messages = value["messages"]
                    break
        
        # Extract the last message and its type
        if messages:
            last_message = messages[-1]
            if isinstance(last_message, dict):
                content = last_message.get("kwargs", {}).get("content", "")
                message_type = last_message.get("kwargs", {}).get("type", "")
            elif isinstance(last_message, list) and len(last_message) >= 2:
                content = last_message[1]
                message_type = last_message[0]
            
            if content:
                if message_type == "user":
                    result = {"role": "user", "content": content}
                elif message_type == "ai":
                    result = {"role": "assistant", "content": content}
    return result


# Tests

In [55]:
import uuid
random_thread_id = str(uuid.uuid4())
test_thread_id = "test_5"
config = {"configurable": {"thread_id": test_thread_id}}


In [66]:
user_input = "Ok, obrigada."


In [67]:
from langchain_core.messages import AIMessage

temp_event_list = []
temp_message_list = []
for event in graph.stream({"messages": [("user", user_input)]}, config,):
    temp_event_list.append(event)
    if (
        "messages" in list(event.values())[0]
        and isinstance(list(event.values())[0]["messages"][-1], AIMessage)
        and list(event.values())[0]["messages"][-1].content
    ):
        print(list(event.values())[0]["messages"][-1].content)
        temp_message_list.append(list(event.values())[0]["messages"][-1].content)


De nada, Lorraine! Fico feliz em poder ajudar. Se você tiver mais alguma dúvida ou precisar de mais informações no futuro, não hesite em entrar em contato. Tenha um ótimo dia!


In [59]:
# # Utils
# graph.get_state(config)
# from IPython.display import Image, display
# display(Image(graph.get_graph().draw_mermaid_png()))