In [1]:
from langchain_google_genai import GoogleGenerativeAIEmbeddings, ChatGoogleGenerativeAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_chroma import Chroma
from dotenv import load_dotenv

load_dotenv()
# 3.1 Initialize the Embedding Model
# NOTE: The model and task type are crucial for retrieval accuracy.
embedding = GoogleGenerativeAIEmbeddings(
    model="models/text-embedding-004",  # A strong general-purpose model
    task_type="retrieval_document" # Optimizes the embedding for document search
)

In [2]:
# ! pip install -U langchain-chroma

In [3]:
v1 = Chroma(
    persist_directory='../data/chroma/',
    embedding_function=embedding
)
print(v1._collection.count())


0


In [4]:
from langchain.prompts import PromptTemplate
from langchain.chains import RetrievalQA

template = """Use the following pieces of context to answer the question at the end. If you don't know the answer, just say that you don't know, don't try to make up an answer. Use three sentences maximum. Keep the answer as concise as possible. Always say "thanks for asking!" at the end of the answer. 
{context}
Question: {question}
Helpful Answer:"""
QA_CHAIN_PROMPT = PromptTemplate.from_template(template)


In [5]:
template = """User: {input}
Make sure your renponse is concise and clear and do not exceed three sentences.
"""

class llmPipeline():
    def __init__(self, model="gemini-2.5-flash", prompt_template=template):
        self.llm = ChatGoogleGenerativeAI(model=model)
        self.prompt = ChatPromptTemplate.from_template(prompt_template)
        self.chain = self.prompt | self.llm | StrOutputParser()

    def invoke(self, input_dict={}):
        try:
            response = self.chain.invoke(input_dict)
            return response
        except Exception as e:
            return f"API Test Failed. Ensure the GOOGLE_API_KEY is correctly set in your .env file. Error: {e}"
a = llmPipeline()

result = a.invoke({"input": "what is the beginign of time?"})
print(result)

The concept of time, as we understand it, is closely linked to the Big Bang. Scientists believe time itself began approximately 13.8 billion years ago with this cosmic expansion. Before this event, the very fabric of spacetime did not exist in its current form.


In [6]:
llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash")

In [12]:
# Build prompt
from langchain.prompts import PromptTemplate
template = """Use the following pieces of context to answer the question at the end. If you don't know the answer, just say that you don't know, don't try to make up an answer. Use three sentences maximum. Keep the answer as concise as possible. Always say "thanks for asking!" at the end of the answer. 
{context}
Question: {question}
Helpful Answer:"""
QA_CHAIN_PROMPT = PromptTemplate(input_variables=["context", "question"],template=template,)

# Run chain
from langchain.chains import RetrievalQA
question = "Summarize the context of the documents in three sentences."
qa_chain = RetrievalQA.from_chain_type(llm,
                                       retriever=v1.as_retriever(),
                                       return_source_documents=True,
                                       chain_type_kwargs={"prompt": QA_CHAIN_PROMPT})


result = qa_chain({"query": question})
result["result"]

'I cannot summarize the context of the documents because no context was provided. Please provide the documents you would like me to summarize. I need the information to answer your question. thanks for asking!'

In [None]:
s1 = "..data/text_files/kf100"

result

'I cannot summarize the context as no documents or context were provided. Please provide the context you would like me to summarize. thanks for asking!'

In [13]:
result

{'query': 'Summarize the context of the documents in three sentences.',
 'result': 'I cannot summarize the context of the documents because no context was provided. Please provide the documents you would like me to summarize. I need the information to answer your question. thanks for asking!',
 'source_documents': []}

In [1]:
from langchain_google_genai import GoogleGenerativeAIEmbeddings, ChatGoogleGenerativeAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
from langchain_core.output_parsers import StrOutputParser
from langchain_chroma import Chroma

from dotenv import load_dotenv
import os
load_dotenv()
from utils import document_loader, chunking, vectorstore, format_docs


In [None]:

PERSIST_DIRECTORY = "../data/chroma_db" 

TEMPLATE = """Use the following pieces of context to answer the question at the end. If you don't know the answer, just say that you don't know, don't try to make up an answer. Use three sentences maximum with maximum of 500 words. Keep the answer as concise as possible. Always say "thanks for asking!" at the end of the answer. 
{context}
Question: {input}
Helpful Answer:"""

class Rag():
    def __init__(self, template=TEMPLATE):
        self.llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash", temperature=0)
        self.embedding = GoogleGenerativeAIEmbeddings(
            model="models/text-embedding-004",
            task_type="retrieval_document"  # Optimized for search
        )
        
        self.vectordb = None
        self.QAprompt = ChatPromptTemplate.from_template(template=template)
        self.qa_chain = None

         # Share state from disk to uvicorn workers, if any. 
        if os.path.exists(PERSIST_DIRECTORY):
            try:
                temp_db = Chroma(
                    persist_directory=PERSIST_DIRECTORY, 
                    embedding_function=self.embedding
                )
                
                if temp_db._collection.count() > 0:
                    self.vectordb = temp_db
                    print(f"Loaded existing vector store with {self.vectordb._collection.count()} chunks.")
                    
                    # Qa chain is initialized
                    self._initialize_lcel_chain(context="")

                else:
                    print("Persistent directory exists but the Chroma collection is empty.")
            except Exception as e:
                print(f"Error loading existing Chroma DB: {e}. Starting with empty state.")

    def _load_docs(self, file_path: str):
        loaded = document_loader(file_path)
        chunked = chunking(loaded)
        self.vectordb = vectorstore(chunked, self.embedding, PERSIST_DIRECTORY)
        
        self._initialize_lcel_chain()
        print(f"Document loaded and vector store created at {PERSIST_DIRECTORY}")

    def _initialize_lcel_chain(self, context: str = ""):
        if not self.vectordb:
            print("Cannot initialize chain: VectorDB is not loaded.")
            return
        
        rag_chain_from_docs = (
            RunnablePassthrough.assign(
                # context=lambda x: format_docs(x['context'])
                context= self._retriever_info(question=x["question"])
            )
            | self.QAprompt  
            | self.llm
            | StrOutputParser() # Parse the output to a string
        )

        self.qa_chain = RunnableParallel(
            answer=rag_chain_from_docs,
            input=RunnablePassthrough()
        )
    def _retriever_info(self, question: str):
        if not self.vectordb:
            return "No VectorDB loaded."
        results = self.vectordb.similarity_search("", k=3)
        return "\n\n".join([doc.page_content for doc in results])

    def _clear_db(self):
        if self.vectordb:
            try:
                self.vectordb.delete_collection()
                self.vectordb = None
                message = "Chroma DB cleared successfully."
            except Exception as e:
                message = (f"Error clearing Chroma DB: {e}")
            return (message)
        

    def ask(self, question: str):
        if self.qa_chain is None:
            return {
                'status_code': 400,
                'response': "No documents loaded. Please load documents first.",
                'sources': []
            }
        result = self.qa_chain.invoke({"question": question})
        print(result)



In [25]:
if __name__ == "__main__":
    import keyboard

    path = "../data/uploads/inte.pdf"
    engine = Rag()
    engine._clear_db()

    engine._load_docs(path)
    while(True):
        que = input("What is your question?")
        result = engine.ask(que)
        print(result)

Loaded existing vector store with 9 chunks.
++++++++++++++++++++++++++++ tags=['Chroma', 'GoogleGenerativeAIEmbeddings'] vectorstore=<langchain_chroma.vectorstores.Chroma object at 0x0000022897D14680> search_kwargs={}
-----Loading Document-----
++++++++++++++++++++++++++++ tags=['Chroma', 'GoogleGenerativeAIEmbeddings'] vectorstore=<langchain_chroma.vectorstores.Chroma object at 0x0000022897DA5E50> search_kwargs={}
Document loaded and vector store created at ../data/chroma_db


GoogleGenerativeAIError: Error embedding content: 'ProtoType' object has no attribute 'DESCRIPTOR'

In [51]:
def retriever_info(vectordb=vec, question: str = "mb"):
    if not vectordb:
        return "No VectorDB loaded."
    
    results = vectordb.similarity_search(question, k=3)
    # preview = [doc.page_content[] for doc in results]
    
    return "\n\n".join([doc.page_content for doc in results])
data = retriever_info(question="Data science")

In [53]:
# preview = [doc.page_content[:80] + "..." for doc in data]
len(data)

1303