# Semantic Search + NER Demo

Built following the excellent [blog post by Camille Corti-Georgiou](https://www.elastic.co/search-labs/blog/articles/developing-an-elastic-search-app-with-streamlit-semantic-search-and-named-entity-extraction).


## Import Dependencies


In [None]:
import csv
import getpass
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from getpass import getpass

from elasticsearch import Elasticsearch, exceptions
from elasticsearch.helpers import parallel_bulk
from kaggle.api.kaggle_api_extended import KaggleApi
from tqdm.auto import tqdm  # auto selects notebook GUI if in Jupyter

## Download dataset from Kaggle


In [None]:
# Configure the Kaggle API
os.environ["KAGGLE_USERNAME"] = getpass.getpass("Enter your Kaggle username:")
os.environ["KAGGLE_KEY"] = getpass.getpass("Enter your Kaggle key:")

# Initialize API
api = KaggleApi()
api.authenticate()

# Define the dataset path
dataset = "gpreda/bbc-news"

# Define download path
download_path = "../data"  # Adjust this path as needed

# Download dataset
api.dataset_download_files(dataset, path=download_path, unzip=True)

print(f"Dataset downloaded to: {download_path}")

## Use Docker to Upload and Start the Custom NER ML Model

```sh
docker run -it --rm docker.elastic.co/eland/eland:latest \
    eland_import_hub_model \
        --cloud-id $CLOUD_ID \
        --es-api-key $API_KEY \
        --hub-model-id "elastic/distilbert-base-uncased-finetuned-conll03-english" \
        --task-type ner \
        --start
```


## Set up Elasticsearch Client


In [None]:
# Create the client instance
es_client = Elasticsearch(
    # For local development
    # hosts=["http://localhost:9200"]
    cloud_id=getpass("Elastic Cloud ID: "),
    api_key=getpass("Elastic Api Key: "),
)

print(es_client.info())

## Download and Deploy ELSER Model

### Helper functions to delete the model check model status


In [None]:
def delete_existing_model(es_client: str, model_id: str) -> bool:
    """
    Deletes an existing machine learning model in Elasticsearch.

    This function attempts to delete a specified machine learning model by its ID. It will return True if the model
    is successfully deleted or if the deletion is acknowledged by Elasticsearch. If the model does not exist or if
    an error occurs during deletion, it catches the NotFoundError exception and returns False.

    Args:
        es_client (str): The Elasticsearch client instance used to connect to and interact with Elasticsearch.
        model_id (str): The unique identifier for the machine learning model to be deleted.

    Returns:
        bool: True if the model was successfully deleted or does not exist, False otherwise.
    """
    try:
        # Attempt to delete the model and check if the deletion is acknowledged
        response = es_client.ml.delete_trained_model(model_id=model_id, force=True)
        if response.get("acknowledged"):
            # success!
            print("Model deleted successfully, we will proceed with creating one")

            return True

        else:
            print(f"There was an issue deleting the model: {response}")
            return False

    except exceptions.NotFoundError:
        # Model does not exist, so return True indicates you can proceed
        print("Model doesn't exist, but we will proceed with creating one")

        return True


# Function to check if the model is downloaded and ready for deployment
def check_model_download_status(es_client: str, model_id: str) -> bool:
    while True:
        status = es_client.ml.get_trained_models(
            model_id=model_id, include="definition_status"
        )

        # Check if the model is fully defined and ready to be deployed
        if status["trained_model_configs"][0]["fully_defined"]:
            print("ELSER Model is downloaded and ready to be deployed.")

            return True
        else:
            print("ELSER Model is downloaded but not ready to be deployed.")
        # Wait for 5 seconds before checking the status again
        time.sleep(5)


def check_model_deployment_status(es_client: str, model_id: str) -> bool:
    while True:
        status = es_client.ml.get_trained_models_stats(
            model_id=model_id,
        )

        if status["trained_model_stats"][0]["deployment_stats"]["state"] == "started":
            print("ELSER Model has been successfully deployed.")

            return True

        else:
            print("ELSER Model is currently being deployed.")

        time.sleep(5)

### Deploy the ELSER model


In [None]:
# The ID for the model we're working with
model_id = ".elser_model_2_linux-x86_64"

# Call the function to delete the model if it exists
if delete_existing_model(es_client, model_id):
    # Create the ELSER model configuration and automatically download the model if it doesn't exist
    es_client.ml.put_trained_model(
        model_id=model_id, input={"field_names": ["text_field"]}
    )

    # Call the function to check the model status
    check_model_download_status(es_client, model_id)

Once the model is downloaded, we can deploy the model in our ML node. Use the following command to deploy the model.


In [None]:
# Start trained model deployment if not already deployed
try:
    es_client.ml.start_trained_model_deployment(
        model_id=model_id,
        wait_for="starting",
        queue_capacity=10000,
        number_of_allocations=1,
        threads_per_allocation=2,
    )

except exceptions.BadRequestError as e:
    print(e)

# Call the function to check the model status
check_model_deployment_status(es_client, model_id)

## Create Elasticsearch Index & Ingest Pipeline


## Define Ingest Pipeline processors, and index mappings

First we need to define the json representing the ingest pipeline processors, and the index mapping


In [None]:
es_news_pipeline_processors = [
    {"date": {"field": "pubDate", "formats": ["EEE, dd MMM yyyy HH:mm:ss zzz"]}},
    {
        "inference": {
            "model_id": ".elser_model_2",
            "input_output": [
                {"input_field": "title", "output_field": "ml-elser-title"}
            ],
            "description": 'Runs .elser_model_2 and stores resulting tokens in "ml-elser-title"',
        }
    },
    {
        "inference": {
            "model_id": ".elser_model_2",
            "input_output": [
                {"input_field": "description", "output_field": "ml-elser-description"}
            ],
            "description": 'Runs .elser_model_2 and stores resulting tokens in "ml-elser-description"',
        }
    },
    {
        "inference": {
            "model_id": "elastic__distilbert-base-uncased-finetuned-conll03-english",
            "target_field": "ml.ner",
            "field_map": {"title": "text_field"},
        }
    },
    {
        "script": {
            "lang": "painless",
            "if": "return ctx['ml']['ner'].containsKey('entities')",
            "source": "Map tags = new HashMap(); for (item in ctx['ml']['ner']['entities']) { if (!tags.containsKey(item.class_name)) tags[item.class_name] = new HashSet(); tags[item.class_name].add(item.entity);} ctx['tags'] = tags;",
        }
    },
]

In [None]:
es_news_index_mapping = {
    "_source": {"excludes": ["content_embedding"]},
    "properties": {
        "@timestamp": {"type": "date"},
        "@version": {"type": "keyword", "ignore_above": 256},
        "description": {"type": "text"},
        "ml": {
            "properties": {
                "ner": {
                    "properties": {
                        "entities": {
                            "properties": {
                                "class_name": {"type": "keyword", "ignore_above": 256},
                                "class_probability": {"type": "float"},
                                "end_pos": {"type": "long"},
                                "entity": {"type": "keyword", "ignore_above": 256},
                                "start_pos": {"type": "long"},
                            }
                        },
                        "model_id": {"type": "keyword", "ignore_above": 256},
                        "predicted_value": {"type": "keyword", "ignore_above": 256},
                    }
                }
            }
        },
        "ml-elser-description": {"type": "sparse_vector"},
        "ml-elser-title": {"type": "sparse_vector"},
        "pubDate": {
            "type": "date",
            "format": "EEE, dd MMM yyyy HH:mm:ss z",
            "ignore_malformed": True,
        },
        "tags": {
            "properties": {
                "LOC": {"type": "keyword", "ignore_above": 256},
                "MISC": {"type": "keyword", "ignore_above": 256},
                "ORG": {"type": "keyword", "ignore_above": 256},
                "PER": {"type": "keyword", "ignore_above": 256},
            }
        },
        "title": {"type": "text"},
        "url": {"type": "keyword", "ignore_above": 256},
    },
}

now, to create the ingest pipeline and search index...


In [None]:
es_client.ingest.put_pipeline(
    id="search-demos-bbc.semantic.ner-pipeline",
    description="Ingest pipeline for Semantic Ner Search Demo using BBC dataset",
    processors=es_news_pipeline_processors,
)

In [None]:
es_index = "search-demos-bbc.semantic.ner"

es_client.indices.delete(index="search-demos-bbc.semantic.ner", ignore_unavailable=True)
es_client.indices.create(
    index=es_index,
    settings={"index": {"default_pipeline": "search-demos-bbc.semantic.ner-pipeline"}},
    mappings=es_news_index_mapping,
)

## Process the CSV data and index into Elasticsearch


In [None]:
# Path to your dataset file
dataset_path = "../data/bbc_news.csv"

# Read the CSV file into a dict
data = []
with open(dataset_path, mode="r", encoding="utf-8") as file:
    # Create a DictReader object
    csv_reader = csv.DictReader(file)

    # Iterate over the rows in the CSV file
    for row in csv_reader:
        data.append(row)

# Now 'data' is a list of dictionaries, where each dictionary represents a row from the CSV file
print(data[0:5])  # Print the first row to verify

### Some helper functions for a smooth ingest experience


In [None]:
def generate_actions(documents: list):
    """
    Generates actions for bulk ingest from a list of dictionaries.

    Args:
        documents (list): A list of dictionaries, where each dictionary is a document.

    Yields:
        dict: A dictionary representing an action for bulk ingest.
    """
    for document in documents:
        yield document


def send_to_elasticsearch(
    es_client: Elasticsearch,
    es_index: str,
    documents: list,
    chunk_size: int = 500,
) -> str:
    """
    Uploads data to an Elasticsearch index using parallel processing.

    Args:
        es_client (Elasticsearch): The Elasticsearch client.
        es_index (str): The name of the Elasticsearch index.
        documents (list): A list of dictionaries to index.
        chunk_size (int): The number of documents to send in each bulk request.

    Returns:
        str: A message indicating the number of successfully indexed documents.
    """
    num_documents = len(documents)
    progress = tqdm(unit="docs", total=num_documents)
    successes = 0

    try:
        for ok, info in parallel_bulk(
            client=es_client,
            index=es_index,
            actions=generate_actions(documents),
            chunk_size=chunk_size,
            request_timeout=360,
        ):
            progress.update(1)
            progress.refresh()

            if ok:
                successes += 1
            else:
                print(f"A document failed to index: {info['index']['error']}")

    except Exception as e:
        print("An Elasticsearch error occurred:", e)
        if hasattr(e, "errors"):
            for error_detail in e.errors:
                print(error_detail)

    return f"Indexed {successes}/{num_documents} documents."

### The actual data ingest


In [None]:
num_workers = 8
thread_count = 16
chunk_size = 1000
batch_size = 500

In [None]:
def upload_in_batches(es_client, es_index, documents, batch_size=500):
    total_docs = len(documents)
    start_index = 0

    while start_index < total_docs:
        # Calculate the end index for the current batch, ensuring it doesn't exceed total_docs
        end_index = min(start_index + batch_size, total_docs)
        current_batch = documents[start_index:end_index]

        # Upload the current batch
        result = send_to_elasticsearch(
            es_client=es_client,
            es_index=es_index,
            documents=current_batch,
            chunk_size=batch_size,  # This ensures parallel_bulk processes in chunks of 500 as well
        )

        # Optionally, print/log the result of each batch upload
        print(result)

        # Update the start index for the next batch
        start_index += batch_size


# Use the function
upload_in_batches(es_client, es_index, data, batch_size=batch_size)