# Tutorial: Working with Pipelines Through REST API

- **Level**: Advanced
- **Time to complete**: *Enter the time it takes to complete this tutorial, in minutes.*
- **Prerequisites**: 
    - Create a folder where you'll store the files for this project.
    - Create a fresh virtual environment.
    - A Docker instance installed and ready.
- **Components**: REST API, indexing Pipeline, RAG Pipeline
- **Goal**: After completing this tutorial, you will have a RAG system that you can query through REST API.

## Overview

This tutorial shows how to create custom REST API endpoints for uploading and indexing files, and then querying your Pipeline. It uses an indexing and a simple RAG Pipeline. 

> You can also have a look at our example [Haystack RAG application](https://github.com/deepset-ai/haystack-rest-api) on GitHub as a practical example.

## Install Haystack


In [None]:
pip install haystack-ai

## Prepare Your Document Store
For this tutorial, use the Elasticsearch Document Store as it allows both keyword-based and vector-based retrieval.
First, install `ElasticsearchDocumentStore`:

In [None]:
pip install elasticsearch-haystack

### To use a local Elasticsearch instance through Docker:
1. Start Elasticsearch:

In [None]:
docker run -d -p 9200:9200 -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xms1024m -Xmx1024m" -e "xpack.security.enabled=false" docker.elastic.co/elasticsearch/elasticsearch:8.10.0

2. Verify it works correctly:

In [None]:
curl --fail http://localhost:9200/_cat/health

If everything's fine, you should get a response similar to this one:

In [None]:
1697807598 13:13:18 docker-cluster green 1 1 0 0 0 0 0 0 - 100.0%

### To use Elastic Cloud:
1. Start a free trial of Elastic Cloud and [create a deployment](https://dev.to/lisahjung/part-3-create-an-elastic-cloud-deployment-36bn).
2. [Connect to Elastic Cloud](https://elasticsearch-py.readthedocs.io/en/v8.10.1/quickstart.html#connecting).

Your Document Store is now ready. You can now move on to Pipelines.

## Prepare Your Pipelines

For deployment, it's best to use serialized versions of your Pipelines.
You can use the [example YAML Pipelines](https://github.com/deepset-ai/haystack-rest-api/tree/main/src/pipelines) in this tutorial or serialize your own Python Pipelines:

In [None]:
with open("my_pipeline.yaml", "w") as f:
    my_python_pipeline.dump(f)

The example Pipelines work with a local instance of Elasticsearch. To use Elastic Cloud, change the `init_parameters` of the Document Store:

In [None]:
# Elastic Cloud
document_store:
        init_parameters:
          cloud_id: YOUR-CLOUD-ID
          api_key: YOUR-API-KEY
					index: default
        type: ElasticsearchDocumentStore

# local Elasticsearch instance
document_store:
        init_parameters:
          hosts: http://localhost:9200
          index: default
        type: ElasticsearchDocumentStore

## Create Your Local REST API
Time to create the endpoints. You will use a Python framework, [FastAPI](https://fastapi.tiangolo.com/), to build the API. Then, use [Uvicorn](https://www.uvicorn.org/) to serve it. 

1. Install FastAPI:

In [None]:
pip install fastapi

2. Install Uvicorn:

In [None]:
pip install "uvicorn[standard]”

### Create a Status Endpoint
Create a file *scr/main.py* with the following content:

In [None]:
from fastapi import FastAPI

app = FastAPI()


@app.get("/ready")
def check_status():
		"""
    Use this endpoint during startup to check if the
    server is ready to take requests.

    The recommended approach is to call this endpoint with a short timeout,
    like 500ms, and if there's no reply, it means the server is busy.
    """
    return True

This creates a simple API endpoint that returns `True` if the REST API finished loading and is ready to accept requests.

Now, run the REST API:

In [None]:
uvicorn src.main:app --host 0.0.0.0 --port 8000  

Try submitting requests to the API:

In [None]:
curl -X 'GET' \
'http://127.0.0.1:8000/ready' \
-H 'accept: application/json'

You should get `True` as the response.
To view the API documentation, go to http://localhost:8000/docs.

### Create an Upload/Indexing Endpoint
Create an endpoint to allow uploading and indexing files to the Elasticsearch Document Store. The endpoint should temporarily save the uploaded files, run the indexing Pipeline on them, and return the result.

Add the following code to the *src/main.py* file to create the endpoint:

In [None]:
from typing import List, Optional
from pathlib import Path
import os
import uuid


from fastapi import FastAPI, UploadFile, File

from haystack.preview import Pipeline

from haystack.preview.components.preprocessors import DocumentCleaner, DocumentSplitter
from haystack.preview.components.file_converters import TextFileToDocument
from haystack.preview.components.writers import DocumentWriter
from elasticsearch_haystack.document_store import ElasticsearchDocumentStore
from elasticsearch_haystack.bm25_retriever import ElasticsearchBM25Retriever

# specify the path to your indexing Pipeline
app = FastAPI()
with open("./src/pipelines/indexing_pipeline.yaml", "rb") as f:
    indexing_pipeline = Pipeline.load(f)

FILE_UPLOAD_PATH = os.getenv("FILE_UPLOAD_PATH", str((Path(__file__).parent.parent / "file-upload").absolute()))
Path(FILE_UPLOAD_PATH).mkdir(parents=True, exist_ok=True)


@app.post("/file-upload")
def upload_files(
    files: List[UploadFile] = File(...), keep_files: Optional[bool] = False):
    """
    Upload a list of files to be indexed.

    Note: Files are removed immediately after being indexed. To keep them, pass the
    `keep_files=true` parameter in the request payload.
    """

    file_paths: list = []

    for file_to_upload in files:
        try:
            file_path = Path(FILE_UPLOAD_PATH) / f"{uuid.uuid4().hex}_{file_to_upload.filename}"
            with file_path.open("wb") as fo:
                fo.write(file_to_upload.file.read())
            file_paths.append(file_path)
        finally:
            file_to_upload.file.close()

    result=indexing_pipeline.run({"converter": {"sources": file_paths}})

    # Clean up indexed files
    if not keep_files:
        for p in file_paths:
            p.unlink()        
        
    return result

To test the endpoint, upload the Austalia Wikipedia page to it:

In [None]:
wget -O australia.txt "https://en.wikipedia.org/w/index.php?title=Australia&action=raw&ctype=text"

curl -X 'POST' \
  'http://localhost:8000/file-upload?keep_files=false' \
  -H 'accept: application/json' \
  -H 'Content-Type: multipart/form-data' \
  -F 'files=@australia.txt;type=text/plain'

*Tip: You can also use your local API docs to do this*.
You should get a response similar to this:

In [None]:
{
  "writer": {
    "documents_written": 20
  }
}

Nice! Your processed Documents are now in the Document Store.

### Create a Query Endpoint

Now, create an endpoint to query our RAG Pipeline. Add the following code to the *src/main.py* file:

In [None]:
from fastapi import FastAPI

from haystack.preview import Pipeline

from haystack.preview.components.builders.answer_builder import AnswerBuilder
from haystack.preview.components.builders.prompt_builder import PromptBuilder
from haystack.preview.components.generators import GPTGenerator
from elasticsearch_haystack.document_store import ElasticsearchDocumentStore
from elasticsearch_haystack.bm25_retriever import ElasticsearchBM25Retriever

# this is the path to the RAG Pipeline
app = FastAPI()
with open("./src/pipelines/rag_pipeline.yaml", "rb") as f:
    rag_pipeline = Pipeline.load(f)

@app.get("/query")
def ask_rag_pipeline(query:str):
    """
    Ask a question to the RAG Pipeline.
    """
    result = rag_pipeline.run({
        "retriever": {"query": query}, 
        "prompt_builder": {"question": query}, 
        "answer_builder": {"query": query}
    })

    return result

Test the endpoint. You can try questions like "who are Torrest Strait Islanders?".

In [None]:
curl -X 'GET' \
  'http://127.0.0.1:8000/query?query=Who%20are%20Torres%20Strait%20Islanders%3F' \
  -H 'accept: application/json'

You should get a response similar to this:

In [None]:
{
  "answer_builder": {
    "answers": [
      {
        "data": "Torres Strait Islanders are ethnically Melanesian people who obtained their livelihood from seasonal horticulture and the resources of their reefs and seas.",
        "query": "Who are Torres Strait Islanders?",
        "metadata": {
          "model": "gpt-3.5-turbo-0613",
          "index": 0,
          "finish_reason": "stop",
          "usage": {
            "prompt_tokens": 727,
            "completion_tokens": 29,
            "total_tokens": 756
          }
        },
        "documents": [...]
			}
    ]
  }
}

Congratulations! You've created three endpoints for your REST API.

<details>
<summary>Click here to see what the complete file should look like.</summary>

```
from typing import List, Optional
from pathlib import Path
import os
import uuid

from fastapi import FastAPI, UploadFile, File
from haystack.preview import Pipeline

# Needed to load the Pipeline without errors (https://github.com/deepset-ai/haystack/issues/6186)
from haystack.preview.components.preprocessors import (
    DocumentCleaner,
    DocumentSplitter,
)
from haystack.preview.components.file_converters import TextFileToDocument
from haystack.preview.components.builders.answer_builder import AnswerBuilder
from haystack.preview.components.builders.prompt_builder import PromptBuilder
from haystack.preview.components.generators import GPTGenerator
from haystack.preview.components.writers import DocumentWriter
from elasticsearch_haystack.document_store import ElasticsearchDocumentStore
from elasticsearch_haystack.bm25_retriever import ElasticsearchBM25Retriever

app = FastAPI(title="My Haystack RAG API")

# Load the Pipelines from the YAML files
with open("./src/pipelines/indexing_pipeline.yaml", "rb") as f:
    indexing_pipeline = Pipeline.load(f)
with open("./src/pipelines/rag_pipeline.yaml", "rb") as f:
    rag_pipeline = Pipeline.load(f)

# Create the file upload directory if it doesn't exist
FILE_UPLOAD_PATH = os.getenv(
    "FILE_UPLOAD_PATH", str((Path(__file__).parent.parent / "file-upload").absolute())
)
Path(FILE_UPLOAD_PATH).mkdir(parents=True, exist_ok=True)


@app.get("/ready")
def check_status():
    """
    Use this endpoint during startup to check if the
    server is ready to take requests.

    The recommended approach is to call this endpoint with a short timeout,
    like 500ms, and if there's no reply, it means the server is busy.
    """
    return True


@app.post("/file-upload")
def upload_files(
    files: List[UploadFile] = File(...), keep_files: Optional[bool] = False
):
    """
    Upload a list of files to be indexed.

    Note: files are removed immediately after being indexed. If you want to keep them, pass the
    `keep_files=true` parameter in the request payload.
    """

    file_paths: list = []

    for file_to_upload in files:
        try:
            file_path = (
                Path(FILE_UPLOAD_PATH) / f"{uuid.uuid4().hex}_{file_to_upload.filename}"
            )
            with file_path.open("wb") as fo:
                fo.write(file_to_upload.file.read())
            file_paths.append(file_path)
        finally:
            file_to_upload.file.close()

    result = indexing_pipeline.run({"converter": {"paths": file_paths}})

    # Clean up indexed files
    if not keep_files:
        for p in file_paths:
            p.unlink()

    return result


@app.get("/query")
def ask_rag_pipeline(query: str):
    """
    Ask a question to the RAG Pipeline.
    """
    result = rag_pipeline.run(
        {
            "retriever": {"query": query},
            "prompt_builder": {"question": query},
            "answer_builder": {"query": query},
        }
    )

    return result
    ```
</details>

## Deploy Your App
Time to deploy your app to the server with Docker. If you need more information on how to use FastAPI in containers, see [FastAPI documentation](https://fastapi.tiangolo.com/deployment/docker/).

### Create a Docker Image for Your REST API

1. Add the package requirements for your app. Create a *requirements.txt* file in the root folder of your project. Here's what the file may look like:

In [None]:
# Haystack
haystack-ai==2.0.0a

# Elasticsearch Document Store
elasticsearch-haystack==0.0.2

# REST API
fastapi==0.104.0
uvicorn[standard]==0.23.2
python-multipart==0.0.6

2. Create the Dockerfile with instructions for building the Docker image of your app. For more details, see [FastAPI documentation](https://fastapi.tiangolo.com/deployment/docker/#dockerfile).

In [None]:
FROM python:3.10-slim

# installing git is only necessary because elasticsearch-haystack is not yet a package
RUN apt-get update && apt-get install -y git && rm -rf /var/lib/apt/lists/*

WORKDIR /code

COPY ./requirements.txt /code/requirements.txt

RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt

COPY ./src /code/src

CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8000"]

3. Create the Docker image of your app. To create the Docker image of an app called *my-haystack-app*, run:

In [None]:
docker build -t my-haystack-app .

Now, everything's in place for you to run the Docker container of your app locally.

### Run the Docker Container of Your App

#### Using a Local Elasticsearch Instance

First, make sure the Elasticsearch container is started. Run:

In [None]:
docker run -d -p 9200:9200 -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xms1024m -Xmx1024m" -e "xpack.security.enabled=false" docker.elastic.co/elasticsearch/elasticsearch:8.10.0

Because Elasticsearch and your app must share the same network, as a workaround, run the container in a [host network mode](https://docs.docker.com/network/drivers/host/) (`--network="host"`).

*Tip:* If you want to experiment locally, you can also replace the `ElasticsearchDocumentStore` with an ephemeral `InMemoryDocumentStore`. 

Run the Docker container:

In [None]:
docker run --network="host" -e OPENAI_API_KEY=${OPENAI_API_KEY} my-haystack-app

Note that this exports the local environment variable together with the OpenAI API key inside the container.

#### Using Elastic Cloud

Run the Docker container with the following command. Note that it exports the local environment variable together with the OpenAI API key inside the container.

In [None]:
docker run -p 8000:8000 -e OPENAI_API_KEY=${OPENAI_API_KEY} my-haystack-app

### Configure Docker Compose

Docker Compose is a tool that helps you define and share multi-container applications. With Compose, you can create a YAML file to define the services, and with a single command, you can spin everything up or tear it all down.

You can find more information about it in Docker Compose [documentation](https://docs.docker.com/get-started/08_using_compose).

1. Create a *docker-compose.yml* file in the root folder of your project. If you want to understand more about the syntax of a Docker Compose file, you can read through the Docker [reference](https://docs.docker.com/compose/compose-file/compose-file-v3/). Here's what the YAML files may look like:

#### Using a Local Elasticsearch Instance 

In [None]:
version: "3"

services:
  elasticsearch:
    image: "docker.elastic.co/elasticsearch/elasticsearch:8.11.1"
    ports:
      - 9200:9200
    restart: on-failure
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
      - "ES_JAVA_OPTS=-Xms1024m -Xmx1024m"
    healthcheck:
        test: curl --fail http://localhost:9200/_cat/health || exit 1
        interval: 10s
        timeout: 1s
        retries: 10

  rest-api:
    build:
      context: .
    ports:
      - 8000:8000
    restart: on-failure
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
    depends_on:
      elasticsearch:
        condition: service_healthy

<details><summary>Click here to expand the detailed explanation of this YAML file.</summary>

You can see two services defined in this file: one for Elasticsearch and one for the REST API.

The Elasticsearch service:
- Uses the official Elasticsearch Docker image.
- Follows the convention HOST_PORT:CONTAINER_PORT. Maps TCP port 9200 in the container to port 9200 on the Docker host. You can change the first port according on your needs.
- Has some environment variables for configuration. You can find more information about them in the [Elasticsearch documentation](https://www.elastic.co/guide/en/elasticsearch/reference/current/docker.html#docker-cli-run-dev-mode).
- Has a `healthcheck` based on `curl``: checks whether the container has been started correctly and is in a healthy state.

The REST API service:
- You will need to change the Elasticsearch hosts in your YAML Pipelines. When the application ran locally, you used http://localhost:9200. In the default network of your Compose application, the Elasticsearch service is known under the name `elasticsearch`, so the host becomes http://elasticsearch:9200.
- `DOCUMENTSTORE_PARAMS_HOST` is the host of the Elasticsearch Document Store. When the application ran locally, you used http://localhost:9200.
In the default network of your Compose application, the Elasticsearch service is known under the name elasticsearch, so the host becomes http://elasticsearch:9200.
- `context: .` means that the corresponding Docker image will be built based on the Dockerfile located in the root folder of your project (the same folder as the Docker Compose YAML file).
- `8000:8000` maps the TCP port 8000 in the container to port 8000 on the Docker host. You can change the first port according on your needs.
- The `environment` variable which is needed in our REST API is the `OPENAI_API_KEY` which is needed to call the LLM. In this example, you export your local environment variable with the same name inside the container. You can customize this as you want.
- With the `depends_on` condition, you are making sure that the REST API service starts only after the Elasticsearch server has started and is in a healthy state (based on the previously defined `healthcheck`).

</details>

#### Using Elastic Cloud

In [None]:
version: "3"

services:
  rest-api:
    build:
      context: .
    ports:
      - 8000:8000
    restart: on-failure
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}

2. Now you can easily spin up your multi-container application with the command:

In [None]:
docker compose up

You will see the interactive API docs on http://localhost:8000/docs, and you can try calling indexing and querying endpoints.

To stop the application, run:

In [None]:
docker compose down

Congratulations! You have now deployed a Haystack RAG application with REST API.