In [40]:
from elasticsearch import Elasticsearch, helpers
import json

**Docs**
- VIDEO: https://www.youtube.com/watch?v=C-JKcMM6IXE
- Pagination: https://soumilshah1995.blogspot.com/2020/06/elk-pre-margin-0px-border-none-padding.html
- Query Pagination: https://soumilshah1995.blogspot.com/2020/02/elastic-search-query-pagination-in.html
- KNN on ES: https://soumilshah1995.blogspot.com/2020/05/knn-machine-learning-algorithm-on.html
- Basics: https://soumilshah1995.blogspot.com/2020/01/getting-started-with-elastic-search-and.html

#### Setup and Connection

In [5]:
config = {
    "host": "localhhost",
    "port": 9200
}


ELASTIC_PASSWORD = "ivNbJFD-QHXZZZ3KhFPR"
CERT_FINGERPRINT  = "aaef55be51c34aec52942c598356c1bc53b2fd17356efd029368151977b567bb"
CA_CERT_PATH = "D:\Program Files\elasticsearch-8.7.0-windows-x86_64\elasticsearch-8.7.0\config\certs\http_ca.crt"


In [6]:
# Create the client instance
es = Elasticsearch(
    "https://localhost:9200",
    ca_certs=CA_CERT_PATH,
    basic_auth=("elastic", ELASTIC_PASSWORD)
)

# Successful response!
print(es.info())
print(es.ping())


{'name': 'HP-ASHISH', 'cluster_name': 'elasticsearch', 'cluster_uuid': 'bRWf0wh_SzWiIZUytRleiA', 'version': {'number': '8.7.0', 'build_flavor': 'default', 'build_type': 'zip', 'build_hash': '09520b59b6bc1057340b55750186466ea715e30e', 'build_date': '2023-03-27T16:31:09.816451435Z', 'build_snapshot': False, 'lucene_version': '9.5.0', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'}
True


#### Operations

In [7]:
# CREATE INDEX and DELETE
# es.indices.create(index="hello")
# es.indices.delete(index="hello")

In [8]:
# display all existing indexes
existing_indexes = es.indices.get_alias(index="*", pretty=True)
print(f"\nExisting indexes: ")
for index in existing_indexes:
    print(f"-->{index}")


Existing indexes: 
-->.security-7
-->.apm-custom-link
-->.apm-agent-configuration
-->.apm-source-map
-->.kibana-event-log-8.7.0-000001
-->.security-profile-8
-->.kibana_security_session_1
-->hello
-->.kibana_task_manager_8.7.0_001
-->.kibana_8.7.0_001


  existing_indexes = es.indices.get_alias(index="*", pretty=True)


In [9]:
# check if index is available -> throws exception if index not found
check_index = es.search(index="hello")
print(check_index)

{'took': 1, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 0, 'relation': 'eq'}, 'max_score': None, 'hits': []}}


#### Upload sample jsons

To upload multiple docs
1. use generator convert into ELK format
2. helpers.bulk(es, generator_name)


Mappings: GET test_persons/_mapping
SETTINGS: GET test_persons/_settings

1. check in kibana: `GET index_name/_search`

In [13]:
e1={
    "first_name":"Soumil",
    "last_name":"Shah",
    "age": 24,
    "about": "Full stack Software Developers ",
    "interests": ['Youtube','music'],
}

e2={
    "first_name":"nitin",
    "last_name":"Shah",
    "age": 58,
    "about": "Soumil father ",
    "interests": ['Stock','Relax'],
}

In [19]:
es.indices.create(index="test_persons")

In [20]:
docs = [e1, e2]
for id, doc in enumerate(docs):
    # you can pass id param or let ES autogenerate 
    response = es.index(index='test_persons', document=doc)

##### Upload json files using bulk api

In [48]:
# read json
data = ""
with open("./employees.json") as f:
    data = json.load(f)
print(data[0])

{'_id': '643ba51a8e8ded03a708e06e', 'index': 0, 'guid': '595a317e-fb56-4911-9fb7-ea14878f1a99', 'isActive': False, 'balance': '$3,700.46', 'picture': 'http://placehold.it/32x32', 'age': 39, 'eyeColor': 'green', 'name': 'Stafford Chen', 'gender': 'male', 'company': 'NIQUENT', 'email': 'staffordchen@niquent.com', 'phone': '+1 (858) 520-2687', 'address': '140 Grove Street, Hiko, Ohio, 3183', 'about': 'Reprehenderit amet ipsum elit culpa. Aliqua exercitation elit ullamco commodo veniam tempor exercitation elit incididunt voluptate laboris magna aliquip. Quis voluptate sint officia aute sint ipsum non culpa officia ea labore ad eu. Consequat voluptate fugiat ad culpa enim Lorem cillum pariatur deserunt pariatur do tempor.\r\n', 'registered': '2014-06-22T11:32:04 -06:-30', 'latitude': 53.245887, 'longitude': 91.978141, 'tags': ['mollit', 'fugiat', 'dolor', 'tempor', 'id', 'veniam', 'voluptate'], 'friends': [{'id': 0, 'name': 'Gibson Lindsey'}, {'id': 1, 'name': 'Bonnie Stokes'}, {'id': 2, 'n

In [52]:
def elk_generator(json_docs):
    for doc in data:
        yield {
            "_index":"empl_test" , # which index to insert doc into
            "_id": doc.get("_id"),
            "_source": {
                "uuid": doc.get("_id",""),
                "name": doc.get("name",""),
                "address": doc.get("address",""),
                "longitude": doc.get("longitude",0),
                "latitude": doc.get("latitude",0)
                
            }
        }
    

In [55]:
# dtypes for ES: https://www.elastic.co/guide/en/elasticsearch/reference/current/sql-data-types.html

upload_index_mappings = {
      "properties":{
        "name":{ "type": "text"},
        "address":{"type": "text"},
        "uuid":{"type": "text"},
        "latitude":{"type": "float"},
        "longitude":{ "type": "float"}
      }
}

# create index with mappings
es.indices.create(index="empl_test", mappings=upload_index_mappings)

In [54]:
# bulk import using bulk api
elk_generator_obj = elk_generator(json_docs=data)

try:
    upload_response = helpers.bulk(client=es, actions=elk_generator_obj)
    print(f"SUCCESFULLY INSERTED: \n {upload_response}")
except Exception as e:
    print(e)

SUCCESFULLY INSERTED: 
 (6, [])
