## Agentic RAG Pipeline with memory and chat

_code by Stefano Fiorucci ([Twitter](https://x.com/theanakin87), [LI](https://www.linkedin.com/in/stefano-fiorucci/))_

Here is an agentic RAG pipelines that can remember previous chat messages. The most helpful assistants don't forget the thing you just said. 📓

This pipeline takes advantage of Haystack's flexible looping capabilities, running and saving previous interactions until you exit.

Components used:
- [`InMemoryDocumentStore`](https://docs.haystack.deepset.ai/docs/inmemorydocumentstore)
- [`InMemoryBM25Retriever`](https://docs.haystack.deepset.ai/docs/inmemorybm25retriever)
- DynamicChatPromptBuilder
- [`FilterRetriever`](https://docs.haystack.deepset.ai/docs/filterretriever)
- [`DocumentWriter`](https://docs.haystack.deepset.ai/docs/documentwriter)
- [`OutputAdapter`](https://docs.haystack.deepset.ai/docs/outputadapter)

### Prerequisites

You'll need an [OpenAI API Key](https://help.openai.com/en/articles/4936850-where-do-i-find-my-openai-api-key), although this code could be adapted to use [any model Haystack supports](https://docs.haystack.deepset.ai/docs/generators).

## Installation

In [1]:
%%bash

pip install haystack-ai

Collecting haystack-ai
  Downloading haystack_ai-2.2.0-py3-none-any.whl (345 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 345.2/345.2 kB 4.1 MB/s eta 0:00:00
Collecting lazy-imports (from haystack-ai)
  Downloading lazy_imports-0.3.1-py3-none-any.whl (12 kB)
Collecting openai>=1.1.0 (from haystack-ai)
  Downloading openai-1.31.1-py3-none-any.whl (324 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 324.1/324.1 kB 6.9 MB/s eta 0:00:00
Collecting posthog (from haystack-ai)
  Downloading posthog-3.5.0-py2.py3-none-any.whl (41 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 41.3/41.3 kB 3.0 MB/s eta 0:00:00
Collecting trafilatura (from haystack-ai)
  Downloading trafilatura-1.10.0-py3-none-any.whl (1.0 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 1.0/1.0 MB 11.3 MB/s eta 0:00:00
Collecting httpx<1,>=0.23.0 (from openai>=1.1.0->haystack-ai)
  Downloading httpx-0.27.0-py3-none-any.whl (75 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 75.6/75.6 kB 5.4 MB/s eta 0:00:00
Collect



## Authorization

In [2]:
import os
from getpass import getpass

if "OPENAI_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = getpass("Enter OpenAI API key:")

Enter OpenAI API key:··········


## Initialize the document store

Write some documents to it, which you'll ask the agent to refer to later.

In [3]:
from haystack import Document, Pipeline
from haystack.components.builders import DynamicChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.dataclasses import ChatMessage
from haystack.document_stores.in_memory import InMemoryDocumentStore

document_store = InMemoryDocumentStore()
documents = [Document(content="There are over 7,000 languages spoken around the world today."),
             Document(content="Chinese language boasts the highest number of native speakers."),
			       Document(content="Elephants have been observed to behave in a way that indicates a high level of self-awareness, such as recognizing themselves in mirrors."),
			       Document(content="In certain parts of the world, like the Maldives, Puerto Rico, and San Diego, you can witness the phenomenon of bioluminescent waves.")]
document_store.write_documents(documents=documents)

4

## Retrieve previous chat messages

`FilterRetriever` is a special retriever that retrieves documents based on the passed-in `filters` parameter. Here, it is used to retrieve documents that were previously stored in the `InMemoryDocumentStore`.

Note that no filters are passed in to the `FilterRetriever`. If we had a huge list of chat messages, this could get quite slow! In a production scenario, you'd want to create a session ID for each chat session and pass that in as a filter.

In [4]:

from haystack.components.retrievers import FilterRetriever
from haystack.components.writers import DocumentWriter
from haystack import Document
from haystack.components.converters import OutputAdapter
from typing import List
from haystack.document_stores.types import DuplicatePolicy

memory_store = InMemoryDocumentStore()
memory_retriever = FilterRetriever(memory_store)
# The same ChatMessage can't be stored multiple times in memory_store
memory_writer = DocumentWriter(memory_store, policy=DuplicatePolicy.SKIP)


## Save `ChatMessage` objects

As the pipeline runs through multiple loops, you'll need to save previous chat messages to the document store.

This is a utility function that takes a list of `ChatMessage` objects and turns them into Haystack documents. The `OutputAdapter` component makes the `chat_messages_to_docs` function fit smoothly into our pipeline, passing the input from the component at the end of the previous loop iteration into the next.

In [5]:
def chat_messages_to_docs(chat_messages: List[ChatMessage]):
    return [Document(content=message.content) for message in chat_messages]

output_adapter = OutputAdapter(template="{{ chat_messages | chat_messages_to_docs }}", output_type=List[Document], custom_filters={"chat_messages_to_docs": chat_messages_to_docs})

## Create the pipeline

For simplicity, this pipelines uses the `InMemoryBM25Retriever` to do keyword-based retrieval. In a production scenario where you're searching a vast number of documents, an `EmbeddingRetriever` would be faster and more accurate.

The `DynamicChatPromptBuilder` takes 3 arguments:
- _query_, the chat message from the current loop iteration
- _documents_, which were written when we initialized the `DocumentStore`
- _memories_, or previous chat messages

In [6]:
# components for RAG
pipeline = Pipeline()
pipeline.add_component("retriever", InMemoryBM25Retriever(document_store=document_store))
pipeline.add_component("prompt_builder", DynamicChatPromptBuilder(runtime_variables=["query", "documents", "memories"]))
pipeline.add_component("llm", OpenAIChatGenerator())

# components for memory
pipeline.add_component("memory_retriever", memory_retriever)
pipeline.add_component("memory_writer", memory_writer)
pipeline.add_component("output_adapter", output_adapter)

# connections for RAG
pipeline.connect("retriever.documents", "prompt_builder.documents")
pipeline.connect("prompt_builder.prompt", "llm.messages")

# connections for memory
pipeline.connect("memory_retriever", "prompt_builder.memories")
pipeline.connect("llm.replies", "output_adapter.chat_messages")
pipeline.connect("output_adapter", "memory_writer")

<haystack.core.pipeline.pipeline.Pipeline object at 0x7da851662bc0>
🚅 Components
  - retriever: InMemoryBM25Retriever
  - prompt_builder: DynamicChatPromptBuilder
  - llm: OpenAIChatGenerator
  - memory_retriever: FilterRetriever
  - memory_writer: DocumentWriter
  - output_adapter: OutputAdapter
🛤️ Connections
  - retriever.documents -> prompt_builder.documents (List[Document])
  - prompt_builder.prompt -> llm.messages (List[ChatMessage])
  - llm.replies -> output_adapter.chat_messages (List[ChatMessage])
  - memory_retriever.documents -> prompt_builder.memories (List[Document])
  - output_adapter.output -> memory_writer.documents (List[Document])

## Draw the pipeline

If you want to see a diagram of the pipeline, running this cell will create a file locally.

In [8]:
pipeline.draw(path="./image.png")

## Create the prompt

The system message tells the LLM how to act.

We need 2 loops here in our prompt: one to run through the memories, one to run through any relevant documents that were returned.

In [9]:
system_message = ChatMessage.from_system("You are a helpful assistant.")
user_message_template ="""Given the previous messages and the provided documents, answer the question. Use your memory.
    Memory:
    {% for memory in memories %}
        {{ memory.content }}
    {% endfor %}

    Documents:
    {% for doc in documents %}
        {{ doc.content }}
    {% endfor %}

    \nQuestion: {{query}}
    \nAnswer:
"""
user_message = ChatMessage.from_user(user_message_template)

# Bring it all together

What follows is glue code to run the pipeline in a loop, provide input instructions, and break when the user enters `Q` to quit.

Here are some example questions to get you started:
- _How many languages are there?_
- _What is the one with most native speakers_
- _Do you remember the two answers you gave to me before?_

In [10]:

while True:
    messages = [system_message, user_message]
    question = input("Enter your question or Q to exit. Example: How many languages are there?\n🔮 ")
    if question=="Q":
        break

    res = pipeline.run(data={"retriever": {"query": question}, "prompt_builder": {"prompt_source": messages, "query": question}}, include_outputs_from=["llm", "prompt_builder"])
    print(f"res: {res}")
    assistant_resp = res['llm']['replies'][0]
    print(f"🤖 {assistant_resp.content}")

Enter your question or Q to exit. Example: How many languages are there?
🔮 How many languages are there?
res: {'memory_writer': {'documents_written': 1}, 'prompt_builder': {'prompt': [ChatMessage(content='You are a helpful assistant.', role=<ChatRole.SYSTEM: 'system'>, name=None, meta={}), ChatMessage(content='Given the previous messages and the provided documents, answer the question. Use your memory.\n    Memory:\n    \n\n    Documents:\n    \n        There are over 7,000 languages spoken around the world today.\n    \n        Chinese language boasts the highest number of native speakers.\n    \n        Elephants have been observed to behave in a way that indicates a high level of self-awareness, such as recognizing themselves in mirrors.\n    \n        In certain parts of the world, like the Maldives, Puerto Rico, and San Diego, you can witness the phenomenon of bioluminescent waves.\n    \n\n    \nQuestion: How many languages are there?\n    \nAnswer:', role=<ChatRole.USER: 'user'>