# Loading Data into the Elasticsearch 
We are going to use python to read the data and load that into *elasticsearch*. We need the the *elasticsearch* python package installed to do that. 

In [1]:
from elasticsearch import Elasticsearch
from sentence_transformers import SentenceTransformer
from pprint import pprint


## Connecting to Elasticsearch 
es = Elasticsearch( "https://elk-single-node:9200", 
    ca_certs="/home/vagrant/data/elasticsearch/certs/ca/ca.crt", 
    basic_auth=('elastic','Ucsc@1234')
)

model = SentenceTransformer('all-MiniLM-L6-v2')

print( es.info() )

  from .autonotebook import tqdm as notebook_tqdm


{'name': 'elk-single-node', 'cluster_name': 'elk-single-node', 'cluster_uuid': 'Gkdt5h9RSiC2fvIFhQUS7A', 'version': {'number': '8.11.1', 'build_flavor': 'default', 'build_type': 'deb', 'build_hash': '6f9ff581fbcde658e6f69d6ce03050f060d1fd0c', 'build_date': '2023-11-11T10:05:59.421038163Z', 'build_snapshot': False, 'lucene_version': '9.8.0', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'}


Let us now define the functions which we need to handle this operation. 

In [13]:
def create_index():
    es.indices.delete(index='my_documents-v3', ignore_unavailable=True)
    es.indices.create(
        index='my_documents-v3',
        mappings={
            'properties': {
            'embedding': {
                    'type': 'dense_vector',
                },
            'elser_embedding': {
                'type': 'sparse_vector',
               },
            }
            },
        settings={
            'index': {
                'default_pipeline': 'elser-ingest-pipeline'
            }
        }
    )


def insert_document(document):
    return es.index(index='my_documents-v3', document={
            **document,
            'embedding': model.encode(document['summary']),
        })

def insert_documents(documents):
    operations = []
    for document in documents:
            operations.append({'index': {'_index': 'my_documents-v3'}})
            operations.append({
                **document,
                'embedding': model.encode(document['summary']),
            })
    return es.bulk(operations=operations)

def reindex():
    import json

    create_index()
    with open('data.json', 'rt') as f:
        documents = json.loads(f.read())
    return insert_documents(documents)

def search(**query_args):
    return es.search(index='my_documents-v3', **query_args)

def retrieve_document(id):
    return es.get(index='my_documents-v3', id=id)

def deploy_elser():
    import time

    # download ELSER v2
    es.ml.put_trained_model(model_id='.elser_model_2',
                                input={'field_names': ['text_field']})
        
    # wait until ready
    while True:
        status = es.ml.get_trained_models(model_id='.elser_model_2',
                                            include='definition_status')
        if status['trained_model_configs'][0]['fully_defined']:
            # model is ready
            break
        time.sleep(1)

        # deploy the model
    es.ml.start_trained_model_deployment(model_id='.elser_model_2')

    # define a pipeline
    es.ingest.put_pipeline(
        id='elser-ingest-pipeline',
        processors=[
            {
                'inference': {
                    'model_id': '.elser_model_2',
                    'input_output': [
                        {
                            'input_field': 'summary',
                            'output_field': 'elser_embedding',
                        }
                    ]
                }
                }
        ]
    )

def deploy_elser_model():
    """Deploy the ELSER v2 model to Elasticsearch."""
    try:
        deploy_elser()
    except Exception as exc:
        print(f'Error: {exc}')
    else:
        print(f'ELSER model deployed.')

def start_elser_v2_model():
    import time 
    # wait until ready
    while True:
        status = es.ml.get_trained_models(model_id='.elser_model_2',
                                            include='definition_status')
        if status['trained_model_configs'][0]['fully_defined']:
            # model is ready
            break
        time.sleep(1)

        # deploy the model
    es.ml.start_trained_model_deployment(model_id='.elser_model_2')

    status = es.ml.get_trained_models(model_id='.elser_model_2')

    print(status)

    stats = es.ml.get_trained_models_stats(model_id='.elser_model_2')

    print(stats)

def get_ml_model_status():
    status = es.ml.get_trained_models(model_id='.elser_model_2')

    print(status)

    stats = es.ml.get_trained_models_stats(model_id='.elser_model_2') 

def create_ingest_pipleline_for_elser_model_2():
    # define a pipeline
    es.ingest.put_pipeline(
        id='elser-ingest-pipeline',
        processors=[
            {
                'inference': {
                    'model_id': '.elser_model_2',
                    'input_output': [
                        {
                            'input_field': 'summary',
                            'output_field': 'elser_embedding',
                        }
                    ]
                }
                }
        ]
    )


Let us now use the reindex function to create an index and load the data

In [5]:
deploy_elser_model()
#reindex()

Error: ApiError(429, 'status_exception', 'Could not start deployment because no ML nodes with sufficient capacity were found')


In [None]:
# Let us try to start the ML Model
start_elser_v2_model()


In [7]:
# Get Model Status
get_ml_model_status()

{'count': 1, 'trained_model_configs': [{'model_id': '.elser_model_2', 'model_type': 'pytorch', 'model_package': {'packaged_model_id': 'elser_model_2', 'model_repository': 'https://ml-models.elastic.co', 'minimum_version': '11.0.0', 'size': 438123914, 'sha256': '2e0450a1c598221a919917cbb05d8672aed6c613c028008fedcd696462c81af0', 'metadata': {}, 'tags': [], 'vocabulary_file': 'elser_model_2.vocab.json'}, 'created_by': 'api_user', 'version': '11.0.0', 'create_time': 1702352714515, 'model_size_bytes': 0, 'estimated_operations': 0, 'license_level': 'platinum', 'description': 'Elastic Learned Sparse EncodeR v2', 'tags': ['elastic'], 'metadata': {}, 'input': {'field_names': ['text_field']}, 'inference_config': {'text_expansion': {'vocabulary': {'index': '.ml-inference-native-000002'}, 'tokenization': {'bert': {'do_lower_case': True, 'with_special_tokens': True, 'max_sequence_length': 512, 'truncate': 'first', 'span': -1}}}}, 'location': {'index': {'name': '.ml-inference-native-000002'}}}]}


In [9]:
# Create a new Ingest Pipeline
create_ingest_pipleline_for_elser_model_2()

In [10]:
# Reindex the Data
reindex()

ObjectApiResponse({'errors': False, 'took': 345, 'ingest_took': 6848, 'items': [{'index': {'_index': 'my_documents-v3', '_id': 'xcqIXIwBHclpjcJpL4tE', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 0, '_primary_term': 1, 'status': 201}}, {'index': {'_index': 'my_documents-v3', '_id': 'xsqIXIwBHclpjcJpL4tE', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 1, '_primary_term': 1, 'status': 201}}, {'index': {'_index': 'my_documents-v3', '_id': 'x8qIXIwBHclpjcJpL4tE', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 2, '_primary_term': 1, 'status': 201}}, {'index': {'_index': 'my_documents-v3', '_id': 'yMqIXIwBHclpjcJpL4tE', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 3, '_primary_term': 1, 'status': 201}}, {'index': {'_index': 'my_documents-v3', '_id': 'ycqIXIwBHclpjcJpL4tE

Let us now try to search for items in the elasicsearch database. 

In [11]:
def extract_filters(query):
    import re 

    filter_regex = r'category:([^\s]+)\s*'
    m = re.search(filter_regex, query)
    if m is None:
        return {}, query  # no filters
    filters = {
        'filter': [{
            'term': {
                'category.keyword': {
                    'value': m.group(1)
                }
            }
        }]
    }
    query = re.sub(filter_regex, '', query).strip()
    
    return filters, query

def handle_search(query):

    filters, parsed_query = extract_filters(query)
    
    results = es.search(
        query={
            'bool': {
                'must': [
                    {
                        'text_expansion': {
                            'elser_embedding': {
                                'model_id': '.elser_model_2',
                                'model_text': parsed_query,
                            }
                        },
                    }
                ],
                **filters,
            }
        },
        size=5,
    )

    #results=results['hits']['hits']
    #total=results['hits']['total']['value']
    #query=query

    print('Searching for Below Query: ')
    print(query)
    print(f'We got hits in the database!')
    pprint(results)
    print('Results are given below...')
    #pprint(total)



In [14]:
query = 'Working from home'

handle_search(query=query)


Searching for Below Query: 
Working from home
We got hits in the database!
ObjectApiResponse({'took': 48, 'timed_out': False, '_shards': {'total': 30, 'successful': 30, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 15, 'relation': 'eq'}, 'max_score': 14.170206, 'hits': [{'_index': 'my_documents-v3', '_id': 'x8qIXIwBHclpjcJpL4tE', '_score': 14.170206, '_ignored': ['content.keyword'], '_source': {'summary': 'Starting May 1, 2023, our hybrid work policy will require employees to work from the office three days a week and two days remotely.', 'updated_at': '2023-05-01', 'created_on': '2023-05-01', 'rolePermissions': ['demo', 'manager'], 'name': 'Wfh Policy Update May 2023', 'elser_embedding': {'weekends': 0.749587, 'purdue': 0.0072095944, 'pt': 0.105417535, 'year': 0.24258894, 'shift': 0.31631547, 'texas': 0.21996601, 'remote': 1.4332536, 'laptop': 0.34197634, 'salary': 0.04841312, 'jan': 0.27189222, 'alaska': 0.115874924, 'freelance': 0.07720333, 'tel': 0.11584308, 'state': 0.25