<a href="https://colab.research.google.com/github/deepset-ai/haystack-experimental/blob/main/examples/async_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Running Haystack Pipelines in Asynchronous Environments

In this notebook, you'll learn how to use the `AsyncPipeline` and async-enabled components from the [haystack-experimental](https://github.com/deepset-ai/haystack-experimental) repository to build and execute a Haystack pipeline in an asynchronous environment. It's based on [this short Haystack tutorial](https://haystack.deepset.ai/tutorials/27_first_rag_pipeline), so it would be a good idea to familiarize yourself with it before we begin. A further prerequisite is working knowledge of cooperative scheduling and [async programming in Python](https://docs.python.org/3/library/asyncio.html).

## Motivation

By default, the `Pipeline` class in `haystack` is a regular Python object class that exposes non-`async` methods to add/connect components and execute the pipeline logic. Currently, it *can* be used in async environments, but it's not optimal to do so since it executes its logic in a '[blocking](https://en.wikipedia.org/wiki/Blocking_(computing))' fashion, i.e., once the `Pipeline.run` method is invoked, it must run to completion and return the outputs before the next statement of code can be executed<sup>1</sup>. In a typical async environment, this prevents active async event loop from scheduling other `async` coroutines, thereby reducing throughput. Similarly, Haystack components currently only provide a non-`async` `run` method for their execution. To mitigate this bottleneck, we introduce the concept of async-enabled Haystack components and an `AsyncPipeline` class that cooperatively schedules the execution of both async and non-async components.

### Goals
- Allow individual components to opt into `async` support.
    - Not all components benefit from being async-enabled - I/O-bound components are the most suitable candidates.
- Provide a backward-compatible way to execute Haystack pipelines containing both async and non-async components.
- Execute components concurrently
    - Concurrent execution can yield major speed-ups in pipeline runtime (e.g. hybrid retrieval, running Generators concurrently)

### Non-goals
- Add async support to all existing components.

<sup>1</sup> - This is a simplification as the Python runtime can potentially schedule another thread, but it's a detail that we can ignore in this case.

Let's now go ahead and see what it takes to add async support to the original tutorial, starting with installing Haystack, the experimental package and the requisite dependencies.


In [None]:
%%bash

pip install -U haystack-ai
pip install -U haystack-experimental
pip install datasets
pip install sentence-transformers

Provide an [OpenAI API key](https://platform.openai.com/api-keys) to ensure that LLM generator can query the OpenAI API.

In [None]:
import os
from getpass import getpass

if "OPENAI_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = getpass("Enter OpenAI API key:")

# If you're running this notebook on Google Colab, you might need to the following instead:
#
# from google.colab import userdata
# if "OPENAI_API_KEY" not in os.environ:
#  os.environ["OPENAI_API_KEY"] = userdata.get('OPENAI_API_KEY')

Initialize a `DocumentStore` to index your documents. We use the `InMemoryDocumentStore` from the `haystack-experimental` package since it has support for `async`.

In [15]:
from haystack_experimental.document_stores.in_memory import InMemoryDocumentStore

document_store = InMemoryDocumentStore()

Fetch the data and convert it into Haystack `Document`s.

In [16]:
from datasets import load_dataset
from haystack import Document

dataset = load_dataset("bilgeyucel/seven-wonders", split="train")
docs = [Document(content=doc["content"], meta=doc["meta"]) for doc in dataset]

To store your data in the `DocumentStore` with embeddings, initialize a `SentenceTransformersDocumentEmbedder` with the model name and call `warm_up()` to download the embedding model.

Then, we calculate the embeddings of the docs with the newly warmed-up embedder and write the documents to the document store. Notice that we call the `write_documents_async` method and use the `await` keyword with it. The `DocumentStore` protocol in `haystack-experimental` exposes `async` variants of common methods such as `count_documents`, `write_documents`, etc. These [coroutines](https://docs.python.org/3/library/asyncio-task.html#coroutines) are awaitable when invoked inside an async event loop (the notebook/Google Colab kernel automatically starts an event loop).

In [17]:
from haystack.components.embedders import SentenceTransformersDocumentEmbedder

doc_embedder = SentenceTransformersDocumentEmbedder(
    model="sentence-transformers/all-MiniLM-L6-v2"
)
doc_embedder.warm_up()

docs_with_embeddings = doc_embedder.run(docs)
n_docs_written = await document_store.write_documents_async(docs_with_embeddings["documents"])
print(f"Indexed {n_docs_written} documents")

Batches:   0%|          | 0/5 [00:00<?, ?it/s]

Indexed 151 documents


The next step is to build the RAG pipeline to generate answers for a user query.

Initialize a text embedder to create an embedding for the user query and an `InMemoryEmbeddingRetriever` to use with the `InMemoryDocumentStore` you initialized earlier. As with the latter, the async-enabled embedding retriever class stems from the `haystack-experimental` package.

In [None]:
from haystack.components.embedders import SentenceTransformersTextEmbedder

from haystack_experimental.components.retrievers.in_memory import InMemoryEmbeddingRetriever

text_embedder = SentenceTransformersTextEmbedder(model="sentence-transformers/all-MiniLM-L6-v2")
retriever = InMemoryEmbeddingRetriever(document_store)


Create a custom prompt to use with the `ChatPromptBuilder` and initialize a `OpenAIChatGenerator` to consume the output of the former.

In [None]:
from haystack.components.builders import ChatPromptBuilder
from haystack.dataclasses import ChatMessage

from haystack_experimental.components.generators.chat import OpenAIChatGenerator

template = """
Given the following information, answer the question.

Context:
{% for document in documents %}
    {{ document.content }}
{% endfor %}

Question: {{question}}
Answer:
"""

prompt_builder = ChatPromptBuilder(template=[ChatMessage.from_user(template)])
generator = OpenAIChatGenerator(model="gpt-4o-mini")

We finally get to the creation of the pipeline instance. Instead of using the `Pipeline` class, we use the `AsyncPipeline` class from the `haystack-experimental` package. 

The rest of the process, i.e., adding components and connecting them with each other remains the same as with the original `Pipeline` class.

In [None]:
from haystack_experimental.core import AsyncPipeline

async_rag_pipeline = AsyncPipeline()
# Add components to your pipeline
async_rag_pipeline.add_component("text_embedder", text_embedder)
async_rag_pipeline.add_component("retriever", retriever)
async_rag_pipeline.add_component("prompt_builder", prompt_builder)
async_rag_pipeline.add_component("llm", generator)

# Now, connect the components to each other
async_rag_pipeline.connect("text_embedder.embedding", "retriever.query_embedding")
async_rag_pipeline.connect("retriever", "prompt_builder.documents")
async_rag_pipeline.connect("prompt_builder.prompt", "llm.messages")

Now, we create a coroutine that queries the pipeline with a question.

The key differences between the `AsyncPipeline.run` and `Pipeline.run` methods have to do with their parameters and return values.

Both `Pipeline.run` and `AsyncPipeline.run` share the `data` parameter that encapsulates the initial inputs for the pipeline's components.

While `Pipeline.run` accepts an additional `include_outputs_from` parameter to return the outputs of intermediate, non-leaf components in the pipeline graph, `AsyncPipeline.run` does not. This is because the latter is implemented as an `async` generator that yields the output of **each component** as soon as it executes successfully. This has the following implications:

- The output of `AsyncPipeline.run` must be consumed in an `async for` loop for the pipeline execution to make progress.
- By providing the intermediate results as they are computed, it allows for a tighter feedback loop between the backend and the user. For example, the results of the retriever can be displayed to the user before the LLM's response is generated.

Whenever a component needs to be executed, the logic of `AsyncPipeline.run` will determine if it supports async execution. 
- If the component has opted into async support, the pipeline will schedule its execution as a coroutine on the event loop and yield control back to the async scheduler until the component's outputs are returned. 
- If the component has not opted into async support, the pipeline will launch its execution in a separate thread and schedule it on the event loop.

In both cases, given an `AsyncPipeline` only one of its components can be executing at any given time. However, this does not prevent multiple, different `AsyncPipeline` instances from executing concurrently.

The execution of an `AsyncPipeline` is deemed to be complete once program flow exits the `async for` loop. At this point, the final results of the pipeline (the outputs of the leaf nodes in the pipeline graph) can be accessed with the loop variable.

In [None]:
from typing import Any, Dict


async def query_pipeline(question: str) -> Dict[str, Dict[str, Any]]:
    input = {
        "text_embedder": {"text": question},
        "prompt_builder": {"question": question},
    }

    result_idx = 0
    async for pipeline_output in async_rag_pipeline.run_async_generator(input):
        print(f"Pipeline result '{result_idx}' = {pipeline_output}")
        result_idx += 1

    # The last output of the pipeline is the final pipeline output.
    return pipeline_output

We can now execute the pipeline with some examples.

In [None]:
examples = [
    "Where is Gardens of Babylon?",
    "Why did people build Great Pyramid of Giza?",
    "What does Rhodes Statue look like?",
]

async def run_query_pipeline():
    global examples
    for question in examples:
        print(f"Querying pipeline with question: '{question}'")
        response = await query_pipeline(question)
        print(f'\tOutput: {response["llm"]["replies"][0]}\n')

    print("Done!")


await run_query_pipeline()

You can alternatively use the `run_async` method to execute an `AsyncPipeline` in the same manner as a regular `Pipeline` while retaining the benefits of cooperative scheduling.

In [None]:
question = examples[0]
outputs = await async_rag_pipeline.run_async(
    {"text_embedder": {"text": question}, "prompt_builder": {"question": question}},
    include_outputs_from={"retriever"},
)

print(outputs)

## Custom Asynchronous Components

Individual components can opt into async by implementing a `run_async` coroutine that has the same signature, i.e., input parameters and outputs as the `run` method. This constraint is placed to ensure that pipeline connections are the same irrespective of whether a component supports async execution, allowing for plug-n-play backward compatibility with existing pipelines.


In [None]:
from typing import Any, Dict

from haystack import component


@component
class MyCustomComponent:
    def __init__(self, my_custom_param: str):
        self.my_custom_param = my_custom_param

    @component.output_types(original=str, concatenated=str)
    def run(self, input: str) -> Dict[str, Any]:
        return {
            "original": input,
            "concatenated": input + self.my_custom_param
        }

    async def do_io_bound_op(self, input: str) -> str:
        # Do some IO-bound operation here
        return input + self.my_custom_param

    @component.output_types(original=str, concatenated=str)
    async def run_async(self, input: str) -> Dict[str, Any]:
        return {
            "original": input,
            "concatenated": await self.do_io_bound_op(input)
        }

## Running components concurrently

Components are scheduled to run concurrently, when the execution graph allows it.
For example:

In most hybrid retrieval pipelines (i.e. you run BM25 and embedding retrieval to join the results later), the calls to the document store could be executed concurrently. However, the synchronous Pipeline will execute the components sequentially. The `AsyncPipeline` can schedule the components to run concurrently. For most production-grade hybrid retrieval setups, this would reduce total pipeline runtime by 100-600 ms (depending on the speed of your document store).

The difference is even more pronounced, when running pipelines with LLM calls that could execute concurrently.

In [None]:
import time
import asyncio

from haystack import component
from haystack_experimental import Pipeline, AsyncPipeline

# We implement a small custom component that helps us illustrate the concurrent scheduling.
@component
class AsyncTestComponent:
    """
    A test component that simulates async operations by waiting for a specified time
    before returning a message.

    ### Usage example
    ```python
    test_comp = AsyncTestComponent(name="TestComponent", wait_time=2)

    # Sync usage
    result = test_comp.run(user_msg="Hello")
    print(result["message"])  # prints after 2 seconds

    # Async usage
    result = await test_comp.run_async(user_msg="Hello")
    print(result["message"])  # prints after 2 seconds
    ```
    """

    def __init__(self, name: str, wait_time: int = 1):
        """
        Initialize the AsyncTestComponent.

        :param name: Name of the component to be used in the output message
        :param wait_time: Time to wait before returning result (in seconds)
        """
        self.name = name
        self.wait_time = wait_time

    @component.output_types(message=str)
    def run(self, user_msg: str) -> dict:
        """
        Synchronous method that waits for the specified time and returns a message.

        :param user_msg: Input message from the user (unused in output but required for example)
        :return: Dictionary containing the output message
        """
        time.sleep(self.wait_time)
        return {"message": f"Message from {self.name}"}

    @component.output_types(message=str)
    async def run_async(self, user_msg: str) -> dict:
        """
        Asynchronous method that waits for the specified time and returns a message.

        :param user_msg: Input message from the user (unused in output but required for example)
        :return: Dictionary containing the output message
        """
        await asyncio.sleep(self.wait_time)
        return {"message": f"Component {self.name}: Received '{user_msg}'."}

def get_pipeline(type_="async"):
    wait_1 = AsyncTestComponent(name="wait_1", wait_time=1)
    wait_2 = AsyncTestComponent(name="wait_2", wait_time=2)
    wait_3 = AsyncTestComponent(name="wait_3", wait_time=3)
    wait_4 = AsyncTestComponent(name="wait_4", wait_time=4)
    wait_10 = AsyncTestComponent(name="wait_10", wait_time=10)

    if type_ == "async":
        pp = AsyncPipeline()
    else:
        pp = Pipeline()

    pp.add_component("wait_1", wait_1)
    pp.add_component("wait_2", wait_2)
    pp.add_component("wait_3", wait_3)
    pp.add_component("wait_4", wait_4)
    pp.add_component("wait_10", wait_10)


    return pp

In [None]:
# Let's run the sync version first
pipe = get_pipeline(type_="sync")

start_time = time.time()
pipe.run({"user_msg": "Hi"})
end_time = time.time()
execution_time = end_time - start_time

print(f"Execution time for 'Pipeline': {execution_time:.4f} seconds")

In [None]:
# The AsyncPipeline is twice as fast!

pipe = get_pipeline("async")

start_time = time.time()
outputs = await pipe.run_async({"user_msg": "Hi"})
end_time = time.time()
execution_time = end_time - start_time

print(f"Execution time for 'AsyncPipeline': {execution_time:.4f} seconds")
