# Enhance Microsoft Copilot with Elasticsearch

This notebook execute an API that allows you to search for invoices using Elasticsearch generating a Ngrok tunnel to expose the API to the internet. This notebook is based on the article [Enhance Microsoft Copilot with Elasticsearch](https://www.elastic.co/blog/enhance-microsoft-copilot-with-elasticsearch).

In [None]:
%pip install fastapi pyngrok uvicorn nest-asyncio elasticsearch==8 -q

Note: you may need to restart the kernel to use updated packages.


In [4]:
import os
import json
from getpass import getpass
from datetime import datetime

import nest_asyncio
import uvicorn

from fastapi import FastAPI, Query
from pyngrok import conf, ngrok

from elasticsearch.helpers import bulk
from elasticsearch import Elasticsearch

## Setup Variables

In [5]:
os.environ["ELASTICSEARCH_ENDPOINT"] = getpass("Elastic Endpoint: ")
os.environ["ELASTICSEARCH_API_KEY"] = getpass("Elastic Api Key: ")
os.environ["NGROK_AUTH_TOKEN"] = getpass("Ngrok Auth Token: ")


INDEX_NAME = "invoices"

## Elasticsearch client

In [9]:
_client = Elasticsearch(
    os.environ["ELASTICSEARCH_ENDPOINT"],
    api_key=os.environ["ELASTICSEARCH_API_KEY"],
)

## Mappings

In [10]:
try:
    _client.indices.create(
        index=INDEX_NAME,
        body={
            "mappings": {
                "properties": {
                    "id": {"type": "keyword"},
                    "file_url": {"type": "keyword"},
                    "issue_date": {"type": "date"},
                    "description": {"type": "text", "copy_to": "semantic_field"},
                    "services": {
                        "type": "object",
                        "properties": {
                            "name": {
                                "type": "text",
                                "copy_to": "semantic_field",
                            },
                            "price": {"type": "float"},
                        },
                    },
                    "total_amount": {
                        "type": "float",
                    },
                    "semantic_field": {"type": "semantic_text"},
                }
            }
        },
    )

    print("index created successfully")
except Exception as e:
    print(
        f"Error creating inference endpoint: {e.info['error']['root_cause'][0]['reason'] }"
    )

  _client.indices.create(


index created successfully


## Ingesting documents to Elasticsearch

In [11]:
with open("invoices_data.json", "r", encoding="utf-8") as f:
    invoices = json.load(f)

In [17]:
def build_data():
    for doc in invoices:
        yield {"_index": INDEX_NAME, "_source": doc}


try:
    success, errors = bulk(_client, build_data())
    print(f"{success} documents indexed successfully")

    if errors:
        print("Errors during indexing:", errors)

except Exception as e:
    print(f"Error: {str(e)}, please wait some seconds and try again.")

15 documents indexed successfully


## Building API

In [14]:
app = FastAPI()

In [15]:
@app.get("/search/semantic")
async def search_semantic(query: str = Query(None)):
    try:
        result = _client.search(
            index=INDEX_NAME,
            query={
                "semantic": {
                    "field": "semantic_field",
                    "query": query,
                }
            },
        )

        hits = result["hits"]["hits"]
        results = [{"score": hit["_score"], **hit["_source"]} for hit in hits]

        return results
    except Exception as e:
        return Exception(f"Error: {str(e)}")


@app.get("/search/by-date")
async def search_by_date(from_date: str = Query(None), to_date: str = Query(None)):
    try:
        from_dt = datetime.strptime(from_date, "%m/%d/%Y %I:%M:%S %p")
        to_dt = datetime.strptime(to_date, "%m/%d/%Y %I:%M:%S %p")

        formatted_from = from_dt.strftime("%d/%m/%Y")
        formatted_to = to_dt.strftime("%d/%m/%Y")

        result = _client.search(
            index=INDEX_NAME,
            query={
                "range": {
                    "issue_date": {
                        "gte": formatted_from,
                        "lte": formatted_to,
                        "format": "dd/MM/yyyy",
                    }
                }
            },
        )

        hits = result["hits"]["hits"]
        results = [hit["_source"] for hit in hits]

        return results
    except Exception as e:
        return Exception(f"Error: {str(e)}")

## Running the API

In [16]:
conf.get_default().auth_token = os.environ["NGROK_AUTH_TOKEN"]
ngrok_tunnel = ngrok.connect(8000)

print("Public URL:", ngrok_tunnel.public_url)

nest_asyncio.apply()
uvicorn.run(app, port=8000)

INFO:     Started server process [20613]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)


Public URL: https://83ed-2405-201-e007-4814-41d6-efe2-96dc-533b.ngrok-free.app


t=2025-06-26T21:17:39+0530 lvl=warn msg="Stopping forwarder" name=http-8000-6c871241-e998-4ea0-953c-a0da0b7f7848 acceptErr="failed to accept connection: Listener closed"
INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [20613]


## Delete the index

In [7]:
def print_results(results):
    if results.get("acknowledged", False):
        print("DELETED successfully.")

    if "error" in results:
        print(f"ERROR: {results['error']['root_cause'][0]['reason']}")


# Cleanup - Delete Index
result = _client.indices.delete(index=INDEX_NAME, ignore=[400, 404])
print_results(result)

  result = _client.indices.delete(index=INDEX_NAME, ignore=[400, 404])


DELETED successfully.
