In [1]:
from getpass import getpass
import logging
import os
import warnings
from typing import Tuple, List

import pandas as pd

import mlflow

from haystack.utils import Secret
from haystack import Pipeline

from haystack.dataclasses import Document

from haystack_integrations.document_stores.qdrant import QdrantDocumentStore
from haystack_integrations.components.retrievers.qdrant import QdrantEmbeddingRetriever
from haystack.document_stores.types import DuplicatePolicy
from haystack.utils import ComponentDevice

from haystack.evaluation.eval_run_result import EvaluationRunResult

from haystack.components.builders import PromptBuilder, AnswerBuilder
from haystack.components.joiners import DocumentJoiner
from haystack.components.writers import DocumentWriter
from haystack.components.evaluators import (
    FaithfulnessEvaluator,
    ContextRelevanceEvaluator,
)

from haystack.components.fetchers.link_content import LinkContentFetcher
from haystack.components.converters import HTMLToDocument, PyPDFToDocument
from haystack.components.preprocessors import DocumentSplitter, DocumentCleaner
from haystack.components.generators import OpenAIGenerator
from haystack.components.embedders import (
    SentenceTransformersDocumentEmbedder,
    SentenceTransformersTextEmbedder,
)

In [2]:
logging.getLogger("mlflow").setLevel(logging.ERROR)
warnings.filterwarnings("ignore", category=FutureWarning)

In [3]:
MLFLOW_TRACKING_URI = "http://127.0.0.1:8080"
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)

In [3]:
# You need a paid OPENAI API KEY
OPENAI_API_KEY = getpass("OPENAI_API_KEY: ", stream=None)

In [4]:
def create_vector_database() -> QdrantDocumentStore:
    """
    Creates the vector database (persisted) for the RAG system.
    It creates documents from a prespecified PDF file and html byte
    streams from some preselected web pages.

    Returns
    =======
    An instance of a vector database
    """
    # Persisted document store
    document_store = QdrantDocumentStore(
        path=os.path.join(os.getcwd(), "vd"),
        index="Document",
        recreate_index=False,
        embedding_dim=384,  # The document embedder (below) produces dense vectors of dimensionality 384
    )
    # Pipeline components to create the vector database
    fetcher = LinkContentFetcher()
    html_converter = HTMLToDocument()
    pdf_converter = PyPDFToDocument()
    document_joiner = DocumentJoiner()
    cleaner = DocumentCleaner()
    ## Overlapping chunks helps preserving contextual integrity
    splitter = DocumentSplitter(split_by="word", split_length=256, split_overlap=16)
    # Document Embedder (sentence transformer)
    document_embedder = SentenceTransformersDocumentEmbedder(
        model="BAAI/bge-small-en-v1.5",  # Check the hugging face website for more info about the transformer
        device=ComponentDevice.from_str(
            "cuda:0"
        ),  # Replace cuda:0 with cpu if GPU is not available
    )
    ## Download the model
    document_embedder.warm_up()
    ## Writes documents and their embeddings into the vector database
    writer = DocumentWriter(document_store=document_store, policy=DuplicatePolicy.SKIP)

    ## Pipeline
    indexing_pipeline = Pipeline()

    # Adding components into the pipeline
    indexing_pipeline.add_component(instance=fetcher, name="fetcher")
    indexing_pipeline.add_component(instance=html_converter, name="html_converter")
    indexing_pipeline.add_component(instance=pdf_converter, name="pdf_converter")
    indexing_pipeline.add_component(instance=document_joiner, name="document_joiner")
    indexing_pipeline.add_component(instance=cleaner, name="cleaner")
    indexing_pipeline.add_component(instance=splitter, name="splitter")
    indexing_pipeline.add_component(
        instance=document_embedder, name="document_embedder"
    )
    indexing_pipeline.add_component(instance=writer, name="writer")

    ## Pipeline connections
    indexing_pipeline.connect("fetcher.streams", "html_converter.sources")
    indexing_pipeline.connect("html_converter", "document_joiner")
    indexing_pipeline.connect("pdf_converter", "document_joiner")
    indexing_pipeline.connect("document_joiner.documents", "cleaner")
    indexing_pipeline.connect("cleaner", "splitter")
    indexing_pipeline.connect("splitter", "document_embedder")
    indexing_pipeline.connect("document_embedder", "writer.documents")

    ## Write the html byte streams and the pdf into the vector database
    indexing_pipeline.run(
        data={
            "fetcher": {
                "urls": [
                    "https://medium.com/@ssafarveisi/stream-processing-using-apache-flink-70c5a990801a",
                    "https://medium.com/@ssafarveisi/pyspark-stream-processing-on-k8s-using-stackable-data-platform-5695b0eafd6f",
                ],  # Replace these URLS with yours
            },
            "pdf_converter": {
                "sources": ["artifactory_for_poetry.pdf"]
            },  # Replace this PDF file with yours
        }
    )

    return document_store


def create_rag_pipeline(document_store: QdrantDocumentStore) -> Pipeline:
    """
    Creates a RAG pipeline using a persisted vector database.

    Parameters
    ==========
    document_store:
        An instance of a document store

    Returns
    =======
    An instance of a Pipeline (RAG)
    """
    # Create the prompt for the LLM (generative model)
    prompt_template = """
    Answer the following question given the documents.
    If the answer is not contained within the documents reply with 'no_answer'
    Query: {{question}}
    Documents:
    {% for document in documents %}
    {{document.content}}
    {% endfor %}
    """

    # Pipeline components for RAG
    prompt_builder = PromptBuilder(template=prompt_template)
    text_embedder = SentenceTransformersTextEmbedder(
        model="BAAI/bge-small-en-v1.5", device=ComponentDevice.from_str("cuda:0")
    )
    retriever = QdrantEmbeddingRetriever(document_store)
    llm = OpenAIGenerator(
        model="gpt-4o-mini", api_key=Secret.from_token(OPENAI_API_KEY)
    )

    ## Pipeline
    rag_pipeline = Pipeline()

    ## Adding components into the pipeline
    rag_pipeline.add_component("text_embedder", text_embedder)
    rag_pipeline.add_component("retriever", retriever)
    rag_pipeline.add_component("prompt_builder", prompt_builder)
    rag_pipeline.add_component("llm", llm)
    rag_pipeline.add_component("answer_builder", AnswerBuilder())

    ## Pipeline connections
    rag_pipeline.connect("text_embedder.embedding", "retriever.query_embedding")
    rag_pipeline.connect("retriever", "prompt_builder.documents")
    rag_pipeline.connect("prompt_builder", "llm")
    rag_pipeline.connect("llm.replies", "answer_builder.replies")
    rag_pipeline.connect("llm.meta", "answer_builder.meta")
    rag_pipeline.connect("retriever", "answer_builder.documents")

    return rag_pipeline

In [5]:
document_store = create_vector_database()

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

0it [00:00, ?it/s]


In [6]:
document_store.count_documents()

17

In [12]:
rag_pipeline = create_rag_pipeline(document_store=document_store)

In [46]:
# Example question
question = "Which version of Stackable did the author use?"

response = rag_pipeline.run(
    {
        "text_embedder": {"text": question},
        "prompt_builder": {"question": question},
        "answer_builder": {"query": question},
    },
)

print(response["answer_builder"]["answers"][0].data)

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

The version of Stackable that the author used is 22.11.


In [8]:
# This data frame is used to evaluate the retriever in the RAG pipeline.
# Replace the questions with yours. You need to modify the source key as well.
# The source key points to the actual source in which the relevant documents to the
# question can be found and retrieved by the retriever.
eval_data = pd.DataFrame(
    {
        "question": [
            "Which version of Stackable did the author use?",
            "Which K8s operators did the author use to showcase pyspark stream processing?",
            "Which mode did the author opt to deploy a Flink cluster on k8s?",
            "How many task managers did the author select for the Flink cluster?",
            "What is DEFAULT_CA_BUNDLE for poetry artifactory access?",
        ],
        "source": [
            [
                "https://medium.com/@ssafarveisi/pyspark-stream-processing-on-k8s-using-stackable-data-platform-5695b0eafd6f"
            ],
            [
                "https://medium.com/@ssafarveisi/pyspark-stream-processing-on-k8s-using-stackable-data-platform-5695b0eafd6f"
            ],
            [
                "https://medium.com/@ssafarveisi/stream-processing-using-apache-flink-70c5a990801a"
            ],
            [
                "https://medium.com/@ssafarveisi/stream-processing-using-apache-flink-70c5a990801a"
            ],
            ["artifactory_for_poetry.pdf"],
        ],
    }
)

In [9]:
# Set the mlflow experiment to which the mlflow runs are logged
mlflow.set_experiment("Evaluate RAG")

<Experiment: artifact_location='mlflow-artifacts:/987346353460110895', creation_time=1724771462751, experiment_id='987346353460110895', last_update_time=1724771462751, lifecycle_stage='active', name='Evaluate RAG', tags={}>

In [13]:
def evaluate_embedding():
    """Evaluates the retriever in the RAG pipeline"""

    def extract_source(doc: Document) -> str:
        if "url" in doc.meta:
            return doc.meta["url"]
        elif "file_path" in doc.meta:
            return doc.meta["file_path"]
        else:
            raise KeyError("Neither 'url' nor 'file_path' exists in the metadata")

    def retrieve_doc_sources(question: str) -> List[str]:
        response = rag_pipeline.run(
            {
                "text_embedder": {"text": question},
                "prompt_builder": {"question": question},
                "answer_builder": {"query": question},
            }
        )
        docs = response["answer_builder"]["answers"][0].documents
        return [extract_source(doc) for doc in docs]

    def retriever_model_function(question_df: pd.DataFrame) -> pd.Series:
        return question_df["question"].apply(retrieve_doc_sources)

    with mlflow.start_run():
        return mlflow.evaluate(
            model=retriever_model_function,
            data=eval_data,
            model_type="retriever",
            targets="source",
            evaluators="default",
        )

In [14]:
result = evaluate_embedding()

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Batches:   0%|          | 0/1 [00:00<?, ?it/s]

In [15]:
eval_results_of_retriever_df_bge_small_en = result.tables["eval_results_table"]

Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]

In [16]:
display(eval_results_of_retriever_df_bge_small_en)

Unnamed: 0,question,source,outputs,precision_at_3/score,recall_at_3/score,ndcg_at_3/score
0,Which version of Stackable did the author use?,[https://medium.com/@ssafarveisi/pyspark-strea...,[https://medium.com/@ssafarveisi/pyspark-strea...,0.333333,1,1.0
1,Which K8s operators did the author use to show...,[https://medium.com/@ssafarveisi/pyspark-strea...,[https://medium.com/@ssafarveisi/pyspark-strea...,0.666667,1,0.919721
2,Which mode did the author opt to deploy a Flin...,[https://medium.com/@ssafarveisi/stream-proces...,[https://medium.com/@ssafarveisi/stream-proces...,1.0,1,1.0
3,How many task managers did the author select f...,[https://medium.com/@ssafarveisi/stream-proces...,[https://medium.com/@ssafarveisi/stream-proces...,1.0,1,1.0
4,What is DEFAULT_CA_BUNDLE for poetry artifacto...,[artifactory_for_poetry.pdf],"[artifactory_for_poetry.pdf, artifactory_for_p...",0.666667,1,1.0


In [24]:
with warnings.catch_warnings():
    # Suppress the warning 'Inferred schema contains integer column(s)...'
    warnings.simplefilter("ignore", UserWarning)
    # Evaluate different top K strategy with Mlflow
    with mlflow.start_run() as run:
        evaluate_results = mlflow.evaluate(
            data=eval_results_of_retriever_df_bge_small_en,
            targets="source",
            predictions="outputs",
            evaluators="default",
            extra_metrics=[
                mlflow.metrics.precision_at_k(1),
                mlflow.metrics.precision_at_k(2),
                mlflow.metrics.precision_at_k(3),
                mlflow.metrics.recall_at_k(1),
                mlflow.metrics.recall_at_k(2),
                mlflow.metrics.recall_at_k(3),
                mlflow.metrics.ndcg_at_k(1),
                mlflow.metrics.ndcg_at_k(2),
                mlflow.metrics.ndcg_at_k(3),
            ],
        )

    display(evaluate_results.tables["eval_results_table"])

Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]

Unnamed: 0,question,precision_at_3/score,recall_at_3/score,ndcg_at_3/score,source,outputs,precision_at_1/score,precision_at_2/score,recall_at_1/score,recall_at_2/score,ndcg_at_1/score,ndcg_at_2/score
0,Which version of Stackable did the author use?,0.333333,1,1.0,[https://medium.com/@ssafarveisi/pyspark-strea...,[https://medium.com/@ssafarveisi/pyspark-strea...,1,0.5,1,1,1,1
1,Which K8s operators did the author use to show...,0.666667,1,0.919721,[https://medium.com/@ssafarveisi/pyspark-strea...,[https://medium.com/@ssafarveisi/pyspark-strea...,1,0.5,1,1,1,1
2,Which mode did the author opt to deploy a Flin...,1.0,1,1.0,[https://medium.com/@ssafarveisi/stream-proces...,[https://medium.com/@ssafarveisi/stream-proces...,1,1.0,1,1,1,1
3,How many task managers did the author select f...,1.0,1,1.0,[https://medium.com/@ssafarveisi/stream-proces...,[https://medium.com/@ssafarveisi/stream-proces...,1,1.0,1,1,1,1
4,What is DEFAULT_CA_BUNDLE for poetry artifacto...,0.666667,1,1.0,[artifactory_for_poetry.pdf],"[artifactory_for_poetry.pdf, artifactory_for_p...",1,1.0,1,1,1,1
