In [None]:
!pip install elasticsearch

In [None]:
#connecting to a server
from pprint import pprint
from elasticsearch import Elasticsearch

es = Elasticsearch('http://localhost:9200')
client_info = es.info()
print('Connected to Elasticsearch!')
pprint(client_info.body)

In [None]:
#checking if index exists
response = es.indices.exists(index='AfricGenderData')
response.body #this should return true

In [None]:
#Using the search API
response = es.search(
    index='AfrciaGenderData',
    body={
        "query": {"match_all": {}}
    }
)

n_hits = response['hits']['total']['value']
print(f"Found {n_hits} documents in index_1")

#Search in all indicies starting with index
response = es.search(
    index='index*', #alternative index='index_1,index_2',
    body={
        "query": {"match_all": {}}
    }
)

n_hits = response['hits']['total']['value']
print(f"Found {n_hits} documents in all indexes with name starting with 'index'")

#Search in all indicies available
response = es.search(
    index='_all',
    body={
        "query": {"match_all": {}}
    }
)

n_hits = response['hits']['total']['value']
print(f"Found {n_hits} documents in all indexes")

In [None]:
#lokking for documents with a specific date
response = es.search(
    index='AfricaGenderData',
    body={
        "query": {
            "term": {
                "created_on": "2024-09-22"
            }
        }
    }
)

n_hits = response['hits']['total']['value']
print(f"Found {n_hits} documents in the index"

In [None]:
#View retrieved documents
retrieved_documents = response['hits']['hits']
retrieved_documents


In [None]:
#Retrieve any documents that contian the term
response = es.search(
    index='AfricaGenderData',
    body={
        "query": {
            "match": {
                "text": "document"
            }
        }
    }
)

n_hits = response['hits']['total']['value']
print(f"Found {n_hits} documents in the index")

In [None]:
#retrieving those created before a specific date
response = es.search(
    index='AfricaGenderData',
    body={
        "query": {
            "range": {
                "created_on": {
                    "lte": "2024-09-23" #
                }
            }
        }
    }
)

n_hits = response['hits']['total']['value']
print(f"Found {n_hits} documents in my_index")

In [None]:
#combining multiple criteria
response = es.search(
    index='AfricaGenderData',
    body={
        "query": {
            "bool": {
                "must": [
                    {
                        "match": {
                            "text": "education"
                        }
                    },
                    {
                        "range": {
                            "created_on": {
                                "gte": "2024-09-24", #greter than equal
                                "lte": "2024-09-24"  #less than equal
                            }
                        }
                    }
                ]
            }
        }
    }
)

n_hits = response['hits']['total']['value']
print(f"Found {n_hits} documents in my_index")

In [None]:
#retrieving 10 documents stating with the 11th results
response = es.search(
    index="AfricaGenderData",
    body={
        "query": {
            "match_all": {}
        },
        "size": 10,
        "from": 10
    },
)

for hit in response['hits']['hits']:
    print(hit['_source'])

In [None]:
#Adding a timeout to the search
response = es.search(
    index="AfricaGenderData",
    body={
        "query": {
            "match": {
                "message": "search keyword"
            }
        },
        "timeout": "10s"
    },
)

response.body

In [None]:
#adding aggregation e.g all documents with an avg
response = es.search(
    index="AfricaGenderData",
    body={
        "query": {
            "match_all": {}
        },
        "aggs": {
            "avg_age": {
                "avg": {
                    "field": "age"
                }
            }
        }
    }
)

average_age = response['aggregations']['avg_age']['value']
print(f"Average Age: {average_age}")

In [None]:
#combining agg, timeout, size and from
response = es.search(
    index="AfricaGenderData",
    body={
        "query": {
            "match": {
                "message": "important keyword"
            }
        },
        "aggs": {
            "max_price": {
                "max": {
                    "field": "price"
                }
            }
        },
        "size": 5,
        "from": 20,
        "timeout": "5s"
    },
)

for hit in response['hits']['hits']:
    print(hit['_source'])

max_price = response['aggregations']['max_price']['value']
print(f"Max Price: {max_price}")

In [None]:
#Using vector search
from pprint import pprint

query = "What is a black hole?"
embedded_query = get_embedding(query) #embed the query

result = es.search(
    index='my_index',
    knn={
        "field": "embedding",
        "query_vector": embedded_query,
        "num_candidates": 5,
        "k": 3, #number of nearest neighbours
    }
)

n_documents = result.body["hits"]["total"]["value"]
print(f"Found {n_documents} documents")

In [None]:
#loading docs
documents = 0

#loading the docs in bulk
from tqdm import tqdm

operations = []
for document in tqdm(documents, total=len(documents)):
    operations.append({'index': {'_index': AfricaGenderData}})
    operations.append(document)

response = es.bulk(operations=operations)
pprint(response.body["errors"])

#refreshing the index after loading
es.indices.refresh(index=AfricaGednerData)

count = es.count(index=AfricaGenderDat)["count"]
print(f"Indexed {count} documents")

In [None]:
#Pagination
response = es.search(
    index=AfricaGenderData,
    body={
        "from": 0, #starting from the 0
        "size": 10, #return 10
        "sort": [
            {"timestamp": "desc"}, #sorting
            {"id": "desc"}
        ]
    }
)

hits = response["hits"]["hits"]
for hit in hits:
    print(f"ID: {hit['_source']['id']}")

In [None]:
#search after
response = es.search(
    index=AfricaGenderData,
    body={
        "size": 10, #number of results
        "sort": [
            {"timestamp": "desc"}, #sorting criteria, required for search after
            {"id": "desc"}
        ]
    }
)

hits = response["hits"]["hits"]
for hit in hits:
    print(f"ID: {hit['_source']['id']}")
    print(f"Sort values: {hit['sort']}")
    print()

In [None]:
#continuing the search after to start where you stopped
last_sort_values = hits[-1]["sort"]
response = es.search(
    index=AfricaGenderData,
    body={
        "size": 10,
        "sort": [
            {"timestamp": "desc"},
            {"id": "desc"}
        ],
        "search_after": last_sort_values
    }
)

hits = response["hits"]["hits"]
for hit in hits:
    print(f"ID: {hit['_source']['id']}")
    print(f"Sort values: {hit['sort']}")
    print()

In [None]:
#Ingest pipeline - pipeline creation
from pprint import pprint

response = es.ingest.put_pipeline(
    id='lowercase_pipeline',
    description='This pipeline transforms the text to lowercase',
    processors=[
        {
            "lowercase": {
                "field": "text"
            }
        }
    ]
)
pprint(response.body)

#Getting the pipeline
response = es.ingest.get_pipeline(id='lowercase_pipeline')
pprint(response.body)

#deleting the pipeline
response = es.ingest.delete_pipeline(id='lowercase_pipeline')
pprint(response.body)

In [None]:
#Testing the pipeline before use
response = es.ingest.put_pipeline(
    id='lowercase_pipeline',
    description='This pipeline transforms the text to lowercase',
    processors=[
        {
            "lowercase": {
                "field": "text"
            }
        }
    ]
)
pprint(response.body)

#simulate
response = es.ingest.simulate(
    id='lowercase_pipeline',
    docs=[
        {
            "_index": "my_index",
            "_id": "1",
            "_source": {
                "text": "HELLO WORLD"
            }
        }
    ]
)
pprint(response.body)

#Applying the pipeline on an insert bulk
operations = []
for document in dummy_data:
    operations.append({'index': {'_index': 'my_index'}})
    operations.append(document)

response = es.bulk(operations=operations, pipeline='lowercase_pipeline')
pprint(response.body)

In [None]:
#Pipeline failure handling
response = es.ingest.put_pipeline(
    id='pipeline_1',
    description='Pipeline with multiple transformations',
    processors=[
        {
            "lowercase": {
                "field": "text",
            }
        },
        {
            "set": {
                "field": "text",
                "value": "CHANGED BY PIPELINE",
            }
        },
    ]
)
pprint(response.body)

#this operation fails, no text field
document = {
    'title': 'Sample Title 4',
    'created_on': '2024-09-25',
}

response = es.index(
    index='my_index',
    pipeline='pipeline_1',
    body=document
)
pprint(response.body)

#setting it to ignore
response = es.ingest.put_pipeline(
    id='pipeline_2',
    description='Pipeline with multiple transformations, handling and ignoring failures',
    processors=[
        {
            "lowercase": {
                "field": "text",
                "on_failure": [
                    {
                        "set": {
                            "field": "text",
                            "value": "FAILED TO LOWERCASE",
                            "ignore_failure": True,
                        }
                    }
                ]
            }
        },
        {
            "set": {
                "field": "new_field",
                "value": "ADDED BY PIPELINE",
                "ignore_failure": True,
            }
        },
    ]
)
pprint(response.body)

In [None]:
#Ingest Processors
document = {
    "price": "100.50",
    "old_name": "old_value",
    "description": "<p>This is a description with HTML.</p>",
    "username": "UserNAME",
    "category": "books",
    "title": "   Example Title with Whitespace   ",
    "tags": "tag1,tag2,tag3",
    "temporary_field": "This field should be removed"
}

#pipeline example
pipeline_body = {
    "description": "Pipeline to demonstrate various ingest processors",
    "processors": [
        {
            "convert": {
                "field": "price",
                "type": "float",
                "ignore_missing": True
            }
        },
        {
            "rename": {
                "field": "old_name",
                "target_field": "new_name"
            }
        },
        {
            "set": {
                "field": "status",
                "value": "active"
            }
        },
        {
            "html_strip": {
                "field": "description"
            }
        },
        {
            "lowercase": {
                "field": "username"
            }
        },
        {
            "uppercase": {
                "field": "category"
            }
        },
        {
            "trim": {
                "field": "title"
            }
        },
        {
            "split": {
                "field": "tags",
                "separator": ","
            }
        },
        {
            "remove": {
                "field": "temporary_field"
            }
        },
        {
            "append": {
                "field": "tags",
                "value": ["new_tag"]
            }
        }
    ]
}

pipeline_id = "example pipeline"
es.ingest.put_pipeline(id=pipeline_id, body=pipeline_body)
print(f"Pipeline '{pipeline_id}' created successfully!")

#Pipeline Function
Convert: Changes the data type of a field.
Rename: Changes the name of a field.
Set: Assigns a specified value to a field.
HTML Strip: Strips HTML tags from a field's content.
Lowercase: Transforms the text in a field to lowercase.
Uppercase: Transforms the text in a field to uppercase.
Trim: Removes whitespace from the beginning and end of a field's value.
Split: Divides the field content into an array, using a comma , as the delimiter.
Remove: Deletes a field from the document.
Append: Adds a value to an array field.



In [None]:
#Using the pipeline
from pprint import pprint

es.indices.delete(index='my_index', ignore_unavailable=True)
es.indices.create(index='my_index') #create index

response = es.index(index="my_index", document=document, pipeline=pipeline_id) #add the document and run it through the pipeline
pprint(response.body)

In [None]:
#Multiple filters
response = es.search(
    index="my_index",
    body={
        "query": {
            "bool": {
                "filter": [
                    {
                        "term": {
                            "color": "yellow" #color is yellow and brand is addidas
                        }
                    },
                    {
                        "term": {
                            "brand": "adidas"
                        }
                    }
                ]
            }
        },
    },
)

hits = response.body['hits']['hits']
print(f"Found {len(hits)} documents")

In [None]:
#Adding agregations to filters
response = es.search(
    index="my_index",
    body={
        "query": {
            "bool": {
                "filter": {
                    "term": {
                        "brand": "gucci" #filter by gucci
                    }
                }
            }
        },
        "aggs": {
            "colors": {
                "terms": {
                    "field": "color.keyword" #count avalianle colors
                }
            },
            "color_red": {
                "filter": {
                    "term": {
                        "color.keyword": "red" #select and filter by red
                    }
                },
                "aggs": {
                    "models": {
                        "terms": {
                            "field": "model.keyword" #count available models
                        }
                    }
                }
            }
        },
        "post_filter": {
            "term": {
                "color": "red" #filter what to return
            }
        },
        "size": 20
    }
)
pprint(response.body)

In [None]:
#returning the agregations
colors_aggregation = response.body['aggregations']['colors']['buckets']
pprint(colors_aggregation)

In [None]:
color_red_aggregation = response.body['aggregations']['color_red']['models']['buckets']
pprint(color_red_aggregation)