Perform a Bulk API

In [1]:
# Connected to the elasticsearch 
from pprint import pprint 
from elasticsearch import Elasticsearch 
from config.development import Config

es = Elasticsearch(
    Config.ES_HOST, 
    verify_certs=True, 
    basic_auth=(Config.ES_USERNAME, Config.ES_PASSWORD)
)

client = es.info()
print("Connected to the elasticsearch ✔")
pprint(client.body)

Connected to the elasticsearch ✔
{'cluster_name': 'elasticsearch',
 'cluster_uuid': 'tdYmEtALQyuf5oSJGl74OQ',
 'name': 'david-server',
 'tagline': 'You Know, for Search',
 'version': {'build_date': '2025-06-19T01:37:57.847711500Z',
             'build_flavor': 'default',
             'build_hash': '580aff1a0064ce4c93293aaab6fcc55e22c10d1c',
             'build_snapshot': False,
             'build_type': 'deb',
             'lucene_version': '8.11.3',
             'minimum_index_compatibility_version': '6.0.0-beta1',
             'minimum_wire_compatibility_version': '6.8.0',
             'number': '7.17.29'}}


In [2]:
# Make an index 
es.indices.delete(index="my_index", ignore_unavailable=True)
es.indices.create(index="my_index")

ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'my_index'})

In [3]:
import json
from tqdm import tqdm

# Make a document from dummy_data.json
document_ids = []
dummy_data = json.load(open("../data/dummy_data.json"))

for document in tqdm(dummy_data, total=len(dummy_data)):
    response = es.index(index="my_index", body=document)
    document_ids.append(response["_id"])

100%|██████████| 5/5 [00:00<00:00, 30.73it/s]


In [4]:
document_ids

['0u4e8ZkBtfwvgmKSPsDe',
 '0-4e8ZkBtfwvgmKSP8Ad',
 '1O4e8ZkBtfwvgmKSP8A2',
 '1e4e8ZkBtfwvgmKSP8BP',
 '1u4e8ZkBtfwvgmKSP8Bo']

In [5]:
# Update the index document 
response = es.update(
    index="my_index", 
    id = document_ids[0], 
    script = {
        "source": "ctx._source.title = params.title", 
        "params": {
            "title": "Testing new data"
        }
    }
)

In [6]:
# Testing to response in here 
response = es.get(index="my_index", id=document_ids[0])
pprint(response.body)

{'_id': '0u4e8ZkBtfwvgmKSPsDe',
 '_index': 'my_index',
 '_primary_term': 1,
 '_seq_no': 5,
 '_source': {'content': 'This is sample content first entry',
             'created_on': '2023-10-01',
             'title': 'Testing new data'},
 '_type': '_doc',
 '_version': 2,
 'found': True}


In [7]:
# Update with the testing in second index 
response = es.update(
    index="my_index", 
    id = document_ids[1], 
    script = {
        "source": "ctx._source.new_field = 'Dummy data fields'"
    }
)
response

ObjectApiResponse({'_index': 'my_index', '_type': '_doc', '_id': '0-4e8ZkBtfwvgmKSP8Ad', '_version': 2, 'result': 'updated', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 6, '_primary_term': 1})

In [8]:
# Testing and response data 
response = es.get(index="my_index", id=document_ids[2])
pprint(response.body)

{'_id': '1O4e8ZkBtfwvgmKSP8A2',
 '_index': 'my_index',
 '_primary_term': 1,
 '_seq_no': 2,
 '_source': {'content': 'This is sample content third entry',
             'created_on': '2023-10-03',
             'title': 'Sample title 3'},
 '_type': '_doc',
 '_version': 1,
 'found': True}


Bulk API -> We executeed each operation one at time, with each action requiring a separate API call. This approach is slow and ineficient. Now, let's see how to accomplisih the same task using the bulk API.

In [9]:
# Test for perform bulk api 
es.indices.delete(index="my_index", ignore_unavailable=True)
es.indices.create(index="my_index")

ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'my_index'})

In [10]:
# Perform the elasticsearch bulk API
response = es.bulk(
    operations=[
        # Operation 1
        {
            "index": {
                "_id": 1,
                "_index": "my_index"
            }
        },
        # Source
        {
            "title": "Sample title 1",
            "content": "This is sample content first entry",
            "created_on": "2023-10-01"
        },

        # Operation 2
        {
            "index": {
                "_id": 2,
                "_index": "my_index"
            }
        },
        # source
        {
            "title": "Sample title 2",
            "content": "This is sample content second entry",
            "created_on": "2023-10-02"
        },

        # Operation 3
        {
            "index": {
                "_id": 3,
                "_index": "my_index"
            }
        },
        # Source
        {
            "title": "Sample title 3",
            "content": "This is sample content third entry",
            "created_on": "2023-10-03"
        },

        # Operation 4
        {
            "update": {
                "_id": 1,
                "_index": "my_index"
            }
        },
        # source
        {
            "doc": {
                "title": "Updated the title in the first index"
            }
        },

        # Operation 5
        {
            "update": {
                "_id": 2,
                "_index": "my_index"
            }
        }, 
        # source 
        {
            "doc": {
                "newfield": "Add new field in the document"
            }
        }, 
        
        # Operation 6
        {
            "delete": {
                "_id": 3, 
                "_index": "my_index"
            }
        }, 
        
    ]
)

In [11]:
from pprint import pprint 

pprint(es.info().body)

{'cluster_name': 'elasticsearch',
 'cluster_uuid': 'tdYmEtALQyuf5oSJGl74OQ',
 'name': 'david-server',
 'tagline': 'You Know, for Search',
 'version': {'build_date': '2025-06-19T01:37:57.847711500Z',
             'build_flavor': 'default',
             'build_hash': '580aff1a0064ce4c93293aaab6fcc55e22c10d1c',
             'build_snapshot': False,
             'build_type': 'deb',
             'lucene_version': '8.11.3',
             'minimum_index_compatibility_version': '6.0.0-beta1',
             'minimum_wire_compatibility_version': '6.8.0',
             'number': '7.17.29'}}


In [12]:
# Perform to return all the data query 
from pprint import pprint 

# pprint(es.info().body)
doc = {
    "size": 10000, 
    "query": {
        "match_all": {}
    }
}

response = es.search(index="my_index", body=doc)
pprint(response.body)

{'_shards': {'failed': 0, 'skipped': 0, 'successful': 1, 'total': 1},
 'hits': {'hits': [],
          'max_score': None,
          'total': {'relation': 'eq', 'value': 0}},
 'timed_out': False,
 'took': 0}
