<a href="https://colab.research.google.com/github/guilhermelaviola/BusinessIntelligenceAndBigDataArchitectureWithAppliedDataScience/blob/main/Class14.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# **Distributed Search & Databases in Big Data with Elasticsearch**
Distributed search enables efficient data retrieval across multiple nodes in Big Data systems, making it possible to handle large and growing datasets. Tools like Elasticsearch implement this approach by storing JSON documents in indexed structures that speed up searches, while techniques such as replication and sharding ensure high availability, fault tolerance, and performance. Elasticsearch also provides a flexible JSON-based Query DSL for complex queries and supports bulk data ingestion through the Bulk API. In this context, concepts like indexing, MapReduce for distributed data processing, and NoSQL databases—which offer scalability and schema flexibility—play key roles. Designing a distributed search system requires careful consideration of scalability, availability, consistency, security, and ongoing monitoring to maintain performance and reliability.


In [17]:
!pip install elasticsearch



In [18]:
# Importing all the necessary libraries and resources:
from elasticsearch import Elasticsearch, helpers
import pandas as pd
import os

## **Example: Distributed Search with Elasticsearch**
Following we can see a practical example using Elasticsearch, one of the most popular distributed search engines. The search itself is performed in a distributed manner: when a query is made, the system not only retrieves the information but also organizes that search using relevance filters, such as promotions or prices. This is done in real time, aggregating and sorting results from multiple shards simultaneously.

In [19]:
es = Elasticsearch(
    'http://localhost:9200',
    basic_auth=('elastic', 'elastic')
)

In [20]:
parquet_path = './'

# Defining the function:
def index_parquet_files():
  for filename in os.listdir(parquet_path):
    if filename.endswith('.parquet'):
      # Reading the Parquet file:
      df = pd.read_parquet(os.path.join(parquet_path, filename))

      # Converting DataFrame to dictionary for insertion in Elasticsearch:
      records = df.to_dict(orient='records')

      # Preparing the data for insertion with Bulk API:
      actions = [
          {
              '_index': 'test',  # replaces 'test' with your index name.
              '_source': record
          }
          for record in records
      ]

      # Inserting data into Elasticsearch:
      helpers.bulk(es, actions)

# Executing the function:
index_parquet_files()

In [21]:
# Defining the index to be searched:
index = 'test'

# Defining the search query:
query = {
    'query': {
        'match_all': {}  # This is a simple query that returns all the documents.
    },
    'size': 200
}

In [22]:
# Executing the query:
response = es.search(index=index, body=query)

# Printing the results:
for hit in response['hits']['hits']:
  print(hit['_source'])

Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/urllib3/connection.py", line 198, in _new_conn
    sock = connection.create_connection(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/urllib3/util/connection.py", line 85, in create_connection
    raise err
  File "/usr/local/lib/python3.12/dist-packages/urllib3/util/connection.py", line 73, in create_connection
    sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/elastic_transport/_node/_http_urllib3.py", line 167, in perform_request
    response = self.pool.urlopen(
               ^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/urllib3/connectionpool.py", line 841, in urlopen
    retries = retries.increment(
              ^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/pyth

ConnectionError: Connection error caused by: ConnectionError(Connection error caused by: NewConnectionError(<urllib3.connection.HTTPConnection object at 0x787d11fe16a0>: Failed to establish a new connection: [Errno 111] Connection refused))