In [None]:
from dotenv import load_dotenv
load_dotenv()

## Extract texts, tables, images using Unstructured

In [None]:
from unstructured.partition.pdf import partition_pdf

output_path = "./"
file_path = output_path + "data/book.pdf"

chunks = partition_pdf(
    filename=file_path,
    infer_table_structure=True,

    strategy="hi_res",
    languages=['ben', 'eng'],

    extract_image_block_types=['Image', 'Table'],
    extract_image_block_to_payload=True,

    chunking_strategy="by_title",
    max_characters = 10000,
    combine_text_under_n_chars=2000,
    new_after_n_chars=6000,
)

In [3]:
# Extract texts from the chunks

texts = []

for chunk in chunks:
    if 'CompositeElement' in str(type(chunk)):
        texts.append(chunk)

In [4]:
# Get the images from CompositeElements

def get_images_base64(chunks):
    images_b64 = []

    for chunk in chunks:
        if 'CompositeElement' in str(type(chunk)):
            chunk_els = chunk.metadata.orig_elements
            for el in chunk_els:
                if 'Image' in str(type(el)):
                    images_b64.append(el.metadata.image_base64)
    
    return images_b64

images = get_images_base64(chunks)

## Summarize the content

First we will chunk the extracted texts. Then we will create summary of text chunks and images and then vectorize them.

In [5]:
from langchain_groq import ChatGroq
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

**N.B:** Llama-4-maverick and Llama-4-scout both are tested. Llama-4-maverick provides better results

In [6]:
llm_maverick = ChatGroq(
    temperature = 0.5,
    model = "meta-llama/llama-4-maverick-17b-128e-instruct"  
)

llm_scout = ChatGroq(
    temperature = 0.5,
    model = "meta-llama/llama-4-scout-17b-16e-instruct"  
)


### Text Chunking

**Text Chunking Strategy:**

1. Each chunk in texts is a Composite Element.
2. We will extract the texts from each chunk first.
3. Then those extracted text will be passed to RecursiveCharacterTextSplitter to split texts in a token aware setup.
4. Token aware strategy is followed because, Jina Embeddings model has a token limit of 8196.
5. Tiktoken is used to count token of each chunk

In [7]:
def get_texts(texts):
    """Extracts text from a list of CompositeElements or text objects."""
    texts_only = []
    for text in texts:
        if isinstance(text, dict):
            if 'text' in text:
                texts_only.append(text['text'])
        elif hasattr(text, 'text'):
            texts_only.append(text.text)
    
    return "\n".join(texts_only)

In [None]:

import tiktoken
from langchain_text_splitters import RecursiveCharacterTextSplitter

encoding = tiktoken.get_encoding("cl100k_base")

def token_length(text: str) -> int:
    return len(encoding.encode(text))

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=7000,               # Max tokens per chunk
    chunk_overlap=200,             # Optional overlap for context continuity
    length_function=token_length,  # Token-aware length function
    separators=[
        "\n\n",                    # Paragraph breaks
        "\n",                      # Line breaks
        "।",                       # Bangla sentence ending
        ".",                       # English sentence ending
        "!",                       # Exclamation
        "?",                       # Question
        ";",                       # Semicolon
        " ",                       # Word level
        ""                         # Character level (last resort)
    ],
    is_separator_regex=False
)

chunked_texts = text_splitter.split_text(get_texts(texts))

chunked_texts

### Text and tables summaries

In [9]:
# Prompt

prompt_text_summary = """
তুমি একজন বুদ্ধিমান ভাষা সহকারী, যার উদ্দেশ্য হলো একটি শিক্ষামূলক পাঠ্যবইয়ের নির্দিষ্ট অংশ (টেক্সট বা টেবিল) থেকে উচ্চ-গুণমানের, তথ্যসমৃদ্ধ সারাংশ তৈরি করা যা পরবর্তী পর্যায়ে প্রশ্নোত্তর ভিত্তিক ব্যবস্থায় কার্যকরভাবে ব্যবহৃত হতে পারে।

তোমার সারাংশ হতে হবে:
- মূল বিষয়বস্তু ও তথ্য সুনির্দিষ্টভাবে ধারণকারী  
- প্রসঙ্গ বজায় রেখে তৈরি   
- সংক্ষিপ্ত কিন্তু পরিপূর্ণ  

তুমি যেটা *করবে না*:
- ভূমিকা, উপসংহার, বা ব্যক্তিগত ব্যাখ্যা দেবে না  
- "সারাংশ:", "এই অংশে বলা হয়েছে" বা অনুরূপ বাক্যাংশ দিয়ে শুরু করবে না  
- অতিরিক্ত ব্যাখ্যামূলক বা অলংকারপূর্ণ ভাষা ব্যবহার করবে না  

উদ্দেশ্য হলো: পাঠ্যবস্তুর জ্ঞানগত কাঠামো সংরক্ষণ করে এমন একটি embedding-উপযোগী সারাংশ তৈরি করা।

এখন নিচের অংশ থেকে একটি সারাংশ তৈরি করো: {element}

"""

In [None]:
prompt = ChatPromptTemplate.from_template(prompt_text_summary)

text_summary_chain = prompt | llm_maverick | StrOutputParser()

text_summaries = text_summary_chain.batch(chunked_texts, {"max_concurrency": 1})

text_summaries

### Image summaries

In [16]:
prompt_image_summary = ChatPromptTemplate.from_messages([
    (
        "user",
        [
            {"type": "text", "text": """
                তুমি একজন দক্ষ চিত্র বিশ্লেষক ভাষা সহকারী, যার কাজ হলো একটি চিত্র (যেমন টেবিল, ডায়াগ্রাম, চিত্রিত উদাহরণ, বা সংক্ষিপ্ত লেখা-সহ চিত্র) দেখে একটি সংক্ষিপ্ত, তথ্যবহুল সারাংশ তৈরি করা।

                তোমার সারাংশ:
                - চিত্রের মূল তথ্য ও কাঠামো ধরে রাখবে
                - ব্যাখ্যামূলক এবং তথ্যসমৃদ্ধ হবে
                - ভবিষ্যৎ প্রশ্নোত্তর ব্যবস্থায় ব্যবহারের উপযোগী হবে

                তুমি যেটা *করবে না*:
                - চিত্রটি কী তা পুনরাবৃত্তি করবে না (যেমন: "এই চিত্রে দেখা যাচ্ছে...")
                - ব্যক্তিগত মন্তব্য, উপসংহার বা অলংকার ব্যবহার করবে না
                - প্রসঙ্গের বাইরে যাবে না

                লক্ষ্য: একটি embedding-উপযোগী সারাংশ তৈরি করা যা চিত্রের জ্ঞানকে স্পষ্টভাবে সংরক্ষণ করে।
                """
            }, 
            {"type": "image_url", "image_url": {"url": "{image_url}"}}
        ]
    )
])

In [None]:
image_summary_chain = prompt_image_summary | llm_maverick | StrOutputParser()

inputs = [{"image_url": f"data:image/jpeg;base64,{img}"} for img in images]

image_summaries = image_summary_chain.batch(inputs, {"max_concurrency": 1})

image_summaries

## Load data and summaries to vectorstore

**Strategy:**

- ChromaDB is used with MultiVectorRetriever and InMemoryDocStore
- Jina Embeddings is used

In [19]:
import uuid
from langchain_community.vectorstores import Chroma
from langchain.storage import InMemoryStore
from langchain_core.documents import Document
from langchain.retrievers.multi_vector import MultiVectorRetriever
from langchain_community.embeddings import JinaEmbeddings

In [20]:
embeddings_model = JinaEmbeddings(
    model_name="jina-embeddings-v3"
)

In [None]:
# vector store for summaries
vector_store = Chroma(
    collection_name="hsc-bangla-book",
    embedding_function=embeddings_model
)

# storage for parent documents
store = InMemoryStore()
id_key = "doc_id"

retriever = MultiVectorRetriever(
    vectorstore=vector_store,
    docstore=store,
    id_key=id_key,
    search_type="similarity",
    search_kwargs={"k": 5}
)

### Load summaries and link to the original data

**Strategy:**

1. We will load the text_summaries and image_summaries in vector_store.
2. Original texts (Before getting the summary) will be loaded into doc_store and will be linked to text_summaries in vector_store.

In [None]:
# add text summaries
text_ids = [str(uuid.uuid4()) for _ in texts]
summary_texts = [
    Document(page_content=summary, metadata={id_key: text_ids[i], "type": "text"}) 
    for i, summary in enumerate(text_summaries)
]
retriever.vectorstore.add_documents(summary_texts)
retriever.docstore.mset(list(zip(text_ids, texts)))

In [None]:
# add image summaries
img_ids = [str(uuid.uuid4()) for _ in images]
summary_img = [
    Document(page_content=summary, metadata={id_key: img_ids[i], "type": "image"}) 
    for i, summary in enumerate(image_summaries)
]
retriever.vectorstore.add_documents(summary_img)

## RAG Pipeline

### Short-term memory

Short term memory with max_turns = 5

In [None]:
class ShortTermMemory:
    def __init__(self, max_turns=5):
        self.max_turns = max_turns
        self.history = []  # Stores (user_input, assistant_output) tuples

    def update(self, user_input: str, assistant_output: str):
        self.history.append((user_input, assistant_output))
        self.history = self.history[-self.max_turns:]

    def get_context(self):
        return "\n".join([
            f"User: {u}\nAssistant: {a}" for u, a in self.history
        ])

    def reset(self):
        self.history = []

short_term_memory = ShortTermMemory(max_turns=5)

### Information Retrieval

**Strategy:**

1. MultiVectorRetriever is used with similarity search with two options provided:
   - Retrieve relevant texts summaries and image summaries from vector_store with raw_texts from doc_store
   - Retrive only relevant texts summaries and images summaries
2. Retriver will choose top-k (5) options and merge them.

In [83]:
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
from langchain_core.messages import HumanMessage
from base64 import b64decode

In [None]:
def retrieve_context(query, use_raw_texts=False, top_k=5):
    """Retrieve context based on the query using similarity search.
    Args:
        query (str): The query to search for.
        use_raw_texts (bool): Whether to return raw texts or summaries
        top_k (int): Number of top documents to retrieve.
    """
    
    retrieved_docs = retriever.vectorstore.similarity_search(query, k=top_k)
    
    final_contexts = []

    for doc in retrieved_docs:
        doc_type = doc.metadata.get("type")
        doc_id = doc.metadata.get(id_key)

        if doc_type == "text" and use_raw_texts:
            # Lookup full raw text from docstore
            raw_doc = retriever.docstore.mget([doc_id])[0]
            if raw_doc:
                final_contexts.append(raw_doc)
            else:
                # Fallback to summary if raw text not found
                final_contexts.append(doc)
        else:
            # Use the summary directly
            final_contexts.append(doc)

    return final_contexts


### Response Generation

**Strategy:**

- Create_chain function will retrieve contexts based on the query and retrieval strategy.
- Then will pass it to llm using build_prompt to generate response

In [62]:
def parse_docs(docs):
    """Parse retrieved documents into text and image content"""
    text_content = ""
    image_summaries = []
    
    for doc in docs:
        doc_type = doc.metadata.get("type")
        if doc_type == "text":
            text_content += doc.page_content + "\n\n"
        elif doc_type == "image":
            image_summaries.append(doc.page_content)
    
    return {
        "text_content": text_content.strip(),
        "image_summaries": image_summaries
    }

def build_prompt(kwargs):
    context = kwargs["context"]
    user_question = kwargs["question"]

    # Combine text content and image summaries
    context_text = context["text_content"]
    
    if context["image_summaries"]:
        context_text += "\n\nছবি সংক্রান্ত তথ্য:\n"
        for i, img_summary in enumerate(context["image_summaries"], 1):
            context_text += f"{i}. {img_summary}\n"
    
    memory_context = short_term_memory.get_context()

    prompt_template = f"""
        তুমি একজন সহায়ক এবং দ্বিভাষিক ভাষা সহকারী, যার কাজ হলো শুধুমাত্র নিচের প্রসঙ্গ ব্যবহার করে ব্যবহারকারীর প্রশ্নের উত্তর দেওয়া। প্রসঙ্গের মধ্যে টেক্সট, টেবিল, ও ছবি থেকে তথ্য থাকতে পারে।

        নির্দেশনা:
        - নিচের প্রসঙ্গ ব্যবহার করে প্রশ্নের উত্তর দেবে।
        - প্রশ্নের উত্তর সংক্ষিপ্ত, স্পষ্ট এবং প্রসঙ্গ-ভিত্তিক হওয়া উচিত।
        - কল্পনাপ্রসূত বা অনুমানভিত্তিক তথ্য এড়িয়ে চলবে।
        - প্রশ্নের উত্তর দেওয়ার সময় প্রসঙ্গের তথ্য উদ্ধৃত করবে না।
        - যদি কোনও প্রশ্নের উত্তর খুঁজে না পাও, তবে {context_text} পড়ে, আউটসাইড নলেজ এ সার্চ করে সঠিক তথ্য খুঁজে, নিজের reasoning ব্যবহার করে বাইরে থেকে সঠিক প্রসঙ্গ খুঁজে উত্তর দেবে।
        - কোনো preamble বা অতিরিক্ত ব্যাখ্যা ছাড়া সরাসরি প্রশ্নের উত্তর দেবে এবং Outside knowledge ব্যবহার করলে তা উল্লেখ করে দেবে।

        --- প্রসঙ্গ শুরু ---
        {context_text}
        --- প্রসঙ্গ শেষ ---

        প্রশ্ন: {user_question}
        """
    
    prompt_template = f"""
    তুমি একজন সহায়ক এবং দ্বিভাষিক ভাষা সহকারী, যার কাজ হলো শুধুমাত্র নিচের প্রসঙ্গ ব্যবহার করে ব্যবহারকারীর প্রশ্নের উত্তর দেওয়া। প্রসঙ্গের মধ্যে টেক্সট, টেবিল, ও ছবি থেকে তথ্য থাকতে পারে।

    --- প্রসঙ্গ (Long-Term Memory) ---
    {context_text}

    --- সাম্প্রতিক কথোপকথন (Short-Term Memory) ---
    {memory_context}

    প্রশ্ন / Question: {user_question}

    Instructions (English for clarity):
    - Use only the provided context (Long-Term + Short-Term Memory). Do not invent or assume.
    - Respond in the **same language** as the user's question (Bangla or English).
    - Keep the response short, clear, and grounded in the provided context.
    - Avoid hallucinated, vague, or speculative answers.
    - If you cannot find an answer to any question, then read the {context_text} and {memory_context}, search in outside knowledge to find correct information, use your own reasoning to find the correct context from outside and provide the answer.
    - Answer the question directly without any preamble or additional explanation, and if you use outside knowledge, mention that.

    """

    return ChatPromptTemplate.from_messages([
        HumanMessage(content=prompt_template)
    ])
                

In [None]:
def create_chain(use_raw_texts=False, top_k=5):
    """Create chain with specified retrieval strategy"""
    
    def get_context_for_chain(query):
        """Wrapper function to integrate retrieve_context with the chain"""
        docs = retrieve_context(query, use_raw_texts=use_raw_texts, top_k=top_k)
        return parse_docs(docs)
    
    return (
        {
            "context": RunnableLambda(get_context_for_chain),
            "question": RunnablePassthrough(),
        }
        | RunnableLambda(build_prompt) 
        | llm_scout
        | StrOutputParser()
    )

chain = create_chain(use_raw_texts=False, top_k=5)


In [None]:
# Long-Short Term Memory Example

questions = [
    "রবীন্দ্রনাথ ঠাকুরের ছদ্মনাম কি ছিলো?",
    "কল্যাণীর বাবা কে ছিলেন?",
    "কাকে অনুপমের ভাগ্য দেবতা বলা হয়েছে?",
    "ভানুসিংহ ঠাকুর কে ছিলেন?"
]

for query in questions:
    response = chain.invoke(query)
    short_term_memory.update(query, response)
    print(f"Qn: {query}\nAns: {response}\n---")

In [None]:
response = chain.invoke(
    "কল্যাণীর বাবা কে ছিলেন?"
)

print(response)

In [None]:
response = chain.invoke(
    "রবীন্দ্রনাথ ঠাকুরের ছদ্মনাম কি ছিলো?"
)

print(response)

## Evaluation



For evaluating the RAG system, we may use the RAGAS library.

We will evaluate based on :

1. Cosine Similarity Score (Similarity between the generated answer and the ground truth) - `Semantic Similarity`
2. Groundedness (Is the answer supported by retrieved context?) - `Response Groundness`, `Factual Correctness`, `Faithfulness`
3. Relevance (Does the system fetch the most appropriate documents?) - `Context Relevance`, `LLMContextRecall`

In [None]:
sample_queries = [
    "রবীন্দ্রনাথ ঠাকুরের ছদ্মনাম কি ছিলো?",
    "অনুপমের ভাষায় সুপুরুষ কাকে বলা হয়েছে?",
    "কল্যাণীর বাবা কে ছিলেন?",
    "কাকে অনুপমের ভাগ্য দেবতা বলে উল্লেখ করা হয়েছে?",
    "শম্ভুনাথ সেকরার হাত এ কি পরখ করতে দিয়েছিলেন?"
]

expected_responses = [
    "রবীন্দ্রনাথ ঠাকুরের ছদ্মনাম ছিলো ভানুসিংহ ঠাকুর",
    "অনুপমের ভাষায় শম্ভুনাথ কে সুপুরুষ বলা হয়েছে",
    "কল্যাণীর বাবা ছিলেন একজন ডাক্তার। তার নাম ছিলো শম্ভুনাথ সেন",
    "অনুপমের মামাকে তার ভাগ্য দেবতা বলে উল্লেখ করা হয়েছে",
    "শম্ভুনাথ সেকরার হাত এ একজোড়া ইয়ারিং পরখ করতে দিয়েছিলেন"
]

In [None]:
def format_docs(relevant_docs):
    return "\n".join(doc.page_content for doc in relevant_docs)

In [None]:
from ragas import EvaluationDataset

dataset = []

for query, reference in zip(sample_queries, expected_responses):
    relevant_docs = retrieve_context(query, top_k=5, use_raw_texts=True)
    response = chain.invoke(query)
    dataset.append(
        {
            "user_input": query,
            "retrieved_contexts": [rdoc.page_content for rdoc in relevant_docs],
            "response": response,
            "reference": reference,
        }
    )

evaluation_dataset = EvaluationDataset.from_list(dataset)

In [None]:
from ragas.llms import LangchainLLMWrapper
from ragas.embeddings import LangchainEmbeddingsWrapper

from ragas import evaluate
from ragas.metrics import SemanticSimilarity, LLMContextRecall, ResponseGroundedness, ContextRelevance, Faithfulness
from ragas.metrics._factual_correctness import FactualCorrectness


In [None]:
evaluator_llm = LangchainLLMWrapper(llm_scout)
evaluator_embedding = LangchainEmbeddingsWrapper(embeddings_model)

### Similarity Check

In [None]:
similarity_score = evaluate(
    dataset=evaluation_dataset,
    metrics=[SemanticSimilarity()],
    embeddings=evaluator_embedding,
    experiment_name="similarity_score"
)

similarity_score

### Groundness Check

In [None]:
groundness_check = evaluate(
    dataset=evaluation_dataset,
    metrics=[ResponseGroundedness(), Faithfulness(), FactualCorrectness()],
    llm=evaluator_llm,
    experiment_name="groundness_check"
)

groundness_check

### Relevancy Check

In [None]:
relevancy_check = evaluate(
    dataset=evaluation_dataset,
    metrics=[LLMContextRecall(), ContextRelevance()],
    llm=evaluator_llm,
    experiment_name="relevancy_check"
)

relevancy_check