In [11]:
import numpy as np
import os
import time
import requests
import json

In [5]:
ES_HOST = os.environ.get("ES_HOST", "http://es01:9200")
WORDS_FILE = "/app/words.txt"

def wait_for_elasticsearch():
    """ Wait until Elasticsearch is accessible on ES_HOST. """
    while True:
        try:
            r = requests.get(ES_HOST, timeout=3)
            if r.status_code == 200:
                print(f"Elasticsearch is up at {ES_HOST}.")
                break
        except requests.exceptions.RequestException:
            pass
        print(f"Waiting for Elasticsearch to be ready at {ES_HOST}...")
        time.sleep(3)

        
wait_for_elasticsearch()

Elasticsearch is up at http://es01:9200.


In [7]:
def create_autocomplete_index():
    """ Create the 'autocomplete' index with a 'completion' field mapping. """
    print("Creating the 'autocomplete' index with completion mapping...")

    # Define the mapping for the 'suggest' field
    mapping = {
        "mappings": {
            "properties": {
                "suggest": {
                    "type": "completion"
                }
            }
        }
    }

    url = f"{ES_HOST}/autocomplete"
    # Use PUT to create or update index
    response = requests.put(url, json=mapping)
    if response.status_code in (200, 201):
        print("Index created or updated successfully.")
    else:
        # It's okay if index already exists; 400/404 can happen
        print(f"Index creation response ({response.status_code}): {response.text}")

create_autocomplete_index()

Creating the 'autocomplete' index with completion mapping...
Index created or updated successfully.


In [19]:
def create_autocomplete_index2():
    """ Create the 'autocomplete' index with a 'completion' field mapping. """
    print("Creating the 'autocomplete' index with completion mapping...")

    # Define the mapping for the 'suggest' field
    payload = {
        "settings": {
            "analysis": {
                "analyzer": {
                    "folding_analyzer": {
                        "type": "custom",
                        "tokenizer": "standard",
                        "filter": [
                            "lowercase",
                            "asciifolding"
                        ]
                    }
                }
            }
        },
        "mappings": {
            "properties": {
                "suggest": {
                    "type": "completion",
                    "analyzer": "folding_analyzer",
                    "preserve_separators": True,
                    "preserve_position_increments": True,
                    "max_input_length": 50
                }
            }
        }
    }
    url = f"{ES_HOST}/autocomplete2"
    # Use PUT to create or update index
    response = requests.put(url, json=payload)
    if response.status_code in (200, 201):
        print("Index created or updated successfully.")
    else:
        # It's okay if index already exists; 400/404 can happen
        print(f"Index creation response ({response.status_code}): {response.text}")

create_autocomplete_index2()

Creating the 'autocomplete' index with completion mapping...
Index created or updated successfully.


In [20]:
def bulk_index_in_batches(index_name, words, batch_size=50000):
    """
    Splits the 'words' list into chunks of 'batch_size' and 
    sends each chunk to the Elasticsearch Bulk API.
    """
    def generate_bulk_payload(batch):
        lines = []
        for word in batch:
            # Action/metadata
            lines.append(json.dumps({"index": {"_index": index_name}}))
            # Document body
            lines.append(json.dumps({"suggest": word}))
        return "\n".join(lines) + "\n"

    total_docs = len(words)
    start = 0

    while start < total_docs:
        end = min(start + batch_size, total_docs)
        batch = words[start:end]
        payload = generate_bulk_payload(batch)

        # Send bulk
        bulk_url = f"{ES_HOST}/_bulk?refresh=wait_for"
        headers = {"Content-Type": "application/x-ndjson"}
        response = requests.post(bulk_url, data=payload, headers=headers)

        if response.status_code not in (200, 201):
            print(f"Batch [{start}:{end}] failed with status {response.status_code}")
            print(response.text)
        else:
            resp_json = response.json()
            if resp_json.get("errors"):
                print(f"Batch [{start}:{end}] had partial failures:")
                for item in resp_json.get("items", []):
                    if "error" in item["index"]:
                        print(json.dumps(item["index"]["error"], indent=2))
            else:
                print(f"Batch [{start}:{end}] indexed successfully.")

        start = end

def index_words_in_batches(index_name):
    if not os.path.isfile(WORDS_FILE):
        print("No words.txt file found; skipping.")
        return

    with open(WORDS_FILE, "r", encoding="utf-8") as f:
        words = [line.strip() for line in f if line.strip()]

    # Here we choose 1000 as a default batch size; adjust to your needs
    bulk_index_in_batches(index_name, words)

# Then call:
index_words_in_batches("autocomplete2")

Batch [0:50000] indexed successfully.
Batch [50000:100000] indexed successfully.
Batch [100000:150000] indexed successfully.
Batch [150000:200000] indexed successfully.
Batch [200000:250000] indexed successfully.
Batch [250000:300000] indexed successfully.
Batch [300000:350000] indexed successfully.
Batch [350000:400000] indexed successfully.
Batch [400000:450000] indexed successfully.
Batch [450000:466550] indexed successfully.


In [18]:
def autocomplete(prefix):
    """
    Send a suggest query to the 'autocomplete' index with the given prefix
    and return a list of suggestion options.
    """
    url = f"{ES_HOST}/autocomplete/_search"
    headers = {"Content-Type": "application/json"}
    
    # Suggest query body
    suggest_query = {
        "suggest": {
            "word-suggest": {
                "prefix": prefix,
                "completion": {
                    "field": "suggest"
                }
            }
        }
    }
    
    response = requests.post(url, headers=headers, json=suggest_query)
    if response.status_code != 200:
        print(f"Error fetching suggestions. Status: {response.status_code}")
        print(response.text)
        return []
    
    data = response.json()
    
    # The suggest results are under data["suggest"]["word-suggest"][0]["options"]
    # We'll parse and return the suggested text.
    suggestions = []
    try:
        suggest_items = data["suggest"]["word-suggest"][0]["options"]
        for item in suggest_items:
            suggestions.append(item["text"])
    except (KeyError, IndexError):
        print("Unexpected response format:")
        print(json.dumps(data, indent=2))
    
    return suggestions


test_prefixes = ["wo", "he", "wor", "xyz"]

for prefix in test_prefixes:
    results = autocomplete(prefix)
    print(f"Prefix: '{prefix}' -> Suggestions: {results}")


Prefix: 'wo' -> Suggestions: ['WO', 'woa', 'woad', 'woad-leaved', 'woad-painted']
Prefix: 'he' -> Suggestions: ['HE', 'he-all', 'he-balsam', 'he-broom', 'he-cabbage-tree']
Prefix: 'wor' -> Suggestions: ['Worcester', 'Worcestershire', 'Word', 'worble', 'word-beat']
Prefix: 'xyz' -> Suggestions: ['xyz']


In [22]:

def fuzzy_autocomplete(index_name, prefix):    
    max_fuzziness = 2  # 3 doesn't work, need another approach

    print(f"Querying prefix='{prefix}' with fuzziness={max_fuzziness}...")

    query = {
        "suggest": {
            "autocomplete-suggest": {
                "prefix": prefix,
                "completion": {
                    "field": "suggest",
                    "skip_duplicates": True,
                    "fuzzy": {
                        "fuzziness": max_fuzziness,
                        "transpositions": True,
                        "min_length": 2,    # minimum length before fuzzy
                        "prefix_length": 1  # exact match for first character
                    }
                }
            }
        }
    }

    url = f"{ES_HOST}/{index_name}/_search"
    headers = {"Content-Type": "application/json"}
    response = requests.post(url, json=query, headers=headers)
    
    if response.status_code != 200:
        print(f"Error searching. Status: {response.status_code}")
        print(response.text)
        return []
    
    data = response.json()
    try:
        # The path to suggestions -> data["suggest"]["autocomplete-suggest"][0]["options"]
        options = data["suggest"]["autocomplete-suggest"][0]["options"]
        return [opt["text"] for opt in options]
    except (KeyError, IndexError):
        print("Unexpected search response format.")
        print(json.dumps(data, indent=2))
        return []
    

test_prefixes = ["exampel", "exzmple", "workd", "hel", "wonderful"]
for prefix in test_prefixes:
    suggestions = fuzzy_autocomplete("autocomplete2", prefix)
    print(f"Prefix: '{prefix}' -> Suggestions: {suggestions}")
    print("-" * 60)


Querying prefix='exampel' with fuzziness=2...
Prefix: 'exampel' -> Suggestions: ['examplar', 'example', "example's", 'exampled', 'exampleless']
------------------------------------------------------------
Querying prefix='exzmple' with fuzziness=2...
Prefix: 'exzmple' -> Suggestions: ['examplar', 'example', "example's", 'exampled', 'exampleless']
------------------------------------------------------------
Querying prefix='workd' with fuzziness=2...
Prefix: 'workd' -> Suggestions: ['Worcester', 'Worcestershire', 'Word', 'worble', 'word-beat']
------------------------------------------------------------
Querying prefix='hel' with fuzziness=2...
Prefix: 'hel' -> Suggestions: ['H', 'H-bar', 'H-beam', 'H-blast', 'h.']
------------------------------------------------------------
Querying prefix='wonderful' with fuzziness=2...
Prefix: 'wonderful' -> Suggestions: ['wonderful', 'wonderfuller', 'wonderfully', 'wonderfulness', 'wonderfulnesses']
--------------------------------------------------

In [None]:

# https://stackoverflow.com/questions/53652482/elastic-search-with-fuzziness-more-than-2-characters-distance

# I think fuzzy queries are inappropriate to your case. Fuzziness is a way to solve problem of little misspellings that human can make while typing his query. Human brain can easily skip substitution of some letter in the middle of word without loosing of overall meaning of phrase. The similar behavior we expect from search engine.

# Try to use regular partial maching with ngrams analyzer:

#     PUT my_index
#     {
#         "settings": {
#             "analysis": {
#                 "filter": {
#                     "trigrams_filter": {
#                         "type": "ngram",
#                         "min_gram": 3,
#                         "max_gram": 3
#                     }
#                 },
#                 "analyzer": {
#                     "trigrams": {
#                         "type": "custom",
#                         "tokenizer": "standard",
#                         "filter": [
#                             "lowercase",
#                             "trigrams_filter"
#                         ]
#                     }
#                 }
#             }
#         }, 
#         "mappings": {
#             "my_type": {
#                 "properties": {
#                     "my_field": {
#                         "type": "text",
#                         "analyzer": "trigrams"
#                     }
#                 }
#             }
#         }
#     }

