In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, trim, regexp_replace, lower, concat, lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

import os
from pathlib import Path

project_dir = Path(os.getcwd()).parent
data_dir = project_dir / 'data'
raw_data_dir = data_dir / 'raw'
cleaned_data_dir = data_dir / 'cleaned'

In [None]:
print("File Name", '\t\t\t\t', 'Size in GB')
for file in raw_data_dir.glob('*.json'):
    print(file.name, ' \t', round(os.path.getsize(file) / 1e9, 2), 'GB')

In [None]:
spark = SparkSession.builder.appName('YelpDataEda').getOrCreate()

### Wrangling Businesses Data (yelp_academic_dataset_business.json)

In [None]:
business_schema = StructType([
    StructField("business_id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("address", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("postal_code", StringType(), True),
    StructField("latitude", StringType(), True),
    StructField("longitude", StringType(), True),
    StructField("stars", StringType(), True),
    StructField("review_count", IntegerType(), True),
    StructField("is_open", IntegerType(), True),
    StructField("categories", StringType(), True),
    StructField("attributes", StructType([
        StructField("Alcohol", StringType(), True),
        StructField("Ambience", StructType([
            StructField("casual", StringType(), True),
            StructField("classy", StringType(), True),
            StructField("divey", StringType(), True),
            StructField("hipster", StringType(), True),
            StructField("intimate", StringType(), True),
            StructField("romantic", StringType(), True),
            StructField("touristy", StringType(), True),
            StructField("trendy", StringType(), True),
            StructField("upscale", StringType(), True)
        ]), True),
        StructField("BikeParking", StringType(), True),
        StructField("BusinessAcceptsCreditCards", StringType(), True),
        StructField("BusinessParking", StructType([
            StructField("garage", StringType(), True),
            StructField("lot", StringType(), True),
            StructField("street", StringType(), True),
            StructField("valet", StringType(), True)
        ]), True),
        StructField("GoodForKids", StringType(), True),
        StructField("HasTV", StringType(), True),
        StructField("NoiseLevel", StringType(), True),
        StructField("OutdoorSeating", StringType(), True),
        StructField("RestaurantsAttire", StringType(), True),
        StructField("RestaurantsDelivery", StringType(), True),
        StructField("RestaurantsGoodForGroups", StringType(), True),
        StructField("RestaurantsPriceRange2", StringType(), True),
        StructField("RestaurantsReservations", StringType(), True),
        StructField("RestaurantsTakeOut", StringType(), True),
        StructField("WiFi", StringType(), True)
    ]), True),
    StructField("hours", StructType([
        StructField("Monday", StringType(), True),
        StructField("Tuesday", StringType(), True),
        StructField("Wednesday", StringType(), True),
        StructField("Thursday", StringType(), True),
        StructField("Friday", StringType(), True),
        StructField("Saturday", StringType(), True),
        StructField("Sunday", StringType(), True)
    ]), True)
])

In [None]:
business_df = spark.read.json(str(raw_data_dir / 'yelp_academic_dataset_business.json'), schema=business_schema)

In [None]:
# Extract attributes
business_df = business_df.withColumn("attr_alcohol", business_df.attributes.Alcohol) \
       .withColumn("attr_bike_parking", business_df.attributes.BikeParking) \
       .withColumn("attr_business_accepts_credit_cards", business_df.attributes.BusinessAcceptsCreditCards) \
       .withColumn("attr_good_for_kids", business_df.attributes.GoodforKids) \
       .withColumn("attr_has_tv", business_df.attributes.HasTV) \
       .withColumn("attr_noise_level", business_df.attributes.NoiseLevel) \
       .withColumn("attr_outdoor_seating", business_df.attributes.OutdoorSeating) \
       .withColumn("attr_restaurants_attire", business_df.attributes.RestaurantsAttire) \
       .withColumn("attr_restaurants_delivery", business_df.attributes.RestaurantsDelivery) \
       .withColumn("attr_restaurants_good_for_groups", business_df.attributes.RestaurantsGoodforGroups) \
       .withColumn("attr_restaurants_price_range2", business_df.attributes.RestaurantsPriceRange2) \
       .withColumn("attr_restaurants_reservations", business_df.attributes.RestaurantsReservations) \
       .withColumn("attr_restaurants_takeout", business_df.attributes.RestaurantsTakeOut) \
       .withColumn("attr_wifi", business_df.attributes.WiFi) \
       .withColumn("attr_ambience_casual", business_df.attributes.Ambience.casual) \
       .withColumn("attr_ambience_classy", business_df.attributes.Ambience.classy) \
       .withColumn("attr_ambience_divey", business_df.attributes.Ambience.divey) \
       .withColumn("attr_ambience_hipster", business_df.attributes.Ambience.hipster) \
       .withColumn("attr_ambience_intimate", business_df.attributes.Ambience.intimate) \
       .withColumn("attr_ambience_romantic", business_df.attributes.Ambience.romantic) \
       .withColumn("attr_ambience_touristy", business_df.attributes.Ambience.touristy) \
       .withColumn("attr_ambience_trendy", business_df.attributes.Ambience.trendy) \
       .withColumn("attr_ambience_upscale", business_df.attributes.Ambience.upscale) \
       .withColumn("attr_business_parking_garage", business_df.attributes.BusinessParking.garage) \
       .withColumn("attr_business_parking_lot", business_df.attributes.BusinessParking.lot) \
       .withColumn("attr_business_parking_street", business_df.attributes.BusinessParking.street) \
       .withColumn("attr_business_parking_valet", business_df.attributes.BusinessParking.valet)

# Extract hours
business_df = business_df.withColumn("hours_monday", business_df.hours.Monday) \
       .withColumn("hours_tuesday", business_df.hours.Tuesday) \
       .withColumn("hours_wednesday", business_df.hours.Wednesday) \
       .withColumn("hours_thursday", business_df.hours.Thursday) \
       .withColumn("hours_friday", business_df.hours.Friday) \
       .withColumn("hours_saturday", business_df.hours.Saturday) \
       .withColumn("hours_sunday", business_df.hours.Sunday)

# Drop original nested columns
business_df = business_df.drop("attributes", "hours")

In [None]:
# Long-form to Wide-form on categories column
business_df.select("categories").show(5, False) # Current categories column

In [None]:
business_df = business_df.withColumn("category", split("categories", ", "))

# Explode categories column
exploded_categories = business_df.select("business_id", "category", "categories").withColumn("category", explode("category"))
exploded_categories = exploded_categories.withColumn("category", regexp_replace("category", "&", " "))
exploded_categories = exploded_categories.withColumn("category", regexp_replace("category", " +", " "))
exploded_categories = exploded_categories.withColumn("category", regexp_replace("category", " ", "_"))
exploded_categories = exploded_categories.withColumn("category", regexp_replace("category", "-", "_"))
exploded_categories = exploded_categories.withColumn("category", regexp_replace("category", "_+", "_"))
exploded_categories = exploded_categories.withColumn("category", regexp_replace("category", "'", ""))
exploded_categories = exploded_categories.withColumn("category", regexp_replace("category", "&", "and"))
exploded_categories = exploded_categories.withColumn("category", regexp_replace("category", "/", "_or_"))
exploded_categories = exploded_categories.withColumn("category", regexp_replace("category", "\(", ""))
exploded_categories = exploded_categories.withColumn("category", regexp_replace("category", "\)", ""))
exploded_categories = exploded_categories.withColumn("category", trim("category"))
exploded_categories = exploded_categories.withColumn("category", lower("category"))
exploded_categories = exploded_categories.withColumn("category", concat(lit("cat_"), exploded_categories.category))

In [None]:
exploded_categories.select("business_id", "category", "categories").show(10, False) # Exploded categories column

In [None]:
# exploded_categories.toPandas().to_csv(str(raw_data_dir / 't_categories.csv'), index=False)

In [None]:
exploded_categories.printSchema()

In [None]:
# Pivot the resulting rows into columns
pivoted_df = exploded_categories.groupBy("business_id").pivot("category").count().na.fill(0)
pivoted_df.show(5, False)

In [None]:
# Join the pivoted DataFrame back to the original DataFrame
joined_df = business_df.join(pivoted_df, "business_id", "left")
joined_df.show(5, False)

In [None]:
# Remove the original categories column and category column
joined_df = joined_df.drop("categories", "category")
joined_df.show(5, False)

In [None]:
# Change joined_df to pandas and Write to csv
# joined_df.toPandas().to_csv(str(cleaned_data_dir / 'business.csv'), index=False)

In [None]:
# review_df = spark.read.json(str(data_dir / 'yelp_academic_dataset_review.json'))
# user_df = spark.read.json(str(data_dir / 'yelp_academic_dataset_user.json'))