# Basic Setup.

In [2]:
#-Importing the modules-#
import warnings
import numpy as np
import pandas as pd
from typing import Any, Dict, Generator
from elasticsearch import helpers, Elasticsearch, ElasticsearchWarning

#-Suppressing the warnings-#
warnings.filterwarnings("ignore", category = ElasticsearchWarning)


In [3]:
#-Creating a dataframe from the Netflix Titles CSV-#
df = pd.read_csv("reporters.csv")

#-Creating the Elasticsearch object connected to our localhost instance-#
host = "http://localhost:9200/"
es = Elasticsearch(hosts = host, request_timeout = 600)


In [4]:
#-Testing the connection-#
es.ping()


True

# Some Data Cleaning / Pre-processing before uploading the data.

In [5]:
#-Checking the number of records before processing-#
print(f"Records before processing: {df.shape[0]}.\n")

#-Dropping all records without not associated with any outlets-#
print(f"Records without outlet_id: {df["outlet_id"].isna().sum()}.\n")
df = df.dropna(subset = ["outlet_id"])

#-Making the outlet_id as int-#
df["outlet_id"] = df["outlet_id"].astype(int)

#-Replacing the NaN values with None-#
df = df.replace(np.nan, None)

#-Checking the number of records after processing-#
print(f"Records after processing: {df.shape[0]}.")


Records before processing: 1000.

Records without outlet_id: 330.

Records after processing: 670.


# Making data ready for Ingestion.

In [6]:
#-Making the dataframe a list of rows (dict)-#
records = df.to_dict("records")

In [7]:
#-Generator function to process the list of rows-#
def doc_generator(records: list) -> Generator[Dict[str, Any], None, None]:

    #-Iterating the records-#
    for row in records:

        #-Creating a copy of the dict to not alter the original dataset-#
        row_copy = row.copy()

        #-Yielding the processed document-#
        yield {
            "_index": "tbl_reporters",
            "_id": row_copy.pop("reporter_id"),
            "_source": row_copy
        }


In [8]:
#-Processing our records using the generator function-#
processed_records = doc_generator(records)

#-Printing the first record-#
next(processed_records)


{'_index': 'tbl_reporters',
 '_id': 85382,
 '_source': {'name': 'Michael Griffiths',
  'outlet_id': 16039,
  'city': None,
  'state': None,
  'country_code': 'GB',
  'associations': '111071, 18745, 57818',
  'topics': 'Crime, Law, News, Crime And Justice, European Union',
  'twitter_description': 'News editor and covering sanctions for Global Investigations Review.  in . Also runs @FARAupdates. Still some UK corporate crime stuff. DMs open.',
  'pitch': None,
  'last_updated': '2024-08-22 07:16:54.000',
  'active': False}}

# Importing the data in bulk.

In [9]:
#-Deleting the index first to show it creates a new index if does not exist-#
response = es.indices.delete(index = "tbl_reporters", ignore_unavailable = True)
print(response)

#-Showing the index is deleted-#
response = es.indices.get(index = "*", ignore_unavailable = True)
print("tbl_reporters" in response)


{'acknowledged': True}
False


In [10]:
#-Importing the data using the helpers function-#
response = helpers.bulk(es, doc_generator(records))
print(response)


(670, [])


# Importing the data with datatype mapping.

In [11]:
#-Deleting the tbl_reporters index-#
response = es.indices.delete(index = "tbl_reporters", ignore_unavailable = True)
print(response)

#-Confirming that it is deleted-#
response = es.indices.get(index = "*", ignore_unavailable = True)
print("tbl_reporters" in response)


{'acknowledged': True}
False


In [12]:
#-Available datatypes-#
common_types = "binary, boolean, keyword, constant_keyword, wildcard, long, double, date, date_nanos"
object_types = "object, flattened, nested, join"
structured_types = "long_range, double_range, date_range, ip_range, ip, version, version, murmur3"
aggregate_types = "aggregate_metric_double, histogram"
text_types = "text, match_only_text, annotated-text, completion, search_as_you_type, semantic_text, token_count"
document_types = "dense_vector, sparse_vector, rank_feature, rank_features"
spatial_types = "geo_point, geo_shape, point, shape"
other_types = "percolator"

#-Settings dict object-#
settings = {
        "number_of_shards": 1,
        "number_of_replicas": 0
        }

mappings = {
    "properties": {
        "outlet_id": {
            "type": "double"
        }
    }
}

config = {
    "settings": settings,
    "mappings": mappings

}

In [13]:
#-Creating the index with the above settings-#
response = es.indices.create(index = "tbl_reporters", body = config)
print(response)


{'acknowledged': True, 'shards_acknowledged': True, 'index': 'tbl_reporters'}


In [14]:
#-Importing the data using the helpers function-#
response = helpers.bulk(es, doc_generator(records))
print(response)


(670, [])


# Ingesting large CSV files.

In [15]:
#-This approach is for large dataset files with limited preprocessing-#
import csv

def doc_generator(filename: str) -> Generator[Dict[str, Any], None, None]:

    #-Opening the file-#
    with open(filename, "r", encoding = "utf-8") as csv_file:

        #-Creating a CSV reader object-#
        reader = csv.DictReader(csv_file)

        #-Iterating the records-#
        for row in reader:

            #-Yielding the processed document-#
            yield {
                "_index": "tbl_reporters",
                "_id": row.pop("reporter_id"),
                "_source": row
            }

#-Doing the bulk import with this method-#
response = helpers.bulk(es, doc_generator("reporters.csv"))
print(response)


(1000, [])


In [17]:
#-Checking the ingested data using search-#
dict(es.search(index = "tbl_reporters"))


{'took': 2,
 'timed_out': False,
 '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0},
 'hits': {'total': {'value': 1000, 'relation': 'eq'},
  'max_score': 1.0,
  'hits': [{'_index': 'tbl_reporters',
    '_id': '85382',
    '_score': 1.0,
    '_source': {'name': 'Michael Griffiths',
     'outlet_id': '16039',
     'city': '',
     'state': '',
     'country_code': 'GB',
     'associations': '111071, 18745, 57818',
     'topics': 'Crime, Law, News, Crime And Justice, European Union',
     'twitter_description': 'News editor and covering sanctions for Global Investigations Review.  in . Also runs @FARAupdates. Still some UK corporate crime stuff. DMs open.',
     'pitch': '',
     'last_updated': '2024-08-22 07:16:54.000',
     'active': 'false'}},
   {'_index': 'tbl_reporters',
    '_id': '1308596',
    '_score': 1.0,
    '_source': {'name': 'Muhammad Adil',
     'outlet_id': '',
     'city': 'Peshawar',
     'state': 'Khyber Pakhtunkhwa',
     'country_code': 'PK',
    