Assumes Langchain v.0.3.4


In [None]:
import os
import pandas as pd
import openpyxl # open xlsx files with pandas
import re
from typing import List
from typing_extensions import Annotated, TypedDict
import streamlit as st
from qdrant_client import QdrantClient
from langchain_qdrant import QdrantVectorStore
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough, RunnableMap, RunnableLambda

In [None]:

# Config Qdrant
QDRANT_URL = st.secrets["QDRANT_URL"]
QDRANT_API_KEY = st.secrets["QDRANT_API_KEY"]
qdrant_path = "/tmp/local_qdrant"

# Config langchain_openai
# for langchain_openai.OpenAIEmbeddings
OPENAI_API_KEY = st.secrets["OPENAI_API_KEY"]

# Misc configs for tracing
CONFIG = {
    "qdrant_collection_name": "ASK_vectorstore",
    "embedding_model": "text-embedding-ada-002",  # alt: text-embedding-3-large
    "embedding_dims": 1536,  # alt: 1024
    "search_type": "mmr",
    "k": 5,
    'fetch_k': 20,   # fetch 30 docs then select 5
    'lambda_mult': .7,    # 0= max diversity, 1 is min. default is 0.5
    "score_threshold": 0.5,
    "generation_model": "gpt-3.5-turbo-16k",
    "temperature": 0.7,
}

In [None]:
# Create and cache the document retriever
@st.cache_resource
def get_retriever():
    '''Creates and caches the document retriever and Qdrant client.'''

    # Qdnrat client cloud instance.
    client = QdrantClient(
        url=QDRANT_URL,
        prefer_grpc=True,
        api_key=QDRANT_API_KEY,
    )  # For local, use QdrantClient(path="/tmp/local_qdrant")  # on mac: /private/tmp/local_qdrant

    qdrant = QdrantVectorStore(
        client=client,
        collection_name=CONFIG["qdrant_collection_name"],
        embedding=OpenAIEmbeddings(model=CONFIG["embedding_model"]),
    )

    retriever = qdrant.as_retriever(
        search_type=CONFIG["search_type"],
        search_kwargs={'k': CONFIG["k"], "fetch_k": CONFIG["fetch_k"],
                       "lambda_mult": CONFIG["lambda_mult"], "filter": None},  # filter documents by metadata
    )

    return retriever

In [None]:
# Cache data retrieval function
# @st.cache_data
def get_retrieval_context(file_path: str):
    '''Reads the worksheets Excel file into a dictionary of dictionaries.'''
    context_dict = {}
    for sheet_name in pd.ExcelFile(file_path).sheet_names:
        df = pd.read_excel(file_path, sheet_name=sheet_name)
        if df.shape[1] >= 2:
            context_dict[sheet_name] = pd.Series(
                df.iloc[:, 1].values, index=df.iloc[:, 0]).to_dict()
    return context_dict


# Path to prompt enrichment dictionaries
enrichment_path = os.path.join(os.path.dirname(
    __file__), 'config/retrieval_context.xlsx')


# Define and cache the enrichment function to use cached context
@traceable(run_type="chain")
# @st.cache_data
def enrich_question(user_question: str, filepath=enrichment_path) -> str:
    enrichment_dict = get_retrieval_context(filepath)
    acronyms_dict = enrichment_dict.get("acronyms", {})
    terms_dict = enrichment_dict.get("terms", {})

    enriched_question = user_question
    # Replace acronyms with full form
    for acronym, full_form in acronyms_dict.items():
        if pd.notna(acronym) and pd.notna(full_form):
            enriched_question = re.sub(
                r'\b' + re.escape(str(acronym)) + r'\b', str(full_form), enriched_question)
    # Add explanations
    for term, explanation in terms_dict.items():
        if pd.notna(term) and pd.notna(explanation):
            if str(term) in enriched_question:
                enriched_question += f" ({str(explanation)})"
    return enriched_question

In [None]:
def create_prompt():
    system_prompt = (
        "Use the following pieces of context to answer the users question. "
        "INCLUDES ALL OF THE DETAILS IN YOUR RESPONSE, INDLUDING REQUIREMENTS AND REGULATIONS. "
        "National Workshops are required for boat crew, aviation, and telecommunications when they are offered. "
        "Include Auxiliary Core Training (AUXCT) for questions on certifications or officer positions. "
        "If you don't know the answer, just say I don't know. \n----------------\n{context}"
    )
    return ChatPromptTemplate.from_messages([
        ("system", system_prompt),
        ("human", "{enriched_question}"),
    ])


# Function to format documents (doesn't require caching)
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)


# Schema for llm responses
class AnswerWithSources(TypedDict):
    """An answer to the question, with sources."""
    answer: str
    sources: Annotated[
        List[str],
        ...,
        "List of sources and pages used to answer the question",
    ]

## Main RAG Pipeline

In [None]:
'''
DESIRED MAP
# Define the chain of operations:
# 1. Get the user_question from the user  This can be done with chain.invoke
# 2. Pass the user_question without any change using RunnablePassthrough
# 3. Create enhanced_question fropm user-question with the dictionaries
# 4. Pass the user_question without any change using RunnablePassthrough
# 5. Pass the enhanced question through the retriever
# 6. Get the context through the retriever.
# 7. Format the input (context & question) through PromptTemplate
# 8. run through OpenAI's chat model and structures output via AnswerWithSources custom parser to include the sources used by llm
# 9. Parse the response as a string output by StrOutputParser()
10. Append the user_quetsion and enhanced_question to the response
# '''

In [None]:
# Define enrich_question function as a Runnable
enricher = RunnableLambda(lambda user_question: enrich_question(user_question))

# Retrieve relevant documents using the enriched question
retriever = get_retriever().with_config(metadata=CONFIG)

# Prepare the prompt input
prompt = create_prompt()

# Run through OpenAI's chat model
llm = ChatOpenAI(model=CONFIG["generation_model"],
                 temperature=CONFIG["temperature"])

In [None]:
chain = (
    {
        # explicitly map the user question value to the user question key
        "user_question": RunnablePassthrough(),
        # ditto for enriched question
        "enriched_question": lambda x: enricher(x["enriched_question"]),
    }
    | {"context": retriever | format_docs, "enriched_question": RunnablePassthrough()}
    | prompt
    | llm.with_structured_output(AnswerWithSources)
    | (lambda x: {
        "answer": x["answer"],  # Add the answer to  the dictionary
        "llm_sources": x["sources"],  # Add llm sources  to the dictionary
    }
    )
)

In [None]:
user_question = "what is required for NACO?"

In [None]:
response = chain.invoke({"user_question": user_question})