# Bulk APIs

In [2]:
from pprint import pprint
from elasticsearch import Elasticsearch

In [3]:
es=Elasticsearch("http://localhost:9200")
client_info=es.info()

In [5]:
print("Connected to ElasticSearch !")
pprint(client_info.body)

Connected to ElasticSearch !
{'cluster_name': 'docker-cluster',
 'cluster_uuid': 'XoT9Xm1xR3O6L3zfFdu5nQ',
 'name': 'd8153502b8b9',
 'tagline': 'You Know, for Search',
 'version': {'build_date': '2023-02-13T09:35:20.314882762Z',
             'build_flavor': 'default',
             'build_hash': '2d58d0f136141f03239816a4e360a8d17b6d8f29',
             'build_snapshot': False,
             'build_type': 'docker',
             'lucene_version': '9.4.2',
             'minimum_index_compatibility_version': '7.0.0',
             'minimum_wire_compatibility_version': '7.17.0',
             'number': '8.6.2'}}


In [8]:
# create the index for bulk api operation

# delete the bulk_api_index if already exists
es.indices.delete(index="bulk_api_index", ignore_unavailable=True)
# create the bulk_api_index

es.indices.create(index="bulk_api_index")

ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'bulk_api_index'})

In [16]:
docs=[
    {
        "field1":"value11",
        "field2":"value12",
        "field3":3
    },
    {
        "field1":"value21",
        "field2":"value22",
        "field3":3
    },
    {
        "field1":"value31",
        "field2":"value32",
        "field3":3
    }
]

- we can use `source format` and `doc format` for update

In [17]:
# bulk api for creating/index, updating and deleting the document in index
bulk_res=es.bulk(
    operations=[
        #action1: index
        {
            "index":{
                "_index":"bulk_api_index",
                "_id":"1"
            }
        },
        # data for : action 1 in source format
        docs[0],
        # action 2: index/insert doc 
        {
            "index":{
                "_id":"2",
                "_index":"bulk_api_index"
            }
        },
        docs[1], # data for action2 in sorce format
        {
            # action 3: insert doc 3 without specifying _id value
            "index":{
                "_index":"bulk_api_index"
            }
        },
        # data for action 3 in source format
        docs[2],
        # action 4: deleting the doc at id 1
        {
            "delete":{
                "_index":"bulk_api_index",
                "_id":"1"
            }
            
        },
        # no source required for delete
        # action 5: update the doc at id 2
        {
            "index":{
                "_index":"bulk_api_index",
                "_id":"2"
            }
        },
        # data for action 5 in source format
        {
            
            "field3":134,
            "field4":"hey adding an extra field in the doc"
        
        }
    ]
)


In [18]:
bulk_res.body['errors']

False

In [19]:
bulk_res.body

{'took': 95,
 'errors': False,
 'items': [{'index': {'_index': 'bulk_api_index',
    '_id': '1',
    '_version': 1,
    'result': 'created',
    '_shards': {'total': 2, 'successful': 1, 'failed': 0},
    '_seq_no': 2,
    '_primary_term': 1,
    'status': 201}},
  {'index': {'_index': 'bulk_api_index',
    '_id': '2',
    '_version': 1,
    'result': 'created',
    '_shards': {'total': 2, 'successful': 1, 'failed': 0},
    '_seq_no': 3,
    '_primary_term': 1,
    'status': 201}},
  {'index': {'_index': 'bulk_api_index',
    '_id': '_ajeNpUBUdIER6xMfbEd',
    '_version': 1,
    'result': 'created',
    '_shards': {'total': 2, 'successful': 1, 'failed': 0},
    '_seq_no': 4,
    '_primary_term': 1,
    'status': 201}},
  {'delete': {'_index': 'bulk_api_index',
    '_id': '1',
    '_version': 2,
    'result': 'deleted',
    '_shards': {'total': 2, 'successful': 1, 'failed': 0},
    '_seq_no': 5,
    '_primary_term': 1,
    'status': 200}},
  {'index': {'_index': 'bulk_api_index',
    '_i

In [23]:
es.get(index="bulk_api_index",id="2").body

{'_index': 'bulk_api_index',
 '_id': '2',
 '_version': 2,
 '_seq_no': 6,
 '_primary_term': 1,
 'found': True,
 '_source': {'doc': {'field3': 134,
   'field4': 'hey adding an extra field in the doc'}}}

In [24]:
es.get(index="bulk_api_index",id="_ajeNpUBUdIER6xMfbEd").body

{'_index': 'bulk_api_index',
 '_id': '_ajeNpUBUdIER6xMfbEd',
 '_version': 1,
 '_seq_no': 4,
 '_primary_term': 1,
 'found': True,
 '_source': {'doc': {'field1': 'value31', 'field2': 'value32', 'field3': 3}}}

In [26]:
es.count(index="bulk_api_index").body["count"]

2

In [None]:
update_as_upsert=es.bulk(
    operations=[
        {
            "update":{
                "_index":"bulk_api_index",
                "_id":"4"
            }
        },
        {
            "doc_as_upsert":"true", # update as upsert
            "doc":{
                "field1":"field 1 value",
                "field2":"field 2 value"
            }
        }
    ]
)

In [31]:
update_as_upsert.body['errors']

False

In [32]:
update_as_upsert.body

{'took': 5,
 'errors': False,
 'items': [{'update': {'_index': 'bulk_api_index',
    '_id': '4',
    '_version': 1,
    'result': 'created',
    '_shards': {'total': 2, 'successful': 1, 'failed': 0},
    '_seq_no': 7,
    '_primary_term': 1,
    'status': 201}}]}

In [30]:
es.get(index="bulk_api_index", id="4").body

{'_index': 'bulk_api_index',
 '_id': '4',
 '_version': 1,
 '_seq_no': 7,
 '_primary_term': 1,
 'found': True,
 '_source': {'field1': 'field 1 value', 'field2': 'field 2 value'}}