# **STELLAR**: A Structured, Trustworthy, and Explainable LLM-Led Architecture for Reliable Customer Support)

# 1. Requirements

In [9]:
from google.colab import drive, userdata
drive.mount('/content/drive')

# Installation of Required Libraries
%pip install -q -U groq langchain chromadb sentence-transformers langchain-community langchain-huggingface rank_bm25
# Groq API Initialization
from groq import Groq
client = Groq(api_key=userdata.get('GROQ_API_KEY'))
STELLAR_path = "/content/drive/MyDrive/STELLAR_Selo_de_Inovacao"

# Essential Imports
import os
import json
import nltk
from math import ceil
from datetime import datetime
import time
import pytz
import requests
import uuid
import logging
from scipy.special import softmax

# Library-specific Imports
from rank_bm25 import BM25Okapi
from langchain.schema.document import Document
from langchain.prompts import ChatPromptTemplate
from langchain.vectorstores.chroma import Chroma
from langchain_huggingface import HuggingFaceEmbeddings
from transformers import (
    AutoModelForSequenceClassification,
    AutoTokenizer,
    AutoConfig,
    logging as transformers_logging
)

# Set Logging Configuration
logging.basicConfig(level=logging.INFO)
transformers_logging.set_verbosity_error()

# Preloading Resources
nltk.download('punkt')
nltk.download('punkt_tab')

# Model Configuration
MODEL = "llama-3.3-70b-versatile"


class Module:
    def __init__(self, model_name="llama-3.3-70b-versatile"):
        self.model_name = model_name

    def ask_model(self, question: str, model: str="llama-3.3-70b-versatile") -> str:
        """
        Queries the Groq API with a question and model.

        Args:
            question (str): The input question for the model.
            model (str): The model to query.

        Returns:
            str: The response from the model as a string or None if an error occurs.
        """
        try:
            response = client.chat.completions.create(
                messages=[{"role": "user", "content": question}],
                model=model,
            )
            return response.choices[0].message.content
        except Exception as e:
            print(f"Error querying Groq API: {e}")
            return None

    def load_txt_file(self, path):
        """
        Loads a text file from the specified path.

        Args:
            path (str): The path to the text file.

        Returns:
            str: The contents of the text file.
        """
        try:
            with open(path, 'r') as file:
                return file.read()
        except FileNotFoundError:
            print(f"File not found: {path}")
            return ""

    def load_json_file(self, path:str) -> dict:
        """
        Loads a JSON file from the specified path.

        Args:
            path (str): The path to the JSON file.

        Returns:
            dict: The contents of the JSON file as a dictionary.
        """
        try:
            with open(path, 'r') as file:
                return json.load(file)
        except FileNotFoundError:
            print(f"File not found: {path}")
            return {}

    def parse_str_to_json(self, string, required_fields):
        """
        Parses a string containing a JSON-like dictionary and verifies the required fields.

        Args:
            string (str): The string to parse, expected to contain a JSON-like dictionary.
            required_fields (list): The fields that the dictionary must contain.

        Returns:
            dict: The parsed dictionary if successful.

        Raises:
            ValueError: If the parsing fails or the dictionary does not contain the required fields.
        """
        try:
            # Extract the JSON-like content from the string
            json_start = string.find("{")
            json_end = string.find("}") + 1
            if json_start == -1 or json_end == -1:
                raise ValueError("No valid JSON content found in the string.")

            json_string = string[json_start:json_end]
            parsed_dict = json.loads(json_string)

            # Verify required fields
            if not all(field in parsed_dict for field in required_fields):
                raise ValueError(f"Missing required fields: {required_fields}")

            return parsed_dict

        except Exception as e:
            raise ValueError(f"Error parsing string to JSON: {e}")

    def get_current_date_time(self, timezone:str='America/Sao_Paulo') -> dict:
        """
        Gets the current date and time in the specified timezone.

        Args:
            timezone (str): The timezone to use.

        Returns:
            dict: A dictionary containing the current time, in the format:
              {"year": <int>, "month": <int>, "day": <int>, "hour": <int>}

        """
        timezone = pytz.timezone('America/Sao_Paulo')
        timezone_time = datetime.now(timezone)
        time = {
            "year": timezone_time.year,
            "month": timezone_time.month,
            "day": timezone_time.day,
            "hour": timezone_time.hour
        }

        return time

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


# 2. Modules

## 2.1 Module 1: Initial Classifier

In [10]:
class Module1_InitialClassifier(Module):
    def __init__(self, model_name:str=MODEL, prompt_path:str=f"{STELLAR_path}/requirements/module_1/prompt_module1.txt"):
        super().__init__(model_name)
        self.prompt_path = prompt_path


    def classify_question(self, question: str) -> dict:
        """
        Receives a question and returns the response in JSON format. Retries up to 3 times
        if the response is not correctly formatted. If still invalid, returns a default answer.

        Args:
            question (str): The question to send to the LLM.
            model (str): The model to query.

        Returns:
            dict: Validated response from the LLM or a default fallback answer. format:
            {"category": <int>, "answer": <str>, "explanation": <str>, "confidence": <float>}
        """
        # Get the prompt
        with open(self.prompt_path, "r") as f:
            prompt = f.read()

        # Add the question to the prompt
        full_prompt = prompt + question

        default_answer = {
            'category': 0,
            'answer': 'Desculpe. Houve um erro com o processamento da sua dúvida. Você está sendo encaminhado para o buscador de perguntas frequentes.',
            'explanation': 'Resposta padrão.',
            'confidence': 1
        }

        for attempt in range(3):
            response = self.ask_model(full_prompt, self.model_name)

            if response is None:
                print(f"Attempt {attempt + 1}: No response from model.")
                continue

            parsed_response = self.parse_str_to_json(response, ['category', 'answer', 'explanation', 'confidence'])
            if type(parsed_response["category"]) != int or parsed_response["category"] not in [0, 1, 2, 3]:
                print(f"Attempt {attempt + 1}: Invalid response format.")
                continue

            if parsed_response:
                return parsed_response
            else:
                print(f"Attempt {attempt + 1}: Response parsing failed.")

        print("All attempts failed. Returning default answer.")
        return default_answer

In [11]:
# question = "Como posso acionar meu seguro de carro?"  # 0
# question = "Onde posso ligar para cancelar meu seguro?"  # 1
question = "Preciso da ajuda de um atendente."  # 2
#question = "Qual é o maior país do mundo?"  # 3


module = Module1_InitialClassifier()
print(module.classify_question(question))

{'category': 2, 'answer': 'Claro, podemos encaminhar sua solicitação para um atendente humano. Aguarde um momento enquanto estamos transferindo sua chamada.', 'explanation': 'O cliente explicitamente solicita falar com um atendente humano, indicando necessidade de atendimento personalizado e direto.', 'confidence': 1.0}


## 2.2 Module 2: RAG

In [12]:
class Module2_RAG(Module):
    def __init__(self, model_name=MODEL, data_path: str = f"{STELLAR_path}/requirements/module_2/FAQs.json",
                 chroma_path: str = f"{STELLAR_path}/requirements/module_2/chroma",
                 prompt_template_path: str = f"{STELLAR_path}/requirements/module_2/prompt_template.txt",
                 prompt_rerank_path: str = f"{STELLAR_path}/requirements/module_2/prompt_rerank.txt",
                 embedding_model: str = "paraphrase-multilingual-mpnet-base-v2",
                 n_retrieved_chunks: int = 10):
        super().__init__(model_name)
        self.data_path = data_path
        self.chroma_path = chroma_path
        self.prompt_template_path = prompt_template_path
        self.prompt_rerank_path = prompt_rerank_path
        self.embedding_model = embedding_model
        self.n_retrieved_chunks = n_retrieved_chunks

    def add_new_faq_to_chroma(self, new_faq: dict):
        """
        Adds a single new FAQ to the Chroma database and to the json file with all the FAQs.

        Args:
            new_faq (dict): The new FAQ to be added to the database in this format:
                {"category": <str>, "question": <str>, "answer": <str> }.
        Returns:
            None
        """
        # calculate the new chunk id (based on the existing ids)
        with open (self.data_path, "r") as f:
          faqs = json.load(f)
        max_id = max([int(faq["id"]) for faq in faqs])
        new_id = max_id + 1
        new_faq["id"] = new_id

        # Format the chunk ({"id": <int>, "category": <str>, "question": <str>, "answer": <str> })
        formated_faq = {
            "id": new_id,
            "category": new_faq["category"],
            "question": new_faq["question"],
            "answer": new_faq["answer"]
        }

        # Write FAQ to the json file
        faqs.append(formated_faq)
        with open (self.data_path, "w") as f:
            json.dump(faqs, f, indent=2, ensure_ascii=False)

        # Load the database
        path = f"{self.chroma_path}/vector_database"
        database = Chroma(
            persist_directory=path,
            embedding_function=self.get_embedding_function(self.embedding_model))

        # Add the chunks to the database
        documents = [formated_faq]
        self.add_to_chroma(documents, model_name=self.embedding_model)

    def create_database(self):
        """
        Creates the Chroma database.
        """
        # If the database already exists, return
        path = f"{self.chroma_path}/vector_database"
        if os.path.exists(path):
          return
        # load the items from the .json file
        documents = self.load_documents()
        # Save the chunks to the chroma directory
        self.add_to_chroma(documents, model_name=self.embedding_model)

    def add_to_chroma(self, chunks: list[Document], model_name: str):
        """
        Adds the documents to the Chroma database.

        Args:
            chunks (list[Document]): The documents to add to the database in this format:
                {"id": <int>, "category": <str>, "question": <str>, "answer": <str>}
            model_name (str): The name of the embedding model to use.
        Returns:
            None
        """
        # Load the database
        path = f"{self.chroma_path}/vector_database"
        database = Chroma(
            persist_directory=path,
            embedding_function=self.get_embedding_function(model_name))
        # Add the chunks to the database
        existing_items = database.get(include=[]) # IDs are included by default
        existing_ids = set(existing_items["ids"])
        print(f"Number of existing items: {len(existing_ids)}.")
        # Filter out the chunks that are already in the database
        chunks_to_add = []
        for chunk in chunks:
            if chunk["id"] not in existing_ids:
                chunks_to_add.append(chunk)
        # Add the chunks to the database
        if len(chunks_to_add) > 0:
            print(f"Adding {len(chunks_to_add)} items to the database.")
            documents_to_add = []
            for chunk in chunks_to_add:
                # Create a Document object for each chunk
                document = Document(
                    page_content=chunk["question"] + "\n" + chunk["answer"],
                    metadata={
                        "id": chunk["id"],
                        "category": chunk["category"],
                        "question": chunk["question"],
                        "answer": chunk["answer"]})
                documents_to_add.append(document)
            # Add documents to the database
            database.add_documents(documents=documents_to_add)
            database.persist()
            print("✅ Database updated with new items.")
        else:
            print("✅ No new items to add.")

    def load_documents(self) -> list[Document]:
        """Loads the documents from the data directory.
        Documents are dictionaries with the fields:
        id, category, question, answer.

        Returns:
            list[Document]: A list of Document objects in this format:
            {"id": <int>, "category": <str>, "question": <str>, "answer": <str>}
        """
        with open(self.data_path, "r") as f:
          documents = json.load(f)
        return documents

    def get_embedding_function(self, embedding_model):
        """
        Gets the embedding function for the given embedding model.
        Args:
            embedding_model (str): The name of the embedding model.
        Returns:
            HuggingFaceEmbeddings: The embedding function.
        """
        model = f"sentence-transformers/{embedding_model}"
        model_kwargs = {"device": "cpu"}
        encode_kwargs = {"normalize_embeddings": True}
        embeddings = HuggingFaceEmbeddings(
            model_name=model,
            model_kwargs=model_kwargs,
            encode_kwargs=encode_kwargs
        )
        return embeddings

    def semantic_retrieval(self, question: str, n:int, model_name:str) -> list[int]:
        """Receives a question and retrieves n FAQ ids related to the question
        Args:
            question (str): The question to be answered.
            n (int): The number of FAQs to be retrieved.
        Return:
            a list of the ids of the n FAQs related to the question
        """
        # Prepare the database.
        embedding_function = self.get_embedding_function(self.embedding_model)
        path = f"{self.chroma_path}/vector_database"
        database = Chroma(persist_directory=path, embedding_function=embedding_function)

        # Search the database (results:List[Tuple[Document, float]]).
        results = database.similarity_search_with_score(question, k=n)

        # Return only the ids
        for i in range(len(results)):
            results[i] = results[i][0].metadata["id"]

        return results

    def BM25_retrieval(self, question: str, n: int, list_of_chunks: list[str]) -> list[int]:
        """
        Retrieve top-n chunks using BM25 based on the given question.
        Args:
            question (str): The query question.
            n (int): Number of chunks to retrieve.
            list_of_chunks (List[str]): The list of chunks to search within.
        Returns:
            List[int]: ids of the selected chunks
        """
        # Tokenize the chunks and the question
        tokenized_chunks = [nltk.word_tokenize(chunk) for chunk in list_of_chunks]
        tokenized_question = nltk.word_tokenize(question)

        # Initialize the BM25 model
        bm25 = BM25Okapi(tokenized_chunks)

        # Score the chunks based on the question
        scores = bm25.get_scores(tokenized_question)

        # Get the indices of the top-n chunks based on scores
        top_indices = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)[:n]

        return top_indices


    def hybrid_search(self, question: str, n: int, embedding_model: str) -> list[int]:
      """Receives a question and retrieves n FAQs related to the question
      Return: a list of the ids of the n FAQs related to the question
      """
      # 70% Semantic search (round up)
      n_semantic = ceil(0.7 * n)
      semantic_ids = self.semantic_retrieval(question, n_semantic, embedding_model)


      # 30% BM25 search
      n_bm25 = n - n_semantic

      # Search for n chunks so that we can choose the 30% most similar that are not in semantic_ids
      with open (self.data_path, 'r') as file:
        data = json.load(file)
      # Use the "question only" approach by default
      chunks = [data[i]["question"] for i in range(len(data))]
      bm25_ids = self.BM25_retrieval(question, n, chunks)
      # Choose the 30% most similar that are not in semantic_ids
      bm25_ids = [i for i in bm25_ids if i not in semantic_ids]
      bm25_ids = bm25_ids[:n_bm25]

      hybrid_ids = semantic_ids + bm25_ids
      return hybrid_ids

    def error_handler(self, faq_ids: list[int], reranked_ids: list[int]) -> list[int]:
        """
        Adjusts a reranked list of IDs to match the FAQ IDs list in length and content.

        Args:
            faq_ids (List[int]): The original list of FAQ IDs.
            reranked_ids (List[int]): The reranked list of IDs.

        Returns:
            List[int]: A list with the correct number of unique IDs matching FAQ IDs.

        Raises:
            Exception: If reranked_ids is not a list.
        """

        # Validate input type
        if not isinstance(reranked_ids, list):
            raise Exception("The reranked_ids variable is not a list")

        # Ensure reranked_ids contains only unique integers
        reranked_ids = [rr_id for rr_id in reranked_ids if isinstance(rr_id, int)]
        reranked_ids = list(dict.fromkeys(reranked_ids))  # Remove duplicates

        # Case 1: If sizes match, return as-is
        if len(reranked_ids) == len(faq_ids):
            return reranked_ids

        # Case 2: If reranked_ids is smaller, fill missing IDs from faq_ids
        if len(reranked_ids) < len(faq_ids):
            missing_ids = [faq_id for faq_id in faq_ids if faq_id not in reranked_ids]
            return reranked_ids + missing_ids[:len(faq_ids) - len(reranked_ids)]

        # Case 3: If reranked_ids is larger, filter out extras and recurse
        reranked_ids = [rr_id for rr_id in reranked_ids if rr_id in faq_ids]
        return self.error_handler(faq_ids, reranked_ids)


    def groq_LLM_reranking(self, query: str, faq_ids: list[int], model: str, answers:bool = True) -> list[str]:
        """
        Reranks a list of FAQs based on their relevance to the query using Groq API.

        Args:
            query (str): The question from the customer.
            faq_ids (List[int]): A list of the ids of the FAQs returned after a dense retrieval.
            model (str): The model to use for reranking.

        Returns:
            List[int]: The same FAQ ids reranked by relevance.
        """
        def load_faqs(self, faq_ids: list[int]) -> list[tuple]:
            """Loads FAQs matching the given IDs from the data file.
            Retuns:
                List[Tuple[str, str, int]]
            """
            with open(self.data_path, 'r') as file:
                data = json.load(file)
            return [(faq["question"], faq["answer"], faq["id"]) for faq in data if faq["id"] in faq_ids]
        def build_prompt(self, query: str, faqs_with_ids: list[tuple], answers:bool = False) -> str:
            """Constructs the prompt for the model."""
            with open(self.prompt_rerank_path, 'r') as file:
                prompt = file.read()
            if not answers:
              related_questions = "\n".join(f"{faq_id}: {content}" for content, answer, faq_id in faqs_with_ids)
              return f"{prompt}\nDúvida do cliente:\n{query}\nPerguntas relacionadas:\n{related_questions}\n\nSaída:"
            else:
              related_questions = "\n".join(f"{faq_id}: {content}\n{answer}" for content, answer, faq_id in faqs_with_ids)
              return f"{prompt}\nDúvida do cliente:\n{query}\nPerguntas relacionadas:\n{related_questions}\n\nSaída:"
        def parse_model_response(self, response: str) -> list[int]:
            """Parses the model's response into a list of IDs."""
            start = response.find("[")
            end = response.find("]") + 1
            return json.loads(response[start:end])
        def call_model_with_retries(self, prompt: str, max_retries: int = 3) -> list[int]:
            """Attempts to call the model with retries in case of failure."""
            for attempt in range(max_retries):
                try:
                    response = self.ask_model(prompt, model)
                    reranked_ids = parse_model_response(self, response)
                    reranked_ids = self.error_handler(faq_ids, reranked_ids)

                    if isinstance(reranked_ids, list) and len(reranked_ids) == len(faq_ids):
                        return reranked_ids
                    else:
                        raise ValueError("Invalid response format or length mismatch.")

                except Exception as e:
                    print(f"Attempt {attempt + 1} failed: {e}")

            # Fall back to the original FAQ order if all retries fail
            print("Model inference failed after all retries.")
            return faq_ids
        # Main process
        with open(self.prompt_rerank_path, 'r') as file:
            prompt = file.read()
        try:
            faqs_with_ids = load_faqs(self, faq_ids)
            prompt = build_prompt(self, query=query, faqs_with_ids=faqs_with_ids)
            return call_model_with_retries(self, prompt)
        except Exception as e:
            print("Unexpected error:", e)
            return faq_ids

    def query_model(self, question: str, FAQ_ids, n_ids):
        """
        Queries the LLM with the retrieved FAQS to generate the answer to the client.
        Args:
            question (str): The question from the customer.
            FAQ_ids (List[int]): A list of the ids of the FAQs returned after a hybrid retrieval.
            n_ids (int): The number of FAQs to be retrieved.

        Returns:
            str: The answer to the question.
        """
        # Use the n_ids chunks with the highest similarity
        FAQ_ids = FAQ_ids[:n_ids]
        # create the context
        with open (self.data_path, 'r') as file:
            data = json.load(file)
        chunks = []
        for id in FAQ_ids:
          for faq in data:
            if faq["id"] == id:
              chunks.append(faq["question"]+"\n"+faq["answer"])
              break

        context_text = "\n---\n".join([chunk for chunk in chunks])
        # Apply the prompt template to the query
        with open(self.prompt_template_path, 'r') as file:
            PROMPT_TEMPLATE = file.read()
        prompt_template = ChatPromptTemplate.from_template(PROMPT_TEMPLATE)
        prompt = prompt_template.format(context=context_text, question=question)

        model_answer = self.ask_model(prompt, self.model_name)

        # Format the response
        sources = FAQ_ids
        formatted_response = f"Resposta: {model_answer}\nFontes (ids das FAQs): {sources}"
        return formatted_response

    # Main function to handle the entire RAG process
    def process_question(self, question: str, n_ids: int = 5)->str:
        """
        Retrieves FAQs from the database, reranks these FAQs
        and answers the question based on them.

        Args:
            question (str): The question to be answered.
            n_ids (int): The number of FAQs to be retrieved.

        Returns:
            str: The answer to the question.
        """
        # Step 1: Retrieve IDs from the database
        retrieved_ids = self.hybrid_search(question, self.n_retrieved_chunks, self.embedding_model)

        # Step 2: Re-rank the retrieved IDs
        reranked_ids = self.groq_LLM_reranking(
            question,
            retrieved_ids,
            self.model_name,
            answers=True
        )

        # Step 3: Query the model with the top `n_ids` chunks
        formatted_response = self.query_model(question, reranked_ids, n_ids)
        return formatted_response

In [13]:
module = Module2_RAG()
# module.create_database()

question = "Quais são os beneficios do seguro dental?"
print(module.process_question(question))

Resposta: Os benefícios do seguro dental incluem cobertura para procedimentos como:

- Extração
- Canal
- Cirurgias realizadas em consultório
- Radiologia
- Dentística (restaurações)
- Periodontia (tratamento de gengivas)
- Endodontia (canais)
- Odontopediatria (odontologia para crianças)
- Próteses

Além disso, o plano Bradesco Dental oferece acesso a uma ampla rede de mais de 27 mil dentistas credenciados em todo o país, garantindo tratamentos odontológicos básicos, como limpeza e tratamento de cáries. Existem também opções específicas para crianças, como os planos Dente de Leite (0 a 7 anos) e o plano Júnior (8 a 17 anos), e um plano sem coparticipação com cobertura de várias especialidades.
Fontes (ids das FAQs): [42, 37, 41, 38, 39]


## 2.3 Module 3: Contact Info

In [14]:
class Module3_ContactInfo(Module):
    def __init__(self, model_name=MODEL, contact_info_path:str=f"{STELLAR_path}/requirements/module_3/contact_info.txt",
                 prompt_path:str=f"{STELLAR_path}/requirements/module_3/initial_prompt.txt",
                 followup_prompt_path:str=f"{STELLAR_path}/requirements/module_3/followup_prompt.txt"):
        super().__init__(model_name)
        self.contact_info = self.load_txt_file(contact_info_path)
        self.prompt = self.load_txt_file(prompt_path)
        self.followup_prompt = self.load_txt_file(followup_prompt_path)
        self.chat_history = ""

    def ask_module(self, query:str):
        """
        Interacts with a user to answer questions based on a predefined context and a language model.

        This function prompts the user for a query, processes it using a predefined context template,
        and queries a language model for an answer. It handles follow-up questions if the model
        indicates ambiguity or the need for clarification.
        Args:
            query (str): The user's query.
        Returns:
            str: The final answer provided by the language model, or an appropriate message
                if the query cannot be answered.
            str: The updated chat history.
        """

        def process_follow_up(query, follow_up_question):
            """
            Handles the process of asking a follow-up question and querying the model again.
            The model is not informed that it can start the answer with "-1". Therefore,
            only 1 follow-up question is asked.

            Args:
                query (str): The original user query.
                follow_up_question (str): The follow-up question suggested by the model.

            Returns:
                str: The model's final response to the query.
                str: The updated chat history.
            """
            print(f"Additional question: {follow_up_question}")
            follow_up_answer = input("Por favor, forneça mais detalhes: ")

            self.chat_history += f"\n\033[32mUsuário\033[0m: {follow_up_answer}\n"

            full_query = query + follow_up_question + follow_up_answer
            follow_up_prompt = self.followup_prompt.format(context=self.contact_info, question=full_query)
            follow_up_response = self.ask_model(follow_up_prompt, MODEL)

            return handle_response(follow_up_response, full_query)

        def handle_response(response, query):
            """
            Processes the model's response to determine the next steps.

            Args:
                response (str): The model's response.
                query (str): The query associated with the response.

            Returns:
                str: The final answer or an appropriate error message.
                str: The updated chat history.
            """
            if response.startswith("-1"):
                follow_up_question = response[3:]
                self.chat_history += f"\n\033[34mModule 3\033[0m: {follow_up_question}\n"
                return process_follow_up(query, follow_up_question)

            if response.startswith("-2"):
                return "Infelizmente, não conseguimos achar a resposta para a sua pergunta em nossa base de contatos.", ""


            return response, self.chat_history

        # Main execution
        prompt = self.prompt.format(context=self.contact_info, question=query)
        model_answer = self.ask_model(prompt, MODEL)

        return handle_response(model_answer, query)

In [15]:
# query = "Qual o número de celular para renovar meu seguro?"
# query = "Quanto é o kg do arroz?"
query = "Onde posso ligar para fazer uma reclamação do meu seguro de carro?"

module = Module3_ContactInfo()
answer, chat_history = module.ask_module(query)
print(answer)

Para fazer uma reclamação do seu seguro de carro, você pode ligar para o SAC (Serviço de Atendimento ao Cliente) no telefone 0800 727 9966, que está disponível 24 horas, 7 dias por semana. Além disso, você também pode entrar em contato com a Central de Relacionamento Auto, cujos números de telefone são:
- Capitais e Regiões Metropolitanas: 4004 2757
- Demais Regiões: 0800 701 2757
Eles atendem de segunda a sexta, das 08h às 18h (horário de Brasília).


## 2.4 Module 4: Human Escalation

In [16]:
category_mapping = {
  "Gestão de Apólices": "Policy Management",
  "Sinistros":"Claims",
  "Pagamentos": "Payments",
  "Perguntas Gerais": "General Questions",
  "Problemas Técnicos": "Technical Problems",
  "Escalações para Suporte Humano": "Human Support Escalations",
  "Perguntas Regulatórias ou de Conformidade": "Regulatory or Compliance Questions"
}

class Module4_HumanEscalation(Module):
    def __init__(self, model_name=MODEL, insurance_weights_path:str=f"{STELLAR_path}/requirements/module_4/insurance_weights.json",
                 category_weights_path:str=f"{STELLAR_path}/requirements/module_4/category_weights.json",
                 name_and_ins_type_prompt_path:str=f"{STELLAR_path}/requirements/module_4/name_and_ins_type_prompt.txt",
                 sum_and_cat_prompt_path:str=f"{STELLAR_path}/requirements/module_4/sum_and_cat_prompt.txt",
                 recommended_message_prompt_path:str=f"{STELLAR_path}/requirements/module_4/recommended_message_prompt.txt",
                 waiting_list_path:str=f"{STELLAR_path}/outputs/module_4/waiting_list.json",
                 human_agents_path:str=f"{STELLAR_path}/human_agents/human_agents.json"):
        super().__init__(model_name)
        self.insurance_weights = self.load_json_file(insurance_weights_path)
        self.category_weights = self.load_json_file(category_weights_path)
        self.name_and_ins_type_prompt = self.load_txt_file(name_and_ins_type_prompt_path)
        self.sum_and_cat_prompt = self.load_txt_file(sum_and_cat_prompt_path)
        self.recommended_message_prompt = self.load_txt_file(recommended_message_prompt_path)
        self.waiting_list_path = waiting_list_path
        self.human_agents_path = human_agents_path

    def add_human_agent(self, human_agent):
        """
        Adds a new human agent to the list of human agents.
        Args:
            human_agent (Human_agent): A dictionary containing the details of the new human agent.
        """
        try:
            with open(self.human_agents_path, "r") as f:
                human_agents = json.load(f)
        except FileNotFoundError:
            human_agents = []

        new_agent = {
              "id": (len(human_agents) + 1),
              "status": human_agent.status,
              "human_attendant_name": human_agent.human_attendant_name,
              "insurance_type": human_agent.insurance_type,
              "query_category": human_agent.query_category
        }


        human_agents.append(new_agent)

        with open(self.human_agents_path, "w") as f:
            json.dump(human_agents, f, indent=2, ensure_ascii=False)

    def free_human_agent(self, id):
        """
        Frees an agent with the given id.
        Args:
            id (int): The id of the agent to be freed.
        """
        try:
            with open(self.human_agents_path, "r") as f:
                human_agents = json.load(f)
        except FileNotFoundError:
            return

        for agent in human_agents:
            if agent["id"] == id:
                agent["status"] = "Available"
                with open(self.human_agents_path, "w") as f:
                    json.dump(human_agents, f, indent=2, ensure_ascii=False)
                return

    def find_available_human_agent(self)->dict:
        """
        Finds a human agent for the customer.
        Returns:
            dict: A dictionary containing the details of the human agent in this format:
            {"id": <int>, "status": <str>, "human_attendant_name": <str>, "insurance_type": <int>, "query_category": <str>}
        """
        try:
            with open(self.human_agents_path, "r") as f:
                human_agents = json.load(f)
        except FileNotFoundError:
            return None

        # Search an agent with the same query category and insurance type
        for agent in human_agents:
            if agent["status"] == "Available" and agent["insurance_type"] == self.insurance_type and agent["query_category"] == self.query_category:
                agent["status"] = "Busy"
                with open(self.human_agents_path, "w") as f:
                    json.dump(human_agents, f, indent=2, ensure_ascii=False)
                return agent

        # Search for an agent with the same insurance type
        for agent in human_agents:
            if agent["status"] == "Available" and agent["insurance_type"] == self.insurance_type:
                agent["status"] = "Busy"
                with open(self.human_agents_path, "w") as f:
                    json.dump(human_agents, f, indent=2, ensure_ascii=False)
                return agent
        # Search for any available agents
        for agent in human_agents:
            if agent["status"] == "Available":
                agent["status"] = "Busy"
                with open(self.human_agents_path, "w") as f:
                    json.dump(human_agents, f, indent=2, ensure_ascii=False)
                return agent

        # if no human agent is available, return None
        return None



    def model_answer_name_and_ins_type(self, chat_history, model, max_retries=3):
        """
          Processes a chat_history through the model and returns the response as a dictionary.
          Retries once if the response is not valid JSON, and falls back to parsing_error_handler.
          Args:
              chat_history (str): The input chat_history.
              model (str): The model to query.

          Returns:
              dict: The response from the model as a dictionary in the format
              {"name": <str>, "insurance_type": <int>}.
        """
        attempt = 0
        while attempt < max_retries:
            try:
                prompt = self.name_and_ins_type_prompt + chat_history
                answer = self.ask_model(prompt, model)

                if not answer:
                    raise ValueError("Answer is None")

                return self.parse_str_to_json(answer, ["name", "insurance_type"])

            except Exception as e:
                print(f"Attempt {attempt + 1} failed in model_answer_name_and_ins_type: {e}")
                attempt += 1

        # if parsing failed, return the default name and ins_type
        output = {
            "name": "",
            "insurance_type": 0
        }
        return output

    def model_answer_summary_and_category(self, chat_history, model, max_retries=3):
        """
        Processes a chat_history through the model and returns the response as a dictionary.
        Retries once if the response is not valid JSON, and falls back to parsing_error_handler.
        Args:
            chat_history (str): The input chat_history.
            model (str): The model to query.

        Returns:
            dict: The response from the model as a dictionary in the format
            {"summary": <str>, "category": <str>, "subcategory": <str>}.
        """
        attempt = 0
        while attempt < max_retries:
            try:
                prompt = self.sum_and_cat_prompt + chat_history
                answer = self.ask_model(prompt, model)

                if not answer:
                    raise ValueError("Answer is None")

                return self.parse_str_to_json(answer, ["summary", "category", "subcategory"])

            except Exception as e:
                print(f"Attempt {attempt + 1} failed in model_answer_summary_and_category: {e}")
                attempt += 1

        # if parsing failed, return empty summary, and default category and subcategory
        output = {
            "summary": "",
            "category": "General Questions",
            "subcategory": "Other"
        }
        return output


    def calculate_sentiment_points(self, sentiment: dict[str, float]) -> int:
        """
        Calculates sentiment points based on the sentiment distribution.
        Formula:
            Sentiment Factor = (negative * 2.0) + (neutral * 1.0) + (positive * 0.5)
            Sentiment Points = min(50, max(0, Sentiment Factor * 25))
        Args:
            sentiment (Dict[str, float]): Dictionary with keys 'positive', 'neutral', 'negative' and their corresponding values.
        Returns:
            int: Sentiment points in the range [0, 50].
        """
        sentiment_factor = (sentiment["negative"] * 2.0) + (sentiment["neutral"] * 1.0) + (sentiment["positive"] * 0.5)
        sentiment_points = min(50, max(0, int(sentiment_factor * 25)))
        return sentiment_points

    def calculate_category_points(self, category: str, subcategory: str, weights: dict[str, dict[str, float]]) -> int:
        """
        Calculates category points based on predefined weights for category and subcategory.
        Formula:
            Category Points = Weight * 30
        Args:
            category (str): The category of the query.
            subcategory (str): The subcategory of the query.
            weights (Dict[str, Dict[str, float]]): Dictionary mapping categories and subcategories to their respective weights.
        Returns:
            int: Category points in the range [0, 30].
        """
        weight = weights.get(category, {}).get(subcategory, 0.0)
        category_points = min(30, max(0, int(weight * 30)))
        return category_points

    def calculate_insurance_points(self, insurance_type: int, weights: dict[int, float]) -> int:
        """
        Calculates insurance points based on predefined weights for insurance types.
        Formula:
            Insurance Points = Weight * 20
        Args:
            insurance_type (int): The insurance type ID.
            weights (Dict[int, float]): Dictionary mapping insurance types to their respective weights.
        Returns:
            int: Insurance points in the range [0, 20].
        """
        weight = weights.get(insurance_type, 0.0)
        insurance_points = min(20, max(0, int(weight * 20)))
        return insurance_points

    def calculate_urgency(self, sentiment: dict[str, float], category: str, subcategory: str, insurance_type: int) -> int:
        """
        Calculates the total urgency score based on sentiment, category/subcategory, and insurance type.
        Formula:
            Urgency Score = Sentiment Points + Category Points + Insurance Points
        Args:
            sentiment (Dict[str, float]): Sentiment analysis scores.
            category (str): Query category.
            subcategory (str): Query subcategory.
            insurance_type (int): Type of insurance.
        Returns:
            int: Total urgency score in the range [0, 100].
        """
        sentiment_points = self.calculate_sentiment_points(sentiment)
        category_points = self.calculate_category_points(category, subcategory, self.category_weights)
        insurance_points = self.calculate_insurance_points(insurance_type, self.insurance_weights)

        urgency_score = sentiment_points + category_points + insurance_points
        return urgency_score

    def recommended_message(self, chat_history: str, human_attendant_name: str) -> str:
        """
        Generates a recommended introductory message for the human attendant to send to the customer.

        Args:
            chat_history (str): The history of the conversation with the customer.
            human_attendant_name (str): The name of the human attendant.

        Returns:
            str: The recommended message for the human attendant to send to the customer.
        """
        # Format the prompt with the input data
        question = self.recommended_message_prompt.format(
            human_attendant_name=human_attendant_name,
            chat_history=chat_history,
        )

        # Ask the model to generate the response
        response = self.ask_model(question, self.model_name)

        return response

    def add_to_waiting_list(self, issue_data:dict):
        """
        Adds the customer to the waiting list.
        Args:
            issue_data (dict): The data of the customer in the following format:
            {"sentiment": <dict>, "chat_history": <str>, "human_attendant_name": <str>, "model": <str>,
            "customer_name": <str>, "insurance_type": <int>, "issue_summary": <str>, "query_category": <str>,
            "query_subcategory": <str>, "urgency_score": <int>, "recommended_message": <str>}
        """
        # Extract data from the waiting list
        try:
            with open(self.waiting_list_path, "r") as f:
                waiting_list = json.load(f)
        except FileNotFoundError:
            waiting_list = []

        # Assume the list is ordered by urgency (first element is the most urgent one)
        index = 0
        for i in range(len(waiting_list)):
            # As a tiebreaker, customers who have been waiting the longest have priority.
            if issue_data["urgency_score"] <= waiting_list[i]["urgency_score"]:
                index += 1
            else:
                break

        waiting_list.insert(index, issue_data)
        with open(self.waiting_list_path, "w") as f:
            json.dump(waiting_list, f, indent=2, ensure_ascii=False)


    def run(self, chat_history:str, sentiment:dict, model="llama-3.3-70b-versatile"):
        """
        Extracts information from the chat history (customer name, insurance_type,
        issue summary, issue category and subcategory). After that, generates a
        recommended message for the human attendant to send to the customer.

        Args:
            chat_history (str): The history of the conversation with the customer.
            sentiment (dict): Sentiment analysis scores.

        Returns:
            dict: A dictionary containing the extracted information and the recommended message
            in the following format:
              {"sentiment": <dict>, "chat_history": <str>, "human_attendant_name": <str>,
              "human_attendant_id": <int>, "model": <str>, "customer_name": <str>, "insurance_type": <int>,
              "issue_summary": <str>, "query_category": <str>, "query_subcategory": <str>,
              "urgency_score": <int>, "recommended_message": <str>}
        """
        output = {}
        output["sentiment"] = sentiment
        output["chat_history"] = chat_history
        output["model"] = model

        # Get Customer name and insurance type
        customer_name_and_ins_type = self.model_answer_name_and_ins_type(chat_history, MODEL)
        output["customer_name"] = customer_name_and_ins_type["name"]
        output["insurance_type"] = customer_name_and_ins_type["insurance_type"]
        self.insurance_type = customer_name_and_ins_type["insurance_type"]

        # Get issue summary and query category and query subcategory
        sum_cat_and_subcat = self.model_answer_summary_and_category(chat_history, MODEL)
        output["issue_summary"] = sum_cat_and_subcat["summary"]
        output["query_category"] = sum_cat_and_subcat["category"]
        output["query_subcategory"] = sum_cat_and_subcat["subcategory"]
        self.query_category = sum_cat_and_subcat["category"]

        # Try to find an available agent
        available_agent = self.find_available_human_agent()
        if available_agent is not None:
            output["human_attendant_name"] = available_agent["human_attendant_name"]
            output["human_attendant_id"] = available_agent["id"]
        else:
            output["human_attendant_name"] = ""
            output["human_attendant_id"] = ""


        # Calculate urgency score
        output["urgency_score"] = self.calculate_urgency(sentiment, sum_cat_and_subcat["category"], sum_cat_and_subcat["subcategory"], customer_name_and_ins_type["insurance_type"])

        # Get the recommended message
        output["recommended_message"] = self.recommended_message(chat_history, output["human_attendant_name"])

        # If there is no available human agent, add customer to the waiting list based on urgency
        if output["human_attendant_name"] == "":
            self.add_to_waiting_list(output)

        return output

In [17]:
module = Module4_HumanEscalation()

# Input:
sentiment = {
    "positive": 0.1,
    "neutral": 0.8,
    "negative": 0.1
}
chat_history = "Eu sou o Carlos, e possuo seguro de carro. Preciso falar com um atendente imediatamente."

# Output:
output = module.run(chat_history, sentiment)
print(json.dumps(output, indent=2, ensure_ascii=False))
module.free_human_agent(output["human_attendant_id"])

# Save to the waiting list (just to demonstrate)
module.add_to_waiting_list(output)

{
  "sentiment": {
    "positive": 0.1,
    "neutral": 0.8,
    "negative": 0.1
  },
  "chat_history": "Eu sou o Carlos, e possuo seguro de carro. Preciso falar com um atendente imediatamente.",
  "model": "llama-3.3-70b-versatile",
  "customer_name": "Carlos",
  "insurance_type": 1,
  "issue_summary": "Carlos é um cliente que precisa falar com um atendente humano imediatamente sobre seu seguro de carro.",
  "query_category": "Escalações para Suporte Humano",
  "query_subcategory": "Assistência Urgente",
  "human_attendant_name": "Maria",
  "human_attendant_id": 2,
  "urgency_score": 56,
  "recommended_message": "Olá, Carlos! Aqui é a Maria, da área de seguros do Bradesco Seguros. \nVi que você precisa falar conosco com urgência sobre o seu seguro de carro e quero garantir que estamos priorizando sua solicitudade.\nEstou aqui para ajudar e resolver qualquer problema que você esteja enfrentando o mais rápido possível.\nVamos entender melhor o que está acontecendo com o seu seguro de car

## 2.5 Module 5: Sentiment Analysis

In [18]:
class Module5_SentimentAnalysis(Module):
    def __init__(self, model_name:str=MODEL,
                 sentiment_model="cardiffnlp/twitter-roberta-base-sentiment-latest"):
          super().__init__(model_name)
          self.sentiment_model = sentiment_model

    def translate(self, text: str) -> str:
        """
        Translates a given text from Portuguese to English using an LLM.
        Args:
            text (str): Input text in Portuguese.
        Returns:
            str: Translated text in English.
        """
        prompt = (
            "Translate the following Portuguese sentence to English. "
            "Respond only with the translation in a JSON format like this: {\"translation\": \"<your translation>\"}\n\n"
            f"Sentence: \"{text}\""
        )
        response = self.ask_model(prompt)
        try:
            response = response[response.index("{") : response.index("}") + 1]
            return json.loads(response)["translation"]
        except:
            print("Invalid LLM response:", response)
            return ""


    def sentiment_analysis(self, text: str) -> dict[str, float]:
        """
        Analyzes the sentiment of a given text and returns sentiment scores.
        Args:
            text (str): Input text.
        Returns:
            dict[str, float]: Sentiment scores for positive, neutral, and negative classes.
        Example: {"positive": 0.03, "neutral": 0.92, "negative": 0.05}
        """
        # Translate text to English
        translated_text = self.translate(text)

        # Define tokenizer, config and model
        tokenizer = AutoTokenizer.from_pretrained(self.sentiment_model)
        config = AutoConfig.from_pretrained(self.sentiment_model)
        model = AutoModelForSequenceClassification.from_pretrained(self.sentiment_model)

        # Tokenize input text
        encoded_input = tokenizer(text, return_tensors='pt')

        # Perform sentiment analysis
        output = model(**encoded_input)
        scores = softmax(output[0][0].detach().numpy())

        # Map scores to sentiment labels
        return {"positive": float(scores[2]), "neutral": float(scores[1]), "negative": float(scores[0])}

In [20]:
module = Module5_SentimentAnalysis()
text = "Como posso cancelar meu seguro?"
print(module.sentiment_analysis(text))

{'positive': 0.025908462703227997, 'neutral': 0.8828645348548889, 'negative': 0.09122706204652786}


## 2.6 Module 6: Feedback Collector

In [21]:
class Module6_FeedbackCollector(Module):
    def __init__(self, chat_history:str, human_attendant_name:str, sentiment_analysis:dict,
                 insurance_type:int, issue_summary:str, query_category:str, query_subcategory:str,
                 model_name:str=MODEL, requirements_path:str=f"{STELLAR_path}/requirements/module_6",
                 comments_path:str=f"{STELLAR_path}/outputs/module_6/comments"):
        super().__init__(model_name)
        self.chat_history = chat_history
        self.human_attendant_name = human_attendant_name
        self.sentiment_analysis = sentiment_analysis
        self.insurance_type = insurance_type
        self.issue_summary = issue_summary
        self.query_category = query_category
        self.query_subcategory = query_subcategory
        self.requirements_path = requirements_path
        self.comments_path = comments_path

        # Load info from the requirements path
        self.category_to_team = self.load_json_file(f"{requirements_path}/category_to_team.json")
        self.categorization_prompt = self.load_txt_file(f"{requirements_path}/categorization_prompt.txt")
        self.feedback_questions = self.load_json_file(f"{requirements_path}/feedback_questions.json")
        self.keys = self.load_json_file(f"{requirements_path}/keys.json")
        self.feedback_categories = self.load_json_file(f"{requirements_path}/feedback_categories.json")


    def categorize_comment(self, customer_comment: str) -> list[str]:
        """
        Categorizes a customer comment into feedback categories.

        Args:
            customer_comment (str): The customer comment to categorize.

        Returns:
            List[str]: A list of feedback categories.
        """
        prompt = f"{self.categorization_prompt}\nInput: {customer_comment}\nOutput: "
        response = self.ask_model(prompt, self.model_name)
        categories = []
        for category in self.feedback_categories:
            if category in response:
                categories.append(category)

        if len(categories) > 0:
            return categories
        return ["Other"]


    def comment_routing_and_saving(self, feedback: dict):
        """
        Routes and saves customer feedback based on their categories.

        Args:
            feedback (dict): A dictionary of customer feedback.
        """
        # Get the time
        time = self.get_current_date_time()

        # Write the team-specific feedback
        for key, value in feedback.items():
            if "categories" in value and value["categories"]:
                for category in value["categories"]:
                    team = self.category_to_team.get(category, "Customer Support Team")
                    filename = f"{self.comments_path}/{team.replace(' ', '_').lower()}_feedback.json"
                    entry = {"feedback": value,"feedback_type":key, "time": time}

                    try:
                        with open(filename, "r", encoding="utf-8") as f:
                            data = json.load(f)
                    except FileNotFoundError:
                        data = []

                    # Add the new feedback in te beggining of the list, so that newer feedback appears first
                    data.insert(0, entry)

                    with open(filename, "w", encoding="utf-8") as f:
                        json.dump(data, f, ensure_ascii=False, indent=2)

        # Write the overall feedback: chat_history, human_attendant_name, sentiment_analysis, customer_name, insurance_type, issue_summary, query_category, query_subcategory
        path = f"{self.comments_path}/overall_feedback.json"
        try:
            with open(path, "r", encoding="utf-8") as f:
                data = json.load(f)
        except FileNotFoundError:
            data = []

        entry = {"feedback": feedback, "time": time}
        entry["chat_history"] = self.chat_history
        if self.human_attendant_name != "":
          entry["human_attendant_name"] = self.human_attendant_name
        if self.sentiment_analysis != {}:
          entry["sentiment_analysis"] = self.sentiment_analysis
        entry["insurance_type"] = self.insurance_type
        if self.issue_summary != "":
          entry["issue_summary"] = self.issue_summary
        if self.query_category != "":
          entry["query_category"] = self.query_category
        if self.query_subcategory != "":
          entry["query_subcategory"] = self.query_subcategory

        data.insert(0, entry)
        with open(path, "w", encoding="utf-8") as f:
            json.dump(data, f, ensure_ascii=False, indent=2)

        return None


    def get_customer_feedback(self) -> dict:
        """
        Collects customer feedback by asking multiple-choice questions (1-5 scale) and optional follow-up questions.

        Returns:
            dict: Feedback containing ratings, optional follow-up responses for each question in this format:
            {
                key: {
                    "rating": <int>,
                    "follow_up_response": <str>,
                    "categories": [<str>]
                },
        """
        feedback = {}

        for key, question in zip(self.keys, self.feedback_questions):
            while True:
                try:
                    rating = int(input(f"{question} (1-5): "))
                    if rating < 1 or rating > 5:
                        print("Por favor, digite um número entre 1 e 5.")
                        continue
                    break
                except ValueError:
                    print("Entrada Inválida. Por favor, digite um número entre 1 e 5.")

            follow_up_response = None
            if rating <= 3:
                follow_up_response = input(
                    "Sentimos muito por isso. Você poderia nos contar o que poderia ter sido melhor? ")

            feedback[key] = {
                "rating": rating,
                "follow_up_response": follow_up_response
            }

        # the general rating is the average of the other ratings
        general_rating = round(sum([feedback[key]["rating"] for key in self.keys]) / len(self.keys))
        # Ask for optional general comment
        comment = input("Você gostaria de fazer algum comentário adicional sobre o sua experiência? ")
        feedback["general"] = {"rating": general_rating, "follow_up_response": comment}

        # Categorize each written comment
        for key, value in feedback.items():
          if value["follow_up_response"] is not None and len(value["follow_up_response"]) > 5:
            feedback[key]["categories"] = self.categorize_comment(value["follow_up_response"])
          else:
            feedback[key]["categories"] = []

        # route comments
        self.comment_routing_and_saving(feedback)

        return feedback

In [22]:
chat_history = "Olá, sou o Matheus. Estou muito frustrado com a demora para acionamento do meu seguro de saúde!"
human_attendant_name = "Gabriel"
sentiment = {
    "positive": 0.022862698882818222,
    "neutral": 0.031627094745636,
    "negative": 0.945510176569223404
}
insurance_type = 2
issue_summary = "Matheus precisa de ajuda urgente com o acionamento do seguro de saúde."
query_category = "Sinistros"
query_subcategory = "Abertura de Sinistro"

module = Module6_FeedbackCollector(chat_history, human_attendant_name, sentiment, insurance_type, issue_summary, query_category, query_subcategory)
#feedback = module.get_customer_feedback()

## 2.7 Module 7: Knowledge Base Builder

In [23]:
class Module7_KnowledgeBaseBuilder(Module):
    def __init__(self, chat_history:str, insurance_type:int, model_name:str=MODEL, prompt_path:str=f"{STELLAR_path}/requirements/module_7/prompt.txt",
                 review_query_path:str=f"{STELLAR_path}/outputs/module_7/review_query.json",
                 int_to_ins_type_path:str=f"{STELLAR_path}/requirements/module_7/int_to_ins_type.json"):
        super().__init__(model_name)
        self.chat_history = chat_history
        self.insurance_type = insurance_type
        self.model_name = model_name
        self.review_query_path = review_query_path

        self.prompt = self.load_txt_file(prompt_path)
        self.int_to_ins_type = self.load_json_file(int_to_ins_type_path)


    def add_to_review_queue(self, draft_faq_entry: dict) -> None:
        """
        Adds generated draft FAQ entries to a queue for human review.

        Args:
          draft_faq_entry (list): A list of dictionaries representing draft FAQ entries.
        """
        try:
          with open(self.review_query_path, "r") as f:
              review_query = json.load(f)
        except:
          print("Error loading existing review query. Creating new one.")
          review_query = []

        time = self.get_current_date_time()

        new_faq_entry = {
            "draft_faq": draft_faq_entry,
            "chat_history": self.chat_history,
            "time": time,
            "status": "pending"
        }
        review_query.append(new_faq_entry)
        try:
          with open(self.review_query_path, "w") as f:
              json.dump(review_query, f, indent=4, ensure_ascii=False)
        except:
            print("Error writing to review query file. Changes not saved.")


    def parse_response(self, response:str)->dict:
        """
        Parses the response from the model and extracts a dict of fields "question"
        and "draft_answer". If the answer is not in json format or the fields are
        not present, the function raises an exception.

        Args:
            response (str): The response from the model as a string.

        Returns:
            dict: A dictionary containing the parsed fields.

        Raises:
            Exception: If the response is not in JSON format or the fields are missing.
        """
        try:
            json_data = json.loads(response)
            if "question" not in json_data or "draft_answer" not in json_data:
                raise Exception("Missing 'question' or 'draft_answer' field in LLM response.")
            if not isinstance(json_data["question"], str) or not isinstance(json_data["draft_answer"], str):
                raise Exception("Invalid data type for 'question' or 'draft_answer' field.")
            if not json_data["question"].strip() or not json_data["draft_answer"].strip():
                raise Exception("Empty 'question' or 'draft_answer' field.")
            if len(json_data) != 2:
                raise Exception("LLM response has more than 2 fields.")
            return json_data
        except json.JSONDecodeError:
            # if the response has more than 100 characters, truncate it
            if len(response) > 100:
                response = response[:100] + "..."
            print("Response: " + response)
            raise Exception("LLM response is not in JSON format.")


    def generate_draft_faq(self)->dict:
        """
        Creates a new draft FAQ entry in the database. Allow a maximum of 2 retries.

        Returns:
            dict: A dictionary containing the fields "question" and "draft_answer".
            If the draft FAQ cannot be generated after 3 attempts, returns None.
        """
        retries = 0
        while retries < 3:
            try:
                prompt = self.prompt + f"\n{self.chat_history}"
                response = self.ask_model(prompt, self.model_name)
                draft_faq = self.parse_response(response)
                draft_faq["category"] = self.int_to_ins_type.get(str(self.insurance_type), "default")

                self.add_to_review_queue(draft_faq)
                return draft_faq
            except Exception as e:
                retries += 1
                if retries == 3:
                    raise Exception(f"Failed to generate draft FAQ after {retries} attempts: {e}")
                print(f"Error generating draft FAQ: {e}. Retrying...")

        return None


    def review_pending_faqs(self) -> None:
        """
        Reviews pending FAQ entries and allows the human agent to approve, disapprove, rewrite, or leave them as pending.
        Approved or disapproved FAQs are saved in the respective files and removed from the review queue.
        """
        try:
            with open(self.review_query_path, "r") as f:
                review_query = json.load(f)
        except FileNotFoundError:
            print("No pending FAQs found.")
            return

        updated_review_query = []
        for entry in review_query:
            if entry["status"] != "pending":
                updated_review_query.append(entry)
                continue

            print("FAQ review:")
            print(f"Question: {entry['draft_faq']['question']}")
            print(f"Draft Answer: {entry['draft_faq']['draft_answer']}")
            print(f"Category: {entry['draft_faq']['category']}")
            print("\nOptions")
            print("a) Rewrite the answer.")
            print("b) Approve FAQ without changes.")
            print("c) Disapprove the FAQ.")
            print("d) Skip to the next FAQ (and leave it pending).")
            decision = input("Enter your choice (a/b/c/d): ").strip().lower()

            if decision == "a":
                new_answer = input("Enter your answer to the question: ").strip()
                entry["draft_faq"]["draft_answer"] = new_answer
                entry["status"] = "approved"
                approved_path = self.review_query_path.replace("review_query", "approved_faqs")
                self.save_faq_and_upload_chroma(entry, approved_path)


            elif decision == "b":
                entry["status"] = "approved"
                approved_path = self.review_query_path.replace("review_query", "approved_faqs")
                self.save_faq_and_upload_chroma(entry, approved_path)

            elif decision == "c":
                entry["status"] = "rejected"
                rejected_path = self.review_query_path.replace("review_query", "rejected_faqs")
                self.save_faq_and_upload_chroma(entry, rejected_path)

            elif decision == "d":
                print("Keeping the status as pending.")
                updated_review_query.append(entry)

            else:
                print("Invalid input. Keeping the status as pending.")
                updated_review_query.append(entry)

        with open(self.review_query_path, "w") as f:
            json.dump(updated_review_query, f, indent=2, ensure_ascii=False)

    def save_faq_and_upload_chroma(self, entry: dict, path: str) -> None:
        """
        Saves the approved or disapproved FAQ to the respective JSON file.
        If the FAQ was approved, adds it to the FAQ vector database.

        Args:
            entry (dict): The FAQ entry to save in this format:
                {"draft_faq": {"question": <str>, "draft_answer": <str>, "category": <str>}}
            path (str): The path to the JSON file where the FAQ should be saved.
        """
        # Save to the Chroma vector database
        if entry["status"] == "approved":
            new_faq = entry["draft_faq"]

            formated_faq = {
                "category": new_faq["category"],
                "question": new_faq["question"],
                "answer": new_faq["draft_answer"]
            }
            self.update_faq_vector_database(formated_faq)

        # Save to the json file
        try:
            with open(path, "r") as f:
                faqs = json.load(f)
        except FileNotFoundError:
            faqs = []

        faqs.append(entry)
        with open(path, "w") as f:
            json.dump(faqs, f, indent=2, ensure_ascii=False)


    def update_faq_vector_database(self, approved_faq: dict) -> None:
        """
        Updates the FAQ vector database with a single new approved FAQ by calling Module2's add_new_faq_to_chroma function.

        Args:
            approved_faq (dict): The approved FAQ entry to add to the vector database in this format:
                {"category": <str>, "question": <str>, "draft_answer": <str> }.
        """
        print("Updating FAQ vector database...")
        module_2 = Module2_RAG()
        module_2.add_new_faq_to_chroma(approved_faq)

In [24]:
chat_history = "Preciso acionar meu seguro dental, mas não sei como faço para encontrar dentistas credenciados perto de mim."
ins_type = 4

module = Module7_KnowledgeBaseBuilder(chat_history, ins_type)
draft_faq = module.generate_draft_faq()
print(json.dumps(draft_faq, indent=4, ensure_ascii=False))

{
    "question": "Como encontrar dentistas credenciados perto de mim para acionar meu seguro dental?",
    "draft_answer": "Você pode encontrar dentistas credenciados em nosso site ou através da nossa Central de Relacionamento, que podem fornecer informações sobre a rede de profissionais credenciados da Bradesco Seguros.",
    "category": "dental_insurance"
}


In [25]:
module_2 = Module2_RAG()
new_draft_faq = {
    "question": "Como acionar meu seguor viagem em caso de bagagem desaparecida?",
    "answer": "Você pode acessar nosso app, ir em 'meus seguros'. 'seguro viagem' e 'perdi minha bagagem'.",
    "category": "travel_insurance"
}
#module_2.add_new_faq_to_chroma(new_draft_faq)

In [26]:
module_7 = Module7_KnowledgeBaseBuilder(chat_history ="", insurance_type=None)
#module_7.review_pending_faqs()

## 2.8. Module 8: Resolution Verifier

In [27]:
class Module8_ResolutionVerifier(Module):
    def __init__(self, chat_history:str, model_name:str=MODEL, prompt_path:str=f"{STELLAR_path}/requirements/module_8/prompt_resolution_verifier_question.txt"):
        super().__init__(model_name)
        self.chat_history = chat_history
        self.prompt_path = prompt_path
        self.prompt = self.load_txt_file(self.prompt_path)

    def generate_verification_question(self) -> str:
        """
        Generates a tailored verification question for the customer based on the parsed chat history.

        Returns:
            str: A personalized question to verify if the issue was resolved.
        """
        prompt = self.prompt.format(chat_history=self.chat_history)
        return self.ask_model(prompt, self.model_name)

    def verify_issue_resolution(self, verification_question:str) -> tuple[bool, str]:
        """
        Verifies with the customer if the issue was resolved based on the provided verification question.

        Args:
            verification_question (str): The question to verify if the issue was resolved.

        Returns:
            bool: True if the issue was resolved, False otherwise.
            str: The chat history from the module with the verification question and answer.
        """
        chat_history = "\n\033[34mModule 8\033[0m: " + verification_question
        answer = ""
        while answer != "S" and answer != "N":
            answer = input(verification_question + " (S/N): \n")
            if answer != "":
                answer = answer.upper().strip()[0]
            if answer != "S" and answer != "N":
                print("Resposta Inválida. Favor responder com 'S' ou 'N' somente.")

        chat_history += "\n\033[32mUser\033[0m: " + answer
        return answer == "S", chat_history



In [28]:
chat_history = """"Olá, eu sou o José. Estou com uma dúvida relacionada ao acionamento do meu seguro de Carro. Onde encontro a opção de acionamento?
Oi, José! Eu sou o assistente virtual do Bradesco Seguros. Com base em uma busca à nossa base de dados, no seu caso, Você deve entrar no App Bradesco Seguros e entrar na página "meus seguros", onde encontrará as informações sopbre o acionamento do seu seguro de carro."""
module = Module8_ResolutionVerifier(chat_history=chat_history)
question = (module.generate_verification_question())
print(question)

José, você conseguiu encontrar a opção de acionamento do seu seguro de carro no App Bradesco Seguros?


## 2.9 Module 9: Compliance Verifier

In [29]:
class Module9_ComplianceVerifier(Module):
    def __init__(self, user_question:str, llm_response:str, model_name:str="gemma2-9b-it",
                 prompt_path:str=f"{STELLAR_path}/requirements/module_9/prompt_compliance_verifier_1.txt",
                 log_path:str=f"{STELLAR_path}/outputs/module_9/logs/violations.json"):
        super().__init__(model_name)
        self.user_question = user_question
        self.llm_response = llm_response
        self.prompt_path = prompt_path
        self.log_path = log_path
        self.prompt = self.load_prompt()

    def load_prompt(self):
        with open(self.prompt_path, "r") as f:
            prompt = f.read()
        return prompt

    def format_input(self) -> str:
        """
        Formats the input for the LLM by combining the chat history, LLM response, and the loaded prompt.
        Returns the formatted input string ready to be sent to the model.
        """
        dict_input = {"query":self.user_question, "response":self.llm_response}
        prompt = self.prompt + json.dumps(dict_input, indent=2, ensure_ascii=False)
        return prompt

    def parse_response(self, llm_output: str) -> dict:
        """
        Parses the output from the model and extracts a dict of fields "compliance"
        and "violation". If the answer is not in json format or the fields are
        not present, the function raises an exception.

        Args:
            llm_output (str): The output from the model as a string.

        Returns:
            dict: A dictionary containing the parsed fields.

        Raises:
            Exception: If the response is not in JSON format or the fields are missing.
        """
        try:
            if llm_output is None:
                raise Exception("LLM output is None.")
            dict_response = llm_output[llm_output.index("{"):llm_output.index("}")+1]
            json_data = json.loads(dict_response)
            if "compliance" not in json_data or "violation" not in json_data:
                raise Exception("Missing 'compliance' or 'violation' field in LLM response.")
            if not isinstance(json_data["compliance"], bool) or not isinstance(json_data["violation"], str):
                raise Exception("Invalid data type for 'compliance' or 'violation' field.")
            if not json_data["violation"].strip():
                raise Exception("Empty 'violation' field.")
            if len(json_data) != 2:
                raise Exception("LLM response has more than 2 fields.")
            return json_data
        except json.JSONDecodeError:
            print("Response: " + llm_output)
            raise Exception("LLM response is not in JSON format.")


    def run_verification(self) -> dict:
        """
        Executes the compliance verification process.
        Combines input formatting, sending the request to the model, and analyzing the response.
        Returns:
            dict: The final compliance analysis, including compliance status and violations in this format:
                {"compliance": <bool>, "violation": <str>}

        """
        input = self.format_input()
        llm_output = self.ask_model(input, self.model_name)
        for i in range(3):
            if llm_output is None:
                print(f"LLM output is None, retrying... ({i+1}/3)")
                continue
            try:
                results = self.parse_response(llm_output)
                self.log_violations(results)
                return results
            except Exception as e:
                print(f"Error parsing response: {e}")

        return {"compliance": False, "violation": f"ERROR: unable to parse the LLM response: {llm_output}"}

    def log_violations(self, results: dict) -> None:
        """
        Logs any detected violations to a specified log file.
        Args:
            results (dict): The compliance verification results, including any violations.
        """
        # If there is not a violation, return
        if results["compliance"]:
            return

        # If there is a violation, log it
        # Set the time
        time = self.get_current_date_time()

        # Open the log file
        try:
          with open(self.log_path, "r") as f:
              log_data = json.load(f)
        except FileNotFoundError:
            log_data = []

        log = {
            "id": len(log_data) + 1,
            "time": time,
            "query": self.user_question,
            "response": self.llm_response,
            "violation": results["violation"]
        }

        log_data.append(log)

        # Save the log data
        with open(self.log_path, "w") as f:
            json.dump(log_data, f, indent=2, ensure_ascii=False)


In [30]:
module = Module9_ComplianceVerifier(user_question="Como consigo fazer o cancelamento do meu seguro dental?", llm_response="Infelizmente não tenho a resposta para sua pergunta.")
print(module.run_verification())

module = Module9_ComplianceVerifier(user_question="Como consigo fazer a renovação do meu seguro de carro?", llm_response="Basta enviar um email para renovacoes_auto@bradesco.com.")
print(module.run_verification())

module = Module9_ComplianceVerifier(user_question="Como consigo fazer a ativação do meu seguro de viagem?", llm_response="Se vira aí.")
print(module.run_verification())

{'compliance': False, 'violation': 'Resposta incompleta ou vaga'}
{'compliance': True, 'violation': 'Sem violação'}
{'compliance': False, 'violation': 'Tom inadequado'}


# 3. System

## 3.1 CUSTOMER (query workflow)

This is what the customer sees and how he interacts with the system

### 3.3.1 Workflow Implementation

In [31]:
def send_message_to_the_user(message):
    print(message)

def get_user_input(prompt):
    return input(prompt)

class WorkflowManager:
    def __init__(self):
        self.chat_history:str = ""
        self.logs_path = f"{STELLAR_path}/logs/workflow_logs.json"
        self.sequence_of_modules = []

    def save_logs(self, logs):
        try:
            with open(self.logs_path, "r") as f:
                logs_data = json.load(f)
        except FileNotFoundError:
            logs_data = []

        logs_data.append(logs)
        with open(self.logs_path, "w") as f:
            json.dump(logs_data, f, indent=2, ensure_ascii=False)


    def _log_execution(self, module_number, start_time, response):
        execution_time = time.time() - start_time
        log_entry = {
            "module": module_number,
            "execution_time": execution_time,
            "timestamp": datetime.now().isoformat(),
            "response": response
        }
        return log_entry

    def process_query(self, query):
        """
        Args:
            query (str): customer's initial query
        Returns:
            dict: a dict with the final state of the program and the execution logs in this format:
                {"chat_history": <str>,
                "sequence_of_modules": <list>,
                "final_state": <dict>,
                "execution_logs": <list>}
        """
        logs = []
        current_state = {}
        self.chat_history += f"\n\033[32mUser\033[0m: {query}"

        # Start with Module 1
        self.sequence_of_modules.append(1)
        module1 = Module1_InitialClassifier()
        start_time = time.time()
        initial_classification:int = module1.classify_question(query)['category']
        logs.append(self._log_execution(1, start_time, initial_classification))
        current_state['classification'] = initial_classification

        # Branch based on Module 1's classification
        if initial_classification == 0:  # FAQ path
            send_message_to_the_user("Obrigado pela pergunta! Estamos buscando uma resposta para ela no banco de perguntas frequentes.")
            self.chat_history += f"\n\033[36mSistema\033[0m: Estamos buscando uma resposta para ela no banco de perguntas frequentes."
            current_state = self._handle_faq_path(query, current_state, logs)
        elif initial_classification == 1:  # Contact info path
            send_message_to_the_user("Obrigado pela pergunta! Estamos buscando uma resposta para ela no banco de informações de contato.")
            self.chat_history += f"\n\033[36mSistema\033[0m: Obrigado pela pergunta! Estamos buscando uma resposta para ela no banco de informações de contato."
            current_state = self._handle_contact_path(query, current_state, logs)
        elif initial_classification == 2:  # Human escalation path
            send_message_to_the_user("Vamos te encaminhar a um atendente humano. Aguarde um instante.")
            self.chat_history += "\n\033[36mSistema\033[0m: Vamos te encaminhar a um atendente humano. Aguarde um instante."
            current_state = self._handle_human_escalation_path(query, current_state, logs)
        else: # Question was tagged as not relevant
            print("Pergunta Não pertinente.")
            return



        full_log = {
            "chat_history": self.chat_history,
            "sequence_of_modules": self.sequence_of_modules,
            "final_state": current_state,
            "execution_logs": logs
        }
        self.save_logs(full_log)
        return full_log

    def _handle_faq_path(self, query, state, logs):

        # Module 2 - RAG
        self.sequence_of_modules.append(2)
        module2 = Module2_RAG()
        start_time = time.time()
        rag_response:str = module2.process_question(query)
        logs.append(self._log_execution(2, start_time, rag_response))
        state['rag_response'] = rag_response


        # Module 9 - Compliance
        self.sequence_of_modules.append(9)
        module9 = Module9_ComplianceVerifier(query, rag_response)
        start_time = time.time()
        compliance_check:dict[bool,str] = module9.run_verification()
        logs.append(self._log_execution(9, start_time, compliance_check))

        if not compliance_check['compliance']:
            rag_response = self._handle_compliance_failure_faq_path(query, state, logs)
            if rag_response == "":
                send_message_to_the_user("Infelizmente, houve um erro na busca pela sua resposta no banco de perguntas frequentes. Vamos te encaminhar a um atendente humano.")
                self.chat_history += "\n\033[36mSistema\033[0m: Infelizmente, houve um erro na busca pela sua resposta. Vamos te encaminhar a um atendente humano."
                return self._handle_human_escalation_path(query, state, logs)

        send_message_to_the_user(rag_response)
        self.chat_history += f"\n\033[34mModulo 2\033[0m: {rag_response}"

        # Module 8 - Resolution
        self.sequence_of_modules.append(8)
        module8 = Module8_ResolutionVerifier(self.chat_history)
        start_time = time.time()
        verification_question = module8.generate_verification_question()
        logs.append(self._log_execution(8, start_time, verification_question))

        resolution_check, chat_history_module8 = module8.verify_issue_resolution(verification_question)
        self.chat_history += chat_history_module8
        state['resolution_check_faq_path'] = resolution_check


        if not resolution_check:
            send_message_to_the_user("É uma pena sua questão não ter sido resolvida. Vamos te encaminhar a um atendente humano.")
            self.chat_history += "\n\033[36mSistema\033[0m: É uma pena sua questão não ter sido resolvida. Vamos te encaminhar a um atendente humano."
            return self._handle_human_escalation_path(query, state, logs)

        # If the issue is resolved, collect feedback
        send_message_to_the_user("Que bom que sua questão tenha sido resolvida! Agora, poderia nos fornecer uma avaliação rápida para melhorar o atendimento?")
        self.chat_history += "\n\033[36mSistema\033[0m: Que bom que sua questão tenha sido resolvida! Agora, poderia nos fornecer uma avaliação rápida para melhorar o atendimento?"

        return self._collect_feedback(query, state, logs)

    def _handle_contact_path(self, query, state, logs):
        # Module 3 - Contact Info
        self.sequence_of_modules.append(3)
        module3 = Module3_ContactInfo()
        start_time = time.time()
        contact_response, chat_history_module3 = module3.ask_module(query)
        logs.append(self._log_execution(3, start_time, contact_response))
        state['contact_response'] = contact_response
        self.chat_history += chat_history_module3

        # Module 9 - Compliance
        self.sequence_of_modules.append(9)
        module9 = Module9_ComplianceVerifier(query, contact_response)
        start_time = time.time()
        compliance_check:dict[bool,str] = module9.run_verification()
        logs.append(self._log_execution(9, start_time, compliance_check))

        if not compliance_check['compliance']:
            contact_response = self._handle_compliance_failure_contact_info_path(query, state, logs)
            if contact_response == "":
                send_message_to_the_user("Infelizmente, houve um erro na busca pela sua resposta. Vamos tentar procurá-la no banco de perguntas frequentes.")
                self.chat_history += "\n\033[36mSistema\033[0m: Infelizmente, houve um erro na busca pela sua resposta. Vamos tentar procurá-la no banco de perguntas frequentes."
                return self._handle_faq_path(query, state, logs)

        send_message_to_the_user(contact_response)
        self.chat_history += f"\n\033[34mModulee 3\033[0m: {contact_response}"

        # Module 8 - Resolution
        self.sequence_of_modules.append(8)
        module8 = Module8_ResolutionVerifier(self.chat_history)
        start_time = time.time()
        verification_question = module8.generate_verification_question()
        logs.append(self._log_execution(8, start_time, verification_question))
        resolution_check, chat_history_module8 = module8.verify_issue_resolution(verification_question)
        self.chat_history += chat_history_module8
        state['resolution_check_contact_path'] = resolution_check


        if not resolution_check:
            # Try FAQ path as fallback
            send_message_to_the_user("É uma pena sua questão não ter sido resolvida. Vamos te encaminhar ao sistema de busca de perguntas frequentes.")
            self.chat_history += "\n\033[36mSistema\033[0m: É uma pena sua questão não ter sido resolvida. Vamos te encaminhar ao sistema de busca de perguntas frequentes."
            return self._handle_faq_path(query, state, logs)

        # If the issue is resolved, collect feedback
        send_message_to_the_user("Que bom que sua questão tenha sido resolvida! Agora, poderia nos fornecer uma avaliação rápida para melhorar o atendimento?")
        self.chat_history += "\n\033[36mSistema\033[0m: Que bom que sua questão tenha sido resolvida! Agora, poderia nos fornecer uma avaliação rápida para melhorar o atendimento?"

        return self._collect_feedback(query, state, logs)

    def _handle_human_escalation_path(self, query, state, logs):

        # Module 5 - Sentiment
        self.sequence_of_modules.append(5)
        module5 = Module5_SentimentAnalysis()
        start_time = time.time()
        sentiment = module5.sentiment_analysis(query)
        logs.append(self._log_execution(5, start_time, sentiment))
        state['sentiment'] = sentiment

        # Module 4 - Human Escalation
        self.sequence_of_modules.append(4)
        module4 = Module4_HumanEscalation()
        start_time = time.time()
        issue_data = module4.run(self.chat_history, sentiment)
        logs.append(self._log_execution(4, start_time, issue_data))

        # Module 7 - Knowledge Base
        self.sequence_of_modules.append(7)
        insurance_type = issue_data.get('insurance_type', 0)
        module7 = Module7_KnowledgeBaseBuilder(self.chat_history, insurance_type)
        start_time = time.time()
        kb_update = module7.generate_draft_faq()
        logs.append(self._log_execution(7, start_time, kb_update))

        # if issue_data["human_attendant_name"] = "", the customer was added to the waiting list
        if issue_data["human_attendant_name"] == "":
            send_message_to_the_user("Não há nenhum atendente disponível no momento. Você será adicionado à fila de espera.")
            self.chat_history += "\n\033[36mSistema\033[0m: Não há nenhum atendente disponível no momento. Você será adicionado à fila de espera."
            return state

        human_attendant_name = issue_data["human_attendant_name"]
        human_attendant_id = issue_data["human_attendant_id"]
        state['human_attendant_name'] = human_attendant_name
        state['human_attendant_id'] = human_attendant_id

        send_message_to_the_user(f"Você está sendo encaminhado para o(a) atendente {human_attendant_name}. Aguarde um instante.")

        send_message_to_the_user(issue_data["recommended_message"])
        self.chat_history += f"\n\033[34mModulee Humano ({human_attendant_name})\033[0m: {issue_data['recommended_message']}"

        ### Human Module interacts with the customer ###

        # After interaction is done, make the human Module available to help other customers
        module4.free_human_agent(human_attendant_id)

        sentiment_analysis = issue_data.get('sentiment_analysis', {})
        issue_summary = issue_data.get('issue_summary', "")
        query_category = issue_data.get('query_category', "")
        query_subcategory = issue_data.get('query_subcategory', "")

        return self._collect_feedback(query, state, logs, sentiment_analysis, insurance_type, issue_summary, query_category, query_subcategory, human_attendant_name)

    def _handle_compliance_failure_faq_path(self, query, state, logs):
        """
        Returns:
            rag_response:str: The response from the RAG module. Or an empty string
              if the RAG module fails to generate a compliant response.
        """
        # Try once again to generate the answer

        # Module 2 - RAG
        self.sequence_of_modules.append(2)
        module2 = Module2_RAG()
        start_time = time.time()
        rag_response:str = module2.process_question(query)
        logs.append(self._log_execution(2, start_time, rag_response))
        state['rag_response_after_compliance_retry'] = rag_response

        # Check if the new response is compliant

        # Module 9 - Compliance
        self.sequence_of_modules.append(9)
        module9 = Module9_ComplianceVerifier(query, rag_response)
        start_time = time.time()
        compliance_check:dict[bool,str] = module9.run_verification()
        logs.append(self._log_execution(9, start_time, compliance_check))

        if not compliance_check['compliance']:
            return ""

        return rag_response

    def _handle_compliance_failure_contact_info_path(self, query, state, logs):
        """
        Returns:
            contact_response:str: The response from the contact info module. Or an empty string
              if the contact info module fails to generate a compliant response.
        """
        # Try once again to generate the answer

        # Module 3 - Contact Info
        self.sequence_of_modules.append(3)
        module3 = Module3_ContactInfo()
        start_time = time.time()
        contact_response, chat_history_module3 = module3.ask_module(query)
        logs.append(self._log_execution(3, start_time, contact_response))
        state['contact_response_after_compliance_retry'] = contact_response

        # Check if the new response is compliant

        # Module 9 - Compliance
        self.sequence_of_modules.append(9)
        module9 = Module9_ComplianceVerifier(query, contact_response)
        start_time = time.time()
        compliance_check:dict[bool,str] = module9.run_verification()
        logs.append(self._log_execution(9, start_time, compliance_check))

        if not compliance_check['compliance']:
            return ""

        return contact_response


    def _collect_feedback(self, query, state, logs, sentiment_analysis:dict={}, insurance_type:int=0, issue_summary:str="", query_category:str="", query_subcategory:str="", human_attendant_name:str=""):
        # Module 6 - Feedback
        self.sequence_of_modules.append(6)
        module6 = Module6_FeedbackCollector(self.chat_history, human_attendant_name, sentiment_analysis, insurance_type, issue_summary, query_category, query_subcategory)
        start_time = time.time()
        feedback = module6.get_customer_feedback()
        logs.append(self._log_execution(6, start_time, feedback["general"]))
        state['general_feedback'] = feedback["general"]
        return state

### 3.3.2 Workflow Examples

In [32]:
query = "Eu sou o José e gostaria de saber como funciona o seguro dental Bradesco." # 0
# query = "Onde posso ligar para renovar meu seguro de Saúde?" # 1
# query = "Eu sou o Gabriel, do plano de seguro de vida. Preciso da ajuda de um atendente urgentemente." # 2
# query = "Qual é o maior animal do mundo?" # 3 - Pergunta não pertinente

workflow = WorkflowManager()
final_state_and_logs = workflow.process_query(query)

Obrigado pela pergunta! Estamos buscando uma resposta para ela no banco de perguntas frequentes.
Resposta: Olá, José! De acordo com as informações fornecidas, o plano Bradesco Dental é um convênio odontológico que garante acesso a tratamentos odontológicos básicos, como limpeza e tratamento de cáries, com uma ampla cobertura nacional de mais de 27 mil dentistas credenciados. Ele funciona de forma semelhante a um plano de saúde, oferecendo uma variedade de procedimentos, incluindo extração, canal e cirurgias realizadas em consultório. Você pode encontrar mais informações sobre a cobertura completa do plano no portal do Bradesco.
Fontes (ids das FAQs): [41, 36, 42, 38, 40]
Espero ter ajudado, José! Você conseguiu entender como funciona o seguro dental Bradesco? (S/N): 
S
Que bom que sua questão tenha sido resolvida! Agora, poderia nos fornecer uma avaliação rápida para melhorar o atendimento?
Quão satisfeito você está com a resolução do seu problema? (1-5): 5
Como você avaliaria o atendi

### 3.3.3 Log example (chat history, sequence of modules, latency of each module, etc.)

In [33]:
print(json.dumps(final_state_and_logs, indent=4, ensure_ascii=False))

{
    "chat_history": "\n\u001b[32mUser\u001b[0m: Eu sou o José e gostaria de saber como funciona o seguro dental Bradesco.\n\u001b[36mSistema\u001b[0m: Estamos buscando uma resposta para ela no banco de perguntas frequentes.\n\u001b[34mModulo 2\u001b[0m: Resposta: Olá, José! De acordo com as informações fornecidas, o plano Bradesco Dental é um convênio odontológico que garante acesso a tratamentos odontológicos básicos, como limpeza e tratamento de cáries, com uma ampla cobertura nacional de mais de 27 mil dentistas credenciados. Ele funciona de forma semelhante a um plano de saúde, oferecendo uma variedade de procedimentos, incluindo extração, canal e cirurgias realizadas em consultório. Você pode encontrar mais informações sobre a cobertura completa do plano no portal do Bradesco.\nFontes (ids das FAQs): [41, 36, 42, 38, 40]\n\u001b[34mModule 8\u001b[0m: Espero ter ajudado, José! Você conseguiu entender como funciona o seguro dental Bradesco?\n\u001b[32mUser\u001b[0m: S\n\u001b[36mS

## 3.2 HUMAN AGENT (Service list)

Choose the correct human agent based on the query category and insurance type

### 3.2.1 Human_agent class - for the human agent to interact with the customer

In [34]:
insurance_types = {
    0: "Standart",
    1: "Auto",
    2: "Health",
    3: "Life",
    4: "Dental",
    5: "Social Security",
    6: "Residential",
}
category_mapping = ["Policy Management", "Claims", "Payments", "General Questions", "Technical Problems", "Human Support Escalations", "Regulatory or Compliance Questions"]

class Human_agent:
    def __init__(self, human_attendant_name, insurance_type:int=0, query_category:str="General Questions",
                 status:str="Available",
                 waiting_list_path:str=f"{STELLAR_path}/outputs/module_4/waiting_list.json"):
        self.human_attendant_name = human_attendant_name
        self.insurance_type = insurance_type
        self.status = status
        self.query_category = query_category
        self.waiting_list_path = waiting_list_path

    def find_customer(self):
        """
        This is the function a human agent can use to find a customer in the waiting list.
        """
        try:
            with open(self.waiting_list_path, "r") as f:
                waiting_list = json.load(f)
        except FileNotFoundError:
            waiting_list = []

        if len(waiting_list) == 0:
            print("There are no appointments in the queue.")
            return None

        # if there is a customer that is a perfect match for this agent, serve this customer
        for i in range(len(waiting_list)):
            if waiting_list[i]["insurance_type"] == self.insurance_type and waiting_list[i]["query_category"] == self.query_category:
                customer = waiting_list.pop(i)
                with open(self.waiting_list_path, "w") as f:
                    json.dump(waiting_list, f, indent=2, ensure_ascii=False)
                self.customer_service(customer)
                return

        # Check for partial matches
        for i in range(len(waiting_list)):
          if waiting_list[i]["insurance_type"] == self.insurance_type:
              customer = waiting_list.pop(i)
              with open(self.waiting_list_path, "w") as f:
                  json.dump(waiting_list, f, indent=2, ensure_ascii=False)
              self.customer_service(customer)
              return

          if waiting_list[i]["query_category"] == self.query_category:
              customer = waiting_list.pop(i)
              with open(self.waiting_list_path, "w") as f:
                  json.dump(waiting_list, f, indent=2, ensure_ascii=False)
              self.customer_service(customer)
              return

        # if there is no partial match, serve the customer with the highest urgency in the waiting list
        waiting_list.sort(key=lambda x: x["urgency_score"], reverse=True)
        customer = waiting_list.pop(0)
        with open(self.waiting_list_path, "w") as f:
            json.dump(waiting_list, f, indent=2, ensure_ascii=False)
        self.customer_service(customer)
        return

    def customer_service(self, issue_data:dict):
        """
        This is how the human agent interacts with the customer. First, he receives the issue data,
        and then he can start the interaction with the customer.
        Args:
            issue_data (dict): a dict with the issue data in this format:
            {"sentiment": <dict>, "chat_history": <str>, "human_attendant_name": <str>, "model": <str>,
            "customer_name": <str>, "insurance_type": <int>, "issue_summary": <str>, "query_category": <str>,
            "query_subcategory": <str>, "urgency_score": <int>, "recommended_message": <str>}
        """
        print(f"\033[34mHuman Module ({issue_data['human_attendant_name']})\033[0m: {issue_data['recommended_message']}")

        # From now on, the human agent should take control of the interaction
        return

In [35]:
human_agent = Human_agent("Marcos", insurance_type=0, query_category="Perguntas Gerais")
human_agent.find_customer()

[34mHuman Module (Letícia)[0m: Hello Doug, this is Letícia, your attendant from the health insurance area of Bradesco Seguros. I'm here to listen and help you with any questions or concerns you may have regarding your health insurance. I'd love to understand what's on your mind and how I can assist you today. What would you like to talk about regarding your health insurance coverage?


### 3.2.2 Creating some examples of human agents

In [36]:
human_agents = []
human_agents.append(Human_agent("Marcos", insurance_type=0, query_category="General Questions"))
human_agents.append(Human_agent("Maria", insurance_type=1, query_category="Claims"))
human_agents.append(Human_agent("João", insurance_type=2, query_category="Policy Management"))
human_agents.append(Human_agent("Ana", insurance_type=3, query_category="Payments"))
human_agents.append(Human_agent("Pedro", insurance_type=4, query_category="General Questions"))
human_agents.append(Human_agent("Carla", insurance_type=5, query_category="Human Support Escalations"))
human_agents.append(Human_agent("Lucas", insurance_type=6, query_category="Regulatory or Compliance Questions"))
human_agents.append(Human_agent("Fernanda", insurance_type=0, query_category="Claims"))
human_agents.append(Human_agent("Rafael", insurance_type=1, query_category="Policy Management"))
human_agents.append(Human_agent("Isabela", insurance_type=2, query_category="Payments"))
human_agents.append(Human_agent("Gustavo", insurance_type=3, query_category="General Questions"))
human_agents.append(Human_agent("Juliana", insurance_type=4, query_category="Human Support Escalations"))
human_agents.append(Human_agent("Diego", insurance_type=5, query_category="Regulatory or Compliance Questions"))
human_agents.append(Human_agent("Larissa", insurance_type=6, query_category="Claims"))
human_agents.append(Human_agent("Bruno", insurance_type=0, query_category="Policy Management"))
human_agents.append(Human_agent("Amanda", insurance_type=1, query_category="Payments"))
human_agents.append(Human_agent("Ricardo", insurance_type=2, query_category="General Questions"))
human_agents.append(Human_agent("Camila", insurance_type=3, query_category="Human Support Escalations"))
human_agents.append(Human_agent("Gabriel", insurance_type=4, query_category="Regulatory or Compliance Questions"))
human_agents.append(Human_agent("Renata", insurance_type=5, query_category="Claims"))
human_agents.append(Human_agent("Felipe", insurance_type=6, query_category="Policy Management"))
human_agents.append(Human_agent("Lívia", insurance_type=0, query_category="Payments"))
human_agents.append(Human_agent("Matheus", insurance_type=1, query_category="General Questions"))
human_agents.append(Human_agent("Letícia", insurance_type=2, query_category="Human Support Escalations"))
human_agents.append(Human_agent("Vinicius", insurance_type=3, query_category="Regulatory or Compliance Questions"))

module4 = Module4_HumanEscalation()
# To add the new agents, remove the comments bellow:
#for human_agent in human_agents:
#    module4.add_human_agent(human_agent)
