In [1]:
#Data analysis libs
import pandas as pd
import numpy as np
from matplotlib import  pyplot as mp

#Other Utilities
import os
from collections import defaultdict
import datetime
from dateutil import parser
import logging
import json

#ETL processing library
import findspark
findspark.init()

from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql import functions as F

**We will use pyspark to process these data. Though the data can be accomodated in the main memory and can be processed using Pandas, it can grow in future and thus can become difficult to process in main memory. So, I am using pyspark for ETL processing.**

In [None]:
spark = SparkSession.builder.master("local").appName(name = "Yelp Data Lake").getOrCreate()

#logger = logging.basicConfig(filename = 'D:/Capstone/logging.log' , filemode = 'a' , level = 'DEBUG')

#Reading configuration data
config_file_path = os.path.join(os.getcwd(), "config.json")
with open(config_file_path) as config_file:
    config_data = json.load(config_file)

#Landing Zone path & File Names
landing_zone = config_data['local_paths']['landing_zone']
lake_path = config_data['local_paths']['lake_path']
Business_dataset_file = "yelp_academic_dataset_business.json"
Checkin_dataset_file = "yelp_academic_dataset_checkin.json"
photo_dataset_file = "yelp_academic_dataset_photo.json"
review_dataset_file = "yelp_academic_dataset_review.json"
tip_dataset_file = "yelp_academic_dataset_tip.json"
user_dataset_file = "yelp_academic_dataset_user.json"

In [13]:
def process_business_dataset():
    business_file = os.path.join(landing_zone , Business_dataset_file)

    #Reading the json file in spark dataframe
    business_df = spark.read.json(business_file)

    #As we can see the json is a nested json. We want to flatten it to work on tabular data rather than the nested data. We will 
    # a seperate dataframe for the attribute column, wich will hold all the nested attributes in tabular format as well as it will
    # hold the business_id which can later be used to join it with other datasets.
    #Before all that we have to remove all the null values in the attributes column.
    attribute_df = business_df.select(["business_id","attributes.*"])
    open_hours_df = business_df.select(["business_id","hours.*"])
    business_df = business_df.drop('attributes','hours')
    

    #Let's write all 3 dataframes to 3 different files. I will save the files in parquet format so that it consumes less space
    business_df.write.parquet(os.path.join(lake_path, "business") , mode = 'overwrite', compression='snappy')
    attribute_df.write.parquet(os.path.join(lake_path, "business_attribute"), mode = 'overwrite', compression='snappy')
    open_hours_df.write.parquet(os.path.join(lake_path, "opening_time"), mode = 'overwrite', compression='snappy')

    #Now that we have worked on Business files and the resultant is 3 new files. 1.) Attributes file -> containing business 
    #attributes, 2.) Open hours file -> containing the working hours of the business 3.) Business File -> having all other 
    #business details. 

In [14]:
def process_tip_dataset():
    tip_file = os.path.join(landing_zone , tip_dataset_file)
    tip_df = spark.read.json(tip_file)

    #Tip data files looks completely fine and should work well with database as well as pandas. Though the file contains data
    #as old as year 2009, we will keep it as of now as it might help drawing some inference for data analyst or data scientist.
    #And the dataset is not huge, will work well with pandas in memory computation.
    #Let's just save it as parquet file

    tip_df.write.parquet(os.path.join(lake_path , "tip_files"), mode='overwrite', compression='snappy')

In [15]:
def process_photo_dataset():
    photo_file = os.path.join(landing_zone, photo_dataset_file)
    photo_df = spark.read.json(photo_file)
    #Let's fill blank values in caption column with Null
    photo_df = photo_df.replace('',None)
    photo_df.write.parquet(os.path.join(lake_path,'photo'), mode = 'overwrite' , compression = 'snappy')

In [16]:
def process_review_dataset():
    review_file = os.path.join(landing_zone, review_dataset_file)
    review_df = spark.read.json(review_file)
    #review_df.show()

    #The dataset has reviews as old as 2004, for our analysis, we will consider only the last 5 years of data because 
    #it does not make any sense to analyse 2004 data. Last 5 year data can give us better understanding of how the business is
    #doing in current scenarios.
    
    #Fetching maximum date in the dataset
    max_date_rdd = review_df.selectExpr('max(date)').rdd.take(1)
    max_date = datetime.date.fromisoformat(max_date_rdd[0][0])
    
    #Taking the date 5 years back from the max date
    date_filter = datetime.date(max_date.year - 5, max_date.month, max_date.day)
    
    #Filtering last 5 years records
    review_df = review_df.filter(review_df.date > date_filter)

    #There are some special characters in the text column such as \n \t. We need to remove those to handle the data better.
    
    #Replacing special characters like \n,\t,\b,\r,\,/,\",  
    review_df = review_df.withColumn('text' , F.regexp_replace('text','[\n\t\b\r\\\\\/\\"]',''))

    #Data looks ok after this. Let's write it to the file
    review_df.write.parquet(os.path.join(lake_path , 'review'), mode = 'overwrite' , compression='snappy')

In [17]:
def process_checkin_dataset():   
    checkin_file = os.path.join(landing_zone , Checkin_dataset_file)
    checkin_df = spark.read.json(checkin_file)

    #It's a nested json file. Let's first flatten it so that it can be processed later into table or other suitable structure.
    checkin_df = checkin_df.select('business_id','time.*')

    #The column sequence looks messed up. Let's change the sequence.
    l = ['business_id']        
    [l.append(d + '-' + str(i)) for d in ['Mon','Tue','Wed','Thu','Fri','Sat','Sun'] for i in range(24)]
    checkin_df = checkin_df.select(l)

    #Data looks ok now, Let's write to file
    checkin_df.write.parquet(os.path.join(lake_path, 'checkin'), mode = 'overwrite' , compression='snappy')

In [18]:
def process_user_dataset():   
    user_file = os.path.join(landing_zone , user_dataset_file)
    user_df = spark.read.json(user_file)

    #The dataset looks clean and in proper format. Let's just save it as it is.
    user_df.write.parquet(os.path.join(lake_path, 'user'), mode = 'overwrite' , compression='snappy')

In [26]:
if __name__ == "__main__":
    '''process_business_dataset()
    process_tip_dataset()
    process_photo_dataset()
    process_review_dataset()
    process_checkin_dataset()'''
    process_user_dataset()

***Compression Ratio (Before Data Lake/ After Data Lake )***

***Landing Zone directory size => 6.84 GB***

***Lake directory size => 3.43 GB***

***Almost 50% compression ration. Not the best but will work ok.***