**Connect to Elasticsearch**

In [2]:
from pprint import pprint
from elasticsearch import Elasticsearch 

es = Elasticsearch("http://localhost:9200")

client_info = es.info()

print("Successfully connected to Elasticsearch!")

pprint(client_info.body)

Successfully connected to Elasticsearch!
{'cluster_name': 'docker-cluster',
 'cluster_uuid': 'T1HeaWnRTOqX_BBgREVVbA',
 'name': '64c49e436740',
 'tagline': 'You Know, for Search',
 'version': {'build_date': '2025-10-21T10:06:21.288851013Z',
             'build_flavor': 'default',
             'build_hash': '25d88452371273dd27356c98598287b669a03eae',
             'build_snapshot': False,
             'build_type': 'docker',
             'lucene_version': '10.3.1',
             'minimum_index_compatibility_version': '8.0.0',
             'minimum_wire_compatibility_version': '8.19.0',
             'number': '9.2.0'}}


**Create the pipeline**

In [3]:
response = es.ingest.put_pipeline(
    id='lowercase_pipeline',
    description="This pipeline transforms the text to lowercase",
    processors=[
        {
            "lowercase": {"field": "text"}
        },
    ]
)

pprint(response.body)

{'acknowledged': True}


**Get the pipeline**

In [4]:
response = es.ingest.get_pipeline(id="lowercase_pipeline")
pprint(response.body)

{'lowercase_pipeline': {'created_date_millis': 1762187191745,
                        'description': 'This pipeline transforms the text to '
                                       'lowercase',
                        'modified_date_millis': 1762187191745,
                        'processors': [{'lowercase': {'field': 'text'}}]}}


**Delete a pipeline**

In [5]:
response = es.ingest.delete_pipeline(id="lowercase_pipeline")
pprint(response.body)

{'acknowledged': True}


### Simulate a pipeline
The simulate method allows you to give the pipeline fake data just to test if it is working or not. This is usually done before applying the pipeline to your real index and data.

In [6]:
response = es.ingest.put_pipeline(
    id="lowercase_pipeline",
    description="This pipeline will help to transform uppercase to lowercase!",
    processors=[
        {
            "lowercase": {"field": "text"}
        }
    ]
)

pprint(response.body)

{'acknowledged': True}


Inside the docs list, we are providing some test data. After executing the cell, nothing will be indexed. You will just get back how the documents will look like after the transformation.

In [7]:
response = es.ingest.simulate(
    id="lowercase_pipeline", 
    docs=[
        {
            "_index": "my_index", 
            "_id": "1", 
            "_source": {
                "text": "HELLO WORLD"
            }
        }
    ]
)

pprint(response.body)

{'docs': [{'doc': {'_id': '1',
                   '_index': 'my_index',
                   '_ingest': {'timestamp': '2025-11-03T16:35:07.441358777Z'},
                   '_source': {'text': 'hello world'},
                   '_version': '-3'}}]}


### Use the pipeline
Let's read the data and make the text uppercased to see if the lowercase_pipeline will be executed before indexing the documents.

In [None]:
import json 

dummy_data = json.load(open("data/data.json"))

for i, document in enumerate(dummy_data):
    uppercase_text = document["text"].upper()
    document['text'] = uppercase_text
    dummy_data[i] = document

[{'title': 'Sample Title 1',
  'text': 'THIS IS THE FIRST SAMPLE DOCUMENT TEXT.',
  'created_on': '2024-09-22'},
 {'title': 'Sample Title 2',
  'text': 'ELASTICSEARCH MAKES SEARCHING AND ANALYZING LARGE AMOUNTS OF DATA FAST AND EFFICIENT.',
  'created_on': '2024-10-05'},
 {'title': 'Sample Title 3',
  'text': 'DJANGO REST FRAMEWORK SIMPLIFIES API DEVELOPMENT WITH POWERFUL SERIALIZATION TOOLS.',
  'created_on': '2024-11-14'},
 {'title': 'Sample Title 4',
  'text': 'PYTHON PROVIDES A WIDE RANGE OF LIBRARIES FOR DATA ANALYSIS, AUTOMATION, AND BACKEND DEVELOPMENT.',
  'created_on': '2025-01-10'},
 {'title': 'Sample Title 5',
  'text': 'FASTAPI IS AN EXCELLENT CHOICE FOR BUILDING HIGH-PERFORMANCE APIS WITH ASYNC CAPABILITIES. THIS IS A DOCUMENT TOO',
  'created_on': '2025-02-02'}]

In [10]:
es.indices.delete(index="my_index", ignore_unavailable=True)
es.indices.create(index="my_index")

ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'my_index'})

Now, we pass the lowercase_pipeline to the bulk method. It will perform the transformations before indexing the documents.

In [11]:
operations = []

for document in dummy_data:
    operations.append({"index": {"_index": "my_index"}}) # action
    operations.append(document) # source

response = es.bulk(operations=operations, pipeline="lowercase_pipeline")
pprint(response.body)

{'errors': False,
 'ingest_took': 401,
 'items': [{'index': {'_id': 'NXOZSpoBX8WfhiD1cwnx',
                      '_index': 'my_index',
                      '_primary_term': 1,
                      '_seq_no': 0,
                      '_shards': {'failed': 0, 'successful': 1, 'total': 2},
                      '_version': 1,
                      'result': 'created',
                      'status': 201}},
           {'index': {'_id': 'NnOZSpoBX8WfhiD1cwnx',
                      '_index': 'my_index',
                      '_primary_term': 1,
                      '_seq_no': 1,
                      '_shards': {'failed': 0, 'successful': 1, 'total': 2},
                      '_version': 1,
                      'result': 'created',
                      'status': 201}},
           {'index': {'_id': 'N3OZSpoBX8WfhiD1cwnx',
                      '_index': 'my_index',
                      '_primary_term': 1,
                      '_seq_no': 2,
                      '_shards': {'failed': 

In [13]:
response = es.search(
    index="my_index",
    body={
        "query": {"match_all": {}}
    }
)

hits = response["hits"]["hits"]

for hit in hits:
    print(hit["_source"])

{'created_on': '2024-09-22', 'text': 'this is the first sample document text.', 'title': 'Sample Title 1'}
{'created_on': '2024-10-05', 'text': 'elasticsearch makes searching and analyzing large amounts of data fast and efficient.', 'title': 'Sample Title 2'}
{'created_on': '2024-11-14', 'text': 'django rest framework simplifies api development with powerful serialization tools.', 'title': 'Sample Title 3'}
{'created_on': '2025-01-10', 'text': 'python provides a wide range of libraries for data analysis, automation, and backend development.', 'title': 'Sample Title 4'}
{'created_on': '2025-02-02', 'text': 'fastapi is an excellent choice for building high-performance apis with async capabilities. this is a document too', 'title': 'Sample Title 5'}


### Pipeline failure
**1. Not handling the failure**
#
###### In this scenario, we donâ€™t handle failures with ignore_failure or on_failure. Instead, the pipeline will raise an exception, halting execution of any further processes, and the document will not be indexed.

In [15]:
response = es.ingest.put_pipeline(
    id="pipeline_1",
    description="This pipeline will handle multiple transformation!",
    processors=[
        {
            "lowercase": {
                "field": "text"
            }
        },
        {
            "set": {
                "field": "text",
                "value": "CHANGE BY PIPELINE",
            }
        }
    ]   
)

pprint(response.body)

{'acknowledged': True}


In [16]:
document = {
    'title': 'Sample Title 4',
    'created_on': '2024-09-25',
}

response = es.index(
    index="my_index", 
    pipeline="pipeline_1",
    body=document
)

pprint(response.body)

BadRequestError: BadRequestError(400, 'illegal_argument_exception', 'field [text] not present as part of path [text]')

### 2. Handling the failure
To handle the failures, we use ignore_failure or define an on_failure block. With ignore_failure, the pipeline will skip over the failed step and continue executing subsequent processes without interrupting the flow, allowing other documents to be indexed.

Alternatively, with on_failure, we can specify custom error-handling steps, such as logging the error, retrying, or sending notifications, ensuring the pipeline proceeds even if one step encounters an issue.

In [24]:
respose = es.ingest.put_pipeline(
    id="pipeline_2", 
    description="Pipeline with multiple transformations, handling and ignoring failures",
    processors=[
        {
            "lowercase": {
                "field": "text", 
                "on_failure": [
                    {
                        "set": {
                            "field": "text", 
                            "value": "FAILED TO LOWERCASE",
                            "ignore_failure": True,
                        }
                    }
                ]
            }
        },
        {
            "set": {
                "field": "new_field", 
                "value": "ADDED BY Shamim's PIPELINE",
                "ignore_failure": True
            }
        }
    ]
)

pprint(response.body)

{'_shards': {'failed': 0, 'skipped': 0, 'successful': 1, 'total': 1},
 'hits': {'hits': [{'_id': 'NXOZSpoBX8WfhiD1cwnx',
                    '_index': 'my_index',
                    '_score': 1.0,
                    '_source': {'created_on': '2024-09-22',
                                'text': 'this is the first sample document '
                                        'text.',
                                'title': 'Sample Title 1'}},
                   {'_id': 'NnOZSpoBX8WfhiD1cwnx',
                    '_index': 'my_index',
                    '_score': 1.0,
                    '_source': {'created_on': '2024-10-05',
                                'text': 'elasticsearch makes searching and '
                                        'analyzing large amounts of data fast '
                                        'and efficient.',
                                'title': 'Sample Title 2'}},
                   {'_id': 'N3OZSpoBX8WfhiD1cwnx',
                    '_index': 'my_index'

In [25]:
document = {
    'title': 'Sample Title 4',
    'created_on': '2024-09-25',
}

response = es.index(
    index='my_index',
    pipeline='pipeline_2',
    body=document
)
pprint(response.body)

{'_id': 'O3MsS5oBX8WfhiD1fgkQ',
 '_index': 'my_index',
 '_primary_term': 1,
 '_seq_no': 6,
 '_shards': {'failed': 0, 'successful': 1, 'total': 2},
 '_version': 1,
 'result': 'created'}


In [26]:
response = es.search(
    index="my_index", 
    body={
        "query": {
            "match_all": {}
        }
    }
)
hits = response["hits"]["hits"]

for hit in hits:
    print(hit["_source"])

{'created_on': '2024-09-22', 'text': 'this is the first sample document text.', 'title': 'Sample Title 1'}
{'created_on': '2024-10-05', 'text': 'elasticsearch makes searching and analyzing large amounts of data fast and efficient.', 'title': 'Sample Title 2'}
{'created_on': '2024-11-14', 'text': 'django rest framework simplifies api development with powerful serialization tools.', 'title': 'Sample Title 3'}
{'created_on': '2025-01-10', 'text': 'python provides a wide range of libraries for data analysis, automation, and backend development.', 'title': 'Sample Title 4'}
{'created_on': '2025-02-02', 'text': 'fastapi is an excellent choice for building high-performance apis with async capabilities. this is a document too', 'title': 'Sample Title 5'}
{'created_on': '2024-09-25', 'text': 'ADDED BY PIPELINE', 'title': 'Sample Title 4'}
{'created_on': '2024-09-25', 'new_field': "ADDED BY Shamim's PIPELINE", 'text': 'FAILED TO LOWERCASE', 'title': 'Sample Title 4'}
