# MyPharma ChatBot

### Projeto de Engenharia Informática

In [34]:
!pip install -r "../requirements.txt"

Defaulting to user installation because normal site-packages is not writeable


In [None]:
import os
from dotenv import load_dotenv

load_dotenv()

TOKEN=""
ENDPOINT=""
DEPLOYMENT=""

In [None]:
# !ollama pull llama3.1:8b

# Obter documentos

In [None]:
import os

documents_dir = "documents/"
target_folders = ["Condotril", "Duobiotic", "Neurofil"]  # nomes dos produtos
documents = {folder: "" for folder in target_folders}  # Inicializa um dicionário para cada produto

for folder in target_folders:
    folder_path = os.path.join(documents_dir, folder)
    
    if os.path.exists(folder_path) and os.path.isdir(folder_path):
        for doc_name in os.listdir(folder_path):
            if doc_name.endswith(".txt"):
                print(f"txt encontrado ({doc_name})!")
                file_path = os.path.join(folder_path, doc_name)
                with open(file_path, "r", encoding="utf-8") as doc:
                    content = doc.read()
                    documents[folder] += content + "\n"

In [4]:
documents

# Gerador de perguntas

In [5]:
from langchain_ollama.llms import OllamaLLM

model = OllamaLLM(model="llama3.1:8b")

# original questions
questions = ["Quais sao os ingredientes do Condotril?",
"Quais sao os ingredientes do Duobiotic?",
"Quais sao os ingredientes do Neurofil?",
"Quais sao os efeitos do Condotril?",
"Quais sao os efeitos do Duobiotic?",
"Quais sao os efeitos do Neurofil?",
"Qual e o processo de toma do Condotril?",
"Qual e o processo de toma do Duobiotic?",
"Qual e o processo de toma do Neurofil?",
"Quais sao as condicoes de armazenamento do Condotril?",
"Quais sao as condicoes de armazenamento do Duobiotic?",
"Quais sao as condicoes de armazenamento do Neurofil?",
"Em que casos nao se pode tomar Condotril?",
"Em que casos nao se pode tomar Duobiotic?",
"Em que casos nao se pode tomar Neurofil?",
"Quantas doses tem a embalagem do Condotril?",
"Quantas doses tem a embalagem do Duobiotic?",
"Quantas doses tem a embalagem do Neurofil?"]

### Gerar variantes das perguntas originais

In [6]:
def generate_variants(model, questions, n=4):
    variants = {}
    for question in questions:
        system_prompt =  """Your role is to rephrase this question in different ways, keeping the meaning: {question}.
        Never give a question in english, only in portuguese.""".format(question=question)

        response = model.generate(prompts=[system_prompt], num_return_sequences=n)
        variants[question] = [variant for variant in response.generations]
    return variants

In [None]:
question_variants = generate_variants(model, questions, n=4)

In [None]:
for original, variants in question_variants.items():
    print(f"Original: {original}")
    for i, variant in enumerate(variants, 1):
        print(f"Variant {i}: {variant}")
    print()

### Normalizar as variantes (remover acentos e pontuação)

In [8]:
import re
import unicodedata

def remove_acentos(text):
    nfkd_form = unicodedata.normalize('NFKD', text)
    return ''.join([c for c in nfkd_form if not unicodedata.combining(c)])

def remove_pontuation(text):
        return re.sub(r"[^\w\s]", "", text) # remove todos os caracteres exceto letras e numeros

In [None]:
var = {}
var_normalized = {}

for original, variants in question_variants.items():
    QV = []
    #return re.sub(r"[^\w\s\!\?\.\']", "", text)
    for variant in variants:
        responses = re.split(r'[\n\*|\n,]', variant[0].text)
        responses = [resp.strip()
                     for resp in responses[1:] if resp.strip()]
        QV.extend(responses)
    var[original] = QV

for original, variants in var.items():
    QV = []
  
    original = remove_acentos(remove_pontuation(original))
    
    for variant in variants:
        variant = remove_pontuation(variant) 
        variant = remove_acentos(variant)
        
        QV.append(variant)
    
    var_normalized[original] = QV

    print(f"Original: {original}")
    print("Variantes:")
    for variant in var_normalized[original]:
        print("- ", variant)
    print()

### Guardar variantes em .csv

In [None]:
import pandas as pd

variants_df = pd.DataFrame(list(var_normalized.items()), columns=["original", "variants"])
max_variants = max(variants_df["variants"].apply(len))
variants_expanded = pd.DataFrame(variants_df["variants"].tolist(), columns=[f"variant_{i+1}" for i in range(max_variants)])
variants_df = pd.concat([variants_df["original"], variants_expanded], axis=1)

In [None]:
variants_df

In [None]:
variants_df.to_csv('variants.csv')

### Tratamento de perguntas com erros ortográficos

In [None]:
from fuzzywuzzy import fuzz
from spellchecker import SpellChecker
import re
import csv

file_path = 'variants.csv'

def load_variants_from_csv(file_path):
    quest_variants = {}
    with open(file_path, mode='r', encoding='utf-8') as file:
        reader = csv.DictReader(file)
        for row in reader:
            original = row['original']  # A coluna principal com a pergunta original
            # Filtrar todas as variantes não vazias
            variants = [row[key] for key in row if key.startswith('variant_') and row[key].strip()]
            quest_variants[original] = variants  # Mapear original -> lista de variantes
    return quest_variants

spell = SpellChecker(language='pt')

def normalize_text(text):
        return re.sub(r"[^\w\s']", "", text)


def choose_best_question_variant(user_question, quest_variants):
    best_score = 0
    best_match = None

    user_question = normalize_text(user_question)
    user_question = remove_acentos(remove_pontuation(user_question))
    user_question = user_question.lower()    
    user_question = spell.correction(user_question)

    # Comparação com perguntas originais e variantes
    for original, variants in quest_variants.items():
        # Verifica a similaridade com a pergunta original
        score_with_original = fuzz.ratio(user_question, original.lower())
        if score_with_original > best_score:
            best_score = score_with_original
            best_match = original
            # Se a original for altamente semelhante, priorize-a
            if best_score >= 98:
                break

        # Verifica a similaridade com cada variante
        for variant in variants:
            score_with_variant = fuzz.ratio(user_question, variant.lower())
            if score_with_variant > best_score:
                best_score = score_with_variant
                best_match = original

    return best_match, best_score

In [None]:
from langchain.tools import BaseTool
from fuzzywuzzy import fuzz
import csv

class QuestionVariantSelector(BaseTool):
    name: str = "QuestionVariantSelector"
    description: str = "Seleciona a melhor variante de uma pergunta baseada em variantes carregadas de um CSV."
    file_path: str  # Caminho para o CSV com variantes

    def __init__(self, file_path):
        self.variants = self.load_variants_from_csv(file_path)

    def load_variants_from_csv(self, file_path: str) -> dict:
        """Carrega as variantes de perguntas de um arquivo CSV."""
        quest_variants = {}
        with open(file_path, mode='r', encoding='utf-8') as file:
            reader = csv.DictReader(file)
            for row in reader:
                original = row['original']
                variants = [row[key] for key in row if key.startswith('variant_') and row[key].strip()]
                quest_variants[original] = variants
        return quest_variants

    def choose_best_variant(self, user_question: str) -> tuple[str, int]:
        """Encontra a melhor variante para a pergunta do usuário."""
        best_match = user_question
        best_score = 0

        for original, variants in self.variants.items():
            for variant in variants:
                score = fuzz.ratio(user_question.lower(), variant.lower())
                if score > best_score:
                    best_score = score
                    best_match = original

        return best_match, best_score

    def _run(self, question: str) -> dict:
        """Processa a pergunta do usuário e retorna a melhor variante encontrada."""
        best_match, score = self.choose_best_variant(question)
        if score < 65:  # Limite mínimo para considerar uma variante
            best_match = question

        return {"question": best_match, "score": score}

Teste de uma pergunta com erros ortográficos e diferente das perguntas originais

In [None]:
teste_com_erros = "quantós comprmídos d~evo tomâr de duobitiic?"
file_path = 'variants.csv'
result, score = choose_best_question_variant(teste_com_erros, load_variants_from_csv(file_path))
if score < 60:
    result = teste_com_erros
print(f"User question: {teste_com_erros}\nMelhor variante: {result}\nscore: {score}")

# Criar chunks

In [11]:
from langchain_text_splitters import CharacterTextSplitter

text_splitter = CharacterTextSplitter(
    separator="\n\n",
    chunk_size=1200, # o chunk_size tem que ser grande para termos o maximo de informaçao sobre um produto num vetor apenas
    chunk_overlap=200,
    length_function=len,
    is_separator_regex=False,
)

docs = []
for name, content in documents.items():
    # Adiciona o nome do medicamento no início do conteúdo antes de dividir
    full_content = f"Medicamento: {name}\n\n{content}"
    chunks = text_splitter.create_documents([full_content])
    
    # Adiciona o nome do medicamento diretamente ao conteúdo de cada chunk
    for chunk in chunks:
        chunk.metadata = {"medicamento": name}
        chunk.page_content = f"{name}\n\n{chunk.page_content}"
    
    docs.extend(chunks)

for _, (product, content) in enumerate(documents.items()):
    num_characters = len(content)
    print(f"Número de caracteres no documento '{product}': {num_characters}")

print("Número de documents: ", len(docs))

In [None]:
docs

# Criar VectorStore/Retriever

In [None]:
# ollama needs to be installed
# https://ollama.com/
# https://github.com/ollama/ollama/tree/main/docs
from langchain_ollama import OllamaEmbeddings
from langchain_community.vectorstores import FAISS

embedding_model = OllamaEmbeddings(model="llama3.1:8b")

vectorstore = FAISS.from_documents(docs, embedding=embedding_model)
vectorstore.save_local(folder_path = '../vectorstore') # save the vectorstore locally

In [None]:
retriever = FAISS.load_local(
    folder_path='../vectorstore', # para o notebook
    # folder_path='vectorstore', # para o script
    embeddings=embedding_model,
    allow_dangerous_deserialization=True
    ).as_retriever(search_kwargs={"k" : 5})

# Desenvolvimento do Chatbot workflow (LangGraph)

In [15]:
import re
from langchain.tools import BaseTool

class ArticleInformation(BaseTool):
    name: str = "ArticleInformation"
    description: str = "This tool is used to answer questions about MyPharma food suplements. The input is the original user question"
        
    def _run(self, question):
        retrieved_documents = retriever.invoke(question)  # Obtém documentos
        medicamentos = self._extract_medicamento(question)  # Extrai os nomes dos medicamentos (lista)
        
        # Filtra os documentos com base na metadata
        filtered_documents = [
            document for document in retrieved_documents
            if document.metadata.get('medicamento', '').lower() in {med.lower() for med in medicamentos}
        ]
        
        # Retorna os conteúdos dos documentos filtrados
        return [document.page_content for document in filtered_documents]

    
    def _extract_medicamento(self, question: str) -> list[str]:
        """Tenta identificar os medicamentos mencionados na pergunta."""
        # Lista fixa de medicamentos disponíveis
        medicamentos_disponiveis = {"Condotril", "Neurofil", "Duobiotic"}

        # Procura por todos os medicamentos mencionados na pergunta
        matches = re.findall(r'(Condotril|Neurofil|Duobiotic)', question, flags=re.IGNORECASE)
        
        # Filtra para capturar somente os medicamentos válidos, eliminando duplicatas
        medicamentos_identificados = {match.capitalize() for match in matches if match.capitalize() in medicamentos_disponiveis}
        
        if medicamentos_identificados:
            return list(medicamentos_identificados)
        else:
            raise ValueError("Não foi possível identificar nenhum medicamento na pergunta.")

Tools

In [16]:
tools = [ArticleInformation()]
tools_by_name = {tool.name: tool for tool in tools}

AgentState

In [None]:
from typing import Annotated, Sequence, TypedDict
from typing_extensions import TypedDict

from langchain_core.messages import BaseMessage

from langgraph.graph.message import add_messages
from langgraph.graph import END

class AgentState(TypedDict):
    # The add_messages function defines how an update should be processed
    # Default is to replace. add_messages says "append"
    messages: Annotated[Sequence[BaseMessage], add_messages]

System Prompt

In [18]:
from langchain_core.messages import SystemMessage

system_prompt =  SystemMessage("""Your role is to act as a thorough research assistant, providing complete and detailed answers based on the given context. 
  
Your final answer should be as complete as possible. Do not oversimplify or summarize unnecessaraly, considering the results of the tools you used.
Do not oversimplify or summarize the answer.

If you are unsure about what tool to use or how to correct an error, you should ask the user for help.""")

Nodes

In [None]:
from typing import Union, Literal
from langchain_openai import AzureChatOpenAI
from langchain_core.messages import ToolMessage, HumanMessage
import json

# ollama needs to be installed
# https://ollama.com/
# https://github.com/ollama/ollama/tree/main/docs
def call_llm(state: AgentState) -> dict[str, Union[list, bool]]:
        """
        Calls the LLM model.
        
        Parameters
        ----------
        state : AgentState
            The state of the agent.
            
        Returns
        -------
        dict[str, Union[list, bool]]
            The updated state after calling the LLM model.
        """

        llm = AzureChatOpenAI(
            azure_deployment=DEPLOYMENT,
            api_key=TOKEN,
            azure_endpoint=ENDPOINT,
            api_version="2023-06-01-preview",
            model="gpt-4o-mini",
            temperature=0,
            max_tokens=None,
            timeout=None,
            max_retries=2,
        )
        # llm = ChatOllama( model="llama3.1:8b", temperature=0) #instantiates the LLM model
        llm = llm.bind_tools([ArticleInformation()]) #binds the tools to the LLM model
        llm_response = llm.invoke(state['messages']) #calls the LLM model
        
        return {'messages': [llm_response]} #returns the updated state
    
def tool_node(state: AgentState) -> dict[str, Union[list, bool]]:
    '''
    Runs the tool requested in the last AI message (with the tool call).
    
    Parameters
    ----------
    state : AgentState
        The state of the agent.
        
    Returns
    -------
    dict[str, Union[list, bool]]
        The updated state after running the tool.
    '''
    if messages := state.get('messages', []):
        message = messages[-1]
    else:
        raise ValueError('No messages found in input state.')
    
    outputs = []
    
    # tool_call example
    # [{'name': 'ThesisInformation',
    # 'args': {'query': 'What is the...'},
    # 'id': 'call_UwQzgi1QD9uWfBP1r6CWO5ep',
    # 'type': 'tool_call'}]
    
    for tool_call in message.tool_calls:
        tool_result = tools_by_name[tool_call['name']].invoke(
            tool_call['args']
        )
        
        outputs.append(
            ToolMessage(
                content = json.dumps(tool_result),
                name = tool_call['name'],
                tool_call_id = tool_call['id']
            )
        )

    return {'messages': outputs}


def route_tools(state: AgentState) -> Literal['tools', '__end__']:
        '''
        To be used in the conditional_edge method when compiling the graph.
        Routes to the tool node if the last message contains a tool call.
        Otherwise, routes to the end.
        
        Parameters
        ----------
        state : ReactState
            The state of the agent.
            
        Returns
        -------
        Literal['tools', '__end__']
            The next node to route to.
        '''        

        if isinstance(state, list):
            ai_message = state[-1]
        elif messages := state.get('messages', []):
            ai_message = messages[-1]
        else:
            raise ValueError(f'No messages found in input state to tool edge: {state}')

        if hasattr(ai_message, 'tool_calls') and len(ai_message.tool_calls) > 0:
            return 'tools'
        return '__end__'

Compile Graph

In [None]:
from langgraph.graph import StateGraph, START

graph = StateGraph(AgentState)
graph.add_node('llm', call_llm)
graph.add_node('tools', tool_node)
graph.add_edge('tools', 'llm')
graph.add_edge(START, 'llm')
graph.add_conditional_edges('llm', route_tools, {'tools': 'tools', '__end__': '__end__'})

#compile graph
graph = graph.compile()

In [74]:
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))


# Obtenção de Respostas

In [None]:
quest_variants = load_variants_from_csv('variants.csv')

# Ingredientes
question1 = "Quais são os ingredientes do Condotril?"
question2 = "Quais são os ingredientes do Duobiotic?"
question3 = "Quais são os ingredientes do Neurofil?"

# Benefícios
question4 = "Quais são os benefícios do Condotril?"
question5 = "Quais são os benefícios do Duobiotic?"
question6 = "Quais são os benefícios do Neurofil?"

# Efeitos
question7 = "Quais são os efeitos do Condotril?"
question8 = "Quais são os efeitos do Duobiotic?"
question9 = "Quais são os efeitos do Nueurofil?"

# Processo de toma
question10 = "Qual é o processo de toma do Condotril?"
question11 = "Qual é o processo de toma do Duobiotic?"
question12 = "Qual é o processo de toma do Neurofil?"

# Condições de armazenamento
question13 = "Quais são as condições de armazenamento do Condotril?"
question14 = "Quais são as condições de armazenamento do Duobiotic?"
question15 = "Quais são as condições de armazenamento do Neurofil?"

# Precaussões
question16 = "Em que casos não se pode tomar Condotril?"
question17 = "Em que casos não se pode tomar Duobiotic?"
question18 = "Em que casos não se pode tomar Neurofil?"

## Gerar respostas

In [None]:
user_question = "quais saõ os fundamnetso do condotirl?"
best_match, best_score = choose_best_question_variant(user_question, quest_variants)
refined_question = best_match if best_score >= 65 else user_question
response1 = graph.invoke({'messages': [system_prompt, HumanMessage(content=refined_question)]}, debug=True)

In [None]:
print("Pergunta: ", refined_question)
print("Resposta: ", response1['messages'][-1].content)

In [24]:
import pandas as pd

def load_dataframe(csv_file):
    try:
        df = pd.read_csv(csv_file, encoding='utf-8')
        return df
    except FileNotFoundError:
        print(f"Erro: O arquivo '{csv_file}' não foi encontrado.")
        return pd.DataFrame(columns=['question', 'answer'])

# Função de ligação com o frontend

In [None]:

def get_chatbot_response(question):

    file_path = 'variants.csv'
    best_match, best_score = choose_best_question_variant(question, load_variants_from_csv(file_path))
    refined_question = best_match if best_score >= 70 else question
    response = graph.invoke({'messages': [system_prompt, HumanMessage(content=refined_question)]}, debug=True)
    response = response['messages'][-1].content

    return response

Teste

In [None]:
question = "quais sao os ingredientes do condotril?"
response = get_chatbot_response(question)

[36;1m[1;3m[-1:checkpoint][0m [1mState at the end of step -1:
[0m{'messages': []}
[36;1m[1;3m[0:tasks][0m [1mStarting 1 task for step 0:
[0m- [32;1m[1;3m__start__[0m -> {'messages': [SystemMessage(content='Your role is to act as a thorough research assistant, providing complete and detailed answers based on the given context. \n  \nYour final answer should be as complete as possible. Do not oversimplify or summarize unnecessaraly, considering the results of the tools you used.\nDo not oversimplify or summarize the answer.\n\nIf you are unsure about what tool to use or how to correct an error, you should ask the user for help.', additional_kwargs={}, response_metadata={}, id='4c364584-aeec-46f9-b2e6-7ba78c33eddb'),
              HumanMessage(content='Quais sao os ingredientes do Condotril', additional_kwargs={}, response_metadata={})]}
[36;1m[1;3m[0:writes][0m [1mFinished step 0 with writes to 1 channel:
[0m- [33;1m[1;3mmessages[0m -> [SystemMessage(content='Your ro

In [None]:
print(response)

# Trabalho Futuro

Implementação de histórico de mensagens

In [None]:
from langchain_core.messages import HumanMessage
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import START, MessagesState, StateGraph
import uuid 

# Configuração do workflow
workflow = StateGraph(MessagesState)

# Função que chama o modelo
def call_model(state: MessagesState):
    response = model.invoke(state["messages"])
    return {"messages": response}

# Configura o workflow
workflow.add_edge(START, "model")
workflow.add_node("model", call_model)

# Adicionar memória
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)

threads = {}


def get_session_id():
    return str(uuid.uuid4())

def get_session_history(session_id: str):
    if session_id not in threads:
        threads[session_id] = {"messages": []}  
    return threads[session_id]

def start_conversation(query:str, session_id: str = None):

    if session_id is None:
        session_id = get_session_id()

    # Inicializa o thread se ainda não existir
    session_history = get_session_history(session_id)

    user_message = HumanMessage(query)
    session_history["messages"].append(user_message)

    system_prompt =  SystemMessage("""Your role is to act as a thorough research assistant, providing complete and detailed answers based on the given context. 
  
    Your final answer should be as complete as possible. Do not oversimplify or summarize unnecessaraly, considering the results of the tools you used.
    Do not oversimplify or summarize the answer.

    If you are unsure about what tool to use or how to correct an error, you should ask the user for help.""")
    
    # response1 = graph.invoke({'messages': [system_prompt, HumanMessage(content=result)]}, debug=True)
    output = app.invoke(
        {"messages": session_history["messages"] + [system_prompt]},
        config={"configurable": {"thread_id": session_id}},
        debug=True
    )
    
    # Atualiza o histórico com a resposta do modelo
    session_history["messages"] = output["messages"]

    return session_id, output

In [None]:
# Primeira interação - nova sessão será criada
query1 = "Quais são os ingredientes do Condotril?"
session_id, output_1 = start_conversation(query1)
print(f"Session ID: {session_id}")
print(output_1["messages"][-1].content)

# Segunda interação na mesma sessão
query2 = "Qual é o meu nome de medicamento?"
_, output_2 = start_conversation(query2, session_id)
print(output_2["messages"][-1].content)

# Nova interação sem especificar sessão - cria nova sessão
query3 = "Qual é o meu nome de medicamento?"
new_session_id, output_3 = start_conversation(query3)
print(f"New Session ID: {new_session_id}")
print(output_3["messages"][-1].content)

# Ver histórico de todas as sessões
print("\nSessão 1 Histórico:")
for message in threads[session_id]["messages"]:
    print(message.content)

print("\nNova Sessão Histórico:")
for message in threads[new_session_id]["messages"]:
    print(message.content)