## Chunking techniques for Retrieval Augmented Chatbot

This notebook is ran on Kaggle (T4 x2)

In [None]:
!pip install --no-cache-dir llama-cpp-python==0.2.90 --extra-index-url https://abetlen.github.io/llama-cpp-python/whl/cu122
!pip install -qU json-repair datasets langchain==0.3.13 langchain-community==0.3.13 autoawq transformers accelerate faiss-gpu wikipedia googlesearch-python
!git clone https://github.com/chiphuyen/lazynlp.git
!pip install -r /kaggle/working/lazynlp/requirements.txt

import sys
sys.path.append('/kaggle/working/lazynlp')

## RAG Systems

### 1. Chat model
Qwen2.5-7B-Instruct

In [4]:
from langchain_community.chat_models import ChatLlamaCpp

chat_model = ChatLlamaCpp(
    model_path='/kaggle/input/rag-system/gguf/default/2/Qwen2.5-7B-Instruct.Q5_K_S.gguf',
    temperature=0.3,
    max_tokens=512,
    n_batch=32,
    n_ctx=32768,
    n_gpu_layers=-1,
    verbose=False
)

### 2. Embedding model
mxbai-embed-large-v1

In [5]:
from langchain_community.embeddings import LlamaCppEmbeddings

embeddings = LlamaCppEmbeddings(
    model_path="/kaggle/input/rag-system/gguf/default/2/mxbai-embed-large-v1.Q5_K_M.gguf",
    device='cuda',
    n_gpu_layers=-1,
    verbose=False
)

### 3. Documents retriever and Vector store

Retrieve documents through Google searches and Wikipedia articles

In [6]:
from langchain_community.retrievers import WikipediaRetriever
from googlesearch import search
from langchain_core.documents import Document
import lazynlp
import urllib
# headers for scraping
headers = {        
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',         
        'Accept-Language': 'en-US,en;q=0.5',        
        'Connection': 'keep-alive',                         
        'Referer': 'https://search.brave.com/',
        'Sec-Fetch-Dest': 'document',        
        'Sec-Fetch-Mode': 'navigate',        
        'Sec-Fetch-Site': 'none',        
        'Sec-Fetch-User': '?1',        
        'TE': 'trailers',   
        'Upgrade-Insecure-Requests': '1',        
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:133.0) Gecko/20100101 Firefox/133.0'        
    }

class CustomWebRetriever():
    """
    Given a query, retrieve top 5 Google searches and top 2 Wikipedia articles.
    """
    def __init__(self):
        self.wiki_retriever = WikipediaRetriever(top_k_results=2)

    def _get_documents_from_urls(self, urls):
        """
        1. Get htmls for each urls.
        2. Get page contents from htmls using lazynlp.
        3. If url request failed (4xx) or cannot get page content, skip that url.
        4. Return a list of documents.
        """
        docs = []
        for url in urls:
            try:
                req = urllib.request.Request(url, headers=headers)
                response = urllib.request.urlopen(req, timeout=3)
                page = response.read()

                cleaned_page = lazynlp.clean_page(page)
                if cleaned_page:
                    docs.append(Document(page_content=cleaned_page))
            except:
                pass
        
        return docs
    
    def invoke(self, query):
        """
        Retrieve list of relevant documents based on query.
        """
        wiki_docs = self.wiki_retriever.invoke(query)
        urls = [url for url in search(query, num_results=5) if 'wikipedia.org' not in url]
    
        web_docs = self._get_documents_from_urls(urls)
        web_docs.extend(wiki_docs)
        for doc in web_docs:
            doc.page_content = doc.page_content.replace('\t', ' ')
            doc.page_content = doc.page_content.replace('\n', ' ')
            doc.page_content = ' '.join(doc.page_content.split())

            
        return web_docs

Vectorstore

In [7]:
import faiss
from langchain_community.vectorstores import FAISS
from langchain_community.docstore.in_memory import InMemoryDocstore

class BaseVectorstoreRetriever():
    """
    Base class for using FAISS vectorstore to store documents (chunks), and retrieve context for chat model given query.
    """
    def __init__(self):
        index = faiss.IndexFlatL2(len(embeddings.embed_query("hello world")))
        self.vector_store = FAISS(
            embedding_function=embeddings,
            index=index,
            docstore= InMemoryDocstore(),
            index_to_docstore_id={}
        )
        self.retriever = CustomWebRetriever()

    def _get_docs_list(self, docs):
        return
    
    def invoke(self, query):
        """
        1. Retrieve list of relevant documents (chunks) based on query.
        2. Add documents (chunks) to vectorstore.
        3. Retrieve list of contexts most similar (top 5) to query.
        """
        docs = self.retriever.invoke(query)
        docs_list = self._get_docs_list(docs)

        self.vector_store.add_documents(documents=docs_list)
        
        vectorstore_retriever = self.vector_store.as_retriever(search_type="similarity", search_kwargs={'k': 5})
        return vectorstore_retriever.invoke(query)

Naive Chunking

In [8]:
from langchain.text_splitter import RecursiveCharacterTextSplitter

class NaiveVectorstoreRetriever(BaseVectorstoreRetriever):
    """
    Split documents into chunks with each chunk size = 2000 and overlap between chunks = 200.
    """
    def __init__(self):
        super().__init__()
        self.text_splitter = RecursiveCharacterTextSplitter(chunk_size=2000, chunk_overlap=200, length_function=len)

    def _get_docs_list(self, docs):
        return self.text_splitter.split_documents(docs)


Propositions Chunking

In [9]:
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
import torch
from json_repair import repair_json
import json
from nltk.tokenize import sent_tokenize

class PropositionsChunkingVectorstoreRetriever(BaseVectorstoreRetriever):
    """
    Split documents based on propositions (self-contained facts) and fall back on sentence chunking.
    Propositionizer model from paper 'Dense X Retrieval: What Retrieval Granularity Should We Use?'.
    """
    def __init__(self):
        super().__init__()
        model_name = "chentong00/propositionizer-wiki-flan-t5-large"
        self.device = "cuda"
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForSeq2SeqLM.from_pretrained(model_name).to(self.device)

    def _generate_propositions(self, title, chunk, propositions):
        """
        1. Generate propositions from each chunk (list of sentences) using propositionizer model.
        2. Get/fix json string from model output to get list of propositions.
        3. If json string cannot be fix then fall back to using the original chunk (list of sentences).
        4. Return list of documents that are propositions/sentence chunking.
        """
        input_text = f"Title: {title}. Section: . Content: {' '.join(chunk)}"
        input_ids = self.tokenizer(input_text, return_tensors="pt").input_ids
        outputs = self.model.generate(input_ids.to(self.device), max_new_tokens=512).cpu()
        output_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)

        fixed_output_text = repair_json(output_text, ensure_ascii=False)

        if fixed_output_text == '':
            prop_doc_list = [Document(page_content=text, title=title) for text in chunk]
        else:
            prop_list = json.loads(fixed_output_text)
            prop_doc_list = [Document(page_content=text, title=title) for text in set(prop_list) if isinstance(text, str)]
        propositions.extend(prop_doc_list)

        return propositions
    
    def _get_propositions_with_overlap(self, docs_list, max_chunk_tokens=450):
        """
        1. Split document into sentences using nltk
        2. Concat sentences into a chunk until chunk tokens reach max chunk tokens of 450
        3. One sentence overlap between chunks
        """
        propositions = []
        
        for doc in docs_list:
            cleaned_context = doc.page_content.replace("\n", " ")
            sentences = sent_tokenize(cleaned_context)
            chunk = []
            current_tokens = 0
            last_sentence = ""
            doc_title = doc.metadata.get('title', '')
            title_length = len(doc_title)
            for sentence in dict.fromkeys(sentences):
                token_count = len(self.tokenizer.encode(sentence, truncation=False))
                if current_tokens + token_count + title_length <= max_chunk_tokens:
                    chunk.append(sentence)
                    current_tokens += token_count
                else:
                    propositions = self._generate_propositions(doc_title, chunk, propositions)
                    chunk = [last_sentence, sentence]
                    current_tokens = len(self.tokenizer.encode(' '.join(chunk), truncation=False))
    
                last_sentence = sentence
    
            if chunk:
                propositions = self._generate_propositions(doc_title, chunk, propositions)
    
        return propositions

    def _get_docs_list(self, docs):
        return self._get_propositions_with_overlap(docs)

Small2Big Chunking

In [None]:
import uuid
from langchain.storage import InMemoryByteStore
from langchain.retrievers.multi_vector import MultiVectorRetriever, SearchType

class Small2BigVectorstoreRetriever(BaseVectorstoreRetriever):
    """
    Embed small chunks for better semantic similarity while retrieving parent chunks for better context.
    """
    def __init__(self):
        """
        Parent chunks with length of 10000 and overlap of 1000.

        Child chunks with length of 400, chunk_overlap = 40.
        
        Parent documents retriever with associated identifier, search with maximal marginal relevance.
        """
        super().__init__()
        self.parent_text_splitter = RecursiveCharacterTextSplitter(chunk_size=10000, chunk_overlap=1000, length_function=len)
        self.child_text_splitter = RecursiveCharacterTextSplitter(chunk_size=400, chunk_overlap=40, length_function=len)
        self.id_key = "doc_id"
        self.parent_retriever = MultiVectorRetriever(
            vectorstore=self.vector_store,
            byte_store=InMemoryByteStore(),
            id_key=self.id_key,
        )
        self.parent_retriever.search_type = SearchType.mmr
        
    def invoke(self, query):
        """
        1. Retrieve and split into parent chunks.
        2. For each parent chunks split into child chunks.
        3. Set the id for the child chunks as the id of the parent document.
        4. Embed child chunks into vectorstore.
        5. The retriever will find child chunks with the closest (mmr) embedding to the query, and will retrive the
        parent document via id.
        """
        docs = self.retriever.invoke(query)
        parent_docs = self.parent_text_splitter.split_documents(docs)
        parent_doc_ids = [str(uuid.uuid4()) for _ in parent_docs]

        sub_docs = []
        for i, doc in enumerate(parent_docs):
            _id = parent_doc_ids[i]
            _sub_docs = self.child_text_splitter.split_documents([doc])
            for _doc in _sub_docs:
                _doc.metadata[self.id_key] = _id
            sub_docs.extend(_sub_docs)

        self.parent_retriever.vectorstore.add_documents(sub_docs)
        self.parent_retriever.docstore.mset(list(zip(parent_doc_ids, docs)))

        return self.parent_retriever.invoke(query)

### 4. Context retrieval and Generation

In [11]:
import time

def rag_bot(
    questions: list[str], 
    retriever=NaiveVectorstoreRetriever(), 
    rag=True
) -> list[dict]:
    """
    Retrieval Augmented Chatbot (Q&A). Can pass in multiple questions.
    
    Default retriever is Naive (rag=True), set rag=False to not use the RAG System.
    
    Returns answers, documents retrieved and run time (s).
    """
    start_time = time.time()
    batch_docs = [retriever.invoke(question) for question in questions]
 
    instructions_batch = []
    for question, docs in zip(questions, batch_docs):
        if rag:
            docs_string = "\n\n".join(doc.page_content for doc in docs)
            # RAG Prompt
            instructions = f"""You are a helpful assistant who provides concise answers. 
Use the following source documents to answer the user's questions. 

Documents:
{docs_string}

That's the end of documents. Answer the question with either a single word/phrase.
If you don't know the answer, just say that you don't know."""
        else:
            # Normal Chatbot Prompt
            instructions = """You are a helpful assistant who provides concise answers. 
Answer the question with either a single word/phrase.
If you don't know the answer, just say that you don't know."""
        
        instructions_batch.append(
            [
                {"role": "system", "content": instructions},
                {"role": "user", "content": question},
            ]
        )

    ai_responses = [chat_model.invoke(messages) for messages in instructions_batch]

    results = [
        {"answer": ai_msg.content, "documents": docs}
        for ai_msg, docs in zip(ai_responses, batch_docs)
    ]

    end_time = time.time()
    elapsed_time = end_time - start_time
    return results, elapsed_time

### 5. Evaluation

Evaluation using LLM-as-a-judge

In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM

judge_tokenizer = AutoTokenizer.from_pretrained("PrunaAI/prometheus-eval-prometheus-7b-v2.0-AWQ-4bit-smashed")
judge_model = AutoModelForCausalLM.from_pretrained("PrunaAI/prometheus-eval-prometheus-7b-v2.0-AWQ-4bit-smashed", device_map='auto')

In [13]:
import re
def get_result(text):
    """
    Parse rubric score from judge model's output.
     
    If the output is not in the right format and cannot be parsed, return -1.
    """
    result_match = re.search(r"\[RESULT\]\s*(\d+)", text) 
    if result_match is None:
        return -1
    result = int(result_match.group(1))
    return result

In [14]:
SCORE_RUBRIC_TEMPLATE = """
[{criteria}]
Score 1: {score1_description}
Score 2: {score2_description}
Score 3: {score3_description}
Score 4: {score4_description}
Score 5: {score5_description}
""".strip()

ABS_SYSTEM_PROMPT = "You are a fair judge assistant tasked with providing clear, objective feedback based on specific criteria, ensuring each assessment reflects the absolute standards set for performance."

ABSOLUTE_PROMPT_WO_REF = """###Task Description:
An instruction (might include an Input inside it), a response to evaluate, and a score rubric representing a evaluation criteria are given.
1. Write a detailed feedback that assess the quality of the response strictly based on the given score rubric, not evaluating in general.
2. After writing a feedback, write a score that is an integer between 1 and 5. You should refer to the score rubric.
3. The output format should look as follows: "(write a feedback for criteria) [RESULT] (an integer number between 1 and 5)"
4. Please do not generate any other opening, closing, and explanations.

###The instruction to evaluate:
{instruction}

###Response to evaluate:
{response}

###Score Rubrics:
{rubric}

###Feedback: """

def evaluate(instructions, responses, rubric_data):
    """
    1. Get correct instruction for each evaluation factor.
    2. Get the judge model output (feedback and score) of the evaluation between the responses and the instructions.
    3. Return the judge model score, -1 if score cannot be parsed from judge model output.
    """
    batch_messages = []
    for instruction, response in zip(instructions, responses):
        user_content = ABS_SYSTEM_PROMPT + "\n\n" + ABSOLUTE_PROMPT_WO_REF.format(
            instruction=instruction,
            response=response,
            rubric=SCORE_RUBRIC_TEMPLATE.format(**rubric_data),
        )
        batch_messages.append([{"role": "user", "content": user_content}])
        
    tokenized_inputs = [
        judge_tokenizer.apply_chat_template([message], return_tensors="pt", padding=True, truncation=True).to("cuda")
        for message in batch_messages
    ]

    attention_masks = [
        (inputs != judge_tokenizer.pad_token_id).int().to("cuda")
        for inputs in tokenized_inputs
    ]
    
    generated_ids = [
        judge_model.generate(
            inputs,
            attention_mask=mask,
            max_new_tokens=1000,
            do_sample=False,
            pad_token_id=judge_tokenizer.eos_token_id
        )
        for inputs, mask in zip(tokenized_inputs, attention_masks)
    ]
    
    decoded_texts = [
        judge_tokenizer.decode(ids[0], skip_special_tokens=True)
        for ids in generated_ids
    ]
    
    return [get_result(output) for output in decoded_texts]

Evaluation metrics:
- Correctness (Response vs reference answer)
- Relevance (Response vs input)
- Groundedness (Response vs retrieved docs)
- Retrieval relevance (Retrieved docs vs input)

In [15]:
correctness_rubric_data = {
    "criteria": "How similar/correct is the model response, relative to the reference answer?",
    "score1_description": "The response is largely incorrect or diverges significantly from the reference answer, offering little to no overlap or relevance.",
    "score2_description": "The response contains some elements that align with the reference answer but misses key details or includes substantial inaccuracies.",
    "score3_description": "The response covers the main points of the reference answer but may omit minor details or introduce minor inaccuracies.",
    "score4_description": "The response aligns well with the reference answer, accurately capturing most details with rare and insignificant deviations.",
    "score5_description": "The response is highly accurate, fully aligned with the reference answer, and captures all key points without any inaccuracies or omissions.",
}


In [16]:
relevance_rubric_data = {
    "criteria": "How well does the generated response address the initial user input?",
    "score1_description": "The response is completely off-topic or fails to address the input in any meaningful way.",
    "score2_description": "The response has some relevance to the input but fails to address it fully or includes unrelated information.",
    "score3_description": "The response is generally relevant to the input, though it may miss certain aspects or include unnecessary information.",
    "score4_description": "The response is closely aligned with the input, addressing it effectively with minimal extraneous content.",
    "score5_description": "The response is fully relevant, directly and comprehensively addressing the input in a precise and focused manner."
}

In [17]:
groundedness_rubric_data = {
    "criteria": "To what extent does the generated response agree with the retrieved context?",
    "score1_description": "The response is largely ungrounded, containing significant deviations or contradictions from the retrieved context.",
    "score2_description": "The response occasionally references the retrieved context but includes several inaccuracies or unsupported claims.",
    "score3_description": "The response is mostly grounded in the retrieved context but may omit key points or introduce minor unsupported details.",
    "score4_description": "The response is well-grounded in the retrieved context, with rare and minor inaccuracies or omissions.",
    "score5_description": "The response is fully grounded, accurately and comprehensively reflecting the retrieved context without any inconsistencies."
}

In [18]:
retrieval_relevance_rubric_data = {
"criteria": "How relevant are the retrieved documents to the input query?",
"score1_description": "The retrieved documents are irrelevant to the input query, offering no meaningful connection or support.",
"score2_description": "The retrieved documents have minimal relevance, with only a small fraction addressing the query appropriately.",
"score3_description": "The retrieved documents are moderately relevant, partially addressing the query but missing some key aspects.",
"score4_description": "The retrieved documents are highly relevant, addressing the query effectively with only minor gaps or unrelated content.",
"score5_description": "The retrieved documents are perfectly relevant, fully addressing the input query comprehensively and precisely.",
}

Evaluation dataset

In [None]:
from datasets import load_dataset, Dataset
SAMPLE = 100
ds = load_dataset("Stanford/web_questions", split='test')
# Only consider sample with ONE ground truth answer
ds = ds.filter(lambda example: len(example['answers'])==1)
ds = ds.shuffle(seed=24)
# Get 100 samples
ds = Dataset.from_dict(ds[:SAMPLE])

questions = ds['question']

reference_answers = [a[0] for a in ds['answers']]

Run RAG

In [None]:
# Change retriver for each retriever method (or set rag=False for base chat model)
outputs, total_time = rag_bot(ds['question'], retriever = PropositionsChunkingVectorstoreRetriever())

answers = [output['answer'] for output in outputs]

documents = ["\n\n".join(doc.page_content for doc in output['documents']) for output in outputs]

In [None]:
import gc
import torch

del chat_model
del embeddings

torch.cuda.empty_cache()
gc.collect()

Evaluate results

- Correctness (Response vs reference answer)
- Relevance (Response vs input)
- Groundedness (Response vs retrieved docs)
- Retrieval relevance (Retrieved docs vs input)

In [None]:
correctness_score = evaluate(
    instructions = reference_answers, 
    responses = answers,
    rubric_data = correctness_rubric_data, 
)

torch.cuda.empty_cache()
gc.collect()

relevance_score = evaluate(
    instructions = questions, 
    responses = answers, 
    rubric_data = relevance_rubric_data, 
)

torch.cuda.empty_cache()
gc.collect()

groundedness_score = evaluate(
    instructions = documents, 
    responses = answers,
    rubric_data = groundedness_rubric_data, 
)

torch.cuda.empty_cache()
gc.collect()

retrieval_relevance_score = evaluate(
    instructions = questions, 
    responses = documents,
    rubric_data = retrieval_relevance_rubric_data, 
)

Save csv file

In [None]:
import pandas as pd
# Change file name for each retriever method
pd.DataFrame({
    'questions': questions,
    'generated_answers': answers,
    'reference_answers': reference_answers,
    'documents': documents,
    'correctness' : correctness_score,
    'groundedness' : groundedness_score,
    'relevance' : relevance_score,
    'retrieval_relevance' : retrieval_relevance_score,
}).to_csv('prop_chunk_rag.csv')