# 03 - Assemble the RAG Pipeline

Notebook steps:

   - Load the vector database retriever and test it
   - Initialize the reader with a prompt template and a LLM
   - Create a RAG pipeline that combines the retriever and the reader


In [5]:
from collections import defaultdict
import pandas as pd
import numpy as np
from langchain.vectorstores import FAISS
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_openai import ChatOpenAI
from langchain.schema import Document

from lib.vector_store import VectorDB
from lib.io_utils import get_absolute_path
import torch

## Retriever

In the previous notebook, we created the vector database that works in the RAG as a boosted search engine that brings up the most relevant documents in relation to a user query.

We call it the retriever in the RAG.

We init the retriever, to understand how it works, we can look at the notebook `notebooks/scripts/02-build_vectordb.ipynb` which allows us to create a vectordb from a dataframe.


In [7]:
# init variables for retriever just for example (check available in config.yml for more details)
backend = "lancedb"
embedding_model_name = "Lajavaness/sentence-camembert-large"
db_store_path = get_absolute_path("data/retrievers/vectordb/camembert-large_lancedb")

db = VectorDB(
    backend=backend,
    embedding_model=embedding_model_name,
    path=db_store_path
)

üì¶ LanceDB intialization on: /Users/lucaterre/Documents/pro/Travail_courant/DEV/AI-ENC-Projects/on-github/encpos-qa-rag/data/retrievers/vectordb/camembert-large_lancedb
‚ÑπÔ∏è LanceDB table founded.


On teste le retriever

In [10]:
# Requ√™te
#query = ("Pourquoi trouve-t-on des images dans les marges des manuscrits au Moyen-√Çge ?")
#results = db.query(query, k=10)  # results = List[Tuple[Document, float]]

# Groupement par file_id
def group_by_thesis(results_with_score: list[tuple[Document, float]]) -> dict[str, list[tuple[Document, float]]]:
    """Group the results by thesis file_id.

    Args:
        results_with_score (list[tuple[Document, float]]): List of tuples containing Document and score.

    Returns:
        dict[str, list[tuple[Document, float]]]: Dictionary with file_id as keys and list of (Document, score) tuples as values.
    """
    grouped = defaultdict(list)
    for doc, score in results_with_score:
        file_id = doc.metadata.get("file_id", "unknown")
        grouped[file_id].append((doc, score))
    return grouped

# Affichage structur√©
def readeable_output_display(grouped_results: dict[str, list[tuple[Document, float]]]) -> None:
    """ Display the grouped results in a readable format.

    Args:
        grouped_results (dict[str, list[tuple[Document, float]]]): Dictionary with file_id as keys and list of (Document, score) tuples as values.
    Returns:
        None: This function prints the results directly.
    """
    for file_id, docs in grouped_results.items():
        # Trier par score descendant
        docs = sorted(docs, key=lambda tup: tup[1], reverse=True)

        titre = docs[0][0].metadata.get("position_name", "Sans titre")
        auteur = docs[0][0].metadata.get("author", "Inconnu")
        date = docs[0][0].metadata.get("year", "Inconnu")

        print(f"\nüìÑ {auteur}, {titre}, promotion {date}")
        print(f"üß© Chunks trouv√©s : {len(docs)}\n")

        for i, (doc, score) in enumerate(docs):
            extrait = doc.metadata.get("raw_chunk", doc.page_content).strip().replace("\n", " ")
            section = doc.metadata.get("section", "Inconnu")
            print(f"  ‚ñ™Ô∏è Extrait {i + 1} de la section '{section}' (score={score:.4f}) : [‚Ä¶]{extrait}[‚Ä¶]")
        print("-" * 80)

# Ex√©cution
#readeable_output_display(group_by_thesis(results))

def mini_retriever_playground(k: int=10)-> None:
    """Mini retriever playground to test the retriever with user input.

    Args:
        k (int): Number of results to return. Defaults to 10.

    Returns:
        None: This function runs an interactive loop to query the retriever.
    """
    query = input("Query: ")

    results = db.query(query, k=k)
    grouped_results = group_by_thesis(results)
    print(f"User query: {query}\n")
    readeable_output_display(grouped_results)


In [11]:
%%time
mini_retriever_playground(k=10)

User query: Quels sont les th√®mes iconographiques au Moyen-√Çge ?


üìÑ Emmanuelle Giry, L√©on Bloy et le Moyen √Çge l‚Äôimaginaire catholique renouvel√© ?, promotion 2011
üß© Chunks trouv√©s : 1

  ‚ñ™Ô∏è Extrait 1 de la section 'Annexes' (score=0.5563) : [‚Ä¶]Iconographie. ‚Äî Index des personnages m√©di√©vaux cit√©s dans l‚Äô≈ìuvre de Bloy. ‚Äî Tableau synoptique des principaux historiens lus par Bloy. ‚Äî Index g√©n√©ral.[‚Ä¶]
--------------------------------------------------------------------------------

üìÑ Jacques Yvon, L‚Äôillustration des romans arthuriens du xiiie au xve si√®cle, promotion 1948
üß© Chunks trouv√©s : 1

  ‚ñ™Ô∏è Extrait 1 de la section 'Chapitre III L‚Äôiconographie.' (score=0.5548) : [‚Ä¶]L‚Äôiconographie profane a √©t√© peu √©tudi√©e. Elle a fait des emprunts √† l‚Äôiconographie religieuse qui sont visibles dans l‚Äôillustration des romans de chevalerie. Certains th√®mes religieux ont √©t√© trait√©s par les enlumineurs de romans, comme l‚ÄôAnnonciatio

## Reader

Le reader est le composant du RAG qui va prendre les r√©sultats du retriever et les transformer (interpolation) en un prompt pr√©-structur√© et le passer un LLM qui en retour g√©n√®rera une r√©ponse structur√©e ou non.

Les param√®tres √† prendre compte pour le reader sont :
- Le mod√®le LLM qui va √™tre utilis√© pour g√©n√©rer la r√©ponse ;
- Le prompt qui va √™tre utilis√© pour g√©n√©rer la r√©ponse ;

Pour initialiser le Reader on passe par l'interface `ChatOpenAI` de LangChain qui permet d'utiliser les mod√®les LLM d'OpenAI (ou compatibles avec la specication d'API OpenAI, comme Mistral, Llama, etc.).
Le LLM est servi par LM Studio qui est un serveur compatible avec l'API OpenAI et qui permet d'utiliser des mod√®les LLM locaux facilement (il existe d'autres solutions comme Ollama ou Vllm par exemple)

Pour initialiser le LLM on doit sp√©cifier (voir la doc https://python.langchain.com/docs/integrations/chat/openai/) :
- Le nombre maximum de tokens √† g√©n√©rer par le LLM (`max_tokens`) ;
- Le temperature du LLM (pour la cr√©ativit√© de la r√©ponse). Une temperature de 0.0 rendra le LLM tr√®s d√©terministe, tandis qu'une temp√©rature de 1.0 le rendra plus cr√©atif et al√©atoire (`temperature`).
- L'url de l'API OpenAI (qui est en fait l'url du serveur LM Studio) (`openai_api_base`) ;
- La reponse sera en streaming (`streaming=True`) pour afficher la r√©ponse au fur et √† mesure de sa g√©n√©ration.
- max_retries : le nombre de tentatives de g√©n√©ration en cas d'erreur (par exemple, si le LLM ne r√©pond pas dans un d√©lai raisonnable) (`max_retries`).
- timeout : le d√©lai maximum d'attente pour une r√©ponse du LLM (`timeout`).


In [None]:
llm = ChatOpenAI(
    model_name="mistral-nemo-instruct-2407",
    openai_api_base="http://localhost:1234/v1",
    openai_api_key="lm-studio",
    temperature=0.0,
    streaming=True,
    max_tokens=4096,
)

Create a prompt template for the reader

Il n'est exetensible 3 param√®tres :
- output_buffer : le buffer de sortie du reader
- max_tokens : le nombre maximum de tokens √† g√©n√©rer
- embeding_max_tokens : le nombre maximum d

In [None]:
from typing import List, Tuple
from collections import defaultdict
from jinja2 import Template
import tiktoken
from nltk.tokenize import sent_tokenize
from langchain_core.documents import Document
import lmstudio as lms


# 1. Fonction pour compter les tokens
def get_token_count(text: str) -> int:
    model = lms.llm(
    )
    return len(model.tokenize(text))

# 2. Troncature douce : on enl√®ve des phrases √† la fin
def truncate_by_sentence(text: str, max_tokens: int) -> str:
    sentences = sent_tokenize(text)
    result = []
    for sent in sentences:
        result.append(sent)
        joined = " ".join(result)
        if get_token_count(joined) > max_tokens:
            result.pop()
            break
    return " ".join(result).strip() + " [‚Ä¶]"

# 3. G√©n√©ration du contexte structur√©, avec section annexe
def build_context_prompt(
    results: List[Tuple[Document, float]],
    question: str,
    template_path: str,
    max_total_tokens: int = 4096,
    output_buffer: int = 512,
    max_chunk_tokens: int = 500
) -> str:
    prompt_template = Template(open(template_path, encoding="utf-8").read())

    grouped = defaultdict(list)
    for doc, score in results:
        grouped[doc.metadata.get("file_id", "inconnu")].append((doc, score))

    # test if all score is under < 0.5:
    if all(score < 0.5 for _, score in results):
        return "no documents found"

    sorted_groups = sorted(grouped.items(), key=lambda item: max(s for _, s in item[1]), reverse=True)

    included_chunks = []
    fallback_chunks = []
    token_budget = max_total_tokens - output_buffer

    header_base = prompt_template.render(context="PLACEHOLDER", question="PLACEHOLDER", annex="PLACEHOLDER")
    header_tokens = get_token_count(header_base.replace("{{context}}", "").replace("{{question}}", ""))

    total_tokens = header_tokens

    for file_id, chunks in sorted_groups:
        chunks.sort(key=lambda x: x[1], reverse=True)
        meta = chunks[0][0].metadata
        header = f"* Position de th√®se : {meta.get('author','?')}, {meta.get('position_name','?')}, promotion {meta.get('year','?')}\n"
        section_lines = [header]

        for i, (doc, _) in enumerate(chunks):
            section = doc.metadata.get("section", "")
            extrait = doc.metadata.get("raw_chunk", doc.page_content).strip().replace("\n", " ")
            #extrait = truncate_by_sentence(extrait, max_chunk_tokens)

            line = f"Extrait {i+1}"
            if section:
                line += f" - section ¬´ {section} ¬ª"
            line += f" : {extrait}"

            chunk_tokens = get_token_count(line)
            if total_tokens + chunk_tokens > token_budget:
                fallback_chunks.append((meta, i+1, section))
                continue

            section_lines.append(line)
            total_tokens += chunk_tokens

        if len(section_lines) > 1:
            included_chunks.append("\n".join(section_lines) + "\n")

    context = "\n".join(included_chunks)

    # Th√®ses √©loign√©es
    if fallback_chunks:
        annex_by_thesis = defaultdict(list)
        for meta, i, section in fallback_chunks:
            file_id = meta.get("file_id", "inconnu")
            annex_by_thesis[file_id].append((meta, i, section))

        annex_lines = []
        for file_id, chunk_infos in annex_by_thesis.items():
            meta = chunk_infos[0][0]
            title = meta.get("position_name", "?")
            author = meta.get("author", "?")
            promo = meta.get("year", "?")
            header = f"* {author}, {title}, promotion {promo} :"
            annex_lines.append(header)

            for _, i, section in chunk_infos:
                line = f"\t- "
                if section:
                    line += f"section ¬´ {section} ¬ª"
                annex_lines.append(line)

        annex = "\n".join(annex_lines)
    else:
        annex = None

    return prompt_template.render(context=context, question=question, annex=annex)

final_prompt = build_context_prompt(
    results=results,
    question=query,
    template_path="../prompt_templates/v3.jinja",
    max_total_tokens=4096,
    output_buffer=512,
    max_chunk_tokens=500
)

print(final_prompt)

On passe le prompt au LLM pour obtenir la r√©ponse

In [None]:
import asyncio
import logging
import re
from IPython.display import display, Markdown
from spellchecker import SpellChecker
import language_tool_python
import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"
tool = language_tool_python.LanguageTool('fr')

def mask_capitalized_words(text):
    masks = {}
    def replacer(match):
        key = f"__MASK{len(masks)}__"
        masks[key] = match.group(0)
        return key

    # Masque tous les mots qui commencent par une majuscule (hors d√©but de phrase si souhait√©)
    pattern = r'\b[A-Z√â√à√Ä√Ç√ä√é√î√õ√á][a-z√©√®√™√†√ß√Æ√¥√ª√§√´√Ø√∂√º]+\b'
    masked_text = re.sub(pattern, replacer, text)
    return masked_text, masks

def unmask_text(text, masks):
    for key, original in masks.items():
        text = text.replace(key, original)
    return text



CONCLUSION = (
    "Pour rappel, cette r√©ponse est g√©n√©r√©e automatiquement √† partir d'un mod√®le de langue elle peut contenir des approximations, des surcorrections, des erreurs factuelles "
    "ou des interpr√©tations partielles. Cette r√©ponse ne pr√©tend pas se subsituer √† la critique des sources. Dans ce cadre, il est vivement recommand√© de v√©rifier les sources mentionn√©es "
    "dans cette r√©ponse et de consulter d'autres positions de th√®ses pour approfondir votre question."
)

def finalize_response(text: str, conclusion: str = CONCLUSION) -> str:
    lines = text.strip().split('\n')
    final_lines = []

    for line in lines:
        sentences = re.split(r'(?<=[.!?])\s+', line)
        if not sentences:
            continue
        if not re.search(r'[.!?]["‚Äù¬ª‚Äù]?\s*$', sentences[-1]):
            sentences = sentences[:-1]
        if sentences:
            final_lines.append(" ".join(sentences))

    cleaned_text = "\n".join(final_lines).strip()
    if not cleaned_text:
        return conclusion

    # Correction avec masquage
    masked_text, masks = mask_capitalized_words(cleaned_text)
    matches = tool.check(masked_text)
    corrected = language_tool_python.utils.correct(masked_text, matches)
    corrected_text = unmask_text(corrected, masks)

    return corrected_text + "\n\n" + conclusion

# Fonction de g√©n√©ration avec stream
async def stream_and_print(llm, prompt, max_tokens=100000, timeout=100000):
    full_response = ""
    output_display = display(Markdown("üü° G√©n√©ration en cours..."), display_id=True)

    async def _run_stream():
        nonlocal full_response
        chunks = []
        async for chunk in llm.astream(prompt):
            text = getattr(chunk, "content", str(chunk))
            full_response += text
            chunks.append(text)
            output_display.update(Markdown("".join(chunks).replace("\n", "\n\n")))
            if len(full_response.split()) > max_tokens:
                output_display.update(Markdown("‚õîÔ∏è **R√©ponse tronqu√©e : nombre de tokens d√©pass√©.**"))
                break

    try:
        await asyncio.wait_for(_run_stream(), timeout=timeout)
    except asyncio.TimeoutError:
        logging.warning("‚è≥ Timeout ‚Äî relance en stream...")
        output_display.update(Markdown("üîÅ **Reprise apr√®s timeout...**"))
        full_response = ""
        try:
            await asyncio.wait_for(_run_stream(), timeout=timeout)
        except Exception as e:
            logging.error(f"‚ùå √âchec du deuxi√®me stream : {e}")
            output_display.update(Markdown("‚ùå **Impossible de g√©n√©rer une r√©ponse actuellement.**"))
            return "Erreur"
    except Exception as e:
        logging.warning(f"üí• Erreur pendant le stream : {e}")
        output_display.update(Markdown("üîÅ **Nouvelle tentative de g√©n√©ration...**"))
        full_response = ""
        try:
            await asyncio.wait_for(_run_stream(), timeout=timeout)
        except Exception as e:
            logging.error(f"‚ùå √âchec de la seconde tentative : {e}")
            output_display.update(Markdown("‚ùå **Impossible de g√©n√©rer une r√©ponse actuellement.**"))
            return "Erreur"

    # Nettoyage et finalisation
    cleaned_response = finalize_response(full_response)
    output_display.update(Markdown(cleaned_response.replace("\n", "\n\n")))
    return cleaned_response

# Utilisation
if final_prompt == "no documents found":
    response = "Je n'ai trouv√© aucun document pertinent pour r√©pondre √† votre question. Veuillez reformuler votre question ou bien consulter les positions de th√®ses disponibles."
    # stream this
    output_display = display(Markdown(response), display_id=True)
else:
    response = await stream_and_print(llm, final_prompt)

with open("response.txt", "w", encoding="utf-8") as f:
    f.write(response)


Il existe deux optimisations possibles :

- hybrid search : algorithme classique de recherche d'information (par exemple, BM25) combin√© avec un retriever vectoriel (comme FAISS ou Qdrant) pour am√©liorer la pertinence des r√©sultats.
Les algorthmes classiques sont souvent tr√®s bon pour d√©tecter la pr√©sence / absence de mots-cl√©s dans les documents tandis que les algorithmes vectoriels sont meilleurs pour d√©tecter la similarit√© s√©mantique entre les documents et la question pos√©e.
On combine donc les deux approches pour obtenir des r√©sultats plus pertinents. Ceci est possible via `EnsembleRetriever` de LangChain qui permet de combiner plusieurs retrievers en un seul en se basant sur un algorithme de recherche hybride qui r√©alise une combinaison pond√©r√©e des r√©sultats de chaque retriever.

- reranking : cette m√©tode permet de r√©ordonner les r√©sultats obtenus par le retriever en fonction de la pertinence par rapport √† la question pos√©e en utilisant un mod√®le de langage avant le renvoi final des r√©sultats par le retriever. L'id√©e √©tant que le mod√®le de langage peut mieux comprendre le contexte et la pertinence des documents par rapport √† la question pos√©e, et ainsi r√©organiser les r√©sultats pour renvoyer les plus pertinents en premier.

Comme la classe VectorDB nous avons cr√©er une abstration `RAGPipeline` qui inclut le Retrievier via `VectorDB`, la logique vu pr√©c√©d√©ment ainsi que la possibilit√© d'utilis√© l'hybrid search et le reranking.

Dans le notebook sur l'√©valuation nous comparons les performances de ces deux approches que l'ont peut combiner ou non.

In [None]:
from utils.rag_pipeline import RAGPipeline

In [None]:
pipeline = RAGPipeline(
    vectordb_path="./scripts/data/vectordb/camembert-base_faiss",
    template_path="../prompt_templates/v3.jinja",
    backend="faiss",
    embedding_model=None,
    hybrid=True,
    bm25_path="../data/vectordb/bm25/bm25.encpos.tok.512_51.pkl",
    rerank=False,
    use_streaming=True
)

In [None]:
results = await pipeline.generate("Quelle est la place de l'h√©raldique dans les th√®ses du XIXe si√®cle ?")
for doc, score in pipeline.relevant_docs:
    print(f"[{score:.2f}] {doc.metadata.get('author', '?')} - {doc.metadata.get('section', '?')}")
    print(doc.page_content[:300], "...\n")

In [1]:
import time
from utils.rag_pipeline import RAGPipeline  # adapte si n√©cessaire

# Param√®tres communs
template_path = "../prompt_templates/v3.jinja"
query = "Quelle est la place de l'h√©raldique dans les th√®ses du XIXe si√®cle ?"
bm25_path = "../data/vectordb/bm25/bm25.encpos.tok.512_51.pkl"

configs = {
    "BM25 uniquement": dict(
        backend="faiss",
        vectordb_path=None,
        embedding_model=None,
        bm25_path=bm25_path,
        hybrid=False,
        rerank=False
    ),
    "Vectorstore FAISS": dict(
        backend="faiss",
        vectordb_path="./scripts/data/vectordb/camembert-base_faiss",
        embedding_model="Lajavaness/sentence-camembert-base",
        bm25_path=None,
        hybrid=False,
        rerank=False
    ),
    "Vectorstore LanceDB": dict(
        backend="lancedb",
        vectordb_path="./scripts/data/vectordb/camembert-base_lancedb",
        embedding_model="Lajavaness/sentence-camembert-base",
        bm25_path=None,
        hybrid=False,
        rerank=False
    ),
    "Hybrid FAISS": dict(
        backend="faiss",
        vectordb_path="./scripts/data/vectordb/camembert-base_faiss",
        embedding_model="Lajavaness/sentence-camembert-base",
        bm25_path=bm25_path,
        hybrid=True,
        rerank=False
    ),
    "Hybrid LanceDB": dict(
        backend="lancedb",
        vectordb_path="./scripts/data/vectordb/camembert-base_lancedb",
        embedding_model="Lajavaness/sentence-camembert-base",
        bm25_path=bm25_path,
        hybrid=True,
        rerank=False
    ),
    "Rerank FAISS": dict(
        backend="faiss",
        vectordb_path="./scripts/data/vectordb/camembert-base_faiss",
        embedding_model="Lajavaness/sentence-camembert-base",
        bm25_path=None,
        hybrid=False,
        rerank=True
    ),
    "Rerank LanceDB": dict(
        backend="lancedb",
        vectordb_path="./scripts/data/vectordb/camembert-base_lancedb",
        embedding_model="Lajavaness/sentence-camembert-base",
        bm25_path=None,
        hybrid=False,
        rerank=True
    ),
    "Hybrid + Rerank FAISS": dict(
        backend="faiss",
        vectordb_path="./scripts/data/vectordb/camembert-base_faiss",
        embedding_model="Lajavaness/sentence-camembert-base",
        bm25_path=bm25_path,
        hybrid=True,
        rerank=True
    ),
    "Hybrid + Rerank LanceDB": dict(
        backend="lancedb",
        vectordb_path="./scripts/data/vectordb/camembert-base_lancedb",
        embedding_model="Lajavaness/sentence-camembert-base",
        bm25_path=bm25_path,
        hybrid=True,
        rerank=True
    )
}

import pandas as pd
results = []

for label, kwargs in configs.items():
    print(f"\n=== {label} ===")
    pipeline = RAGPipeline(
        template_path=template_path,
        use_streaming=False,
        use_notebook=False,
        k=5,
        **kwargs
    )
    start = time.time()
    response = await pipeline.generate(query)
    duration = time.time() - start
    print(f"\n‚è±Ô∏è Temps √©coul√© : {duration:.2f} secondes")
    print(f"üìÑ Documents trouv√©s : {len(pipeline.relevant_docs)}")

    print("\nüîç Top 3 documents :")
    top = pipeline.relevant_docs[:3]
    for doc, score in top:
        print(f" - [{score:.2f}] {doc.metadata.get('author', '?')} | {doc.metadata.get('section', '?')}")
        print(f"   {doc.page_content[:200]}...\n")

    results.append({
        "Configuration": label,
        "Dur√©e (s)": round(duration, 2),
        "Nb docs": len(pipeline.relevant_docs)
    })

df = pd.DataFrame(results)
print("\nüìä R√©capitulatif :")
print(df.to_markdown(index=False))



=== BM25 uniquement ===

‚úÖ R√©ponse g√©n√©r√©e.
L'h√©raldique, qui est l'√©tude des blasons et des armoiries, semble avoir jou√© un r√¥le relativement limit√© dans les positions de th√®se √©tudi√©es. Les extraits fournis ne font pas directement r√©f√©rence √† cette discipline.
Dans la position de th√®se de Florence K√∂ll sur le Palais-Royal, bien que l'on puisse trouver des r√©f√©rences √† l'h√©raldique dans le contexte du XVIIIe si√®cle, il n'y a pas de mention sp√©cifique de cette discipline dans les √©v√©nements r√©volutionnaires et imp√©riaux qui ont suivi. L'√©tude se concentre plut√¥t sur la gestion du domaine, les travaux du palais princier, les locations, les probl√®mes d'hygi√®ne et de police, ainsi que sur l'utilisation du Tribunat et de la Bourse.
Dans la position de th√®se de Claire Haquet sur les ¬´ sages marchands et bourgeois de Rouen ¬ª, l'accent est mis sur l'histoire urbaine en France √† la fin du Moyen √Çge, avec une √©tude d'ensemble sur Paris et Rouen. L'h√©rald