In [1]:
! pip install Elasticsearch==7.6

Collecting Elasticsearch==7.6
  Downloading elasticsearch-7.6.0-py2.py3-none-any.whl.metadata (7.3 kB)
Downloading elasticsearch-7.6.0-py2.py3-none-any.whl (88 kB)
   ---------------------------------------- 0.0/88.7 kB ? eta -:--:--
   --------- ------------------------------ 20.5/88.7 kB 640.0 kB/s eta 0:00:01
   --------------------------- ------------ 61.4/88.7 kB 656.4 kB/s eta 0:00:01
   ---------------------------------------- 88.7/88.7 kB 834.6 kB/s eta 0:00:00
Installing collected packages: Elasticsearch
Successfully installed Elasticsearch-7.6.0


In [3]:
from elasticsearch import Elasticsearch
from elasticsearch import helpers

In [4]:
# Create the Elasticsearch client.
def get_client(hosts: list, user: str = None, password: str = None):
    if user and password:
        return Elasticsearch(hosts, http_auth=(user, password), verify_certs=False, ssl_show_warn=False)
    else:
        return Elasticsearch(hosts)

In [None]:
# Create an index table.
def create(client: Elasticsearch, index: str):
    # Index mapping information
    index_mapping = {
        "settings": {
            "index": {
                "vector": "true",  # Enable the vector feature.
                "number_of_shards": 1,  # Set the number of index shards as needed.
                "number_of_replicas": 0,  # Set the number of index replicas as needed.
            }
        },
        "mappings": {
            "properties": {
                "my_vector": {
                    "type": "vector",
                    "dimension": 234,
                    "indexing": True,
                    "algorithm": "GRAPH",
                    "metric": "euclidean"
                }
            }
        }
    }
    res = client.indices.create(index=index, body=index_mapping)
    print("create index result: ", res)

In [5]:
# Write data.
def write(client: Elasticsearch, index: str, vecs: list, bulk_size=500):
    for i in range(0, len(vecs), bulk_size):
        actions = [
            {
                "_index": index,
                "my_vector": vec,
                # Other fields can be added if necessary.
            }
            for vec in vecs[i: i+bulk_size]
        ]
        success, errors = helpers.bulk(client, actions, request_timeout=3600)
        if errors:
            print("write bulk failed with errors: ", errors)  # Handle the error as needed.
        else:
            print("write bulk {} docs success".format(success))
    client.indices.refresh(index=index, request_timeout=3600)

In [7]:
# Query a vector index.
def search(client: Elasticsearch, index: str, query: list, size: int):
    # Query statement. Select an appropriate query method.
    query_body = {
        "size": size,
        "query": {
            "vector": {
                "my_vector": {
                    "vector": query,
                    "topk": size
                }
            }
        }
    }
    res = client.search(index=index, body=query_body)
    print("search index result: ", res)

In [8]:
# Delete an index.
def delete(client: Elasticsearch, index: str):
    res = client.indices.delete(index=index)
    print("delete index result: ", res)

In [16]:
# For a non-security cluster, run the following:
# es_client = get_client(hosts=['http://192.168.2.114:9200'])

# For a security cluster with HTTPS enabled, run the following:
es_client = get_client(hosts=['https://159.138.111.237:9200'], user='admin', password='Huawei@1234')

In [23]:
# Test the index name.
index_name = "my_index"

# Create an index.
create(es_client, index=index_name)

create index result:  {'acknowledged': True, 'shards_acknowledged': True, 'index': 'my_index'}


In [24]:
# Write data.
data = [[1.0, 1.0], [2.0, 2.0], [3.0, 3.0]]
write(es_client, index=index_name, vecs=data)


write bulk 3 docs success


In [26]:
# Query an index.
query_vector = [1.0, 1.0]
search(es_client, index=index_name, query=query_vector, size=1)

search index result:  {'took': 2, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 1, 'relation': 'eq'}, 'max_score': 1.0, 'hits': [{'_index': 'my_index', '_type': '_doc', '_id': 'JhMEZZQB_Mdea1bj7QSg', '_score': 1.0, '_source': {'my_vector': [1.0, 1.0]}}]}}


In [20]:
# Delete an index.
delete(es_client, index=index_name)

delete index result:  {'acknowledged': True}


In [None]:
# For a non-security cluster, run the following:
# es_client = get_client(hosts=['http://192.168.2.114:9200'])

# For a security cluster with HTTPS enabled, run the following:
# es_client = get_client(hosts=['https://x.x.x.x:9200', 'https://x.x.x.x:9200'], user='xxxxx', password='xxxxx')

# For a security cluster with HTTPS disabled, run the following:
# es_client = get_client(hosts=['http://x.x.x.x:9200', 'http://x.x.x.x:9200'], user='xxxxx', password='xxxxx')