# AWS OpenSearch Demo

In [1]:
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
import boto3
import pandas as pd
import json
import time
from tqdm.notebook import tqdm
from tqdm import tqdm
from datetime import datetime

In [2]:
crimes = pd.read_csv('data/Police_Department_Incident_Reports__2018_to_Present.csv', 
                      encoding = 'ISO-8859-1',
                      usecols = ['Incident Datetime', 'Row ID', 'Report Type Description', 'Incident Description', 'Incident Category', 'Incident Subcategory', 'Police District', 'Latitude', 'Longitude'])
new_names = {'Incident Datetime': 'datetime',
             'Row ID': 'row_id',
             'Report Type Description': 'report_type_description',
             'Incident Description': 'incident_description',
             'Incident Category': 'incident_category',
             'Incident Subcategory': 'incident_subcategory',
             'Police District': 'police_district',
             'Latitude': 'latitude',
             'Longitude': 'longitude'}

crimes.rename(columns=new_names, inplace=True)
crimes = crimes[(crimes['latitude'] != 0) & (crimes['longitude'] != 0)] # Remove bad geolocation data
crimes.dropna(subset=['latitude', 'longitude'], inplace=True)
def parse_date(date_str):
    return datetime.strptime(date_str, '%Y/%m/%d %I:%M:%S %p').strftime('%Y-%m-%dT%H:%M:%S%z')
crimes['datetime'] = crimes['datetime'].apply(parse_date)

def combine_lat_long(df):
    df['coordinate'] = df['latitude'].astype(str) + ',' + df['longitude'].astype(str)
    df.drop(['latitude', 'longitude'], axis=1, inplace=True)
    return df
crimes = combine_lat_long(crimes)
crimes

Unnamed: 0,datetime,row_id,report_type_description,incident_category,incident_subcategory,incident_description,police_district,coordinate
9,2023-03-11T14:00:00,125431804134,Initial,Assault,Simple Assault,Battery,Park,"37.772895177200766,-122.45428511766733"
11,2022-06-27T12:00:00,125439371000,Initial,Lost Property,Lost Property,Lost Property,Central,"37.78735926098589,-122.40822672700406"
13,2023-03-16T17:30:00,125482604134,Initial,Assault,Simple Assault,Battery,Bayview,"37.76228996810526,-122.40132418490649"
33,2023-03-21T15:50:00,125656351040,Initial,Non-Criminal,Non-Criminal,Aided Case,Northern,"37.787037946181535,-122.41827098126804"
61,2021-08-22T09:40:00,106267662071,Initial,Warrant,Other,Probation Search,Northern,"37.79397724418211,-122.42980398313114"
...,...,...,...,...,...,...,...,...
717844,2023-03-22T13:30:00,125685303401,Initial,Robbery,Robbery - Commercial,"Shoplifting, Force against Agent",Mission,"37.75300402962228,-122.40633623830558"
717845,2023-03-22T00:13:00,125665206371,Initial,Larceny Theft,Larceny Theft - Other,"Theft, Other Property, <$50",Northern,"37.78899525864335,-122.44062361999508"
717846,2023-03-21T21:19:00,125664272000,Initial,Non-Criminal,Non-Criminal,Found Property,Ingleside,"37.70880633652071,-122.45266472219632"
717847,2023-03-22T15:28:00,125687306303,Initial,Larceny Theft,Larceny Theft - From Building,"Theft, From Building, $200-$950",Central,"37.79056523652957,-122.40557374633904"


In [3]:
crimes_documents = crimes.to_dict(orient="records")
crimes_documents[0]

{'datetime': '2023-03-11T14:00:00',
 'row_id': 125431804134,
 'report_type_description': 'Initial',
 'incident_category': 'Assault',
 'incident_subcategory': 'Simple Assault',
 'incident_description': 'Battery',
 'police_district': 'Park',
 'coordinate': '37.772895177200766,-122.45428511766733'}

In [15]:
region = 'us-east-1'
credentials = boto3.Session().get_credentials()
auth = AWSV4SignerAuth(credentials, region)

client = OpenSearch(
    hosts = [{'host': 'search-opensearch-project-z34prjyzqkwuwpshp6uyca6xie.us-east-1.es.amazonaws.com', 'port': 443}],
    http_auth = auth,
    use_ssl = True,
    verify_certs = True,
    connection_class = RequestsHttpConnection
)

client.info()

{'name': '380969d9250b4d034c1de4ecc88fb85b',
 'cluster_name': '946540787295:opensearch-project',
 'cluster_uuid': 'fRQaPPnAR7-7MCp02IcPwA',
 'version': {'distribution': 'opensearch',
  'number': '2.5.0',
  'build_type': 'tar',
  'build_hash': 'unknown',
  'build_date': '2023-03-22T09:39:31.819469Z',
  'build_snapshot': False,
  'lucene_version': '9.4.2',
  'minimum_wire_compatibility_version': '7.10.0',
  'minimum_index_compatibility_version': '7.0.0'},
 'tagline': 'The OpenSearch Project: https://opensearch.org/'}

In [12]:
#client.indices.delete('crimes')

In [13]:
# Check if the index was already created
if not client.indices.exists(index="crimes"):
    # Create the crimes index using a mapping
    mapping = {
            "mappings": {
            "properties": {
                "datetime": {"type": "date"},
                "row_id": {"type": "keyword"},
                "report_type_description": {"type": "text"},
                "incident_category": {"type": "keyword"},
                "incident_subcategory": {"type": "keyword"},
                "incident_description": {"type": "text"},
                "police_district": {"type": "keyword"},
                "coordinate": {"type": "geo_point"},
            }
        }
    }
    client.indices.create(index="crimes", body=mapping)
    print("Created crimes index.")
else:
    print("The crimes index already exists!")

Created crimes index.


In [14]:
chunk_size = 1000 # Number of documents in a chunk.
chunks = [crimes_documents[index:index + chunk_size] for index in range(0, len(crimes_documents), chunk_size)]

# Process each chunk
for chunk in tqdm(chunks, desc = 'Processing documents', unit = 'chunk'):
    body_list = [] # Save all the OpenSearch commands for this chunk. 

    # Add each document to the bulk insert operation
    for doc in chunk:
        body_list.append({'index': {'_index': 'crimes', '_id': doc['row_id']}})
        body_list.append(doc)
    
    response = client.bulk(body='\n'.join([json.dumps(b) for b in body_list])+'\n') # Commence bulk inserts

Processing documents: 100%|██████████| 680/680 [02:05<00:00,  5.42chunk/s]
