In [39]:
import pandas as pd
from dotenv import load_dotenv, find_dotenv
from elasticsearch import Elasticsearch
import os

load_dotenv(find_dotenv())

True

In [40]:
cloud_id = os.environ.get("CLOUD_ID")
api_key = os.environ.get("API_KEY")

In [41]:
es = Elasticsearch(cloud_id=cloud_id, api_key=api_key)

In [42]:
es.info()

ObjectApiResponse({'name': 'instance-0000000001', 'cluster_name': '9caa7647389a4f10a248732811d4a4ca', 'cluster_uuid': '8ogLGj2ZT7ywFoZnNQLygQ', 'version': {'number': '8.15.2', 'build_flavor': 'default', 'build_type': 'docker', 'build_hash': '98adf7bf6bb69b66ab95b761c9e5aadb0bb059a3', 'build_date': '2024-09-19T10:06:03.564235954Z', 'build_snapshot': False, 'lucene_version': '9.11.1', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'})

In [43]:
df = pd.read_json("../yelp_data/mo_business.json")
print(df.columns)
print(df.iloc[0].to_dict())

Index(['business_id', 'name', 'address', 'city', 'state', 'postal_code',
       'latitude', 'longitude', 'stars', 'review_count', 'is_open',
       'attributes', 'categories', 'hours'],
      dtype='object')
{'business_id': 'mpf3x-BjTdTEA3yCZrAYPw', 'name': 'The UPS Store', 'address': '87 Grasso Plaza Shopping Center', 'city': 'Affton', 'state': 'MO', 'postal_code': '63123', 'latitude': 38.551126, 'longitude': -90.335695, 'stars': 3.0, 'review_count': 15, 'is_open': 1, 'attributes': {'BusinessAcceptsCreditCards': 'True'}, 'categories': 'Shipping Centers, Local Services, Notaries, Mailbox Centers, Printing Services', 'hours': {'Monday': '0:0-0:0', 'Tuesday': '8:0-18:30', 'Wednesday': '8:0-18:30', 'Thursday': '8:0-18:30', 'Friday': '8:0-18:30', 'Saturday': '8:0-14:0'}}


In [44]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10913 entries, 0 to 10912
Data columns (total 14 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   business_id   10913 non-null  object 
 1   name          10913 non-null  object 
 2   address       10913 non-null  object 
 3   city          10913 non-null  object 
 4   state         10913 non-null  object 
 5   postal_code   10913 non-null  object 
 6   latitude      10913 non-null  float64
 7   longitude     10913 non-null  float64
 8   stars         10913 non-null  float64
 9   review_count  10913 non-null  int64  
 10  is_open       10913 non-null  int64  
 11  attributes    10047 non-null  object 
 12  categories    10907 non-null  object 
 13  hours         9450 non-null   object 
dtypes: float64(3), int64(2), object(9)
memory usage: 1.2+ MB


In [45]:
def clean_boolean(value):
    if isinstance(value, str):
        return value.lower() == 'true'
    return bool(value) if value is not None else None

In [46]:
## fillna
df['attributes'] = df['attributes'].apply(lambda x: {} if pd.isna(x) else x)
df['attributes'] = df['attributes'].apply(lambda x: {'BusinessAcceptsCreditCards': clean_boolean(x.get('BusinessAcceptsCreditCards'))} if isinstance(x, dict) else {})
df['hours'] = df['hours'].apply(lambda x: {} if pd.isna(x) else x)
df['categories'] = df['categories'].fillna("Unknown")

## dtype mapping
df['is_open'] = df['is_open'].astype(bool)
df['business_id'] = df['business_id'].astype(str)
df['name'] = df['name'].astype(str)
df['postal_code'] = df['postal_code'].astype(str)

In [47]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10913 entries, 0 to 10912
Data columns (total 14 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   business_id   10913 non-null  object 
 1   name          10913 non-null  object 
 2   address       10913 non-null  object 
 3   city          10913 non-null  object 
 4   state         10913 non-null  object 
 5   postal_code   10913 non-null  object 
 6   latitude      10913 non-null  float64
 7   longitude     10913 non-null  float64
 8   stars         10913 non-null  float64
 9   review_count  10913 non-null  int64  
 10  is_open       10913 non-null  bool   
 11  attributes    10913 non-null  object 
 12  categories    10913 non-null  object 
 13  hours         10913 non-null  object 
dtypes: bool(1), float64(3), int64(1), object(9)
memory usage: 1.1+ MB


In [49]:
index_name = "business_data"

mapping = {
    "mappings": {
        "properties": {
            "business_id": {"type": "keyword"},
            "name": {"type": "text"},
            "address": {"type": "text"},
            "city": {"type": "keyword"},
            "state": {"type": "keyword"},
            "postal_code": {"type": "keyword"},
            "location": { "type": "geo_point" },
            "latitude": {"type": "float"},
            "longitude": {"type": "float"},
            "stars": {"type": "float"},
            "review_count": {"type": "integer"},
            "is_open": {"type": "boolean"},
            "attributes": {
                "properties": {
                    "BusinessAcceptsCreditCards": {"type": "boolean"}
                }
            },
            "categories": {"type": "text"},
            "hours": {
                "properties": {
                    "Monday": {"type": "text"},
                    "Tuesday": {"type": "text"},
                    "Wednesday": {"type": "text"},
                    "Thursday": {"type": "text"},
                    "Friday": {"type": "text"},
                    "Saturday": {"type": "text"},
                    "Sunday": {"type": "text"}
                }
            }
        }
    }
}

## create business index

In [50]:
from elasticsearch.helpers import bulk

In [51]:
if not es.indices.exists(index=index_name):
    es.indices.create(index=index_name, body=mapping)
    print(f"Index '{index_name}' created successfully!")
else:
    print(f"Index '{index_name}' already exists.")

Index 'business_data' created successfully!


In [52]:
def upload_to_elastic(df, index_name):
    for _, row in df.iterrows():
        row_data = row.to_dict()
        row_data['location'] = {
            'lat': row_data['latitude'],
            'lon': row_data['longitude']
        }
        yield {
            "_index": index_name,
            "_id": row_data['business_id'],
            "_source": row_data
        }

try:
    response = bulk(es, upload_to_elastic(df, index_name))
    print("Data uploaded successfully:", response)
except Exception as e:
    print("Error uploading data:", e)

Data uploaded successfully: (10913, [])


## Keyword search

In [53]:
search_query = {
    "query": {
        "match": {
            "name": "pizza"
        }
    }
}

response = es.search(index="business_data", body=search_query)

for hit in response['hits']['hits']:
    print(f"Business Name: {hit['_source']['name']}, Score: {hit['_score']}")

Business Name: Imo's Pizza, Score: 4.2314672
Business Name: Domino's Pizza, Score: 4.2314672
Business Name: Elicia's Pizza, Score: 4.2314672
Business Name: Reavis Pizza, Score: 4.2314672
Business Name: Imo's Pizza, Score: 4.2314672
Business Name: Imo's Pizza, Score: 4.2314672
Business Name: Marco's Pizza, Score: 4.2314672
Business Name: Jet's Pizza, Score: 4.2314672
Business Name: Pizza Hut, Score: 4.2314672
Business Name: Pizza Hut, Score: 4.2314672


In [56]:
df['latitude'], df['longitude']

(0        38.551126
 1        38.627695
 2        38.565165
 3        38.613576
 4        38.617272
            ...    
 10908    38.572572
 10909    38.715094
 10910    38.660185
 10911    38.641988
 10912    38.631909
 Name: latitude, Length: 10913, dtype: float64,
 0       -90.335695
 1       -90.340465
 2       -90.321087
 3       -90.322383
 4       -90.212784
            ...    
 10908   -90.333021
 10909   -90.455972
 10910   -90.225247
 10911   -90.261655
 10912   -90.444269
 Name: longitude, Length: 10913, dtype: float64)

## Geo-spatial search

In [57]:
top_left = {"lat": 38.71, "lon": -90.45}     
bottom_right = {"lat": 38.55, "lon": -90.21} 

search_query = {
    "query": {
        "geo_bounding_box": {
            "location": {  
                "top_left": top_left,
                "bottom_right": bottom_right
            }
        }
    }
}

response = es.search(index="business_data", body=search_query)

for hit in response['hits']['hits']:
    print(f"Business Name: {hit['_source']['name']}, Location: {hit['_source']['location']}")

Business Name: The UPS Store, Location: {'lat': 38.551126, 'lon': -90.335695}
Business Name: Famous Footwear, Location: {'lat': 38.627695, 'lon': -90.340465}
Business Name: Tsevi's Pub And Grill, Location: {'lat': 38.5651648, 'lon': -90.3210868}
Business Name: TKO DJs, Location: {'lat': 38.6135761, 'lon': -90.3223828}
Business Name: Ricardo's Italian Cafe, Location: {'lat': 38.6172724, 'lon': -90.2127841}
Business Name: Sunset 44 Bistro, Location: {'lat': 38.5832229, 'lon': -90.4071872}
Business Name: Budweiser Brewery Experience, Location: {'lat': 38.6001971, 'lon': -90.2135377}
Business Name: Passport Health, Location: {'lat': 38.6361966, 'lon': -90.3457542}
Business Name: McDonald's, Location: {'lat': 38.6124951839, 'lon': -90.2219420671}
Business Name: Eddie's Southtown Donuts, Location: {'lat': 38.5857552, 'lon': -90.2768425}
