#### import packages

In [1]:
import pandas as pd
import json
import itertools
import sqlite3

#### define variables

In [2]:
#JSON file locations
business_json= 'C:/Users/Gebruiker/Jupyter/yelp_dataset/JSON/yelp_academic_dataset_business.json'
checkin_json=  'C:/Users/Gebruiker/Jupyter/yelp_dataset/JSON/yelp_academic_dataset_checkin.json'
review_json=   'C:/Users/Gebruiker/Jupyter/yelp_dataset/JSON/yelp_academic_dataset_review.json'
tip_json=      'C:/Users/Gebruiker/Jupyter/yelp_dataset/JSON/yelp_academic_dataset_tip.json'
user_json=     'C:/Users/Gebruiker/Jupyter/yelp_dataset/JSON/yelp_academic_dataset_user.json'

#SQLite3 connection
conn = sqlite3.connect('SQLITE3/yelp_database.sqlite3')
c = conn.cursor()

### I) BUSINESS DATA

#### 0) load & normalize JSON file

In [3]:
#open file as txt, read line per line, and place in a list
business_lines = open(business_json, 'r', encoding='utf-8').readlines()

#as python places a "/n" newline at the end of each line, we need to strip these to achieve valid JSON
    #then create a list of JSON dict objects, line per line
list_business_JSON_objects = []
for line in business_lines:
    list_business_JSON_objects.append(json.loads(line.strip()))

#create a normalized DF out of the JSON object list
business_df = pd.json_normalize(list_business_JSON_objects, max_level=1)

#drop empty columns
business_df = business_df.drop(['hours','attributes'], axis=1)

#column list for later use
complete_df_column_list = business_df.columns.tolist()

business_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 160585 entries, 0 to 160584
Data columns (total 58 columns):
 #   Column                                 Non-Null Count   Dtype  
---  ------                                 --------------   -----  
 0   business_id                            160585 non-null  object 
 1   name                                   160585 non-null  object 
 2   address                                160585 non-null  object 
 3   city                                   160585 non-null  object 
 4   state                                  160585 non-null  object 
 5   postal_code                            160585 non-null  object 
 6   latitude                               160585 non-null  float64
 7   longitude                              160585 non-null  float64
 8   stars                                  160585 non-null  float64
 9   review_count                           160585 non-null  int64  
 10  is_open                                160585 non-null  

#### 1) Main businesses table / general (level 1) attributes table / hours table

In [None]:
#split the original DF into 3 tables; 
#one "main":
business_main_df = business_df[['business_id','name','address','city','state','postal_code','latitude','longitude','stars','review_count','is_open','categories']].copy()

business_main_df.info()

In [None]:
#) create and fill SQLite3 table (including PK index)
c.execute('DROP TABLE IF EXISTS business_main')
conn.commit()

c.execute("""CREATE TABLE business_main(
                business_id text PRIMARY KEY
                ,name text
                ,address text
                ,city text
                ,state text
                ,postal_code text
                ,latitude real
                ,longitude real
                ,stars real
                ,review_count integer
                ,is_open integer
                ,categories text)""")
conn.commit()

business_main_df.to_sql('business_main', conn, if_exists='append', index = False)
conn.commit()

In [4]:
#one with general attributes:

#fetch relevant columns
attributes_column_list = [x for x in complete_df_column_list if x.startswith('attributes.')]
attributes_column_list.insert(0,'business_id')
general_attributes_df = business_df[attributes_column_list]

#drop 5 columns with nested objects (will be extracted later)
general_attributes_df = general_attributes_df.drop(['attributes.BusinessParking','attributes.Ambience','attributes.GoodForMeal','attributes.BestNights','attributes.Music'], axis=1)

#remove "attributes." from column names for use in SQL
general_attributes_df.columns = general_attributes_df.columns.str.replace('attributes.', '')

general_attributes_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 160585 entries, 0 to 160584
Data columns (total 35 columns):
 #   Column                      Non-Null Count   Dtype 
---  ------                      --------------   ----- 
 0   business_id                 160585 non-null  object
 1   RestaurantsTableService     19400 non-null   object
 2   WiFi                        59017 non-null   object
 3   BikeParking                 76480 non-null   object
 4   BusinessAcceptsCreditCards  120177 non-null  object
 5   RestaurantsReservations     45607 non-null   object
 6   WheelchairAccessible        29370 non-null   object
 7   Caters                      40140 non-null   object
 8   OutdoorSeating              50128 non-null   object
 9   RestaurantsGoodForGroups    45381 non-null   object
 10  HappyHour                   15237 non-null   object
 11  BusinessAcceptsBitcoin      17593 non-null   object
 12  RestaurantsPriceRange2      92442 non-null   object
 13  HasTV                       4

  general_attributes_df.columns = general_attributes_df.columns.str.replace('attributes.', '')


In [5]:
#) create and fill SQLite3 table (including FK index)
c.execute('DROP TABLE IF EXISTS attributes_general_staging')
conn.commit()

c.execute("""CREATE TABLE attributes_general_staging (
                    "business_id" TEXT
                    ,"RestaurantsTableService" integer
                    ,"WiFi" TEXT
                    ,"BikeParking" integer
                    ,"BusinessAcceptsCreditCards" integer
                    ,"RestaurantsReservations" integer
                    ,"WheelchairAccessible" integer
                    ,"Caters" integer
                    ,"OutdoorSeating" integer
                    ,"RestaurantsGoodForGroups" integer
                    ,"HappyHour" integer
                    ,"BusinessAcceptsBitcoin" integer
                    ,"RestaurantsPriceRange2" integer
                    ,"HasTV" integer
                    ,"Alcohol" TEXT
                    ,"DogsAllowed" integer
                    ,"RestaurantsTakeOut" integer
                    ,"NoiseLevel" TEXT
                    ,"RestaurantsAttire" TEXT
                    ,"RestaurantsDelivery" integer
                    ,"GoodForKids" integer
                    ,"ByAppointmentOnly" integer
                    ,"AcceptsInsurance" integer
                    ,"HairSpecializesIn" TEXT
                    ,"GoodForDancing" integer
                    ,"BYOB" integer
                    ,"CoatCheck" integer
                    ,"Smoking" TEXT
                    ,"DriveThru" integer
                    ,"BYOBCorkage" TEXT
                    ,"Corkage" integer
                    ,"RestaurantsCounterService" integer
                    ,"AgesAllowed" TEXT
                    ,"DietaryRestrictions" TEXT
                    ,"Open24Hours" integer
                    ,CONSTRAINT fk_business_id  
                    FOREIGN KEY (business_id)  
                    REFERENCES business_main(business_id))""")
conn.commit()

general_attributes_df.to_sql('attributes_general_staging', conn, if_exists='append', index = False)
conn.commit()

c.execute('CREATE INDEX IDX_attributes_general_staging on attributes_general_staging(business_id)')
conn.commit()

In [None]:
#and one with hours:

#fetch relevant columns
hours_column_list = [x for x in complete_df_column_list if x.startswith('hours.')]
hours_column_list.insert(0,'business_id')
hours_df = business_df[hours_column_list]

#remove "attributes." from column names for use in SQL
hours_df.columns = hours_df.columns.str.replace('hours.', '')

hours_df.info()

In [None]:
#) create and fill SQLite3 table (including FK index)
c.execute('DROP TABLE IF EXISTS business_hours')
conn.commit()

c.execute("""CREATE TABLE business_hours(
                business_id text
                ,Monday text
                ,Tuesday text
                ,Wednesday text
                ,Thursday text
                ,Friday text
                ,Saturday text
                ,Sunday text
                ,CONSTRAINT fk_business_id  
                FOREIGN KEY (business_id)  
                REFERENCES business_main(business_id))""")
conn.commit()

c.execute('CREATE INDEX IDX_business_hours on business_hours(business_id)')
conn.commit()

hours_df.to_sql('business_hours', conn, if_exists='append', index = False)
conn.commit()

#### level 2 normalization
> manual action required (instead of raising max_level attribute in (1)) because the business dataset contains nested object columns with unvalid JSON strings; for following columns: attributes.BusinessParking / attributes.Ambience / attributes.GoodForMeal / attributes.BestNights / attributes.Music

#### 2) attributes.BusinessParking

In [None]:
#A) the clean-up (including quotes, booleans and nan values)
business_df['attributes.BusinessParking'] = business_df['attributes.BusinessParking'].replace("'",'"', regex=True)
business_df['attributes.BusinessParking'] = business_df['attributes.BusinessParking'].replace('True','true', regex=True)
business_df['attributes.BusinessParking'] = business_df['attributes.BusinessParking'].replace('False','false', regex=True)
business_df['attributes.BusinessParking'] = business_df['attributes.BusinessParking'].replace('None','null', regex=True)

#B) the normalization where we only apply JSON loads for x==x (NaN receives empty dict)
BusinessParking_df = pd.json_normalize(business_df['attributes.BusinessParking'].apply(lambda x: json.loads(x) if(x == x) else {}))

#C) merge with business_id PK
BusinessParking_df = pd.merge(business_df['business_id'], BusinessParking_df, left_index=True, right_index=True)

BusinessParking_df.info()

In [None]:
#D) create and fill SQLite3 table (including FK index)
c.execute('DROP TABLE IF EXISTS attributes_BusinessParking')
conn.commit()

c.execute("""CREATE TABLE attributes_BusinessParking(
                business_id text
                ,garage integer
                ,street integer
                ,validated integer
                ,lot integer
                ,valet integer
                ,CONSTRAINT fk_business_id  
                FOREIGN KEY (business_id)  
                REFERENCES business_main(business_id))""")
conn.commit()

c.execute('CREATE INDEX IDX_attributes_BusinessParking on attributes_BusinessParking(business_id)')
conn.commit()

BusinessParking_df.to_sql('attributes_BusinessParking', conn, if_exists='append', index = False)
conn.commit()

#### 3) attributes.Ambience

In [None]:
#A) the clean-up (including quotes, booleans and nan values)
business_df['attributes.Ambience'] = business_df['attributes.Ambience'].replace("'",'"', regex=True)
business_df['attributes.Ambience'] = business_df['attributes.Ambience'].replace('True','true', regex=True)
business_df['attributes.Ambience'] = business_df['attributes.Ambience'].replace('False','false', regex=True)
business_df['attributes.Ambience'] = business_df['attributes.Ambience'].replace('None','null', regex=True)

#B) the normalization where we only apply JSON loads for x==x (NaN receives empty dict)
Ambience_df = pd.json_normalize(business_df['attributes.Ambience'].apply(lambda x: json.loads(x) if(x == x) else {}))

#C) merge with business_id PK
Ambience_df = pd.merge(business_df['business_id'], Ambience_df, left_index=True, right_index=True)

Ambience_df.info()

In [None]:
#D) create and fill SQLite3 table (including FK index)
c.execute('DROP TABLE IF EXISTS attributes_Ambience')
conn.commit()

c.execute("""CREATE TABLE attributes_Ambience(
                business_id text
                ,touristy integer
                ,hipster integer
                ,romantic integer
                ,divey integer
                ,intimate integer
                ,trendy integer
                ,upscale integer
                ,classy integer
                ,casual integer
                ,CONSTRAINT fk_business_id  
                FOREIGN KEY (business_id)  
                REFERENCES business_main(business_id))""")
conn.commit()

c.execute('CREATE INDEX IDX_attributes_Ambience on attributes_Ambience(business_id)')
conn.commit()

Ambience_df.to_sql('attributes_Ambience', conn, if_exists='append', index = False)
conn.commit()

#### 4) attributes.GoodForMeal

In [None]:
#A) the clean-up (including quotes, booleans and nan values)
business_df['attributes.GoodForMeal'] = business_df['attributes.GoodForMeal'].replace("'",'"', regex=True)
business_df['attributes.GoodForMeal'] = business_df['attributes.GoodForMeal'].replace('True','true', regex=True)
business_df['attributes.GoodForMeal'] = business_df['attributes.GoodForMeal'].replace('False','false', regex=True)
business_df['attributes.GoodForMeal'] = business_df['attributes.GoodForMeal'].replace('None','null', regex=True)

#B) the normalization where we only apply JSON loads for x==x (NaN receives empty dict)
GoodForMeal_df = pd.json_normalize(business_df['attributes.GoodForMeal'].apply(lambda x: json.loads(x) if(x == x) else {}))

#C) merge with business_id PK
GoodForMeal_df = pd.merge(business_df['business_id'], GoodForMeal_df, left_index=True, right_index=True)

GoodForMeal_df.info()

In [None]:
#D) create and fill SQLite3 table (including FK index)
c.execute('DROP TABLE IF EXISTS attributes_GoodForMeal')
conn.commit()

c.execute("""CREATE TABLE attributes_GoodForMeal(
                business_id text
                ,dessert integer
                ,latenight integer
                ,lunch integer
                ,dinner integer
                ,brunch integer
                ,breakfast integer
                ,CONSTRAINT fk_business_id  
                FOREIGN KEY (business_id)  
                REFERENCES business_main(business_id))""")
conn.commit()

c.execute('CREATE INDEX IDX_attributes_GoodForMeal on attributes_GoodForMeal(business_id)')
conn.commit()

GoodForMeal_df.to_sql('attributes_GoodForMeal', conn, if_exists='append', index = False)
conn.commit()

#### 5) attributes.BestNights

In [None]:
#A) the clean-up (including quotes, booleans and nan values)
business_df['attributes.BestNights'] = business_df['attributes.BestNights'].replace("'",'"', regex=True)
business_df['attributes.BestNights'] = business_df['attributes.BestNights'].replace('True','true', regex=True)
business_df['attributes.BestNights'] = business_df['attributes.BestNights'].replace('False','false', regex=True)
business_df['attributes.BestNights'] = business_df['attributes.BestNights'].replace('None','null', regex=True)

#B) the normalization where we only apply JSON loads for x==x (NaN receives empty dict)
BestNights_df = pd.json_normalize(business_df['attributes.BestNights'].apply(lambda x: json.loads(x) if(x == x) else {}))

#C) merge with business_id PK
BestNights_df = pd.merge(business_df['business_id'], BestNights_df, left_index=True, right_index=True)

BestNights_df.info()

In [None]:
#D) create and fill SQLite3 table (including PK index)
c.execute('DROP TABLE IF EXISTS attributes_BestNights')
conn.commit()

c.execute("""CREATE TABLE attributes_BestNights(
                business_id text PRIMARY KEY
                ,monday integer
                ,tuesday integer
                ,friday integer
                ,wednesday integer
                ,thursday integer
                ,sunday integer
                ,saturday integer
                ,CONSTRAINT fk_business_id  
                FOREIGN KEY (business_id)  
                REFERENCES business_main(business_id))""")
conn.commit()

c.execute('CREATE INDEX IDX_attributes_BestNights on attributes_BestNights(business_id)')
conn.commit()

BestNights_df.to_sql('attributes_BestNights', conn, if_exists='append', index = False)
conn.commit()

#### 6) attributes.Music

In [None]:
#A) the clean-up (including quotes, booleans and nan values)
business_df['attributes.Music'] = business_df['attributes.Music'].replace("'",'"', regex=True)
business_df['attributes.Music'] = business_df['attributes.Music'].replace('True','true', regex=True)
business_df['attributes.Music'] = business_df['attributes.Music'].replace('False','false', regex=True)
business_df['attributes.Music'] = business_df['attributes.Music'].replace('None','null', regex=True)

#B) the normalization where we only apply JSON loads for x==x (NaN receives empty dict)
Music_df = pd.json_normalize(business_df['attributes.Music'].apply(lambda x: json.loads(x) if(x == x) else {}))

#C) merge with business_id PK
Music_df = pd.merge(business_df['business_id'], Music_df, left_index=True, right_index=True)

Music_df.info()

In [None]:
#D) create and fill SQLite3 table (including PK index)
c.execute('DROP TABLE IF EXISTS attributes_Music')
conn.commit()

c.execute("""CREATE TABLE attributes_Music(
                business_id text PRIMARY KEY
                ,dj integer
                ,background_music integer
                ,no_music integer
                ,jukebox integer
                ,live integer
                ,video integer
                ,karaoke integer
                ,CONSTRAINT fk_business_id  
                FOREIGN KEY (business_id)  
                REFERENCES business_main(business_id))""")
conn.commit()

c.execute('CREATE INDEX IDX_attributes_Music on attributes_Music(business_id)')
conn.commit()

Music_df.to_sql('attributes_Music', conn, if_exists='append', index = False)
conn.commit()

### II) USER DATA

In [None]:
#we can use pd.read_json as there are no nested objects
user_df = pd.read_json(user_json, lines=True)

user_df.info()

In [None]:
#) create and fill SQLite3 table (including PK index)
c.execute('DROP TABLE IF EXISTS user')
conn.commit()

c.execute("""CREATE TABLE user(
                user_id text PRIMARY KEY
                ,name text
                ,review_count integer
                ,yelping_since text
                ,useful integer
                ,funny integer
                ,cool integer
                ,elite text
                ,friends integer
                ,fans integer
                ,average_stars real
                ,compliment_hot integer
                ,compliment_more integer
                ,compliment_profile integer
                ,compliment_cute integer
                ,compliment_list integer
                ,compliment_note integer
                ,compliment_plain integer
                ,compliment_cool integer
                ,compliment_funny integer
                ,compliment_writer integer
                ,compliment_photos integer)""")
conn.commit()

user_df.to_sql('user', conn, if_exists='append', index = False)
conn.commit()

### III) CHECK-IN DATA

In [None]:
#we can use pd.read_json as there are no nested objects
checkin_df = pd.read_json(checkin_json, lines=True)

checkin_df.info()

In [None]:
#) create and fill SQLite3 table (including PK index)
c.execute('DROP TABLE IF EXISTS checkin')
conn.commit()

c.execute("""CREATE TABLE checkin(
                business_id text
                ,date text
                ,CONSTRAINT fk_business_id  
                FOREIGN KEY (business_id)  
                REFERENCES business_main(business_id))""")
conn.commit()

c.execute('CREATE INDEX IDX_checkin on checkin(business_id)')
conn.commit()

checkin_df.to_sql('checkin', conn, if_exists='append', index = False)
conn.commit()

### IV) REVIEW DATA
>dataset contains no nested objects, however split is required as file is to large for local memory

In [None]:
#) create SQLite3 table (including PK index & Foreign keys)
c.execute('DROP TABLE IF EXISTS review')
conn.commit()

c.execute("""CREATE TABLE review(
                review_id text PRIMARY KEY
                ,user_id text
                ,business_id text
                ,stars real
                ,useful integer
                ,funny integer
                ,cool integer
                ,text text
                ,date text
                ,CONSTRAINT fk_user_id  
                FOREIGN KEY (user_id)  
                REFERENCES user(user_id)
                ,CONSTRAINT fk_business_id  
                FOREIGN KEY (business_id)  
                REFERENCES business_main(business_id))""")
conn.commit()

In [None]:
#open file as a whole
f = open(review_json, 'r', encoding='utf-8').readlines()

In [None]:
list_cache_A = []
for line in itertools.islice(f,0,2000000):
  line = line.strip()
  if not line: continue
  list_cache_A.append(json.loads(line))
review_df_A = pd.json_normalize(list_cache_A, max_level=1)

In [None]:
review_df_A.info()

In [None]:
review_df_A.to_sql('review', conn, if_exists='append', index = False)
conn.commit()

In [None]:
list_cache_B = []
for line in itertools.islice(f,2000000,4000000):
  line = line.strip()
  if not line: continue
  list_cache_B.append(json.loads(line))
review_df_B = pd.json_normalize(list_cache_B, max_level=1)

In [None]:
review_df_B.info()

In [None]:
review_df_B.to_sql('review', conn, if_exists='append', index = False)
conn.commit()

In [None]:
list_cache_C = []
for line in itertools.islice(f,4000000,6000000):
  line = line.strip()
  if not line: continue
  list_cache_C.append(json.loads(line))
review_df_C = pd.json_normalize(list_cache_C, max_level=1)

In [None]:
review_df_C.info()

In [None]:
review_df_C.to_sql('review', conn, if_exists='append', index = False)
conn.commit()

In [None]:
list_cache_D = []
for line in itertools.islice(f,6000000,9999999):
  line = line.strip()
  if not line: continue
  list_cache_D.append(json.loads(line))
review_df_D = pd.json_normalize(list_cache_D, max_level=1)

In [None]:
review_df_D.info()

In [None]:
review_df_D.to_sql('review', conn, if_exists='append', index = False)
conn.commit()

In [None]:
c.execute('CREATE INDEX IDX_review_business_id on review(business_id)')
c.execute('CREATE INDEX IDX_review_user_business on review(user_id)')
conn.commit()

### V) TIP DATA

In [None]:
#we can use pd.read_json as there are no nested objects
tip_df = pd.read_json(tip_json, lines=True)

tip_df.info()

In [None]:
#) create and fill SQLite3 table
c.execute('DROP TABLE IF EXISTS tip')
conn.commit()

c.execute("""CREATE TABLE tip(
                user_id text
                ,business_id text
                ,text text
                ,date text
                ,compliment_count integer
                ,CONSTRAINT fk_user_id  
                FOREIGN KEY (user_id)  
                REFERENCES user(user_id)
                ,CONSTRAINT fk_business_id  
                FOREIGN KEY (business_id)  
                REFERENCES business_main(business_id))""")
conn.commit()

tip_df.to_sql('tip', conn, if_exists='append', index = False)
conn.commit()

c.execute('CREATE INDEX IDX_tip_user_id on tip(user_id)')
c.execute('CREATE INDEX IDX_tip_business_id on tip(business_id)')
c.execute('CREATE INDEX IDX_tip_user_business on tip(user_id,business_id)')
conn.commit()