### Connect to Elasticsearch

In [None]:
from elasticsearch import Elasticsearch

es = Elasticsearch("http://localhost:9200")

print(es.ping())

True


### Create an Index

#### What is an Index in Elasticsearch?
An index in Elasticsearch is similar to a database in a relational database system. It is a collection of documents that share similar characteristics. Each document is stored as a JSON object and has a unique identifier.

In [2]:
index_name = "test_index"

if not es.indices.exists(index=index_name):
    es.indices.create(index=index_name)
    print(f"Index '{index_name}' created.")
else:
    print(f"Index '{index_name}' already exists.")

Index 'test_index' created.


In [3]:
doc = {
    "name": "John Doe",
    "age": 30,
    "occupation": "Software Engineer",
    "location": "San Francisco"
}

res = es.index(index=index_name, id=1, document=doc)
print(res)

{'_index': 'test_index', '_id': '1', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 0, '_primary_term': 1}


In [4]:
query = {
    "query": {
        "match": {
            "occupation": "Software Engineer"
        }
    }
}

res = es.search(index=index_name, body=query)
print(res)


{'took': 16, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 1, 'relation': 'eq'}, 'max_score': 0.5753642, 'hits': [{'_index': 'test_index', '_id': '1', '_score': 0.5753642, '_source': {'name': 'John Doe', 'age': 30, 'occupation': 'Software Engineer', 'location': 'San Francisco'}}]}}


In [5]:
update_query = {
    "doc": {
        "age": 31
    }
}

es.update(index=index_name, id=1, body=update_query)

ObjectApiResponse({'_index': 'test_index', '_id': '1', '_version': 2, 'result': 'updated', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 1, '_primary_term': 1})

In [6]:
es.delete(index=index_name, id=1)


ObjectApiResponse({'_index': 'test_index', '_id': '1', '_version': 3, 'result': 'deleted', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 2, '_primary_term': 1})

In [7]:
aggregation_query = {
    "size": 0,
    "aggs": {
        "average_age": {
            "avg": {
                "field": "age"
            }
        }
    }
}

res = es.search(index=index_name, body=aggregation_query)
print(res)

{'took': 1, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 0, 'relation': 'eq'}, 'max_score': None, 'hits': []}, 'aggregations': {'average_age': {'value': None}}}


In [8]:
from elasticsearch.helpers import bulk

# Bulk inserting multiple documents
actions = [
    {"_index": index_name, "_id": i, "_source": {"name": f"User {i}", "age": 25 + i, "occupation": "Developer"}}
    for i in range(2, 10)
]

bulk(es, actions)


(8, [])

### What is a Shard in Elasticsearch?
A shard is a subdivision of an index. Each index is split into multiple shards for better performance and fault tolerance.

Primary Shards: Store the actual data.
Replica Shards: Backup copies of primary shards for redundancy.
Each shard is an independent Lucene index that can be stored on a different node in a cluster.

Example: How Shards Improve Performance
Imagine we have 1 million documents in an index. Instead of storing all of them in a single server, we can distribute them across multiple shards, which can then be placed on different servers (nodes).

Creating an Index with Shards and Replicas

In [20]:
index_settings = {
    "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 1
    }
}

index_name = "crime_data"

if es.indices.exists(index=index_name):
    es.indices.delete(index=index_name)  # Delete existing index to avoid conflicts
    print(f"Deleted existing index: {index_name}")

es.indices.create(index=index_name, body=index_settings)
print(f"Created index: {index_name}")

Deleted existing index: crime_data
Created index: crime_data


In [24]:
import pandas as pd
from elasticsearch.helpers import bulk

df = pd.read_csv("Crime_Data.csv")
df.fillna("", inplace=True)

actions = [
    {
        "_index": index_name,
        "_id": row["DR_NO"],
        "_source": row.to_dict()
    }
    for _, row in df.iterrows()
]

success, failed = bulk(es, actions)


  df.fillna("", inplace=True)


In [33]:
import time

query = {
    "query": {
        "match": {
            "Crime_Type": "Vehicle Stolen"
        }
    }
}

start_time = time.time()
res_sharded = es.search(index="crime_data", body=query)
sharded_time = time.time() - start_time
print(f"Sharded Index Query Time: {sharded_time:.4f} seconds")


Sharded Index Query Time: 0.0041 seconds


In [29]:
print(res_sharded)

{'took': 5, 'timed_out': False, '_shards': {'total': 3, 'successful': 3, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 0, 'relation': 'eq'}, 'max_score': None, 'hits': []}}
