Load Env

In [None]:
from dotenv import load_dotenv

load_dotenv()

Get Embeddings

In [2]:
from langchain_openai import OpenAIEmbeddings

embedding = OpenAIEmbeddings(model="text-embedding-3-large", dimensions=256)

Create Retriever

In [3]:
import chromadb
from langchain_chroma import Chroma

chroma_client = chromadb.HttpClient(host="localhost", port=8000)
vector_store = Chroma(
    collection_name="test",
    client=chroma_client,
    embedding_function=embedding,
)

top_k = 5
retriever = vector_store.as_retriever(
        search_type="similarity",
        search_kwargs={"k": top_k},
    )

Create Reranker

In [4]:
from helpers.reranker_integration import get_reranker

reranker = get_reranker(base_retriever=retriever, model_name="BAAI/bge-reranker-base", top_k=top_k)

Define LLM

In [5]:
from helpers.llm_integrations import get_llm

llm = get_llm(model="gpt-4o")

Create Contextualize Chain

In [6]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import (
    Runnable,
    RunnablePassthrough,
    RunnableParallel,
    chain,
)
from operator import itemgetter

contextualize_instructions = """Convert the latest user question into a standalone question given the chat history. Don't answer the question, return the question and nothing else (no descriptive text)."""
contextualize_prompt = ChatPromptTemplate.from_messages(
      [
          ("system", contextualize_instructions),
          ("placeholder", "{chat_history}"),
          ("human", "{question}"),
      ]
  )
contextualize_question = contextualize_prompt | llm | StrOutputParser()

@chain
def contextualize_if_needed(input_: dict) -> Runnable:
    if input_.get("chat_history"):
        return contextualize_question
    else:
        return RunnablePassthrough() | itemgetter("question")

Create QA Chain

In [7]:
instruction = "Answer the questions using the given context."

qa_instructions = instruction + """\n\n{context}."""
qa_prompt = ChatPromptTemplate.from_messages(
  [("system", qa_instructions), ("human", "{question}")]
)

def format_docs(docs):
    return "".join(doc.page_content for doc in docs)

formatted_prompt = {
    "question": itemgetter("question") | RunnablePassthrough(),
    "context": lambda x: format_docs(x["context"]),
} | RunnableParallel(prompt=qa_prompt, question=itemgetter("question"))

qa_chain = formatted_prompt | RunnableParallel(
    llm_result=itemgetter("prompt") | llm,
    question=itemgetter("question"),
    )

Create Retrieval Chain (Pass Reranker Instead of Retriever)

In [None]:
retrieve_docs_chain = itemgetter("question") | reranker

Token Usage Callback

In [8]:
from langchain.callbacks.base import BaseCallbackHandler
from langchain_core.outputs import LLMResult
from typing import Any


class LLMResultHandler(BaseCallbackHandler):
    def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        if response.generations[0][0].message.usage_metadata:
            token_usage = response.generations[0][0].message.usage_metadata
        else:
            usage = response.generations[0][0].message.response_metadata["token_usage"]
            token_usage = {
                "input_tokens": usage.prompt_tokens,
                "output_tokens": usage.completion_tokens,
                "total_tokens": usage.total_tokens,
            }
        self.response = token_usage

llm_result_handler = LLMResultHandler()

Langfuse Callback

In [9]:
from langfuse.callback import CallbackHandler

langfuse_args = {}
langfuse_handler = (
        CallbackHandler(
          **langfuse_args
        )
    )

Create Final Chain (Contextualize -> Retrieval -> Q&A)

In [10]:
final_chain = (
        RunnablePassthrough.assign(question=contextualize_if_needed)
        .assign(context=retrieve_docs_chain)
        .assign(answer=qa_chain)
    )

Invoke Chain

In [None]:
input = "What is fixed deposit?"
result = final_chain.invoke(
        {"question": input, "chat_history": []},
        config={
            "callbacks": [llm_result_handler, langfuse_handler]
        },
    )

answer = result["answer"]
source_documents = [
    {"page_content": doc.page_content, "source": doc.metadata["source"]}
    for doc in result["context"]
]

token_usage = llm_result_handler.response

output = {
    "answer": answer,
    "source_documents": source_documents,
    "token_usage": token_usage,
}

output