# Using Elasticsearch Inference API along Hugging Face models

This notebook demonstrates how to use the Elasticsearch Inference API along with Hugging Face models to build a question and answer system. This notebook is based on the [Using Elasticsearch Inference API along Hugging Face models](https://www.elastic.co/search-labs/blog/elasticsearch-inference-api-and-hugging-face).

In [1]:
%pip install requests elasticsearch -q


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


## Installing dependencies and importing packages

In [2]:
import os
import json
import requests

from dotenv import load_dotenv
from elasticsearch import Elasticsearch, helpers

load_dotenv()

True

## Setting up environment variables

Configure API keys and URLs for Elasticsearch and Hugging Face, along with the index name and inference endpoint identifier.

In [6]:
ELASTICSEARCH_API_KEY = os.getenv("ELASTICSEARCH_API_KEY")
ELASTICSEARCH_URL = os.getenv("ELASTICSEARCH_URL")
HUGGING_FACE_API_KEY = os.getenv("HUGGING_FACE_API_KEY")
HUGGING_FACE_INFERENCE_ENDPOINT_URL = os.getenv("HUGGING_FACE_INFERENCE_ENDPOINT_URL")


INDEX_NAME = "blog-posts"
INFERENCE_ENDPOINT_ID = "hugging-face-mistral-7b-instruct"

## Elasticsearch Python client

Initialize the Elasticsearch client using the configured URL and API key.

In [4]:
es_client = Elasticsearch(ELASTICSEARCH_URL, api_key=ELASTICSEARCH_API_KEY)

## Hugging Face completions inference endpoint setup

Create an Elasticsearch inference endpoint that connects to the Hugging Face model for generating responses based on blog articles.

In [17]:
try:
    resp = es_client.inference.put(
        task_type="chat_completion",
        inference_id=INFERENCE_ENDPOINT_ID,
        body={
            "service": "hugging_face",
            "service_settings": {
                "api_key": HUGGING_FACE_API_KEY,
                "url": HUGGING_FACE_INFERENCE_ENDPOINT_URL,
            },
        },
    )

    print(
        "Chat completion inference endpoint created successfully:",
        resp["inference_id"],
    )
except Exception as e:
    print("Error creating chat completion inference endpoint:", {e})

Chat completion inference endpoint created successfully: hugging-face-mistral-7b-instruct


### Creating index mapping

Define field types and properties for the blog articles index.

In [36]:
try:
    mapping = {
        "mappings": {
            "properties": {
                "id": {"type": "keyword"},
                "title": {
                    "type": "text",
                    "copy_to": "semantic_field",
                    "fields": {"keyword": {"type": "keyword"}},
                },
                "author": {"type": "keyword", "copy_to": "semantic_field"},
                "category": {"type": "keyword", "copy_to": "semantic_field"},
                "content": {"type": "text", "copy_to": "semantic_field"},
                "date": {"type": "date"},
                "semantic_field": {"type": "semantic_text"},
            }
        }
    }

    es_client.indices.create(index=INDEX_NAME, body=mapping)
    print(f"Index {INDEX_NAME} created successfully")
except Exception as e:
    print(f"Error creating index: {e}")

Index blog-posts created successfully


In [37]:
def build_data(json_file, index_name):
    with open(json_file, "r") as f:
        data = json.load(f)

    for doc in data:
        action = {"_index": index_name, "_source": doc}
        yield action


try:
    success, failed = helpers.bulk(
        es_client,
        build_data("dataset.json", INDEX_NAME),
    )
    print(f"{success} documents indexed successfully")

    if failed:
        print(f"Errors: {failed}")
except Exception as e:
    print(f"Error: {str(e)}")

15 documents indexed successfully


## Semantic search function

Function to search for relevant articles using Elasticsearch semantic search capabilities.


In [None]:
def search_articles(query_text, index_name=INDEX_NAME, size=5):
    try:
        query = {
            "query": {
                "semantic": {
                    "field": "semantic_field",
                    "query": query_text,
                }
            },
            "size": size,
        }

        response = es_client.search(index=index_name, body=query)
        hits = response["hits"]["hits"]

        return hits
    except Exception as e:
        print(f"Semantic search error: {str(e)}")
        return []

### Streaming function for real-time responses

Send messages to the Elasticsearch inference endpoint with streaming support, processing server-sent events to extract model responses in real-time.

In [23]:
def stream_chat_completion(messages: list, inference_id: str = INFERENCE_ENDPOINT_ID):

    url = f"{ELASTICSEARCH_URL}/_inference/chat_completion/{inference_id}/_stream"
    payload = {"messages": messages}
    headers = {
        "Authorization": f"ApiKey {ELASTICSEARCH_API_KEY}",
        "Content-Type": "application/json",
    }

    try:
        response = requests.post(url, json=payload, headers=headers, stream=True)
        response.raise_for_status()

        for line in response.iter_lines(decode_unicode=True):
            if line:
                line = line.strip()

                # Handle Server-Sent Events format
                # Skip event lines like "event: message"
                if line.startswith("event:"):
                    continue

                # Process data lines
                if line.startswith("data: "):
                    data_content = line[6:]  # Remove "data: " prefix

                    # Skip empty data or special markers
                    if not data_content.strip() or data_content.strip() == "[DONE]":
                        continue

                    try:
                        chunk_data = json.loads(data_content)

                        # Extract the content from the response structure
                        if "choices" in chunk_data and len(chunk_data["choices"]) > 0:
                            choice = chunk_data["choices"][0]
                            if "delta" in choice and "content" in choice["delta"]:
                                content = choice["delta"]["content"]
                                if content:  # Only yield non-empty content
                                    yield content

                    except json.JSONDecodeError as json_err:
                        # If JSON parsing fails, log the error but continue
                        print(f"\nJSON decode error: {json_err}")
                        print(f"Problematic data: {data_content}")
                        continue

    except requests.exceptions.RequestException as e:
        yield f"Error: {str(e)}"


print("✅ Streaming function defined!")

✅ Streaming function defined!


`ask_question_streaming` function to put together the semantic search and the real time chat_completions.

In [None]:
def ask_question_streaming(user_question, index_name=INDEX_NAME, max_articles=5):

    # Search for relevant articles
    articles = search_articles(user_question, index_name, size=max_articles)

    if not articles:
        print("No relevant articles found for your question.")
        return

    print("=" * 80)
    print(f"Semantic search results: {json.dumps(articles, indent=2)}")
    print("=" * 80)

    # Build context with found articles
    context = "Relevant articles found:\n\n"
    for i, article in enumerate(articles, 1):
        source = article.get("_source", article)
        context += f"Article {i}:\n"
        context += f"Title: {source.get('title', 'N/A')}\n"
        context += f"Author: {source.get('author', 'N/A')}\n"
        context += f"Category: {source.get('category', 'N/A')}\n"
        context += f"Date: {source.get('date', 'N/A')}\n"
        context += f"Content: {source.get('content', 'N/A')}\n\n"

    # Build the prompt for the model
    system_prompt = """You are an expert assistant that helps answer questions about blog articles.
    Based on the provided articles, answer the user's question clearly and accurately.
    If the information is not available in the articles, clearly indicate so.
    Cite relevant articles when appropriate."""

    user_prompt = f"""User question: {user_question}

    {context}

    Please answer the user's question based on the information from the provided articles."""

    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": user_prompt},
    ]

    print(f"Question: {user_question}\n")
    print("=" * 80)
    print("Response (streaming):\n")

    # Stream the response
    for chunk in stream_chat_completion(messages):
        print(chunk, end="", flush=True)

    print("\n" + "=" * 80)

## Use example

We'll ask about the articles that mention risks, vulnerabilities, or warnings.

In [None]:
ask_question_streaming(
    "Are there any articles that mention risks, vulnerabilities, or warnings?"
)

Semantic search results: [
  {
    "_index": "blog-posts",
    "_id": "Knww6poBDe8qGaqI7gFQ",
    "_score": 9.997918,
    "_source": {
      "id": "2",
      "author": "Security Team",
      "date": "2025-11-02",
      "category": "security",
      "content": "We have identified a critical vulnerability in the authentication system that could allow unauthorized access. The vulnerability affects users using JWT tokens issued before October 15th. We recommend updating immediately to SDK version 3.2.1. All affected tokens have been automatically revoked. Please regenerate your access credentials."
    }
  },
  {
    "_index": "blog-posts",
    "_id": "LXww6poBDe8qGaqI7gFQ",
    "_score": 8.438947,
    "_source": {
      "id": "5",
      "title": "Known risks when migrating from version 1.x to 2.0",
      "author": "Laura Perez",
      "date": "2025-11-05",
      "category": "tutorial",
      "content": "If you're planning to migrate from version 1.x to 2.0, there are several important ris

## Cleanup

Delete the index and inference endpoints to prevent consuming resources after completing the workflow.

In [35]:
# Cleanup - Delete Index
es_client.indices.delete(index=INDEX_NAME)
print(f"Index {INDEX_NAME} deleted")

Index blog-posts deleted


In [None]:
# Cleanup - Delete Inference Endpoint
es_client.inference.delete(inference_id=INFERENCE_ENDPOINT_ID)
print(f"Inference endpoint {INFERENCE_ENDPOINT_ID} deleted")

ObjectApiResponse({'acknowledged': True})