## Chunking and Embedding

In [1]:
import uuid 
from langchain_google_genai import ChatGoogleGenerativeAI
import os
from pydantic import BaseModel
from langchain.chains import create_extraction_chain_pydantic
from dotenv import load_dotenv
from langchain.output_parsers.openai_tools import JsonOutputToolsParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda
from langchain.chains import create_extraction_chain
from typing import Optional, List
from langchain import hub

In [8]:
load_dotenv()

google_api_key = os.getenv("GEMINI_API_KEY")


class AgenticChunker:
    def __init__(self, google_api_key):
        self.chunks = {}
        self.id_truncate_limit = 5

        # Whether or not to update/refine summaries and titles as you get new information
        self.generate_new_metadata_ind = True
        self.print_logging = True

        if google_api_key is None:
            google_api_key = os.getenv("GEMINI_API_KEY")

        if google_api_key is None:
            raise ValueError("API key is not provided and not found in environment variables")

        # Use Gemini 1.5 Pro model
        self.llm = ChatGoogleGenerativeAI(
            model="gemini-1.5-flash-latest",
            google_api_key=google_api_key,
            temperature=0
        )

    def add_propositions(self, propositions):
        for proposition in propositions:
            self.add_proposition(proposition)
    
    def add_proposition(self, proposition):
        if self.print_logging:
            print (f"\nAdding: '{proposition}'")

        # If it's your first chunk, just make a new chunk and don't check for others
        if len(self.chunks) == 0:
            if self.print_logging:
                print ("No chunks, creating a new one")
            self._create_new_chunk(proposition)
            return

        chunk_id = self._find_relevant_chunk(proposition)

        # If a chunk was found then add the proposition to it
        if chunk_id:
            if self.print_logging:
                print (f"Chunk Found ({self.chunks[chunk_id]['chunk_id']}), adding to: {self.chunks[chunk_id]['title']}")
            self.add_proposition_to_chunk(chunk_id, proposition)
            return
        else:
            if self.print_logging:
                print ("No chunks found")
            # If a chunk wasn't found, then create a new one
            self._create_new_chunk(proposition)
        

    def add_proposition_to_chunk(self, chunk_id, proposition):
        # Add then
        self.chunks[chunk_id]['propositions'].append(proposition)

        # Then grab a new summary
        if self.generate_new_metadata_ind:
            self.chunks[chunk_id]['summary'] = self._update_chunk_summary(self.chunks[chunk_id])
            self.chunks[chunk_id]['title'] = self._update_chunk_title(self.chunks[chunk_id])

    def _update_chunk_summary(self, chunk):
        """
        If you add a new proposition to a chunk, you may want to update the summary or else they could get stale
        """
        PROMPT = ChatPromptTemplate.from_messages(
            [
                (
                    "system",
                    """
                    You are the steward of a group of chunks which represent groups of sentences that talk about a similar topic
                    A new proposition was just added to one of your chunks, you should generate a very brief 1-sentence summary which will inform viewers what a chunk group is about.

                    A good summary will say what the chunk is about, and give any clarifying instructions on what to add to the chunk.

                    You will be given a group of propositions which are in the chunk and the chunks current summary.

                    Your summaries should anticipate generalization. If you get a proposition about apples, generalize it to food.
                    Or month, generalize it to "date and times".

                    Example:
                    Input: Proposition: Greg likes to eat pizza
                    Output: This chunk contains information about the types of food Greg likes to eat.

                    Only respond with the chunk new summary, nothing else.
                    """,
                ),
                ("user", "Chunk's propositions:\n{proposition}\n\nCurrent chunk summary:\n{current_summary}"),
            ]
        )

        runnable = PROMPT | self.llm

        new_chunk_summary = runnable.invoke({
            "proposition": "\n".join(chunk['propositions']),
            "current_summary" : chunk['summary']
        }).content

        return new_chunk_summary
    
    def _update_chunk_title(self, chunk):
        """
        If you add a new proposition to a chunk, you may want to update the title or else it can get stale
        """
        PROMPT = ChatPromptTemplate.from_messages(
            [
                (
                    "system",
                    """
                    You are the steward of a group of chunks which represent groups of sentences that talk about a similar topic
                    A new proposition was just added to one of your chunks, you should generate a very brief updated chunk title which will inform viewers what a chunk group is about.

                    A good title will say what the chunk is about.

                    You will be given a group of propositions which are in the chunk, chunk summary and the chunk title.

                    Your title should anticipate generalization. If you get a proposition about apples, generalize it to food.
                    Or month, generalize it to "date and times".

                    Example:
                    Input: Summary: This chunk is about dates and times that the author talks about
                    Output: Date & Times

                    Only respond with the new chunk title, nothing else.
                    """,
                ),
                ("user", "Chunk's propositions:\n{proposition}\n\nChunk summary:\n{current_summary}\n\nCurrent chunk title:\n{current_title}"),
            ]
        )

        runnable = PROMPT | self.llm

        updated_chunk_title = runnable.invoke({
            "proposition": "\n".join(chunk['propositions']),
            "current_summary" : chunk['summary'],
            "current_title" : chunk['title']
        }).content

        return updated_chunk_title

    def _get_new_chunk_summary(self, proposition):
        PROMPT = ChatPromptTemplate.from_messages(
            [
                (
                    "system",
                    """
                    You are the steward of a group of chunks which represent groups of sentences that talk about a similar topic
                    You should generate a very brief 1-sentence summary which will inform viewers what a chunk group is about.

                    A good summary will say what the chunk is about, and give any clarifying instructions on what to add to the chunk.

                    You will be given a proposition which will go into a new chunk. This new chunk needs a summary.

                    Your summaries should anticipate generalization. If you get a proposition about apples, generalize it to food.
                    Or month, generalize it to "date and times".

                    Example:
                    Input: Proposition: Greg likes to eat pizza
                    Output: This chunk contains information about the types of food Greg likes to eat.

                    Only respond with the new chunk summary, nothing else.
                    """,
                ),
                ("user", "Determine the summary of the new chunk that this proposition will go into:\n{proposition}"),
            ]
        )

        runnable = PROMPT | self.llm

        new_chunk_summary = runnable.invoke({
            "proposition": proposition
        }).content

        return new_chunk_summary
    
    def _get_new_chunk_title(self, summary):
        PROMPT = ChatPromptTemplate.from_messages(
            [
                (
                    "system",
                    """
                    You are the steward of a group of chunks which represent groups of sentences that talk about a similar topic
                    You should generate a very brief few word chunk title which will inform viewers what a chunk group is about.

                    A good chunk title is brief but encompasses what the chunk is about

                    You will be given a summary of a chunk which needs a title

                    Your titles should anticipate generalization. If you get a proposition about apples, generalize it to food.
                    Or month, generalize it to "date and times".

                    Example:
                    Input: Summary: This chunk is about dates and times that the author talks about
                    Output: Date & Times

                    Only respond with the new chunk title, nothing else.
                    """,
                ),
                ("user", "Determine the title of the chunk that this summary belongs to:\n{summary}"),
            ]
        )

        runnable = PROMPT | self.llm

        new_chunk_title = runnable.invoke({
            "summary": summary
        }).content

        return new_chunk_title


    def _create_new_chunk(self, proposition):
        new_chunk_id = str(uuid.uuid4())[:self.id_truncate_limit] # I don't want long ids
        new_chunk_summary = self._get_new_chunk_summary(proposition)
        new_chunk_title = self._get_new_chunk_title(new_chunk_summary)

        self.chunks[new_chunk_id] = {
            'chunk_id' : new_chunk_id,
            'propositions': [proposition],
            'title' : new_chunk_title,
            'summary': new_chunk_summary,
            'chunk_index' : len(self.chunks)
        }
        if self.print_logging:
            print (f"Created new chunk ({new_chunk_id}): {new_chunk_title}")
    
    def get_chunk_outline(self):
        """
        Get a string which represents the chunks you currently have.
        This will be empty when you first start off
        """
        chunk_outline = ""

        for chunk_id, chunk in self.chunks.items():
            single_chunk_string = f"""Chunk ID: {chunk['chunk_id']}\nChunk Name: {chunk['title']}\nChunk Summary: {chunk['summary']}\n\n"""
        
            chunk_outline += single_chunk_string
        
        return chunk_outline

    def _find_relevant_chunk(self, proposition):
        current_chunk_outline = self.get_chunk_outline()

        PROMPT = ChatPromptTemplate.from_messages(
            [
                (
                    "system",
                    """
                    Determine whether or not the "Proposition" should belong to any of the existing chunks.

                    A proposition should belong to a chunk of their meaning, direction, or intention are similar.
                    The goal is to group similar propositions and chunks.

                    If you think a proposition should be joined with a chunk, return the chunk id.
                    If you do not think an item should be joined with an existing chunk, just return "No chunks"

                    Example:
                    Input:
                        - Proposition: "Greg really likes hamburgers"
                        - Current Chunks:
                            - Chunk ID: 2n4l3d
                            - Chunk Name: Places in San Francisco
                            - Chunk Summary: Overview of the things to do with San Francisco Places

                            - Chunk ID: 93833k
                            - Chunk Name: Food Greg likes
                            - Chunk Summary: Lists of the food and dishes that Greg likes
                    Output: 93833k
                    """,
                ),
                ("user", "Current Chunks:\n--Start of current chunks--\n{current_chunk_outline}\n--End of current chunks--"),
                ("user", "Determine if the following statement should belong to one of the chunks outlined:\n{proposition}"),
            ]
        )

        runnable = PROMPT | self.llm

        chunk_found = runnable.invoke({
            "proposition": proposition,
            "current_chunk_outline": current_chunk_outline
        }).content

        # Pydantic data class
        class ChunkID(BaseModel):
            """Extracting the chunk id"""
            chunk_id: Optional[str]
            
        # Extraction to catch-all LLM responses. This is a bandaid
        extraction_chain = create_extraction_chain_pydantic(pydantic_schema=ChunkID, llm=self.llm)
        extraction_found = extraction_chain.run(chunk_found)
        if extraction_found:
            chunk_found = extraction_found[0].chunk_id

        # If you got a response that isn't the chunk id limit, chances are it's a bad response or it found nothing
        # So return nothing
        if len(chunk_found) != self.id_truncate_limit:
            return None

        return chunk_found
    
    def get_chunks(self, get_type='dict'):
        """
        This function returns the chunks in the format specified by the 'get_type' parameter.
        If 'get_type' is 'dict', it returns the chunks as a dictionary.
        If 'get_type' is 'list_of_strings', it returns the chunks as a list of strings, where each string is a proposition in the chunk.
        """
        if get_type == 'dict':
            return self.chunks
        if get_type == 'list_of_strings':
            chunks = []
            for chunk_id, chunk in self.chunks.items():
                chunks.append(" ".join([x for x in chunk['propositions']]))
            return chunks
    
    def pretty_print_chunks(self):
        print (f"\nYou have {len(self.chunks)} chunks\n")
        for chunk_id, chunk in self.chunks.items():
            print(f"Chunk #{chunk['chunk_index']}")
            print(f"Chunk ID: {chunk_id}")
            print(f"Summary: {chunk['summary']}")
            print(f"Propositions:")
            for prop in chunk['propositions']:
                print(f"    -{prop}")
            print("\n\n")

    def pretty_print_chunk_outline(self):
        print ("Chunk Outline\n")
        print(self.get_chunk_outline())


In [3]:
import os
import getpass

# os.environ["NVIDIA_API_KEY"] = getpass.getpass("Provide your NVIDIA API key: ")

In [4]:
os.environ["LANGSMITH_API_KEY"] = getpass.getpass("Provide your LANGSMITH API key: ")
os.environ["LANGSMITH_TRACING"] = "true"

Provide your LANGSMITH API key: ········


In [5]:
os.environ["GEMINI_API_KEY"] = getpass.getpass("Provide your GEMINI API key: ")

Provide your GEMINI API key: ········


In [7]:
os.environ["COHERE_API_KEY"] = getpass.getpass("Provide your COHERE API key: ")

Provide your COHERE API key: ········


In [9]:
from langchain_qdrant import QdrantVectorStore
from qdrant_client import QdrantClient
from qdrant_client.http.models import Distance, VectorParams
# from langchain_nvidia_ai_endpoints import NVIDIAEmbeddings
# from langchain_cohere import CohereEmbeddings

client = QdrantClient(path = "study_bot_v3")  # Persistent local DB

# if not client.collection_exists("study_data_collection"):
#     client.create_collection(
#         collection_name="study_data_collection",
#         vectors_config=VectorParams(size=1024, distance=Distance.COSINE),
#     )

# cohere_embedding = CohereEmbeddings(
#     model="embed-english-v3.0",
#     cohere_api_key=os.getenv("COHERE_API_KEY")
# )


In [10]:
# !pip install langchain_cohere

In [11]:

obj = hub.pull("wfh/proposal-indexing")
llm = ChatGoogleGenerativeAI(
            model="gemini-1.5-flash-latest",
            google_api_key=google_api_key,
            temperature=0
        )
runnable = obj | llm


class Sentences(BaseModel):
    sentences: List[str]
    

from langchain.prompts import PromptTemplate

prompt = PromptTemplate.from_template(
    """Extract the key propositions from the following text. 
List each proposition as a bullet point.

Text:
{text}
    
Propositions:"""
)

def get_propositions(text):
    final_prompt = prompt.format(text=text)
    result = llm.invoke(final_prompt).content

    # Parse the bullet list into a clean list of strings
    lines = result.strip().split("\n")
    propositions = [line.lstrip("-• ").strip() for line in lines if line.strip()]
    return propositions


In [12]:
# !pip install langchain_ollama

In [13]:
from langchain_community.document_loaders import PyPDFLoader
from langchain_core.documents import Document


ac = AgenticChunker(google_api_key)

def read_pdf(path):
    loader = PyPDFLoader(path)
    file = loader.load()
    
    text = "\n".join([doc.page_content for doc in file])
    paragraphs = text.split("\n\n")
    
    return paragraphs

def encode_pdf(path) :
    paragraphs = read_pdf(path)
    
    propositions = get_propositions(paragraphs)
    
    ac.add_proposition(propositions)
    
#     print(propositions)

# def embed_in_vector_db(ac):
#     documents = []
    
#     for chunk in ac.get_chunks(get_type='dict').values():
#         content = "\n".join([p for sublist in chunk['propositions'] for p in (sublist if isinstance(sublist, list) else [sublist])])

#         metadata = {
#             "chunk_id": chunk['chunk_id'],
#             "title": chunk['title'],
#             "summary": chunk['summary'],
#             "chunk_index": chunk['chunk_index']
#         }
        
#         documents.append(Document(page_content=content,metadata=metadata))
        
#     vectorstore = QdrantVectorStore(
#         client=client,
#         collection_name=collection_name,
#         embedding=cohere_embedding,
#     )
    
#     vectorstore.add_documents(documents)
    
#     return vectorstore
    
    
    
path = input()

encode_pdf(path)

# vectorstore = embed_in_vector_db(ac,client,cohere_embedding)


C:\Users\sudda\Downloads\Aishee Das - 0153- DSCC 05.pdf

Adding: '["* Sukumar Ray's *Abol Tabol* is a cornerstone of Bengali nonsense literature, but also a nuanced critique of early 20th-century colonial Bengal's socio-political realities.", '* *Abol Tabol* uses satire, allegory, and surreal humor to reflect the contradictions and injustices of colonial society.', '* Through bizarre characters and illogical systems, *Abol Tabol* challenges colonial authority, mocks conformity, and highlights absurdities of social hierarchy.', "* Ray's use of nonsense allowed *Abol Tabol* to escape censorship while delivering subtle critiques.", "* *Abol Tabol* is a sophisticated work of protest literature, not merely children's verse.  It's a timeless reflection on power, freedom, and the human condition.", '* *Abol Tabol* masterfully satirizes authority by exposing the absurdities of bureaucratic systems, social hierarchies, and authority figures.', "* *Abol Tabol* critiques societal norms and expect

In [14]:
# from qdrant_client.http.models import CountRequest

# # Check how many points/documents are stored
# count = client.count(
#     collection_name="study_data_collection",
#     exact = True,
# )

# print(f"Number of vectors stored: {count.count}")


In [15]:
# from qdrant_client.http.models import Filter

# client.delete(
#     collection_name="study_data_collection",
#     points_selector=Filter(must=[])  # matches all points
# )


In [16]:
os.environ["HUGGINGFACE_API_KEY"] = getpass.getpass("Please provide HUGGINGFACE API key: ")

Please provide HUGGINGFACE API key: ········


In [17]:
huggingface_api_key = os.getenv("HUGGINGFACE_API_KEY")

## Retrieving 

### dense_retriever

In [18]:
from typing import List, Dict
import torch
from transformers import AutoTokenizer, AutoModelForMaskedLM
from langchain_core.embeddings import Embeddings
from qdrant_client.http import models # Import Qdrant's models

class SPLADESparseEncoder(Embeddings):
    def __init__(self, model_name: str = "naver/splade-cocondenser-ensembledistil"):
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForMaskedLM.from_pretrained(model_name)
        self.model.eval()
        if torch.cuda.is_available():
            self.model.cuda()

    def embed_documents(self, texts: List[str]) -> List[models.SparseVector]: # Change return type
        sparse_vectors_qdrant_format = []
        for text in texts:
            inputs = self.tokenizer(text, return_tensors="pt", truncation=True, padding=True)
            if torch.cuda.is_available():
                inputs = {k: v.cuda() for k, v in inputs.items()}

            with torch.no_grad():
                outputs = self.model(**inputs).logits
                sparse_vector = torch.max(outputs, dim=1).values.squeeze().cpu()
                token_ids = inputs["input_ids"].squeeze().cpu()

            aggregated_sparse_values = {}
            for token_id, value in zip(token_ids, sparse_vector):
                if token_id.item() != self.tokenizer.pad_token_id and value > 0:
                    if token_id.item() not in aggregated_sparse_values:
                        aggregated_sparse_values[token_id.item()] = float(value)
                    else:
                        aggregated_sparse_values[token_id.item()] += float(value)

            sorted_items = sorted(aggregated_sparse_values.items())
            indices = [item[0] for item in sorted_items]
            values = [item[1] for item in sorted_items]

            # Create an instance of Qdrant's SparseVector
            sparse_qdrant_vector = models.SparseVector(indices=indices, values=values)
            sparse_vectors_qdrant_format.append(sparse_qdrant_vector)

        return sparse_vectors_qdrant_format

    def embed_query(self, query: str) -> models.SparseVector: # Change return type
        return self.embed_documents([query])[0]

In [20]:
from langchain.embeddings import HuggingFaceEmbeddings
from langchain_qdrant import QdrantVectorStore, RetrievalMode
from qdrant_client import QdrantClient, models
from qdrant_client.http.models import Distance, VectorParams, SparseVectorParams, SparseIndexParams
# from sentence_transformers import SentenceTransformer

# sparse_model = SentenceTransformer("naver/splade_v2_max")
# 1. Dense embedding model: bge-base-en-v1.5
dense_embeddings = HuggingFaceEmbeddings(
    model_name="BAAI/bge-base-en-v1.5",
    cache_folder="./hf_models"
)

# 2. Sparse embedding model: splade++
sparse_embedding = SPLADESparseEncoder()

# 3. Qdrant local persistent client
# client = QdrantClient(path="qdrant_data")  # persistent storage folder

# 4. Create hybrid collection
client.create_collection(
    collection_name="hybrid_search_collection4",
    vectors_config={"dense": VectorParams(size=768, distance=Distance.COSINE)},  # bge is 768-d
    sparse_vectors_config={"sparse": SparseVectorParams(index=SparseIndexParams(on_disk=False))}
)

# 5. Initialize hybrid Qdrant vector store
qdrant = QdrantVectorStore(
    client=client,
    collection_name="hybrid_search_collection4",
    embedding=dense_embeddings,
    sparse_embedding=sparse_embedding,
    retrieval_mode=RetrievalMode.HYBRID,
    vector_name="dense",
    sparse_vector_name="sparse",
)



# # 8. Show results
# for i, doc in enumerate(results, 1):
#     print(f"\nResult #{i}:\n{doc.page_content}")


In [21]:
from langchain.schema import Document
from uuid import uuid4
documents = []
uuids = []
uuids = []
for chunk in ac.get_chunks(get_type='dict').values():
    flattened_propositions = [prop_item for sublist in chunk["propositions"] for prop_item in sublist]
    content = "\n".join(flattened_propositions)
    metadata = {
        "chunk_id": chunk["chunk_id"],
        "title": chunk["title"],
        "summary": chunk["summary"],
        "chunk_index": chunk["chunk_index"]
    }
    uuid = str(uuid4())
    uuids.append(uuid)
    documents.append(Document(page_content=content, metadata=metadata))
    _ = qdrant.add_documents(documents=documents,ids=uuids)



In [22]:
retriever = qdrant.as_retriever()
# # # 7. Perform hybrid search
# # query = ""
# retrieved_docs = qdrant.similarity_search(query, k=10)

In [23]:
# !pip install torch==2.7.1

In [24]:
from langchain.retrievers.document_compressors import CohereRerank
from langchain.retrievers.contextual_compression import ContextualCompressionRetriever

cohere_rerank = CohereRerank(model="rerank-english-v3.0", top_n=5, cohere_api_key=os.getenv("COHERE_API_KEY"))
reranker = ContextualCompressionRetriever(
    base_compressor = cohere_rerank,
    base_retriever = retriever,
)


  cohere_rerank = CohereRerank(model="rerank-english-v3.0", top_n=5, cohere_api_key=os.getenv("COHERE_API_KEY"))


## RAG Prompt

In [25]:
from langchain_core.prompts import ChatPromptTemplate

RAG_TEMPLATE = """
You are a helpful assistant that answers questions based only on the provided context. Do not use any external knowledge or make assumptions.

--------------------
CONTEXT
--------------------
{context}

--------------------
QUERY
--------------------
{query}

--------------------
INSTRUCTIONS
--------------------
- Answer only using the context above.
- If the answer is not explicitly mentioned or cannot be inferred, respond with: "I couldn't find the answer in the provided context."
- Keep the answer clear and concise.
- Do not repeat the question unless necessary.
- Use bullet points if the answer has multiple parts.
"""

chat_prompt = ChatPromptTemplate.from_messages([
    ("human", RAG_TEMPLATE)
])


In [45]:
llm = ChatGoogleGenerativeAI(
    model="gemini-1.5-flash-latest",
    google_api_key=google_api_key,
    temperature=0.0,
)

## Answer for sample question


In [46]:

query = "What is Abol Tabol about and what are its key themes?" # Example query

retrieved_docs = reranker.invoke(query)

print("--- Retrieved Documents ---")
for i, doc in enumerate(retrieved_docs):
    print(f"Document {i+1}:\nContent: {doc.page_content}\nMetadata: {doc.metadata}\n---")


context_texts = [doc.page_content for doc in retrieved_docs]
context = "\n\n".join(context_texts)

rag_chain = chat_prompt | llm

response = rag_chain.invoke({
    "context": context,
    "query": query
})

print("\n--- Answer ---")
print(response.content)

--- Retrieved Documents ---
Document 1:
Content: * Sukumar Ray's *Abol Tabol* is a cornerstone of Bengali nonsense literature, but also a nuanced critique of early 20th-century colonial Bengal's socio-political realities.
* *Abol Tabol* uses satire, allegory, and surreal humor to reflect the contradictions and injustices of colonial society.
* Through bizarre characters and illogical systems, *Abol Tabol* challenges colonial authority, mocks conformity, and highlights absurdities of social hierarchy.
* Ray's use of nonsense allowed *Abol Tabol* to escape censorship while delivering subtle critiques.
* *Abol Tabol* is a sophisticated work of protest literature, not merely children's verse.  It's a timeless reflection on power, freedom, and the human condition.
* *Abol Tabol* masterfully satirizes authority by exposing the absurdities of bureaucratic systems, social hierarchies, and authority figures.
* *Abol Tabol* critiques societal norms and expectations through satire and absurdity, 

In [47]:
# !pip install -U langchain_huggingface

In [49]:

from huggingface_hub import InferenceClient
print(f"Does InferenceClient have 'post' method? {'post' in dir(InferenceClient)}")

Does InferenceClient have 'post' method? False
