# Project 1 – databricks

## Overview
 My company would like some queryable datasets instead of looking into these raw yelp data.

 I will read the raw data, then join and clean the data to create a unified dataset that is queryable. Finally, based on some real-world business questions, I will query the unified dataset and obtain results.

## Raw to Bronze

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace
from pyspark.sql.functions import col

# set Spark session
spark = SparkSession.builder \
    .appName("Yelp Data Processing") \
    .getOrCreate()


# read each json file
def read_json(file_path):
    infer_schema = "false"
    first_row_is_header = "false"
    delimiter = ","
    file_type = "json"
    df = spark.read.format(file_type) \
        .option("inferSchema", infer_schema) \
        .option("header", first_row_is_header) \
        .option("sep", delimiter) \
        .load(file_path)
    print(file_path)
    print(df.columns)
    print()
    return df

# row to bronze function, read all the datas and union them
def raw_to_bronze(file_paths):
    # read the columns of tip
    df_tip = read_json(file_paths[0]).select(
            "user_id",
            "business_id",
            "text",
            "date",
            "compliment_count"
        )
    #rename the columns
    df_tip = df_tip.withColumnRenamed("text", "tip_text") \
               .withColumnRenamed("date", "tip_date")

    # read the columns of checkin
    df_checkin = read_json(file_paths[1])
    df_checkin = df_checkin.withColumn("business_id", regexp_replace(col("business_id"), "-", ""))
    df_checkin = df_checkin.select(
        "date",
        "business_id"
    )

    # read the columns of business
    df_business = read_json(file_paths[2]).select(
        "business_id",
        "name",
        "address",
        "city",
        "state",
        "postal_code",
        "latitude",
        "longitude",
        "Stars",
        "Review_count",
        "is_open",
        "attributes",
        "categories",
        "hours"
    )
    #rename the columns
    df_business = df_business.withColumnRenamed("review_count", "business_review_count") \
        .withColumnRenamed("stars", "business_stars") \
        .withColumnRenamed("name", "business_name") 
    df_review_1 = read_json(file_paths[3])
    df_review_2 = read_json(file_paths[4])
    df_review_3 = read_json(file_paths[5])
    df_user_1 = read_json(file_paths[6])
    df_user_2 = read_json(file_paths[7])

    # read the columns of review
    df_review = df_review_1.union(df_review_2).union(df_review_3).select(
        "review_id",
        "user_id",
        "business_id",
        "stars",
        "useful",
        "funny",
        "cool",
        "text",
        "date"
    )
    #rename the columns
    df_review = df_review.withColumnRenamed("cool", "review_cool") \
                     .withColumnRenamed("text", "review_text") \
                     .withColumnRenamed("date", "review_date") \
                     .withColumnRenamed("funny", "review_funny") \
                     .withColumnRenamed("stars", "review_stars")

    # delete df_review_1、df_review_2和df_review_3
    df_review_1.unpersist()
    df_review_2.unpersist()
    df_review_3.unpersist()

    # read the columns of user
    df_user = df_user_1.union(df_user_2).select(
        "user_id",
        "name",
        "review_count",
        "yelping_since",
        "useful",
        "funny",
        "cool",
        "elite",
        "friends"
    )
    #rename the columns
    df_user = df_user.withColumnRenamed("cool", "user_cool") \
                 .withColumnRenamed("funny", "user_funny") \
                 .withColumnRenamed("useful", "user_useful") \
                 .withColumnRenamed("review_count", "user_review_count") \
                .withColumnRenamed("name", "user_name") 

    # delete df_user_1、df_user_2
    df_user_1.unpersist()
    df_user_2.unpersist()

    df_all = df_business.join(df_review, "business_id", "left_outer") \
    .join(df_checkin, "business_id", "left_outer") \
    .join(df_user, "user_id", "left_outer") \
    .join(df_tip, ["user_id","business_id"], "left_outer") 

    #return df_tip, df_checkin, df_business, df_review, df_user
    return df_all

# the file paths
file_paths = ["/FileStore/tables/yelp_academic_dataset_tip.json", \
            "/FileStore/tables/yelp_academic_dataset_checkin.json", \
            "/FileStore/tables/yelp_academic_dataset_business.json", \
            "/FileStore/tables/yelp_academic_dataset_review_1.json", \
            "/FileStore/tables/yelp_academic_dataset_review_2.json", \
            "/FileStore/tables/yelp_academic_dataset_review_3.json", \
            "/FileStore/tables/yelp_academic_dataset_user_1.json", \
            "/FileStore/tables/yelp_academic_dataset_user_2.json" ]

df_bronze = raw_to_bronze(file_paths)

/FileStore/tables/yelp_academic_dataset_tip.json
['business_id', 'compliment_count', 'date', 'text', 'user_id']

/FileStore/tables/yelp_academic_dataset_checkin.json
['business_id', 'date']

/FileStore/tables/yelp_academic_dataset_business.json
['address', 'attributes', 'business_id', 'categories', 'city', 'hours', 'is_open', 'latitude', 'longitude', 'name', 'postal_code', 'review_count', 'stars', 'state']

/FileStore/tables/yelp_academic_dataset_review_1.json
['business_id', 'cool', 'date', 'funny', 'review_id', 'stars', 'text', 'useful', 'user_id']

/FileStore/tables/yelp_academic_dataset_review_2.json
['business_id', 'cool', 'date', 'funny', 'review_id', 'stars', 'text', 'useful', 'user_id']

/FileStore/tables/yelp_academic_dataset_review_3.json
['business_id', 'cool', 'date', 'funny', 'review_id', 'stars', 'text', 'useful', 'user_id']

/FileStore/tables/yelp_academic_dataset_user_1.json
['average_stars', 'compliment_cool', 'compliment_cute', 'compliment_funny', 'compliment_hot', 'c

## Bronze to Silver

In [0]:
from pyspark.sql.functions import col, when, lit
from delta.tables import DeltaTable

# save bronze delta table to disk
bronze_delta_path = "/FileStore/tables/yelp_bronze_delta"
df_bronze.write.format("delta").mode("overwrite").save(bronze_delta_path)
# read Delta table
df_bronze_delta = spark.read.format("delta").load(bronze_delta_path)


In [0]:
from pyspark.sql import functions as F

#bronze_to_silver to do data cleaning
def bronze_to_silver(df_bronze):
    #remove the useless columns
    drop_columns = ["address", "attributes", "hours", "date", "tip_text", "user_useful", "user_funny", "user_cool", \
        "friends", "user_review_count", "yelping_since", "elite"]
    df_silver = df_bronze.drop(*drop_columns)
    # filter is_open = 0 
    df_silver = df_silver.filter(F.col("is_open") != 0)

    # filter is_open、business_id、review_stars、user_id is and some other columns is NULL
    df_silver = df_silver.filter((F.col("is_open").isNotNull()) &
                                    (F.col("business_id").isNotNull()) &
                                    (F.col("review_stars").isNotNull()) &
                                    (F.col("user_id").isNotNull()) &
                                    (F.col("state").isNotNull()) &
                                    (F.col("city").isNotNull()))
    #remove the duplicated rows
    df_silver = df_silver.dropDuplicates()
    return df_silver

df_silver = bronze_to_silver(df_bronze)


In [0]:
# save silver delta table to disk
silver_delta_path = "/FileStore/tables/yelp_silver_delta"
df_silver.write.format("delta").mode("overwrite").save(silver_delta_path)
# read Delta table
df_silver_delta = spark.read.format("delta").load(silver_delta_path)

## Query for Some Business Questions

In [0]:
from pyspark.sql import functions as F

df_user_count = df_silver.groupBy("business_id", "business_name") \
                                .agg(F.countDistinct("user_id").alias("unique_user_count")) \
                                .orderBy(F.desc("unique_user_count"))

# show the result
df_user_count.show(30)


+--------------------+--------------------+-----------------+
|         business_id|       business_name|unique_user_count|
+--------------------+--------------------+-----------------+
|_ab50qdWOk0DdB6XO...|   Acme Oyster House|             7568|
|ac1AeYqs8Z4_e2X5M...|        Oceana Grill|             7401|
|GXFMD0Z4jEVZBCsbP...|Hattie B’s Hot Ch...|             6093|
|ytynqOUb3hjKeJfRj...|Reading Terminal ...|             5721|
|oBNrLz4EDhiscSlbO...|Ruby Slipper - Ne...|             5193|
|iSRTaT9WngzB8JJ2Y...| Mother's Restaurant|             5185|
|VQcCL9PiNL_wkGf-u...|         Royal House|             5070|
|_C7QiQQc47AOEv4PE...|  Commander's Palace|             4877|
|GBTPC53ZrG1ZBY3DT...|                Luke|             4554|
|6a4gLLFSgr-Q6CZXD...|              Cochon|             4422|
|PP3BBaVxZLcJU54uP...|Pat's King of Steaks|             4250|
|1b5mnK8bMnnju_cvU...| Biscuit Love: Gulch|             4207|
|I_3LMZ_1m2mzR0oLI...|  Pappy's Smokehouse|             3999|
|VaO-VW3

a. How many businesses take place in each state?

In [0]:
df_state_count = df_silver.groupBy("state") \
                                 .agg(F.countDistinct("business_id").alias("unique_business_count")) \
                                 .orderBy(F.desc("unique_business_count"))

# show the result
df_state_count.show()

+-----+---------------------+
|state|unique_business_count|
+-----+---------------------+
|   PA|                26289|
|   FL|                21540|
|   TN|                 9600|
|   IN|                 8946|
|   MO|                 8363|
|   AZ|                 8108|
|   LA|                 7676|
|   NJ|                 7031|
|   NV|                 6277|
|   AB|                 4346|
|   CA|                 4065|
|   ID|                 3783|
|   DE|                 1894|
|   IL|                 1765|
|   TX|                    4|
|   MA|                    2|
|   WA|                    2|
|   CO|                    1|
|   UT|                    1|
|   MI|                    1|
+-----+---------------------+
only showing top 20 rows



b. How many businesses take place in each city?

In [0]:
df_city_count = df_silver.groupBy("city") \
                                 .agg(F.countDistinct("business_id").alias("unique_business_count")) \
                                 .orderBy(F.desc("unique_business_count"))

# show the result
df_city_count.show()

+----------------+---------------------+
|            city|unique_business_count|
+----------------+---------------------+
|    Philadelphia|                10542|
|          Tucson|                 7533|
|           Tampa|                 7219|
|    Indianapolis|                 5894|
|       Nashville|                 5398|
|            Reno|                 4762|
|     New Orleans|                 4649|
|        Edmonton|                 3916|
|     Saint Louis|                 3403|
|   Santa Barbara|                 3020|
|           Boise|                 2461|
|      Clearwater|                 1815|
|          Sparks|                 1378|
|Saint Petersburg|                 1367|
|        Metairie|                 1336|
|      Wilmington|                 1192|
|        Franklin|                 1135|
|       St. Louis|                 1054|
|  St. Petersburg|                  916|
|        Meridian|                  900|
+----------------+---------------------+
only showing top

c. What kind of business do they have the most in each state?

In [0]:
from pyspark.sql.window import Window
# Group by state, and count the number of each type of business.
df_business_state_count = df_silver.groupBy("state",  "categories") \
                                    .agg(F.countDistinct("business_id").alias("business_count")) \
                                    .orderBy("state",  F.desc("business_count"))
# most common business
windowSpec = Window.partitionBy("state").orderBy(F.desc("business_count"))
df_most_common_state_business = df_business_state_count.withColumn("rank", F.rank().over(windowSpec)) \
                                          .filter(F.col("rank") == 1) \
                                          .select("state", "categories", "business_count")

# show the result
df_most_common_state_business.show()



+-----+--------------------+--------------+
|state|          categories|business_count|
+-----+--------------------+--------------+
|   AZ|Restaurants, Mexican|            58|
|   CA|Restaurants, Mexican|            21|
|   CA|Auto Repair, Auto...|            21|
|   ID|Beauty & Spas, Na...|            27|
|   LA|Beauty & Spas, Na...|            48|
|   MI|Oil Change Statio...|             1|
|   NJ|  Restaurants, Pizza|            82|
|   NV|Restaurants, Mexican|            27|
|   NV|Automotive, Auto ...|            27|
|   NV|Mexican, Restaurants|            27|
|   AB|Beauty & Spas, Na...|            34|
|   CO|Photographers, Ev...|             1|
|   DE|  Restaurants, Pizza|            26|
|   FL|Beauty & Spas, Na...|           150|
|   HI|Cosmetic Surgeons...|             1|
|   IL|Nail Salons, Beau...|            20|
|   IN|Nail Salons, Beau...|            89|
|   MA|Home Services, Ho...|             1|
|   MA|Local Services, S...|             1|
|   MO|Beauty & Spas, Na...|    

d. What kind of business do they have the most in each city?

In [0]:
# Group by city, and count the number of each type of business.
df_business_city_count = df_silver.groupBy("city", "categories") \
                                    .agg(F.countDistinct("business_id").alias("business_count")) \
                                    .orderBy("city", F.desc("business_count"))

# most common business
windowSpec = Window.partitionBy("city").orderBy(F.desc("business_count"))

df_most_common_city_business = df_business_city_count.withColumn("rank", F.rank().over(windowSpec)) \
                                          .filter(F.col("rank") == 1) \
                                          .select("city", "categories", "business_count")

# 显示结果
df_most_common_city_business.show()

+------------+--------------------+--------------+
|        city|          categories|business_count|
+------------+--------------------+--------------+
| AB Edmonton|Hair Salons, Beau...|             1|
|    Abington|Hair Salons, Beau...|             3|
|       Afton|   Food Trucks, Food|             1|
|Apollo Beach|Beauty & Spas, Na...|             3|
|Bargersville|Event Planning & ...|             1|
|Bargersville|American (New), S...|             1|
|Bargersville|Vietnamese, Sushi...|             1|
|        Bear| Pets, Veterinarians|             1|
|        Bear|Movers, Home Serv...|             1|
|        Bear|Home Services, Re...|             1|
|        Bear|Apartments, Home ...|             1|
|        Bear|Beauty & Spas, Ma...|             1|
| Beech Grove|Karaoke, American...|             1|
| Beech Grove|Barbers, Beauty &...|             1|
| Beech Grove|Pubs, Bars, Night...|             1|
| Beech Grove|   Shopping, Jewelry|             1|
| Beech Grove|Cupcakes, Bakerie

e. What time do people usually write reviews?

In [0]:
# extract hour from 
df_review_hour = df_silver.withColumn("hour", F.hour("review_date"))

# grouby by hour, count the number of unique review_id
df_hour_count = df_review_hour.groupBy("hour") \
                              .agg(F.countDistinct("review_id").alias("review_count")) \
                              .orderBy(F.desc("review_count"))

# show the result
df_hour_count.show()


+----+------------+
|hour|review_count|
+----+------------+
|  19|      384770|
|  18|      384574|
|   0|      381556|
|   1|      377021|
|  20|      376172|
|  23|      373278|
|  21|      366103|
|  17|      364696|
|  22|      363110|
|   2|      342747|
|  16|      335389|
|  15|      296573|
|   3|      279770|
|  14|      248207|
|   4|      204469|
|  13|      184845|
|   5|      133916|
|  12|      113063|
|   6|       82762|
|  11|       60108|
+----+------------+
only showing top 20 rows



f. What Are the Average Review Stars of Each Business?

In [0]:
df_review_stars_avg = df_silver.groupBy("business_id", "business_name") \
                                       .agg(F.avg("review_stars").alias("avg_review_stars")) \
                                       .orderBy(F.desc("avg_review_stars"))

# show the result
df_review_stars_avg.show()


+--------------------+--------------------+----------------+
|         business_id|       business_name|avg_review_stars|
+--------------------+--------------------+----------------+
|oDnnlCOYpkMZwOv0o...|Truong & Company ...|             5.0|
|cA3FPFwFxDe58HXUG...|Meranova Bed & Br...|             5.0|
|vt0Hqc2iDd2YrIc-k...|Wright & Feusier ...|             5.0|
|kSKyOrJELZz_AzGkv...|        Mattias Blom|             5.0|
|e-5mSfiD_aT32KgzN...| Second Wind Massage|             5.0|
|_Hgl8YKcQG0DjC9FD...|Bay Area Endodont...|             5.0|
|et9os3PW4Tm5SB9eB...|        The Wellnest|             5.0|
|HL2NuAsbNZx9hgPEp...|Santa Barbara Tra...|             5.0|
|jNR888ACyw4Xoqjx-...|Amber Light Voice...|             5.0|
|dSrJFTreQg2feow9Y...|    A-1 Auto Service|             5.0|
|nHENI2LyfkixJJrSG...|Lafayette Square ...|             5.0|
|yRDO7_CCE9iaDxepL...|  Evan Hathaway, DDS|             5.0|
|R3ljmQ_pHq1PnRzuW...|TriStar Building ...|             5.0|
|dQWgXOG5KScufoiS6...|We