In [1]:
from elasticsearch import Elasticsearch, helpers
import pandas as pd
import json
import time
import warnings

In [2]:
warnings.filterwarnings('ignore')

In [3]:
# URL of the CSV file
csv_url = "https://data.cityofnewyork.us/resource/h9gi-nx95.csv"

In [4]:
# Load the CSV file directly from the URL
df = pd.read_csv(csv_url)

In [5]:
df.head()

Unnamed: 0,crash_date,crash_time,borough,zip_code,latitude,longitude,location,on_street_name,off_street_name,cross_street_name,...,contributing_factor_vehicle_2,contributing_factor_vehicle_3,contributing_factor_vehicle_4,contributing_factor_vehicle_5,collision_id,vehicle_type_code1,vehicle_type_code2,vehicle_type_code_3,vehicle_type_code_4,vehicle_type_code_5
0,2021-09-11T00:00:00.000,2:39,,,,,,WHITESTONE EXPRESSWAY,20 AVENUE,,...,Unspecified,,,,4455765,Sedan,Sedan,,,
1,2022-03-26T00:00:00.000,11:45,,,,,,QUEENSBORO BRIDGE UPPER,,,...,,,,,4513547,Sedan,,,,
2,2022-06-29T00:00:00.000,6:55,,,,,,THROGS NECK BRIDGE,,,...,Unspecified,,,,4541903,Sedan,Pick-up Truck,,,
3,2021-09-11T00:00:00.000,9:35,BROOKLYN,11208.0,40.667202,-73.8665,"\n, \n(40.667202, -73.8665)",,,1211 LORING AVENUE,...,,,,,4456314,Sedan,,,,
4,2021-12-14T00:00:00.000,8:13,BROOKLYN,11233.0,40.683304,-73.917274,"\n, \n(40.683304, -73.917274)",SARATOGA AVENUE,DECATUR STREET,,...,,,,,4486609,,,,,


In [6]:
# Select a subset of columns for simplicity
df = df[['collision_id', 'crash_date', 'crash_time', 'borough', 'latitude', 'longitude', 'on_street_name', 'number_of_persons_injured', 'number_of_persons_killed']]


In [7]:
# Convert the crash_date to datetime format
df['crash_date'] = pd.to_datetime(df['crash_date'])

In [8]:
json_str = df.to_json(orient='records', date_format='iso')
# print(json_str)


In [9]:
# Save it as a JSON file
df.to_json('nyc_traffic_incidents.json', orient='records', date_format = 'iso', lines=True)

In [10]:
df.head()

Unnamed: 0,collision_id,crash_date,crash_time,borough,latitude,longitude,on_street_name,number_of_persons_injured,number_of_persons_killed
0,4455765,2021-09-11,2:39,,,,WHITESTONE EXPRESSWAY,2,0
1,4513547,2022-03-26,11:45,,,,QUEENSBORO BRIDGE UPPER,1,0
2,4541903,2022-06-29,6:55,,,,THROGS NECK BRIDGE,0,0
3,4456314,2021-09-11,9:35,BROOKLYN,40.667202,-73.8665,,0,0
4,4486609,2021-12-14,8:13,BROOKLYN,40.683304,-73.917274,SARATOGA AVENUE,0,0


In [23]:
INDEX_BODY = {
    "mappings": {
        "properties": {
            "collision_id": {"type": "long"},
            "crash_date": {
                "type": "date",
                "format": "yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSSZ||yyyy-MM-dd"
            },
            "crash_time": {"type": "text"},
            "borough": {"type": "text"},
            "latitude": {"type": "float"},
            "longitude": {"type": "float"},
            "on_street_name": {"type": "text"},
            "number_of_persons_injured": {"type": "integer"},
            "number_of_persons_killed": {"type": "integer"}
        }
    },
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0
    }
}


In [12]:
es = Elasticsearch(
    [{'host': 'localhost', 'port': 9200, 'scheme': 'http'}],
    basic_auth=('elastic', '123456')
)

In [13]:
def preprocess_data(file_path):
    processed_data = []
    with open(file_path, 'r') as f:
        for line in f:
            doc = json.loads(line)
            # Remove fields with NaN values
            doc_clean = {k: v for k, v in doc.items() if pd.notna(v)}
            processed_data.append(doc_clean)
    return processed_data


In [14]:
def create_index(index):
    try:
        cr = es.indices.create(index=index, body=INDEX_BODY)
        print(f"Index '{index}' created successfully.")
    except Exception as e:
        print(f"Index creation error: {e}")

In [15]:
def insert_data_by_bulk(data, index):
    actions = [
        {
            '_index': index,
            '_source': doc
        }
        for doc in data
    ]
    
    try:
        success, failed = helpers.bulk(es, actions, stats_only=False, raise_on_error=False)
        print(f"Bulk indexing result: {success} succeeded, {len(failed)} failed")
        for error in failed:
            print(error)
    except Exception as e:
        print("Indexing Error:", e)

In [33]:
# Function to insert data by bulk
def insert_data_by_bulk_1(data, index):
    actions = [
        {
            "_index": index,
            "_source": doc
        }
        for doc in data
    ]

    try:
        # Use helpers.bulk to insert data in bulk
        response = helpers.bulk(es, actions)
        print(f"Indexed {response[0]} documents")
    except Exception as e:
        print(f"Error bulk indexing data: {e}")

In [47]:
def search_data_by_bulk(index):
    # res_data = es.search(index=index, body={"from": 0, "size": 10, "query": {"match": {'Type': 'Accident'}}})
    res_data = es.search(index=index, body={"from": 0, "size": 10, "query": {"match_all" : {} }})
    print("Search result:", res_data)
    return res_data

In [57]:
def search_data_by_bulk_1(index):
    query = {
        "query": {
            "range": {
                "number_of_persons_injured": {
                    "gt": 2  # Greater than 2
                }
            }
        }
    }
    res_data = es.search(index=index, body=query)
    print("Search result:", res_data)
    return res_data['hits']


In [27]:
index = "sales_index_new5"
# create_index(index)

index_new = es.indices.create(index=index, body= INDEX_BODY)


In [34]:
data = preprocess_data('nyc_traffic_incidents.json')

# print(data)

In [41]:
insert_data_by_bulk(data, index)

Bulk indexing result: 1000 succeeded, 0 failed


In [58]:
if __name__ == "__main__":
    # index = "sales_index_3"
    # create_index(index)
    # data = preprocess_data(json_str)
    # insert_data_by_bulk_1(data, index_name)
    search_data_by_bulk_1(index)

Search result: {'took': 6, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 23, 'relation': 'eq'}, 'max_score': 1.0, 'hits': [{'_index': 'sales_index_new5', '_id': 'I2WAKZABII3UBHuKmeYO', '_score': 1.0, '_source': {'collision_id': 4486991, 'crash_date': '2021-12-14T00:00:00.000', 'crash_time': '20:03', 'borough': 'BROOKLYN', 'latitude': 40.65068, 'longitude': -73.95881, 'number_of_persons_injured': 4, 'number_of_persons_killed': 0}}, {'_index': 'sales_index_new5', '_id': 'JGWAKZABII3UBHuKmeYO', '_score': 1.0, '_source': {'collision_id': 4486284, 'crash_date': '2021-12-14T00:00:00.000', 'crash_time': '1:28', 'on_street_name': 'MEEKER AVENUE', 'number_of_persons_injured': 3, 'number_of_persons_killed': 0}}, {'_index': 'sales_index_new5', '_id': 'ZWWAKZABII3UBHuKmeYO', '_score': 1.0, '_source': {'collision_id': 4485523, 'crash_date': '2021-12-08T00:00:00.000', 'crash_time': '19:30', 'on_street_name': 'OCEAN PARKWAY', 'num

In [2]:
es = Elasticsearch(
    [{'host': 'localhost', 'port': 9200, 'scheme': 'http'}],
    basic_auth=('elastic', '123456')
)

In [3]:
df = pd.read_csv("C:\\Users\\Sridevi\\Downloads\\supermarket_sales - Sheet1.csv")

In [4]:
df.head()

Unnamed: 0,Invoice ID,Branch,City,Customer type,Gender,Product line,Unit price,Quantity,Tax 5%,Total,Date,Time,Payment,cogs,gross margin percentage,gross income,Rating
0,750-67-8428,A,Yangon,Member,Female,Health and beauty,74.69,7,26.1415,548.9715,1/5/2019,13:08,Ewallet,522.83,4.761905,26.1415,9.1
1,226-31-3081,C,Naypyitaw,Normal,Female,Electronic accessories,15.28,5,3.82,80.22,3/8/2019,10:29,Cash,76.4,4.761905,3.82,9.6
2,631-41-3108,A,Yangon,Normal,Male,Home and lifestyle,46.33,7,16.2155,340.5255,3/3/2019,13:23,Credit card,324.31,4.761905,16.2155,7.4
3,123-19-1176,A,Yangon,Member,Male,Health and beauty,58.22,8,23.288,489.048,1/27/2019,20:33,Ewallet,465.76,4.761905,23.288,8.4
4,373-73-7910,A,Yangon,Normal,Male,Sports and travel,86.31,7,30.2085,634.3785,2/8/2019,10:37,Ewallet,604.17,4.761905,30.2085,5.3


In [13]:
data = df[:10]


data.columns

Index(['Invoice ID', 'Branch', 'City', 'Customer type', 'Gender',
       'Product line', 'Unit price', 'Quantity', 'Tax 5%', 'Total', 'Date',
       'Time', 'Payment', 'cogs', 'gross margin percentage', 'gross income',
       'Rating'],
      dtype='object')

In [14]:
required_columns = ['Invoice ID', 'Branch', 'City', 'Customer type', 'Gender',
                    'Product line', 'Unit price', 'Quantity', 'Tax 5%', 'Total', 'Date']

In [15]:
df = df[required_columns]

In [17]:
df.head()

Unnamed: 0,Invoice ID,Branch,City,Customer type,Gender,Product line,Unit price,Quantity,Tax 5%,Total,Date
0,750-67-8428,A,Yangon,Member,Female,Health and beauty,74.69,7,26.1415,548.9715,2019-05-01
1,226-31-3081,C,Naypyitaw,Normal,Female,Electronic accessories,15.28,5,3.82,80.22,2019-08-03
2,631-41-3108,A,Yangon,Normal,Male,Home and lifestyle,46.33,7,16.2155,340.5255,2019-03-03
3,123-19-1176,A,Yangon,Member,Male,Health and beauty,58.22,8,23.288,489.048,NaT
4,373-73-7910,A,Yangon,Normal,Male,Sports and travel,86.31,7,30.2085,634.3785,2019-08-02


In [27]:
# Convert date column to the correct format using .loc
df.loc[:, 'Date'] = pd.to_datetime(df['Date'], format='%d/%m/%Y', errors='coerce')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df.loc[:, 'Date'] = pd.to_datetime(df['Date'], format='%d/%m/%Y', errors='coerce')
  df.loc[:, 'Date'] = pd.to_datetime(df['Date'], format='%d/%m/%Y', errors='coerce')


In [28]:
index_name = "sales_index2"

mapping = {
    "mappings": {
        "properties": {
            "Invoice ID": {
                "type": "text",
                "fields": {
                    "keyword": {
                        "type": "keyword",
                        "ignore_above": 256
                    }
                }
            },
            "Branch": {
                "type": "text",
                "fields": {
                    "keyword": {
                        "type": "keyword",
                        "ignore_above": 256
                    }
                }
            },
            "City": {
                "type": "text",
                "fields": {
                    "keyword": {
                        "type": "keyword",
                        "ignore_above": 256
                    }
                }
            },
            "Customer type": {
                "type": "text",
                "fields": {
                    "keyword": {
                        "type": "keyword",
                        "ignore_above": 256
                    }
                }
            },
            "Gender": {
                "type": "text",
                "fields": {
                    "keyword": {
                        "type": "keyword",
                        "ignore_above": 256
                    }
                }
            },
            "Product line": {
                "type": "text",
                "fields": {
                    "keyword": {
                        "type": "keyword",
                        "ignore_above": 256
                    }
                }
            },
            "Unit price": {
                "type": "float"
            },
            "Quantity": {
                "type": "integer"
            },
            "Tax 5%": {
                "type": "float"
            },
            "Total": {
                "type": "float"
            },
            "Date": {
                "type": "date",
                "format": "d/M/yyyy"
            }
        }
    }
}




In [29]:
# Create the index with the updated mapping
es.indices.create(index=index_name, body=mapping, ignore=400)

  es.indices.create(index=index_name, body=mapping, ignore=400)


ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'sales_index2'})

In [30]:
# Function to generate actions for bulk indexing
def generate_actions(df, index_name):
    for _, row in df.iterrows():
        yield {
            "_index": index_name,
            "_source": row.dropna().to_dict(),  # Drop any NaN values to avoid indexing issues
        }

# Generate actions
actions = list(generate_actions(df, index_name))

In [31]:
print(actions)

[{'_index': 'sales_index2', '_source': {'Invoice ID': '750-67-8428', 'Branch': 'A', 'City': 'Yangon', 'Customer type': 'Member', 'Gender': 'Female', 'Product line': 'Health and beauty', 'Unit price': 74.69, 'Quantity': 7, 'Tax 5%': 26.1415, 'Total': 548.9715, 'Date': Timestamp('2019-05-01 00:00:00')}}, {'_index': 'sales_index2', '_source': {'Invoice ID': '226-31-3081', 'Branch': 'C', 'City': 'Naypyitaw', 'Customer type': 'Normal', 'Gender': 'Female', 'Product line': 'Electronic accessories', 'Unit price': 15.28, 'Quantity': 5, 'Tax 5%': 3.82, 'Total': 80.22, 'Date': Timestamp('2019-08-03 00:00:00')}}, {'_index': 'sales_index2', '_source': {'Invoice ID': '631-41-3108', 'Branch': 'A', 'City': 'Yangon', 'Customer type': 'Normal', 'Gender': 'Male', 'Product line': 'Home and lifestyle', 'Unit price': 46.33, 'Quantity': 7, 'Tax 5%': 16.2155, 'Total': 340.5255, 'Date': Timestamp('2019-03-03 00:00:00')}}, {'_index': 'sales_index2', '_source': {'Invoice ID': '123-19-1176', 'Branch': 'A', 'City'

In [25]:
# Perform bulk indexing with error handling
try:
    helpers.bulk(es, actions)
# except exceptions.BulkIndexError as e:
#     # Print detailed error information
#     print(f"Bulk indexing failed for {len(e.errors)} documents.")
#     for error in e.errors:
#         print(error)

SyntaxError: unexpected EOF while parsing (3948286325.py, line 8)

In [32]:
helpers.bulk(es, actions)

BulkIndexError: 229 document(s) failed to index.

In [33]:
es.indices.refresh(index=index_name)

ObjectApiResponse({'_shards': {'total': 2, 'successful': 1, 'failed': 0}})

In [34]:
# Count the documents in the index
doc_count = es.count(index=index_name)['count']
print(f'Document count in {index_name}: {doc_count}')

Document count in sales_index2: 271
