# Creating a basic RAG pipeline with Haystack

Retrieval Augmented Generation (RAG) is a method that combines information retrieval with generative models to provide accurate and context-aware respones.

It is particularly useful for tasks requiring domain-specific knowledge or large-scale document retrieval.

## Setup Environment

This section clones the relevant data we are going to use in this notebook, while also installed all the relevant packages.


**NOTE: Make sure to change the notebook runtime to T4 GPU**

In [None]:
!git clone https://github.com/CaSToRC-CyI/AI-Agents-Training.git

In [None]:
%cd ./AI-Agents-Training

In [None]:
%%bash

uv pip install haystack-ai
uv pip install datasets -U
uv pip install "sentence-transformers>=4.1.0"
uv pip install huggingface_hub -U
uv pip install python-docx

### Import packages

In [6]:
import os
from pathlib import Path
from getpass import getpass
from haystack import Pipeline
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack import Document
from haystack.components.embedders import OpenAIDocumentEmbedder, OpenAITextEmbedder
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.dataclasses import ChatMessage
from haystack.components.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.converters import DOCXToDocument
import textwrap

### Setup Open-AI API key

In [19]:
os.environ["OPENAI_API_KEY"] = getpass("Enter OpenAI API key:")

### Convert data to Haystack Documents

Haystack uses these abstraction called *Documents*. They can hold text, tables, and binary data.

They have the following unique features:

- Unique ID for each document.
- Multiple content types are supported.
- Custom metadata and scoring for advanced document management.
- Optional embeddings for AI-based applications

**Example:**

```python
@dataclass
class Document(metaclass=_BackwardCompatible):
    id: str = field(default="")
    content: Optional[str] = field(default=None)
    blob: Optional[ByteStream] = field(default=None)
    meta: Dict[str, Any] = field(default_factory=dict)
    score: Optional[float] = field(default=None)
    embedding: Optional[List[float]] = field(default=None)
    sparse_embedding: Optional[SparseEmbedding] = field(default=None)
```

---

To convert our files do Haystack Documents, we need to use a *converter*. In our case, since we only have .docx documents, we can go ahead and use Haystack's DOCXToDocument converter.

In [7]:
DOCUMENTS_DIR = Path("./dummy_data/documents_dir")
FILES = [file.resolve() for file in DOCUMENTS_DIR.rglob("*") if file.is_file()]
converter = DOCXToDocument()

docs = []
for file in FILES:
    result = converter.run(sources=[file])
    docs.extend(result["documents"])  # Append the converted documents

#### Inspect a sample document from our database

#### Content

In [15]:
print(docs[0].content)


🛠️ Annual Company Hackathon 2025 – Full Event Brief
📅 Dates: July 15–17, 2025
📍 Location: Hybrid (Company HQ & Remote Access)
⏰ Duration: 48 Hours
🎯 Theme: "Innovate with Impact: Building the Future with Internal APIs"

🧠 Event Overview
The Annual Company Hackathon is a flagship innovation event designed to bring together employees from across departments to collaborate, experiment, and build transformative solutions. This year’s hackathon will run from Tuesday, July 15 at 10:00 AM to Thursday, July 17 at 12:00 PM, offering a 48-hour window for teams to ideate, prototype, and present their projects.
The 2025 theme, "Innovate with Impact", emphasizes the use of our internal APIs to create tools, applications, or enhancements that can drive real value—whether for internal operations, customer experience, or future product lines.

👥 Eligibility & Team Formation
Open to all employees, including full-time, part-time, interns, and contractors.
Teams can consist of 1 to 5 members.
Cross-func

#### Metadata

In [17]:
print(doc.meta)

{'file_path': 'Cybersecurity Awareness Month.docx', 'docx': {'author': 'python-docx', 'category': '', 'comments': 'generated by python-docx', 'content_status': '', 'created': '2013-12-23T23:15:00+00:00', 'identifier': '', 'keywords': '', 'language': '', 'last_modified_by': 'Marios Constantinou', 'last_printed': None, 'modified': '2025-07-15T09:53:00+00:00', 'revision': 2, 'subject': '', 'title': '', 'version': ''}}


## Indexing Documents and performing RAG

### Setup Indexing components

To be able to use our documents we need to perform 2 things:

1. Turn them into embeddings with an *embedder*.
2. Store them in a Haystack *Document Store* so they can be accessed later on.

For our simple use-case, we will use an OpenAI document embedder to extract embeddings, and then we will store them in an *InMemoryDocumentStore*. Basically we are storing them in our system's RAM.

In [20]:
document_store = InMemoryDocumentStore()
doc_embedder = OpenAIDocumentEmbedder()

### Extract embeddings and store to Document Store

Go ahead and run the cell below to begin calculating the embeddings.

In [21]:
docs_with_embeddings = doc_embedder.run(docs)
document_store.write_documents(docs_with_embeddings["documents"])
print(f"Stored {len(docs_with_embeddings['documents'])} documents with embeddings in the document store.")

Calculating embeddings: 1it [00:02,  2.69s/it]

Stored 13 documents with embeddings in the document store.





### Initialize RAG

#### Prompt Template for user

This prompt template will be used by our LLM to generate a response based on our Query.

Specifically, the LLM will read this text from top to bottom:

- It will read the task, which is to respond to the user's query using the **provided context**.
- It will then read some **General Guidelines**.
- Then it will read the **provided context**.
- And finally it will read the **user's query**.

You can see that we pass the **context** and **user's query** through this template. This is why we use a prompt builder later on. This component constructs prompts dynamically by processing chat messages.

Specifically, the *ChatPromptBuilder* component creates prompts using static or dynamic templates written in Jinja2 syntax, by processing a list of chat messages. The templates contain placeholders like {{ variable }} that are filled with values provided during runtime. You can use it for static prompts set at initialization or change the templates and variables dynamically while running.

In [35]:
template = [
    ChatMessage.from_user(
        """
Respond to the User Query using the provided Context.

General Guidelines:
    - Ensure citations are concise and directly related to the information provided.
    - If the answer is not found in the context, state this clearly instead of making assumptions.
    - If the answer comes from several sources, make sure to cite every one of them, including their Source Filename, Source Chapter and Source Page.
    - If information is region-specific, clarify which region it pertains to.
    - Respond in the same language as the user’s query.  
    - Do not use emojis.
    - Be professional and punctual
    - *Avoid* writing a conclusion or a follow-up at the end of each response unless you were asked to.

Context:
{% for document in documents %}
    {{ document.content }}
{% endfor %}

Question: {{question}}
Answer:
"""
    )
]

#### RAG Components

For the actual RAG pipeline we have the following components:

- The *text_embedder* which takes the user's query and turns it into embeddings
- The *retriever* which retrieves the relevant documents
- The *chat_generator* which is our LLM
- The *promot_builder* which was explained above.

In [37]:
text_embedder = OpenAITextEmbedder()
retriever = InMemoryEmbeddingRetriever(document_store)
chat_generator = OpenAIChatGenerator(model="gpt-4o-mini")
prompt_builder = ChatPromptBuilder(template=template)

# Initialize RAG pipeline
basic_rag_pipeline = Pipeline()

basic_rag_pipeline.add_component("text_embedder", text_embedder)
basic_rag_pipeline.add_component("retriever", retriever)
basic_rag_pipeline.add_component("prompt_builder", prompt_builder)
basic_rag_pipeline.add_component("llm", chat_generator)

# Connect the input/output of each component
basic_rag_pipeline.connect("text_embedder.embedding", "retriever.query_embedding")
basic_rag_pipeline.connect("retriever.documents", "prompt_builder.documents")
basic_rag_pipeline.connect("prompt_builder.prompt", "llm.messages")

We connect each component by defining the inputs and outputs.

For example:

- The *text_embedder* will take the user's query as an input and output *embeddings*. These embeddings will be the input of the *retriever*. The *retriever* will take that input as *query_embedding* and output a list of *documents* that are similar to that query.
- Then, the *retriever* outputs the documents we mentioned, and pass them into our *prompt_builder*.
- Finally, the *prompt_builder* will output the prompt and send it as an input to the *llm*.

#### Perform RAG on our data

Feel free to change the question to something else.

Our documents contain information about the following topics:

- Annual Hackathon the company is organizing
- Cybersecurity Awareness Month
- Employee Recognition Program
- New Office Layout Plan
- Office layout redesign plan
- Product X Launch Timeline
- Product Y Launch Timeline
- QuantumStream product CLI Usage
- QuantumStream product Data Encryption feature
- QuantumStream product Plugin System
- QuantumStream product REST API documentation
- QuantumStream product Scheduler feature
- QuantumStream product Scheduling tasks

---

Feel free to ask anything relating to these topics.

**Suggested prompts:**

- "Whats the purpose of the new office layout? Are we loosing our desks??"
- "I am a new employee at the company. Onboard me about the QuantumStream product."
- "I cannot find the relevant email about the Hackathon, can you tell me more details about it?"

In [43]:
question = "I am a new employee at the company. Onboard me about the QuantumStream product." # Feel free to change this question

response = basic_rag_pipeline.run({"text_embedder": {"text": question}, "prompt_builder": {"question": question}})
formatted_text = response["llm"]["replies"][0].text
wrapped_text = "\n".join(
    textwrap.fill(line, width=120, subsequent_indent="  ") if line.strip() else line
    for line in formatted_text.splitlines()
)

print(wrapped_text)

Welcome to the team! QuantumStream is an innovative platform designed for managing streaming data pipelines efficiently.
  Below is an overview of its key components and functionalities to help you get up to speed:

### QuantumStream CLI (Command-Line Interface)
- **Purpose**: Facilitates command-line control for stream management, configuration, and diagnostics.
- **Installation**: You can install the CLI using `pip install quantumstream-cli` or download it from the QuantumStream
  Developer Portal.
- **Core Commands**:
  - `qs init`: Initialize a new QuantumStream project.
  - `qs deploy`: Deploy your stream to a configured environment.
  - `qs monitor`: Launches a real-time metrics dashboard.
  - `qs diag`: Runs a diagnostics scan and outputs a health report.

### QuantumStream REST API
- **Purpose**: Enables integration with external applications for automation and monitoring.
- **Authentication**: API key authentication via HTTP headers.
- **Available Endpoints**: Includes `/strea