# Goal: Building a Retrieval-Augmented Generation
- **Overview**: After completing this tutorial, you'll have learned how to build an indexing pipeline that will preprocess files based on their file type, using the `FileTypeRouter`.

> 💡 (Optional): After creating the indexing pipeline in this tutorial, there is an optional section that shows you how to create a RAG pipeline on top of the document store you just created. You must have a [Hugging Face API Key](https://huggingface.co/settings/tokens) for this section

## Components Used

- [`FileTypeRouter`](https://docs.haystack.deepset.ai/docs/filetyperouter): This component will help you route files based on their corresponding MIME type to different components
- [`MarkdownToDocument`](https://docs.haystack.deepset.ai/docs/markdowntodocument): This component will help you convert markdown files into Haystack Documents
- [`PyPDFToDocument`](https://docs.haystack.deepset.ai/docs/pypdftodocument): This component will help you convert pdf files into Haystack Documents
- [`TextFileToDocument`](https://docs.haystack.deepset.ai/docs/textfiletodocument): This component will help you convert text files into Haystack Documents
- [`DocumentJoiner`](https://docs.haystack.deepset.ai/docs/documentjoiner): This component will help you to join Documents coming from different branches of a pipeline
- [`DocumentCleaner`](https://docs.haystack.deepset.ai/docs/documentcleaner) (optional): This component will help you to make Documents more readable by removing extra whitespaces etc.
- [`DocumentSplitter`](https://docs.haystack.deepset.ai/docs/documentsplitter): This component will help you to split your Document into chunks
- [`SentenceTransformersDocumentEmbedder`](https://docs.haystack.deepset.ai/docs/sentencetransformersdocumentembedder): This component will help you create embeddings for Documents.
- [`DocumentWriter`](https://docs.haystack.deepset.ai/docs/documentwriter): This component will help you write Documents into the DocumentStore

## Overview

In this tutorial, you'll build an indexing pipeline that preprocesses different types of files (markdown, txt and pdf). Each file will have its own `FileConverter`. The rest of the indexing pipeline is fairly standard - split the documents into chunks, trim whitespace, create embeddings and write them to a Document Store.

Optionally, you can keep going to see how to use these documents in a query pipeline as well.

## Preparing the Colab Environment

- [Enable GPU Runtime in Colab](https://docs.haystack.deepset.ai/docs/enabling-gpu-acceleration)
- [Set logging level to INFO](https://docs.haystack.deepset.ai/docs/logging)

## Installing dependencies


In [54]:
!pip install haystack-ai
!pip install mistral-haystack
!pip install "sentence-transformers>=3.0.0" "huggingface_hub>=0.23.0"
!pip install markdown-it-py mdit_plain pypdf
# installing flask to create the API
!pip install Flask
!pip install python-dotenv
!pip install flask-cors

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)




huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)




huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)




huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)




huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)




huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)




huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)




## Create a Pipeline to Index Documents

Next, you'll create a pipeline to index documents. To keep things uncomplicated, you'll use an `InMemoryDocumentStore` but this approach would also work with any other flavor of `DocumentStore`.

You'll need a different file converter class for each file type in our data sources: `.pdf`, `.txt`, and `.md` in this case. Our `FileTypeRouter` connects each file type to the proper converter.

Once all our files have been converted to Haystack Documents, we can use the `DocumentJoiner` component to make these a single list of documents that can be fed through the rest of the indexing pipeline all together.

In [56]:
from haystack.components.writers import DocumentWriter
from haystack.components.converters import MarkdownToDocument, PyPDFToDocument, TextFileToDocument
from haystack.components.preprocessors import DocumentSplitter, DocumentCleaner
from haystack.components.routers import FileTypeRouter
from haystack.components.joiners import DocumentJoiner
from haystack.components.embedders import SentenceTransformersDocumentEmbedder
from haystack import Pipeline
from haystack.document_stores.in_memory import InMemoryDocumentStore

document_store = InMemoryDocumentStore()
file_type_router = FileTypeRouter(mime_types=["text/plain", "application/pdf", "text/markdown"])
text_file_converter = TextFileToDocument()
markdown_converter = MarkdownToDocument()
pdf_converter = PyPDFToDocument()
document_joiner = DocumentJoiner()

From there, the steps to this indexing pipeline are a bit more standard. The `DocumentCleaner` removes whitespace. Then this `DocumentSplitter` breaks them into chunks of 150 words, with a bit of overlap to avoid missing context.

In [57]:
document_cleaner = DocumentCleaner()
document_splitter = DocumentSplitter(split_by="word", split_length=150, split_overlap=50)

Now you'll add a `SentenceTransformersDocumentEmbedder` to create embeddings from the documents. As the last step in this pipeline, the `DocumentWriter` will write them to the `InMemoryDocumentStore`.


In [58]:
document_embedder = SentenceTransformersDocumentEmbedder(model="sentence-transformers/all-MiniLM-L6-v2")
document_writer = DocumentWriter(document_store)

After creating all the components, add them to the indexing pipeline.

In [59]:
preprocessing_pipeline = Pipeline()
preprocessing_pipeline.add_component(instance=file_type_router, name="file_type_router")
preprocessing_pipeline.add_component(instance=text_file_converter, name="text_file_converter")
preprocessing_pipeline.add_component(instance=markdown_converter, name="markdown_converter")
preprocessing_pipeline.add_component(instance=pdf_converter, name="pypdf_converter")
preprocessing_pipeline.add_component(instance=document_joiner, name="document_joiner")
preprocessing_pipeline.add_component(instance=document_cleaner, name="document_cleaner")
preprocessing_pipeline.add_component(instance=document_splitter, name="document_splitter")
preprocessing_pipeline.add_component(instance=document_embedder, name="document_embedder")
preprocessing_pipeline.add_component(instance=document_writer, name="document_writer")

Next, connect them 👇

In [60]:
preprocessing_pipeline.connect("file_type_router.text/plain", "text_file_converter.sources")
preprocessing_pipeline.connect("file_type_router.application/pdf", "pypdf_converter.sources")
preprocessing_pipeline.connect("file_type_router.text/markdown", "markdown_converter.sources")
preprocessing_pipeline.connect("text_file_converter", "document_joiner")
preprocessing_pipeline.connect("pypdf_converter", "document_joiner")
preprocessing_pipeline.connect("markdown_converter", "document_joiner")
preprocessing_pipeline.connect("document_joiner", "document_cleaner")
preprocessing_pipeline.connect("document_cleaner", "document_splitter")
preprocessing_pipeline.connect("document_splitter", "document_embedder")
preprocessing_pipeline.connect("document_embedder", "document_writer")

<haystack.core.pipeline.pipeline.Pipeline object at 0x7082a5be3f50>
🚅 Components
  - file_type_router: FileTypeRouter
  - text_file_converter: TextFileToDocument
  - markdown_converter: MarkdownToDocument
  - pypdf_converter: PyPDFToDocument
  - document_joiner: DocumentJoiner
  - document_cleaner: DocumentCleaner
  - document_splitter: DocumentSplitter
  - document_embedder: SentenceTransformersDocumentEmbedder
  - document_writer: DocumentWriter
🛤️ Connections
  - file_type_router.text/plain -> text_file_converter.sources (List[Union[str, Path, ByteStream]])
  - file_type_router.application/pdf -> pypdf_converter.sources (List[Union[str, Path, ByteStream]])
  - file_type_router.text/markdown -> markdown_converter.sources (List[Union[str, Path, ByteStream]])
  - text_file_converter.documents -> document_joiner.documents (List[Document])
  - markdown_converter.documents -> document_joiner.documents (List[Document])
  - pypdf_converter.documents -> document_joiner.documents (List[Docume

Let's test this pipeline with a few articles.

In [61]:
from pathlib import Path
output_dir = 'articles'
preprocessing_pipeline.run({"file_type_router": {"sources": list(Path(output_dir).glob("**/*"))}})

Batches: 100%|█████████████████████████████| 40/40 [01:22<00:00,  2.07s/it]


{'document_writer': {'documents_written': 1256}}

🎉 If you only wanted to learn how to preprocess documents, you can stop here! If you want to see an example of using those documents in a RAG pipeline, read on.  

## (Optional) Build a pipeline to query documents

Now, let's build a RAG pipeline that answers queries based on the documents you just created in the section above. For this step, we will be using the [`MistralChatGenerator`](https://docs.haystack.deepset.ai/docs/mistralchatgenerator) so must have a [Hugging Face API Key](https://huggingface.co/settings/tokens) and [Mistralai](https://mistral.ai/) API key for this section.

In [62]:
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Check if environment variables exist, and set default values if they don't
if "HF_API_TOKEN" and 'MISTRAL_API_KEY' not in os.environ:
    os.environ["HF_API_TOKEN"] = os.getenv('HF_API_TOKEN') 
    os.environ["MISTRAL_API_KEY"] = os.getenv('MISTRAL_API_KEY')  


In this step you'll build a query pipeline to answer questions about the documents.

This pipeline takes the prompt, searches the document store for relevant documents, and passes those documents along to the LLM to formulate an answer.

> ⚠️ Notice how we used `sentence-transformers/all-MiniLM-L6-v2` to create embeddings for our documents before. This is why we will be using the same model to embed incoming questions.

In [63]:
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.components.builders import ChatPromptBuilder
from haystack.dataclasses import ChatMessage
from haystack.components.generators.chat import HuggingFaceAPIChatGenerator
from haystack_integrations.components.generators.mistral import MistralChatGenerator
from haystack.utils import Secret
from haystack import Pipeline

template = [
    ChatMessage.from_system("""
You are a helpful assistant that answers questions in German language only. Your goal is to provide accurate and detailed answers based exclusively on the specified context. If the answer to a question is not found in the context, you must respond with: "Ich weiß es nicht" (I don't know). 

When answering, always include the following details:
1. Dokumentenindex: Specify the document number or identifier.
2. Zeilen: Reference the exact line numbers where the information is located.
3. Detailed Explanation: Provide a thorough and clear explanation in German, ensuring the response is based solely on the provided context.

Answer Format:
[Dokumentenindex : {} | Zeilen : {}] {detailed answer in German}

Rules:
1. Do not make assumptions or provide information outside the given context.
2. If the context does not contain the answer, respond with: "Ich weiß es nicht".
3. Always answer in German and maintain a formal and professional tone.
4. Do not include source citations, annotations, or any additional information outside the specified format.
    """),
    ChatMessage.from_user(
        """
Answer the questions based on the given context.

Context:
{% for document in documents %}
    {{ document.content }}
{% endfor %}

Question: {{ question }}
Answer:
"""
    )
]

pipe = Pipeline()
pipe.add_component("embedder", SentenceTransformersTextEmbedder(model="sentence-transformers/all-MiniLM-L6-v2"))
pipe.add_component("retriever", InMemoryEmbeddingRetriever(document_store=document_store))
pipe.add_component("chat_prompt_builder", ChatPromptBuilder(template=template))
pipe.add_component(
    "llm",
    # Uncomment one of the following lines to use either Mistral or HuggingFace generator
    MistralChatGenerator(api_key=Secret.from_env_var("MISTRAL_API_KEY")),
    # HuggingFaceAPIChatGenerator(
    #     api_type="serverless_inference_api", api_params={"model": "HuggingFaceH4/zephyr-7b-beta"}
    # ),
)

pipe.connect("embedder.embedding", "retriever.query_embedding")
pipe.connect("retriever", "chat_prompt_builder.documents")
pipe.connect("chat_prompt_builder.prompt", "llm.messages")

# Uncomment the following block to run the pipeline interactively
# while True:
#     question = input("Ask a question: ")
#     answer = pipe.run({"embedder": {"text": question}, "chat_prompt_builder": {"question": question}})
#     print(answer['llm']['replies'][0].text)

<haystack.core.pipeline.pipeline.Pipeline object at 0x7082aeb5bce0>
🚅 Components
  - embedder: SentenceTransformersTextEmbedder
  - retriever: InMemoryEmbeddingRetriever
  - chat_prompt_builder: ChatPromptBuilder
  - llm: HuggingFaceAPIChatGenerator
🛤️ Connections
  - embedder.embedding -> retriever.query_embedding (List[float])
  - retriever.documents -> chat_prompt_builder.documents (List[Document])
  - chat_prompt_builder.prompt -> llm.messages (List[ChatMessage])

# Creating the API endpoint to communicate with the bot

In [None]:
from flask import Flask, request, jsonify
from flask_cors import CORS
import threading
import time


# Create an instance of the Flask class
app = Flask(__name__)

CORS(app)

port = 5800

@app.route("/api", methods=['POST'])
def chat():
    errors = []
    if request.method == 'POST':
        try:
            # Get JSON data from the request
            data = request.json
            question = data['message']
            answer = pipe.run({"embedder": {"text": question}, "chat_prompt_builder": {"question": question}})
            # Process the data (example: echo the data back)
            response = {
                "data": answer['llm']['replies'][0].text
            }
            return jsonify(response), 200
        except Exception as e:
            errors.append(str(e))  # Add the error to the errors list
    else:
        errors.append("Invalid request method")  # Add error for invalid method

    # If there are errors, return them in the response
    if errors:
        return jsonify({"errors": errors}), 400

def run_flask():
    print(f"Flask app is running on http://127.0.0.1:{port}/")
    app.run(port=port, debug=False, use_reloader=False)

# Global variable to track the Flask thread
flask_thread = None

# Main entry point
if __name__ == '__main__':
    # Start the Flask app in a separate thread
    flask_thread = threading.Thread(target=run_flask)
    flask_thread.daemon = True  # Daemonize the thread so it exits when the main program exits
    flask_thread.start()

    try:
        # Keep the main thread alive to keep the Flask app running
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("Shutting down Flask app...")
        # No need to explicitly stop the thread since it's daemonized

Flask app is running on http://127.0.0.1:5800/
 * Serving Flask app '__main__'
 * Debug mode: off


Address already in use
Port 5800 is in use by another program. Either identify and stop that program, or start the server with a different port.
