In [1]:
from pprint import pprint
from elasticsearch import Elasticsearch

es = Elasticsearch('http://localhost:9200')
client_info = es.info()
print('Connected to Elasticsearch!')
pprint(client_info.body)

Connected to Elasticsearch!
{'cluster_name': 'docker-cluster',
 'cluster_uuid': 'ZLPKoMvyRwO3jn9eeAD8Ug',
 'name': '310ad8ef32ea',
 'tagline': 'You Know, for Search',
 'version': {'build_date': '2024-08-05T10:05:34.233336849Z',
             'build_flavor': 'default',
             'build_hash': '1a77947f34deddb41af25e6f0ddb8e830159c179',
             'build_snapshot': False,
             'build_type': 'docker',
             'lucene_version': '9.11.1',
             'minimum_index_compatibility_version': '7.0.0',
             'minimum_wire_compatibility_version': '7.17.0',
             'number': '8.15.0'}}


# Ingest pipeline

In [5]:
from pprint import pprint

response = es.ingest.put_pipeline(
    id='lowercase_pipeline',
    description='This pipeline transforms the text to lowercase',
    processors=[
        {
            "lowercase": {
                "field": "text"
            }
        }
    ]
)
pprint(response.body)

{'acknowledged': True}


In [3]:
response = es.ingest.get_pipeline(id='lowercase_pipeline')
pprint(response.body)

{'lowercase_pipeline': {'description': 'This pipeline transforms the text to '
                                       'lowercase',
                        'processors': [{'lowercase': {'field': 'text'}}]}}


In [4]:
response = es.ingest.delete_pipeline(id='lowercase_pipeline')
pprint(response.body)

{'acknowledged': True}


In [6]:
response = es.ingest.put_pipeline(
    id='lowercase_pipeline',
    description='This pipeline transforms the text to lowercase',
    processors=[
        {
            "lowercase": {
                "field": "text"
            }
        }
    ]
)
pprint(response.body)

{'acknowledged': True}


In [8]:
response = es.ingest.simulate(
    id='lowercase_pipeline',
    docs=[
        {
            "_index": "my_index",
            "_id": "1",
            "_source": {
                "text": "HELLO WORLD"
            }
        }
    ]
)
pprint(response.body)

{'docs': [{'doc': {'_id': '1',
                   '_index': 'my_index',
                   '_ingest': {'timestamp': '2024-11-16T11:58:29.585340553Z'},
                   '_source': {'text': 'hello world'},
                   '_version': '-3'}}]}


In [9]:
import os
import json

dummy_data = json.load(open(os.path.join(os.getcwd(), "data", "dummy.json")))
for i, document in enumerate(dummy_data):
    uppercased_text = document['text'].upper()
    document['text'] = uppercased_text
    dummy_data[i] = document

dummy_data

[{'title': 'Sample Title 1',
  'text': 'THIS IS THE FIRST SAMPLE DOCUMENT TEXT.',
  'created_on': '2024-09-22'},
 {'title': 'Sample Title 2',
  'text': 'HERE IS ANOTHER EXAMPLE OF A DOCUMENT.',
  'created_on': '2024-09-23'},
 {'title': 'Sample Title 3',
  'text': 'THE CONTENT OF THE THIRD DOCUMENT GOES HERE.',
  'created_on': '2024-09-24'}]

In [10]:
es.indices.delete(index='my_index', ignore_unavailable=True)
es.indices.create(index='my_index')

ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'my_index'})

In [11]:
operations = []
for document in dummy_data:
    operations.append({'index': {'_index': 'my_index'}})
    operations.append(document)

response = es.bulk(operations=operations, pipeline='lowercase_pipeline')
pprint(response.body)

{'errors': False,
 'ingest_took': 14,
 'items': [{'index': {'_id': 'tsfYNJMBoO_Kuf5qQB_7',
                      '_index': 'my_index',
                      '_primary_term': 1,
                      '_seq_no': 0,
                      '_shards': {'failed': 0, 'successful': 1, 'total': 2},
                      '_version': 1,
                      'result': 'created',
                      'status': 201}},
           {'index': {'_id': 't8fYNJMBoO_Kuf5qQB_7',
                      '_index': 'my_index',
                      '_primary_term': 1,
                      '_seq_no': 1,
                      '_shards': {'failed': 0, 'successful': 1, 'total': 2},
                      '_version': 1,
                      'result': 'created',
                      'status': 201}},
           {'index': {'_id': 'uMfYNJMBoO_Kuf5qQB_7',
                      '_index': 'my_index',
                      '_primary_term': 1,
                      '_seq_no': 2,
                      '_shards': {'failed': 0

In [12]:
response = es.search(index='my_index')
hits = response.body['hits']['hits']

for hit in hits:
    print(hit['_source'])

{'title': 'Sample Title 1', 'created_on': '2024-09-22', 'text': 'this is the first sample document text.'}
{'title': 'Sample Title 2', 'created_on': '2024-09-23', 'text': 'here is another example of a document.'}
{'title': 'Sample Title 3', 'created_on': '2024-09-24', 'text': 'the content of the third document goes here.'}


## Failure

In [13]:
response = es.ingest.put_pipeline(
    id='pipeline_1',
    description='Pipeline with multiple transformations',
    processors=[
        {
            "lowercase": {
                "field": "text",
            }
        },
        {
            "set": {
                "field": "text",
                "value": "CHANGED BY PIPELINE",
            }
        },
    ]
)
pprint(response.body)

{'acknowledged': True}


In [14]:
document = {
    'title': 'Sample Title 4',
    'created_on': '2024-09-25',
}

response = es.index(
    index='my_index',
    pipeline='pipeline_1',
    body=document
)
pprint(response.body)

BadRequestError: BadRequestError(400, 'illegal_argument_exception', 'field [text] not present as part of path [text]')

In [15]:
response = es.ingest.put_pipeline(
    id='pipeline_2',
    description='Pipeline with multiple transformations, handling and ignoring failures',
    processors=[
        {
            "lowercase": {
                "field": "text",
                "on_failure": [
                    {
                        "set": {
                            "field": "text",
                            "value": "FAILED TO LOWERCASE",
                            "ignore_failure": True,
                        }
                    }
                ]
            }
        },
        {
            "set": {
                "field": "new_field",
                "value": "ADDED BY PIPELINE",
                "ignore_failure": True,
            }
        },
    ]
)
pprint(response.body)

{'acknowledged': True}


In [16]:
document = {
    'title': 'Sample Title 4',
    'created_on': '2024-09-25',
}

response = es.index(
    index='my_index',
    pipeline='pipeline_2',
    body=document
)
pprint(response.body)

{'_id': 'ucffNJMBoO_Kuf5qEx_g',
 '_index': 'my_index',
 '_primary_term': 1,
 '_seq_no': 3,
 '_shards': {'failed': 0, 'successful': 1, 'total': 2},
 '_version': 1,
 'result': 'created'}


In [17]:
response = es.search(index='my_index')
hits = response.body['hits']['hits']
for hit in hits:
    print(hit['_source'])

{'title': 'Sample Title 1', 'created_on': '2024-09-22', 'text': 'this is the first sample document text.'}
{'title': 'Sample Title 2', 'created_on': '2024-09-23', 'text': 'here is another example of a document.'}
{'title': 'Sample Title 3', 'created_on': '2024-09-24', 'text': 'the content of the third document goes here.'}
{'text': 'FAILED TO LOWERCASE', 'title': 'Sample Title 4', 'created_on': '2024-09-25', 'new_field': 'ADDED BY PIPELINE'}
