[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/aiembassy/workshop-rag-haystack/blob/master/notebooks/03-building-rag-pipelines.ipynb)

In [None]:
!pip install "haystack-ai" \
    "trafilatura" \
    "qdrant-haystack" \
    "qdrant-client" \
    "huggingface-hub" \
    "sentence-transformers" \
    "nltk"

# Building RAG pipelines

Retrieval Augmented Generation is the only reliable and efficient way to bring private data into the Large Language Models we have come up with. In this notebook, we will build a pipeline that can be used to generate text from a RAG model.

Phi-3.5 is a good choice if the resource usage is our concern, however the [warm text generation models](https://huggingface.co/models?pipeline_tag=text-generation&sort=trending&inference=warm) list provides a lot of other models that can be used.

In [None]:
from haystack.utils import Secret
from haystack.components.generators import HuggingFaceAPIGenerator

generator = HuggingFaceAPIGenerator(
    api_type="serverless_inference_api",
    api_params={"model": "microsoft/Phi-3-mini-4k-instruct"},
    token=Secret.from_token("HF_TOKEN"),
)

We will use the same sentence transformer as before, but we can also experiment and swap it with another one, if we want to.

In [None]:
from haystack.components.embedders import (
    SentenceTransformersDocumentEmbedder,
    SentenceTransformersTextEmbedder,
)

document_embedder = SentenceTransformersDocumentEmbedder(model="all-MiniLM-L6-v2")
document_embedder.warm_up()

text_embedder = SentenceTransformersTextEmbedder(model="all-MiniLM-L6-v2")
text_embedder.warm_up()

Vector embeddings will be stored in Qdrant index, and we will use it to retrieve the most similar documents.

In [None]:
from haystack_integrations.document_stores.qdrant import QdrantDocumentStore

document_store = QdrantDocumentStore(
    ":memory:",  # Never use in production systems! It's a mode only for testing purposes.
    embedding_dim=384,  # The size of the embeddings produced by the model
    index="facts",  # We can have multiple indexes in the same database
)

## Pipelines

Haystack allows to create more complex process by building DAG-like pipelines consisting of different components. The flow of the application might be linear, or have branches choosing different paths based on the data or even by the LLM's output.

First of all, it makes sense to ingest some data that we'll be playing with. Haystack provides some tools to do the web scraping, but some custom data sources are either implemented as 3rd party plugins or can be built from scratch.

### Ingestion pipeline

We are going to scrape some web pages and store the data in the Qdrant index.

In [None]:
urls = [
    "https://en.wikipedia.org/wiki/Python_(programming_language)",
    "https://en.wikipedia.org/wiki/Java_(programming_language)",
    "https://en.wikipedia.org/wiki/C_(programming_language)",
]

In [None]:
from haystack.components.fetchers import LinkContentFetcher

link_content_fetcher = LinkContentFetcher()
fetched_urls = link_content_fetcher.run(urls=urls)
fetched_urls

In [None]:
from haystack.components.converters import HTMLToDocument

converter = HTMLToDocument()
documents = converter.run(sources=fetched_urls["streams"])
documents

It generally doesn't make sense to store the whole page as a single vector. Embeddings are not lossless, and the more text we have, the more information we lose. They also have a specific context window, and the text should be split into smaller parts to get the best results.

It's considered to be a good practice to split the text with some overlap between the chunks.

In [None]:
from haystack.components.preprocessors import DocumentSplitter

document_splitter = DocumentSplitter(
    split_by="sentence", split_length=10, split_overlap=5
)
document_splitter.warm_up()

split_documents = document_splitter.run(documents=documents["documents"])
split_documents["documents"]

We don't want to run all the steps manually, but rather create a pipeline that will do the job for us. It can also handle retries in case of any issues.

In [None]:
from haystack.components.writers import DocumentWriter

document_writer = DocumentWriter(document_store=document_store)

In [None]:
from haystack import Pipeline

ingestion_pipeline = Pipeline()

# Register all the components
ingestion_pipeline.add_component("link_content_fetcher", link_content_fetcher)
ingestion_pipeline.add_component("converter", converter)
ingestion_pipeline.add_component("document_splitter", document_splitter)
ingestion_pipeline.add_component("document_embedder", document_embedder)
ingestion_pipeline.add_component("document_writer", document_writer)

# Display the pipeline
ingestion_pipeline.show()

It's not yet what we wanted. The components are registered, but they are not connected. Let's fix it.

In [None]:
# Name of the inputs and outputs are documented in the Haystack's docs
# Example: https://docs.haystack.deepset.ai/docs/linkcontentfetcher
ingestion_pipeline.connect("link_content_fetcher.streams", "converter.sources")
ingestion_pipeline.connect("converter.documents", "document_splitter.documents")
ingestion_pipeline.connect("document_splitter.documents", "document_embedder.documents")
ingestion_pipeline.connect("document_embedder.documents", "document_writer.documents")

# Now the pipeline is done and all the dots are connected
ingestion_pipeline.show()

Final pipeline might be called as if it was a single component (yet, pipelines are not components in Haystack).

In [None]:
ingestion_pipeline.run(
    data={
        "urls": [
            "https://en.wikipedia.org/wiki/Ruby_(programming_language)",
            "https://en.wikipedia.org/wiki/JavaScript",
            "https://en.wikipedia.org/wiki/Rust_(programming_language)",
        ]
    }
)

In [None]:
document_store.count_documents()

### RAG pipeline

Once our data is indexed in Qdrant, we can build another pipeline that will connect to the same index and finally perform Retrieval Augmented Generation, using the LLM and the vector embeddings.

For that, we need to take a text input from the user, and then use it as a query to retrieve the most similar document. The text will be used as a prompt for the LLM to generate the text, along with the fetched documents, which will be used as a context.

We have a document store, but also need a retriever to interact with it.

In [None]:
from haystack_integrations.components.retrievers.qdrant import QdrantEmbeddingRetriever

retriever = QdrantEmbeddingRetriever(document_store=document_store)

Now all the components are ready, and we can build the pipeline. However, our prompt to the LLM has to include the user's query and the documents retrieved from the index, so we need to write it first.

Haystack allows to build prompts using the `PromptBuilder` class, and supports [Jinja templates](https://docs.haystack.deepset.ai/docs/jinja-templates) to allow dynamic content.

In [None]:
from haystack.components.builders import PromptBuilder

prompt_template = """
Please answer the following question using only the documents provided.
If the question is unanswerable based solely on the documents, please
answer "No answer". Do not make up any facts or details.

Question: {{ text }}

Documents:
{% for doc in documents %}
	Document {{ loop.index }}:
	Document name: {{ doc.meta['name'] }}
	{{ doc.content }}
{% endfor %}

Answer:
""".strip()
prompt_builder = PromptBuilder(template=prompt_template)

Now it's time to assemble the pipeline.

In [None]:
rag_pipeline = Pipeline()

# Register all the components
rag_pipeline.add_component("text_embedder", text_embedder)
rag_pipeline.add_component("retriever", retriever)
rag_pipeline.add_component("prompt_builder", prompt_builder)
rag_pipeline.add_component("generator", generator)

# Connect the components
rag_pipeline.connect("text_embedder.embedding", "retriever.query_embedding")
rag_pipeline.connect("retriever.documents", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder.prompt", "generator.prompt")

# Display the pipeline
rag_pipeline.show()

In [None]:
def rag(input_query: str) -> str:
    response = rag_pipeline.run(data={"text": input_query})
    return response["generator"]["replies"][0]


print(rag("When was Rust programming language created?"))

In [None]:
print(rag("What is the most popular programming language?"))

## Advanced RAG techniques

The basic pipeline shows how the basic RAG might be created in Haystack. However, it's a pretty new and evolving field, and there are many ways to improve it. For example, we could try to ask the LLM to first of all generate a list of steps to take to answer the question, and then call the same, or another, LLM to generate the answer. This method is called Chain-of-Thought (CoT).

### Chain-of-Thought pipeline

Let's rework the pipeline to include the CoT technique. We can modify the prompt template to generate the steps, and then use them to generate the answer.

In [None]:
new_prompt_template = """
Please generate a list of steps to take in order to answer the following question.
If the question is unanswerable based solely on the documents, please write down
the list of reasons while it's impossible to answer the question. Do not produce
anything more except the list of steps.

Question: {{ text }}

Documents:
{% for doc in documents %}
	Document {{ loop.index }}:
	Document name: {{ doc.meta['name'] }}
	{{ doc.content }}
{% endfor %}

List of steps:
""".strip()

Haystack does not allow to reuse the same component in different pipelines, or even twice in the same pipeline, as DAGs do not allow cycles. We have to create a new generator for the CoT pipeline.

In [None]:
resolving_generator = HuggingFaceAPIGenerator(
    api_type="serverless_inference_api",
    api_params={"model": "microsoft/Phi-3.5-mini-instruct"},
    token=Secret.from_token("HF_TOKEN"),
)

It's also important to build another prompt. So, another prompt template makes a lot of sense.

In [None]:
cot_prompt_template = """
Please perform the list of steps provided and return a succinct answer
to the following question. Be concise and do not provide any additional
information except the answer.

Question: {{ text }}

Steps: {{ replies[0] }}

Answer:
""".strip()

cot_prompt_builder = PromptBuilder(
    template=cot_prompt_template, required_variables=["replies"]
)

Extending the pipeline with the CoT technique is as simple as adding a few more components and connecting them.

In [None]:
rag_pipeline.add_component("cot_prompt_builder", cot_prompt_builder)
rag_pipeline.add_component("resolving_generator", resolving_generator)

rag_pipeline.connect("generator.replies", "cot_prompt_builder.replies")
rag_pipeline.connect("cot_prompt_builder.prompt", "resolving_generator.prompt")

In [None]:
def cot_rag(input_query: str) -> str:
    response = rag_pipeline.run(
        data={
            "text_embedder": {"text": input_query},
            "prompt_builder": {"text": input_query, "template": new_prompt_template},
            "cot_prompt_builder": {"text": input_query},
        }
    )
    return response["resolving_generator"]["replies"][0]

In [None]:
print(cot_rag("When was Rust programming language created?"))

In [None]:
print(cot_rag("What is the most popular programming language?"))