# Building Elasticsearch APIs with FastAPI

This notebook shows how to build an Elasticsearch API with FastAPI with best practices. This notebook is based on the article [Building Elasticsearch APIs with FastAPI](https://www.elastic.co/search-labs/blog/building-elasticsearch-apis-with-fastapi)



## Installing dependencies and importing packages

In [None]:
%pip install fastapi uvicorn elasticsearch pydantic -q

In [2]:
import asyncio
import json
import os
import nest_asyncio
import uvicorn

from getpass import getpass
from typing import List
from elasticsearch import Elasticsearch, helpers
from fastapi import BackgroundTasks, Body, FastAPI, HTTPException, Response
from pydantic import BaseModel

## Declaring variables

In [3]:
os.environ["ELASTICSEARCH_ENDPOINT"] = getpass("Elasticsearch endpoint: ")
os.environ["ELASTICSEARCH_API_KEY"] = getpass("Elasticsearch api-key: ")

nest_asyncio.apply()

## Instance a Elasticsearch client

In [4]:
ES_INDEX = "vet-visits"

es_client = Elasticsearch(
    hosts=[os.environ["ELASTICSEARCH_ENDPOINT"]],
    api_key=os.environ["ELASTICSEARCH_API_KEY"],
)

## Indexing data

In [None]:
try:
    es_client.indices.create(
        index=ES_INDEX,
        body={
            "mappings": {
                "properties": {
                    "breed": {"type": "keyword"},
                    "owner_name": {
                        "type": "text",
                        "fields": {"keyword": {"type": "keyword"}},
                    },
                    "pet_name": {
                        "type": "text",
                        "fields": {"keyword": {"type": "keyword"}},
                    },
                    "species": {"type": "keyword"},
                    "vaccination_history": {"type": "keyword"},
                    "visit_details": {"type": "text"},
                }
            }
        },
    )

    print(f"Index '{ES_INDEX}' created.")
except Exception as e:
    print(f"Error al crear el índice '{ES_INDEX}': {e}")

In [None]:
def build_data(ndjson_file, index_name):
    with open(ndjson_file, "r") as f:
        for line in f:
            doc = json.loads(line)
            yield {"_index": index_name, "_source": doc}


try:
    success, errors = helpers.bulk(es_client, build_data("vet-visits.ndjson", ES_INDEX))
    print(f"{success} documents indexed successfully")

    if errors:
        print("Errors during indexing:", errors)
except Exception as e:
    print(f"Error: {str(e)}")

## FastAPI setup

In [9]:
app = FastAPI()

## Pydantic models for the request and response

In [10]:
# Pydantic model for the request
class SearchRequest(BaseModel):
    term: str
    size: int = 10


# Format for hits
class SearchHit(BaseModel):
    owner_name: str = ""
    visit_details: str = ""


# Pydantic model for the response
class SearchResponse(BaseModel):
    hits: List[SearchHit]
    total: int

## Endpoints

In [11]:
@app.get("/ping")
async def ping():
    try:
        health = await es_client.cluster.health()

        return {
            "status": "success",
            "message": "Connected to Elasticsearch",
            "cluster_status": health["status"],
            "number_of_nodes": health["number_of_nodes"],
            "active_shards": health["active_shards"],
        }
    except Exception as e:
        status_code = getattr(e, "status_code", 500)

        raise HTTPException(
            status_code=status_code,
            detail=f"Error connecting to Elasticsearch: {str(e)}",
        )

### Search endpoint without Pydantic models

In [12]:
@app.post("/search")
async def search(query: dict = Body(...)):
    try:
        result = await es_client.search(index=ES_INDEX, body=query)

        return result
    except Exception as e:
        status_code = getattr(e, "status_code", 500)

        raise HTTPException(status_code=status_code, detail=str(e))

### Search endpoint with Pydantic models

In [None]:
# Using Pydantic models for request/response validation
@app.post("/search", response_model=SearchResponse)
async def search_v3(request: SearchRequest):
    try:
        query = {
            "query": {"match_phrase": {"visit_details": request.term}},
            "size": request.size,
        }

        result = await es_client.search(index=ES_INDEX, body=query)
        hits = result["hits"]["hits"]
        results = []

        for hit in hits:
            source = hit.get("_source", {})
            results.append(
                SearchHit(
                    owner_name=source["owner_name"],
                    visit_details=source["visit_details"],
                )
            )

        return SearchResponse(hits=results, total=len(results))
    except Exception as e:
        status_code = getattr(e, "status_code", 500)

        raise HTTPException(status_code=status_code, detail=str(e))

In [31]:
async def check_task(es_client, task_id):
    try:
        while True:
            status = await es_client.tasks.get(task_id=task_id)
            if status.get("completed", False):
                print(f"Task {task_id} completed.")
                # Here should be the logic to send the email
                break

            await asyncio.sleep(2)
    except Exception as e:
        print(f"Error checking task {task_id}: {e}")


# Background task endpoint
@app.post("/delete-by-query")
async def delete_by_query(
    request: SearchRequest = Body(...), background_tasks: BackgroundTasks = None
):
    try:
        body = {"query": {"term": {"pet_name.keyword": request.term}}}

        response = await es_client.delete_by_query(
            index=ES_INDEX, body=body, wait_for_completion=False
        )

        task_id = response.get("task")
        if task_id:
            background_tasks.add_task(check_task, es_client, task_id)

        return Response(
            status_code=200,
            content=json.dumps(
                {
                    "message": "Delete by query. The response will be send by email when the task is completed.",
                    "task_id": task_id,
                }
            ),
            media_type="application/json",
        )
    except Exception as e:
        status_code = getattr(e, "status_code", 500)

        raise HTTPException(status_code=status_code, detail=str(e))

In [None]:
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

## Clean up

In [None]:
es_client.indices.delete(index=ES_INDEX, ignore=[400, 404])