In [1]:
import pandas as pd
from dotenv import load_dotenv, find_dotenv
from elasticsearch import Elasticsearch
import os

load_dotenv(find_dotenv())

True

In [2]:
cloud_id = os.environ.get("CLOUD_ID")
api_key = os.environ.get("API_KEY")

In [3]:
es = Elasticsearch(cloud_id=cloud_id, api_key=api_key)

In [4]:
es.info()

ObjectApiResponse({'name': 'instance-0000000001', 'cluster_name': '9caa7647389a4f10a248732811d4a4ca', 'cluster_uuid': '8ogLGj2ZT7ywFoZnNQLygQ', 'version': {'number': '8.15.2', 'build_flavor': 'default', 'build_type': 'docker', 'build_hash': '98adf7bf6bb69b66ab95b761c9e5aadb0bb059a3', 'build_date': '2024-09-19T10:06:03.564235954Z', 'build_snapshot': False, 'lucene_version': '9.11.1', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'})

In [5]:
df = pd.read_json("../yelp_data/mo_business.json")
print(df.columns)
print(df.iloc[0].to_dict())

Index(['business_id', 'name', 'address', 'city', 'state', 'postal_code',
       'latitude', 'longitude', 'stars', 'review_count', 'is_open',
       'attributes', 'categories', 'hours'],
      dtype='object')
{'business_id': 'mpf3x-BjTdTEA3yCZrAYPw', 'name': 'The UPS Store', 'address': '87 Grasso Plaza Shopping Center', 'city': 'Affton', 'state': 'MO', 'postal_code': '63123', 'latitude': 38.551126, 'longitude': -90.335695, 'stars': 3.0, 'review_count': 15, 'is_open': 1, 'attributes': {'BusinessAcceptsCreditCards': 'True'}, 'categories': 'Shipping Centers, Local Services, Notaries, Mailbox Centers, Printing Services', 'hours': {'Monday': '0:0-0:0', 'Tuesday': '8:0-18:30', 'Wednesday': '8:0-18:30', 'Thursday': '8:0-18:30', 'Friday': '8:0-18:30', 'Saturday': '8:0-14:0'}}


In [6]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10913 entries, 0 to 10912
Data columns (total 14 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   business_id   10913 non-null  object 
 1   name          10913 non-null  object 
 2   address       10913 non-null  object 
 3   city          10913 non-null  object 
 4   state         10913 non-null  object 
 5   postal_code   10913 non-null  object 
 6   latitude      10913 non-null  float64
 7   longitude     10913 non-null  float64
 8   stars         10913 non-null  float64
 9   review_count  10913 non-null  int64  
 10  is_open       10913 non-null  int64  
 11  attributes    10047 non-null  object 
 12  categories    10907 non-null  object 
 13  hours         9450 non-null   object 
dtypes: float64(3), int64(2), object(9)
memory usage: 1.2+ MB


In [8]:
## fillna
df['attributes'] = df['attributes'].fillna({})
df['hours'] = df['hours'].fillna({})
df['categories'] = df['categories'].fillna("Unknown")

## dtype mapping
df['is_open'] = df['is_open'].astype(bool)
df['business_id'] = df['business_id'].astype(str)
df['name'] = df['name'].astype(str)
df['postal_code'] = df['postal_code'].astype(str)

In [9]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10913 entries, 0 to 10912
Data columns (total 14 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   business_id   10913 non-null  object 
 1   name          10913 non-null  object 
 2   address       10913 non-null  object 
 3   city          10913 non-null  object 
 4   state         10913 non-null  object 
 5   postal_code   10913 non-null  object 
 6   latitude      10913 non-null  float64
 7   longitude     10913 non-null  float64
 8   stars         10913 non-null  float64
 9   review_count  10913 non-null  int64  
 10  is_open       10913 non-null  bool   
 11  attributes    10047 non-null  object 
 12  categories    10913 non-null  object 
 13  hours         9450 non-null   object 
dtypes: bool(1), float64(3), int64(1), object(9)
memory usage: 1.1+ MB


In [6]:
index_name = "business_data"

mapping = {
    "mappings": {
        "properties": {
            "business_id": {"type": "keyword"},
            "name": {"type": "text"},
            "address": {"type": "text"},
            "city": {"type": "keyword"},
            "state": {"type": "keyword"},
            "postal_code": {"type": "keyword"},
            "latitude": {"type": "float"},
            "longitude": {"type": "float"},
            "stars": {"type": "float"},
            "review_count": {"type": "integer"},
            "is_open": {"type": "boolean"},
            "attributes": {
                "properties": {
                    "BusinessAcceptsCreditCards": {"type": "boolean"}
                }
            },
            "categories": {"type": "text"},
            "hours": {
                "properties": {
                    "Monday": {"type": "text"},
                    "Tuesday": {"type": "text"},
                    "Wednesday": {"type": "text"},
                    "Thursday": {"type": "text"},
                    "Friday": {"type": "text"},
                    "Saturday": {"type": "text"},
                    "Sunday": {"type": "text"}
                }
            }
        }
    }
}

## create business index

In [8]:
from elasticsearch.helpers import bulk

In [7]:
if not es.indices.exists(index=index_name):
    es.indices.create(index=index_name, body=mapping)
    print(f"Index '{index_name}' created successfully!")
else:
    print(f"Index '{index_name}' already exists.")

Index 'business_data' created successfully!


In [None]:
def upload_to_elastic(df, index_name):
    for _, row in df.iterrows():
        yield {
            "_index": index_name,
            "_id": row['business_id'],
            "_source": row.to_dict()
        }

for i, doc in enumerate(upload_to_elastic(df, index_name)):
    try:
        es.index(index=doc['_index'], id=doc['_id'], document=doc['_source'])
    except:
        print(i)

0
1
2
3
4
5
6
7
8
9
10
11
12
14
15
16
17
18
19
20
21
22
23
25
27
28
29
30
31
33
34
35
37
39
40
42
44
45
46
48
49
50
51
52
53
54
55
56
57
58
60
61
62
63
64
65
66
67
69
70
71
72
74
75
76
77
78
79
80
