In [1]:
import streamlit as st
import warnings
import configparser
import os
import logging
import random
import string
import time
from pprint import pprint
from typing import List
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder, PromptTemplate
from langchain.chains import create_history_aware_retriever, create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_community.chat_message_histories import ChatMessageHistory
from dotenv import load_dotenv
from langgraph.graph import END, StateGraph, START
from typing_extensions import TypedDict
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.documents import Document


load_dotenv() # Loads .env file

False

In [2]:
from langchain_huggingface import HuggingFaceEmbeddings

In [3]:
GOOGLE_API_KEY = ""
MODEL_EMBEDDINGS_GOOGLE = "models/embedding-001"
MODEL_LLM_GOOGLE = "gemini-1.5-pro"
OUTPUT_PATH = "output"
FAISS_GOOGLE_PATH = "output/faiss_index"
NUMEXPR_MAX_THREADS = "16"
TASK_TYPES = ["task_type_unspecified", "retrieval_query", "retrieval_document", "semantic_similarity", "classification", "clustering"]

In [4]:
CONTEXTUALIZE_Q_SYSTEM_PROMPT = """Given a chat history and the latest user question which might reference context in the chat history, formulate a standalone question which can be understood without the chat history. Do NOT answer the question, just reformulate it if needed and otherwise return it as is."""

In [5]:
PROMPT_TEMPLATE_GOOGLE = """You are an intelligent assistant. Answer the question based solely on the information provided in the context, specific for Spain.
Do not add any information beyond what is given. 
If the context does not contain the answer, respond with: "The answer is not available in the provided context."
Be concise and accurate. All the answers must be in Spanish language from Spain, in an informal manner.
Answers must be clear, precise, and unambiguous, and a teenager should be able to understand the answer.
Explicitly avoid phrases such as "según el documento", "según el capítulo", "en el texto", "como se menciona en el artículo", or any implication of external texts. Do not construct questions that require knowledge of the structure of the document or the location of information in it.
Include the content-specific information that supports the answer to allow the answer to be independent of any external text.
If the content lacks sufficient information to form a complete answer, do not force one.
Create the answers in your own words; Direct copying of content is not permitted.
NEVER mention the words "documento", "texto", "presentación", "archivo", "tabla", "artículo", "ley", "capítulo", "preámbulo", "título preliminar", "disposición" or "disposiciones generales" in your questions or answers.
ALWAYS make sure that all answers are accurate, self-contained, and relevant, without relying on any original document or text or implying its existence, strictly avoiding any invention or speculation.
IMPORTANT: if in the question there is no mention of a Comunidad Autonoma or the name of a city or province, try that the answer applies to Spain as a country.

Context:
{context}
"""

In [6]:
def load_config():
    global GOOGLE_API_KEY
    global MODEL_EMBEDDINGS_GOOGLE
    global OUTPUT_PATH
    global FAISS_GOOGLE_PATH
    global LOG_PATH
    global NUMEXPR_MAX_THREADS
    
    config = configparser.ConfigParser()
    config.read('streamlit_google_history_final.ini')
    GOOGLE_API_KEY = config['KEYS']['google_api_key']
    MODEL_EMBEDDINGS_GOOGLE = config['MODELS']['model_embeddings_google']
    MODEL_LLM_GOOGLE = config['MODELS']['model_llm_google']
    OUTPUT_PATH = config['DEFAULT']['output_path']
    FAISS_GOOGLE_PATH = config['DEFAULT']['faiss_google_path']
    NUMEXPR_MAX_THREADS = config['DEFAULT']['numexpr_max_threads']
    LOG_PATH = config['DEFAULT']['log_path']

In [7]:
warnings.filterwarnings("ignore", category=FutureWarning)
load_config()
os.environ["NUMEXPR_MAX_THREADS"] = NUMEXPR_MAX_THREADS
os.environ['GOOGLE_API_KEY'] = GOOGLE_API_KEY
os.environ['TAVILY_API_KEY'] = "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"

In [8]:
store = {}

In [9]:
def get_session_history(session_id: str) -> BaseChatMessageHistory:
    global store
    if session_id not in store:
        store[session_id] = ChatMessageHistory()
    return store[session_id]

In [10]:
def get_conversational_chain(retriever):
    
    llm = ChatGoogleGenerativeAI(model = MODEL_LLM_GOOGLE, temperature = 0.3)
    
    contextualize_q_prompt = ChatPromptTemplate.from_messages(
        [
            ("system", CONTEXTUALIZE_Q_SYSTEM_PROMPT),
            MessagesPlaceholder("chat_history"),
            ("human", "{input}"),
        ]
    )
    
    history_aware_retriever = create_history_aware_retriever(
        llm, retriever, contextualize_q_prompt
    )
    
    qa_prompt = ChatPromptTemplate.from_messages(
        [
            ("system", PROMPT_TEMPLATE_GOOGLE),
            MessagesPlaceholder("chat_history"),
            ("human", "{input}"),
        ]
    )
    
    question_answer_chain = create_stuff_documents_chain(llm, qa_prompt)
    rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain)
    
    conversational_rag_chain = RunnableWithMessageHistory(
        rag_chain,
        get_session_history,
        input_messages_key="input",
        history_messages_key="chat_history",
        output_messages_key="answer",
    )
    
    return conversational_rag_chain

In [11]:
def user_input(user_question, k_value, session_id):
    #embeddings = GoogleGenerativeAIEmbeddings(model = MODEL_EMBEDDINGS_GOOGLE, task_type=task_type)
    embeddings = HuggingFaceEmbeddings(model_name="paraphrase-xlm-r-multilingual-v1")
    #new_db = FAISS.load_local(FAISS_GOOGLE_PATH + "_" + task_type, embeddings, allow_dangerous_deserialization=True)
    new_db = FAISS.load_local("output/faiss_index_ollama", embeddings, allow_dangerous_deserialization=True)
    retriever = new_db.as_retriever(search_kwargs={"k": k_value})
    chain = get_conversational_chain(retriever)
    response = chain.invoke(
        {"input": user_question},
        config={
            "configurable": {"session_id": session_id}
        },
    )
    
    return response

In [12]:
def response_generator(prompt, k_value, session_id):
    start_model_exec = time.time()
    response = user_input(prompt, k_value, session_id)
    end_model_exec = time.time()
    resp_text = "{0} (Tiempo de respuesta: {1:.2f} seg. Session ID: {2}).".format(response, end_model_exec - start_model_exec,  session_id)
    for word in resp_text.split():
        yield word + " "
        time.sleep(0.05)

In [13]:
class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: question
        generation: LLM generation
        web_search: whether to add search
        documents: list of documents
    """

    question: str
    generation: str
    web_search: str
    documents: List[str]

In [14]:
web_search_tool = TavilySearchResults(k=3)

def web_search(state):
    """
    Web search based based on the question

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Appended web results to documents
    """

    print("---WEB SEARCH---")
    question = state["question"]
    documents = state["documents"]

    # Web search
    docs = web_search_tool.invoke({"query": question})
    web_results = "\n".join([d["content"] for d in docs])
    web_results = Document(page_content=web_results)
    if documents is not None:
        documents.append(web_results)
    else:
        documents = [web_results]
    
    return {"documents": documents, "question": question}


In [15]:
def retrieve(state):
    """
    Retrieve documents from vectorstore

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("---RETRIEVE---")
    question = state["question"]

    # Retrieval
    embeddings = HuggingFaceEmbeddings(model_name="paraphrase-xlm-r-multilingual-v1")
    new_db = FAISS.load_local("output/faiss_index_ollama", embeddings, allow_dangerous_deserialization=True)
    retriever = new_db.as_retriever(search_kwargs={"k": k_value})
    documents = retriever.invoke(question)
    return {"documents": documents, "question": question}

In [16]:
def generate(state):
    """
    Generate answer using RAG on retrieved documents

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]

    embeddings = HuggingFaceEmbeddings(model_name="paraphrase-xlm-r-multilingual-v1")
    new_db = FAISS.from_documents(documents, embeddings, allow_dangerous_deserialization=True)
    retriever = new_db.as_retriever(search_kwargs={"k": k_value})
    chain = get_conversational_chain(retriever)
    generation = chain.invoke(
        {"input": question},
        config={
            "configurable": {"session_id": session_id}
        },
    )    
    
    return {"documents": documents, "question": question, "generation": generation}

In [17]:
# LLM
llm = ChatGoogleGenerativeAI(model = MODEL_LLM_GOOGLE, temperature = 0)

prompt = PromptTemplate(
    template="""You are an expert at routing a user question to a vectorstore or web search. Use the vectorstore for questions on 
    "Código de Tráfico y Seguridad Vial en España". You do not need to be stringent with the keywords in the question related to these topics. 
    Otherwise, use web-search. Give a binary choice 'web_search' or 'vectorstore' based on the question. Return the a JSON with a single key 
    'datasource' and no premable or explanation. 
    
    Question to route: {question}""",
    input_variables=["question"],
)

question_router = prompt | llm | JsonOutputParser()

def route_question(state):
    """
    Route question to web search or RAG.

    Args:
        state (dict): The current graph state

    Returns:
        str: Next node to call
    """

    print("---ROUTE QUESTION---")
    question = state["question"]
    print(question)
    source = question_router.invoke({"question": question})
    print(source)
    print(source["datasource"])
    if source["datasource"] == "web_search":
        print("---ROUTE QUESTION TO WEB SEARCH---")
        return "websearch"
    elif source["datasource"] == "vectorstore":
        print("---ROUTE QUESTION TO RAG---")
        return "vectorstore"

In [18]:
# LLM
llm = ChatGoogleGenerativeAI(model = MODEL_LLM_GOOGLE, temperature = 0)

# Prompt
prompt = PromptTemplate(
    template="""You are a grader assessing whether an answer is grounded in / supported by a set of facts. 
    Give a binary 'yes' or 'no' score to indicate whether the answer is grounded in / supported by a set of facts. 
    Provide the binary score as a JSON with a single key 'score' and no preamble or explanation.
    
    Here are the facts:
    \n ------- \n
    {documents} 
    \n ------- \n
    
    Here is the answer: {generation}""",
    input_variables=["generation", "documents"],
)

hallucination_grader = prompt | llm | JsonOutputParser()

# Prompt
prompt = PromptTemplate(
    template="""You are a grader assessing whether an answer is useful to resolve a question. Give a binary score 'yes' or 'no' to 
    indicate whether the answer is useful to resolve a question. Provide the binary score as a JSON with a single key 'score' and 
    no preamble or explanation.
    
    Here is the answer:
    \n ------- \n
    {generation} 
    \n ------- \n
    
    Here is the question: {question}""",
    input_variables=["generation", "question"],
)

answer_grader = prompt | llm | JsonOutputParser()


def grade_generation_v_documents_and_question(state):
    """
    Determines whether the generation is grounded in the document and answers question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Decision for next node to call
    """

    print("---CHECK HALLUCINATIONS---")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]

    score = hallucination_grader.invoke(
        {"documents": documents, "generation": generation}
    )
    grade = score["score"]

    # Check hallucination
    if grade == "yes":
        print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
        # Check question-answering
        print("---GRADE GENERATION vs QUESTION---")
        score = answer_grader.invoke({"question": question, "generation": generation})
        grade = score["score"]
        if grade == "yes":
            print("---DECISION: GENERATION ADDRESSES QUESTION---")
            return "useful"
        else:
            print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
            return "not useful"
    else:
        pprint("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
        return "not supported"

In [19]:
llm = ChatGoogleGenerativeAI(model = MODEL_LLM_GOOGLE, temperature = 0)

prompt = PromptTemplate(
    template="""You are a grader assessing relevance of a retrieved document to a user question. If the document 
    contains keywords related to the user question, grade it as relevant. It does not need to be a stringent test. 
    The goal is to filter out erroneous retrievals. \n
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question. \n
    Provide the binary score as a JSON with a single key 'score' and no premable or explanation.
    
    Here is the retrieved document: \n\n {document} \n\n
    Here is the user question: {question} \n""",
    input_variables=["question", "document"],
)

retrieval_grader = prompt | llm | JsonOutputParser()

def grade_documents(state):
    """
    Determines whether the retrieved documents are relevant to the question
    If any document is not relevant, we will set a flag to run web search

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Filtered out irrelevant documents and updated web_search state
    """

    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]

    # Score each doc
    filtered_docs = []
    web_search = "No"
    for d in documents:
        score = retrieval_grader.invoke(
            {"question": question, "document": d.page_content}
        )
        grade = score["score"]
        # Document relevant
        if grade.lower() == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        # Document not relevant
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            # We do not include the document in filtered_docs
            # We set a flag to indicate that we want to run web search
            web_search = "Yes"
            continue
    
    return {"documents": filtered_docs, "question": question, "web_search": web_search}

In [20]:
def decide_to_generate(state):
    """
    Determines whether to generate an answer, or add web search

    Args:
        state (dict): The current graph state

    Returns:
        str: Binary decision for next node to call
    """

    print("---ASSESS GRADED DOCUMENTS---")
    web_search = state["web_search"]

    if web_search == "Yes":
        # All documents have been filtered check_relevance
        # We will re-generate a new query
        print(
            "---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, INCLUDE WEB SEARCH---"
        )
        return "websearch"
    else:
        # We have relevant documents, so generate answer
        print("---DECISION: GENERATE---")
        return "generate"

In [21]:
workflow = StateGraph(GraphState)

# Define the nodes
workflow.add_node("websearch", web_search)  # web search
workflow.add_node("retrieve", retrieve)  # retrieve
workflow.add_node("grade_documents", grade_documents)  # grade documents
workflow.add_node("generate", generate)  # generatae

In [22]:
# Build graph
workflow.add_conditional_edges(
    START,
    route_question,
    {
        "websearch": "websearch",
        "vectorstore": "retrieve",
    },
)

workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "websearch": "websearch",
        "generate": "generate",
    },
)
workflow.add_edge("websearch", "generate")
workflow.add_conditional_edges(
    "generate",
    grade_generation_v_documents_and_question,
    {
        "not supported": "generate",
        "useful": END,
        "not useful": "websearch",
    },
)

In [23]:
k_value = 8
session_id = "hdb00011"
prompt = "cual es la temperatura en Madrid?"

response = {}
for task in TASK_TYPES:
    response[task] = user_input(prompt, k_value, session_id, task)

for task in TASK_TYPES:
    print(task)
    print(response[task]['context'])

response = user_input(prompt, k_value, session_id, task)

response['answer']

response = user_input("me lo explicas de otra forma?", k_value, session_id, task)

response['answer']

response

In [24]:
# Compile
app = workflow.compile()

# Test

inputs = {"question": prompt}
for output in app.stream(inputs):
    for key, value in output.items():
        pprint(f"Finished running: {key}:")
pprint(value["generation"])

---ROUTE QUESTION---
cual es la temperatura en Madrid?
{'datasource': 'web_search'}
web_search
---ROUTE QUESTION TO WEB SEARCH---
---WEB SEARCH---


KeyError: 'documents'