In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode
from pyspark.sql.functions import array, coalesce, explode_outer
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, FloatType, DateType
from pyspark.sql.functions import current_date, lit, when
from pyspark.sql.functions import struct
from pyspark.sql.functions import desc, rank
from pyspark.sql.window import Window
from pyspark.sql.functions import col, count, rank


# Initialize Spark session
spark = SparkSession.builder.getOrCreate()

In [0]:
# List the contents of the target directory
files = dbutils.fs.ls("/FileStore/shared_uploads/caidanni@seattleu.edu/")

# Print the file paths
for file in files:
    print(file.path)


dbfs:/FileStore/shared_uploads/caidanni@seattleu.edu/yelp_academic_dataset_business.json
dbfs:/FileStore/shared_uploads/caidanni@seattleu.edu/yelp_academic_dataset_checkin.json
dbfs:/FileStore/shared_uploads/caidanni@seattleu.edu/yelp_academic_dataset_review.json
dbfs:/FileStore/shared_uploads/caidanni@seattleu.edu/yelp_academic_dataset_tip.json
dbfs:/FileStore/shared_uploads/caidanni@seattleu.edu/yelp_academic_dataset_user.json
dbfs:/FileStore/shared_uploads/caidanni@seattleu.edu/yelpbronze/
dbfs:/FileStore/shared_uploads/caidanni@seattleu.edu/yelpbronze1/
dbfs:/FileStore/shared_uploads/caidanni@seattleu.edu/yelpoutput/
dbfs:/FileStore/shared_uploads/caidanni@seattleu.edu/yelpoutput1/
dbfs:/FileStore/shared_uploads/caidanni@seattleu.edu/yelpoutput2/
dbfs:/FileStore/shared_uploads/caidanni@seattleu.edu/yelsliver/


In [0]:
# Define file paths in DBFS
review_file_path = "dbfs:/FileStore/shared_uploads/caidanni@seattleu.edu/yelp_academic_dataset_review.json"
checkin_file_path = "dbfs:/FileStore/shared_uploads/caidanni@seattleu.edu/yelp_academic_dataset_checkin.json"
business_file_path = "dbfs:/FileStore/shared_uploads/caidanni@seattleu.edu/yelp_academic_dataset_business.json"
user_file_path = "dbfs:/FileStore/shared_uploads/caidanni@seattleu.edu/yelp_academic_dataset_user.json"
tip_file_path= "dbfs:/FileStore/shared_uploads/caidanni@seattleu.edu/yelp_academic_dataset_tip.json"



In [0]:
# Read JSON files into DataFrames
review_df = spark.read.json(review_file_path)
checkin_df = spark.read.json(checkin_file_path)
business_df = spark.read.json(business_file_path)
user_df = spark.read.json(user_file_path)
tip_df = spark.read.json(tip_file_path)

In [0]:
# Display 5 rows for dataset and print schema for each dataset
review_df.show(5)
review_df.printSchema()

+--------------------+----------+--------------------+-----+--------------------+------+--------------------+---------+
|         business_id|      date|           review_id|stars|                text|  type|             user_id|    votes|
+--------------------+----------+--------------------+-----+--------------------+------+--------------------+---------+
|9yKzy9PApeiPPOUJE...|2011-01-26|fWKvX83p0-ka4JS3d...|    5|My wife took me h...|review|rLtl8ZkDX5vH5nAx9...|{2, 0, 5}|
|ZRJwVLyzEJq1VAihD...|2011-07-27|IjZ33sJrzXqU-0X6U...|    5|I have no idea wh...|review|0a2KyEL0d3Yb1V6ai...|{0, 0, 0}|
|6oRAC4uyJCsJl1X0W...|2012-06-14|IESLBzqUCLdSzSqm0...|    4|love the gyro pla...|review|0hT2KtfLiobPvh6cD...|{0, 0, 1}|
|_1QQZuf4zZOyFCvXc...|2010-05-27|G-WvGaISbqqaMHlNn...|    5|Rosie, Dakota, an...|review|uZetl9T0NcROGOyFf...|{1, 0, 2}|
|6ozycU1RpktNG2-1B...|2012-01-05|1uJFq2r5QfJG_6ExM...|    5|General Manager S...|review|vYmM4KTsC8ZfQBg-j...|{0, 0, 0}|
+--------------------+----------+-------

In [0]:
checkin_df.show(5)
checkin_df.printSchema()

+--------------------+--------------------+-------+
|         business_id|        checkin_info|   type|
+--------------------+--------------------+-------+
|KO9CpaSPOoqm0iCWm...|{NULL, NULL, NULL...|checkin|
|oRqBAYtcBYZHXA7G8...|{3, NULL, NULL, N...|checkin|
|6cy2C9aBXUwkrh4bY...|{NULL, NULL, 1, N...|checkin|
|D0IB17N66FiyYDCzT...|{NULL, NULL, NULL...|checkin|
|HLQGo3EaYVvAv22bO...|{NULL, NULL, NULL...|checkin|
+--------------------+--------------------+-------+
only showing top 5 rows

root
 |-- business_id: string (nullable = true)
 |-- checkin_info: struct (nullable = true)
 |    |-- 0-0: long (nullable = true)
 |    |-- 0-1: long (nullable = true)
 |    |-- 0-2: long (nullable = true)
 |    |-- 0-3: long (nullable = true)
 |    |-- 0-4: long (nullable = true)
 |    |-- 0-5: long (nullable = true)
 |    |-- 0-6: long (nullable = true)
 |    |-- 1-0: long (nullable = true)
 |    |-- 1-1: long (nullable = true)
 |    |-- 1-2: long (nullable = true)
 |    |-- 1-3: long (nullable = tru

In [0]:
business_df.show(1)
business_df.printSchema()

+--------------------+--------------------+------+--------------------+---------+-----------+--------------------+-------------+----+------------+-----+-----+--------+
|         business_id|          categories|  city|        full_address| latitude|  longitude|                name|neighborhoods|open|review_count|stars|state|    type|
+--------------------+--------------------+------+--------------------+---------+-----------+--------------------+-------------+----+------------+-----+-----+--------+
|rncjoVoEFUJGCUoC1...|[Accountants, Pro...|Peoria|8466 W Peoria Ave...|33.581867|-112.241596|Peoria Income Tax...|           []|true|           3|  5.0|   AZ|business|
+--------------------+--------------------+------+--------------------+---------+-----------+--------------------+-------------+----+------------+-----+-----+--------+
only showing top 1 row

root
 |-- business_id: string (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: string (containsNull = true)

In [0]:
user_df.show(5)
user_df.printSchema()

+-------------+---------+------------+----+--------------------+---------+
|average_stars|     name|review_count|type|             user_id|    votes|
+-------------+---------+------------+----+--------------------+---------+
|          5.0|      Jim|           6|user|CR2y7yEm4X035ZMzr...|{0, 0, 7}|
|          1.0|    Kelle|           2|user|_9GXoHhdxc30ujPaQ...|{0, 0, 1}|
|          5.0|Stephanie|           2|user|8mM-nqxjg6pT04kwc...|{0, 0, 1}|
|          5.0|        T|           2|user|Ch6CdTR2IVaVANr-R...|{0, 0, 2}|
|          1.0|     Beth|           1|user|NZrLmHRyiHmyT1Jrf...|{0, 0, 0}|
+-------------+---------+------------+----+--------------------+---------+
only showing top 5 rows

root
 |-- average_stars: double (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- type: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- votes: struct (nullable = true)
 |    |-- cool: long (nullable = true)
 |    |-- funny: long

In [0]:
tip_df.show(5)
tip_df.printSchema()

+--------------------+----------+-----+--------------------+----+--------------------+
|         business_id|      date|likes|                text|type|             user_id|
+--------------------+----------+-----+--------------------+----+--------------------+
|cE27W9VPgO88Qxe4o...|2013-04-18|    0|Don't waste your ...| tip|-6rEfobYjMxpUWLNx...|
|mVHrayjG3uZ_RLHkL...|2013-01-06|    0|Your GPS will not...| tip|EZ0r9dKKtEGVx2Cdn...|
|KayYbHCt-RkbGcPdG...|2013-12-03|    0|Great drink speci...| tip|xb6zEQCw9I-Gl0g06...|
|wJr6kSA5dchdgOdwH...|2013-07-22|    0|Sarah rocks! Best...| tip|fvTivrsJoUMYXnOJw...|
|fNGIbpazjTRdXgwRY...|2013-04-22|    0|Decent selection ...| tip|6GrH6gp09pqYykGv8...|
+--------------------+----------+-----+--------------------+----+--------------------+
only showing top 5 rows

root
 |-- business_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- likes: long (nullable = true)
 |-- text: string (nullable = true)
 |-- type: string (nullable = true)
 

In [0]:
###Flattern each dataset
##review_df

flat_review_df = review_df.select(
    "business_id",
    "date",
    "review_id",
    "stars",
    "text",
    "type",
    "user_id",
    col("votes.cool").alias("votes_cool"),
    col("votes.funny").alias("votes_funny"),
    col("votes.useful").alias("votes_useful")
)
flat_review_df.show(5)


+--------------------+----------+--------------------+-----+--------------------+------+--------------------+----------+-----------+------------+
|         business_id|      date|           review_id|stars|                text|  type|             user_id|votes_cool|votes_funny|votes_useful|
+--------------------+----------+--------------------+-----+--------------------+------+--------------------+----------+-----------+------------+
|9yKzy9PApeiPPOUJE...|2011-01-26|fWKvX83p0-ka4JS3d...|    5|My wife took me h...|review|rLtl8ZkDX5vH5nAx9...|         2|          0|           5|
|ZRJwVLyzEJq1VAihD...|2011-07-27|IjZ33sJrzXqU-0X6U...|    5|I have no idea wh...|review|0a2KyEL0d3Yb1V6ai...|         0|          0|           0|
|6oRAC4uyJCsJl1X0W...|2012-06-14|IESLBzqUCLdSzSqm0...|    4|love the gyro pla...|review|0hT2KtfLiobPvh6cD...|         0|          0|           1|
|_1QQZuf4zZOyFCvXc...|2010-05-27|G-WvGaISbqqaMHlNn...|    5|Rosie, Dakota, an...|review|uZetl9T0NcROGOyFf...|         1|    

In [0]:
# Flatten checkin_df
# Define a list of column names for the checkin hours (0-0, 0-1, ..., 23-6)
checkin_hours = [f"{i}-{j}" for i in range(24) for j in range(7)]

# Select the business_id and the individual checkin hour columns
flat_checkin_df = checkin_df.select("business_id", *[col(f"checkin_info.{hour}").alias(hour) for hour in checkin_hours])

# Show the result
flat_checkin_df.show(1)




+--------------------+----+----+----+----+----+---+----+----+----+----+----+----+----+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+---+----+----+----+----+----+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|         business_id| 0-0| 0-1| 0-2| 0-3| 0-4|0-5| 0-6| 1-0| 1-1| 1-2| 1-3| 1-4| 1-5|1-6| 2-0| 2-1| 2-2| 2-3| 2-4| 2-5| 2-6| 3-0| 3-1| 3-2| 3-

In [0]:
#drop the useless information: checkin_infor
# Select only the business_id column
#flat_checkin_df = flat_checkin_df.select("business_id")

# Show the updated DataFrame schema
#flat_checkin_df.printSchema()
#flat_checkin_df.show(5)

In [0]:

# Flatten the business_df
flat_business_df = business_df.select(
    "business_id",
    "city",
    "full_address",
    "latitude",
    "longitude",
    "name",
    "open",
    "review_count",
    "stars",
    "state",
    "type",
    explode_outer(coalesce("categories", array())).alias("category"),
    explode_outer(coalesce("neighborhoods", array())).alias("neighborhood")
)

flat_business_df.show(5)

+--------------------+-------+--------------------+---------+-----------+--------------------+----+------------+-----+-----+--------+--------------------+------------+
|         business_id|   city|        full_address| latitude|  longitude|                name|open|review_count|stars|state|    type|            category|neighborhood|
+--------------------+-------+--------------------+---------+-----------+--------------------+----+------------+-----+-----+--------+--------------------+------------+
|rncjoVoEFUJGCUoC1...| Peoria|8466 W Peoria Ave...|33.581867|-112.241596|Peoria Income Tax...|true|           3|  5.0|   AZ|business|         Accountants|        NULL|
|rncjoVoEFUJGCUoC1...| Peoria|8466 W Peoria Ave...|33.581867|-112.241596|Peoria Income Tax...|true|           3|  5.0|   AZ|business|Professional Serv...|        NULL|
|rncjoVoEFUJGCUoC1...| Peoria|8466 W Peoria Ave...|33.581867|-112.241596|Peoria Income Tax...|true|           3|  5.0|   AZ|business|        Tax Services|      

In [0]:
## Flatten user_df
flat_user_df = user_df.select(
    "average_stars",
    "name",
    "review_count",
    "type",
    "user_id",
    col("votes.cool").alias("user_votes_cool"),
    col("votes.funny").alias("user_votes_funny"),
    col("votes.useful").alias("user_votes_useful")
)
flat_user_df.show(5)

+-------------+---------+------------+----+--------------------+---------------+----------------+-----------------+
|average_stars|     name|review_count|type|             user_id|user_votes_cool|user_votes_funny|user_votes_useful|
+-------------+---------+------------+----+--------------------+---------------+----------------+-----------------+
|          5.0|      Jim|           6|user|CR2y7yEm4X035ZMzr...|              0|               0|                7|
|          1.0|    Kelle|           2|user|_9GXoHhdxc30ujPaQ...|              0|               0|                1|
|          5.0|Stephanie|           2|user|8mM-nqxjg6pT04kwc...|              0|               0|                1|
|          5.0|        T|           2|user|Ch6CdTR2IVaVANr-R...|              0|               0|                2|
|          1.0|     Beth|           1|user|NZrLmHRyiHmyT1Jrf...|              0|               0|                0|
+-------------+---------+------------+----+--------------------+--------

In [0]:
# Join `flat_review_df` with `flat_business_df`
flat_unified_df = flat_review_df.join(
    flat_business_df, 
    flat_review_df["business_id"] == flat_business_df["business_id"], 
    how="inner")
flat_unified_df.show(5)

+--------------------+----------+--------------------+-----+--------------------+------+--------------------+----------+-----------+------------+--------------------+-------+--------------------+----------+------------+------------------+----+------------+-----+-----+--------+------------------+------------+
|         business_id|      date|           review_id|stars|                text|  type|             user_id|votes_cool|votes_funny|votes_useful|         business_id|   city|        full_address|  latitude|   longitude|              name|open|review_count|stars|state|    type|          category|neighborhood|
+--------------------+----------+--------------------+-----+--------------------+------+--------------------+----------+-----------+------------+--------------------+-------+--------------------+----------+------------+------------------+----+------------+-----+-----+--------+------------------+------------+
|9yKzy9PApeiPPOUJE...|2011-01-26|fWKvX83p0-ka4JS3d...|    5|My wife to

In [0]:
#drop the duplicate columns
flat_unified_df = flat_unified_df.drop(flat_review_df["business_id"])

In [0]:
flat_unified_df = flat_unified_df.join(
    tip_df, 
    flat_unified_df["business_id"] == tip_df["business_id"], 
    how="inner")
flat_unified_df.show(5)

+----------+--------------------+-----+--------------------+------+--------------------+----------+-----------+------------+--------------------+----------+--------------------+----------------+-----------------+--------------------+----+------------+-----+-----+--------+-----------+------------+--------------------+----------+-----+--------------------+----+--------------------+
|      date|           review_id|stars|                text|  type|             user_id|votes_cool|votes_funny|votes_useful|         business_id|      city|        full_address|        latitude|        longitude|                name|open|review_count|stars|state|    type|   category|neighborhood|         business_id|      date|likes|                text|type|             user_id|
+----------+--------------------+-----+--------------------+------+--------------------+----------+-----------+------------+--------------------+----------+--------------------+----------------+-----------------+--------------------+-

In [0]:
##drop the duplicate columns
flat_unified_df = flat_unified_df.drop(flat_review_df["business_id"], tip_df["user_id"])

In [0]:
# Join `flat_unified_df` with `flat_user_df`
flat_unified_df = flat_unified_df.join(
    flat_user_df, 
    flat_unified_df["user_id"] == flat_user_df["user_id"], 
    how="inner"
)


In [0]:
#drop the duplicate columns
flat_unified_df = flat_unified_df.drop(flat_user_df["user_id"], flat_review_df["date"],flat_review_df["stars"], tip_df["business_id"])



In [0]:
flat_unified_df = flat_unified_df.join(
    flat_checkin_df,
    flat_unified_df["business_id"] == flat_checkin_df["business_id"],
    how="inner"
)



In [0]:
#drop the duplicate columns
flat_unified_df = flat_unified_df.drop(flat_checkin_df["business_id"], flat_review_df["type"], flat_user_df["type"], flat_review_df["text"],flat_user_df["name"], flat_business_df["review_count"],flat_review_df["type"], flat_business_df["type"])

In [0]:
# Create the path using dbutils.fs.mkdirs()
dbutils.fs.mkdirs("dbfs:/FileStore/shared_uploads/caidanni@seattleu.edu/yelpoutput1/")

True

In [0]:
# Save the bronze table
bronze_table_path = "dbfs:/FileStore/shared_uploads/caidanni@seattleu.edu/yelpoutput1/"

#Save the DataFrame as a bronze table in Delta Lake format
flat_unified_df.write.format("delta").mode("overwrite").save(bronze_table_path)


In [0]:
#Add SCD2 columns
from pyspark.sql.functions import current_date, lit

# Define the current date
current_date = lit(current_date())

# Add SCD2 columns: valid_from, valid_to, version
flat_unified_df = flat_unified_df.withColumn("valid_from", current_date) \
    .withColumn("valid_to", lit("9999-12-31")).withColumn("version", lit(1))  









In [0]:
# Show the updated DataFrame schema
flat_unified_df.show(5)

+--------------------+--------------------+----------+-----------+------------+--------------------+----------+--------------------+----------------+-----------------+--------------------+----+-----+-----+-----------+------------+----------+-----+--------------------+----+-------------+------------+---------------+----------------+-----------------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----

In [0]:
#Convert to Delta Lake
flat_unified_df.write.format("delta").mode("overwrite").save("dbfs:/FileStore/shared_uploads/caidanni@seattleu.edu/yelpoutput2/")


In [0]:
#How many reviews are there for each business?
#group by business_id and count
business_review_count = flat_unified_df.groupBy("business_id").count()
#print the result
business_review_count.show()

+--------------------+-----+
|         business_id|count|
+--------------------+-----+
|LzpR_jE6VIutJ08s2...|10824|
|3H2ttTM2aSIaZ6FTj...| 1200|
|5ambRqdTJt9vGwFzV...|85280|
|peA3F-PnIfijYr8Hu...|15725|
|27hfvE5DcOY3J1-MW...|  240|
|aBjWR-Mol58GMsRAd...| 2016|
|EzAen1_nwVuiWPKBj...| 2800|
|w2PHq5wfGuqWmHAru...|   96|
|CO8UjN6WXsDJqo5qJ...|  288|
|s_cKw6m0Fw9jZbobR...| 1170|
|d4GSh1BalxvQCU7zL...|  144|
|U2C1t0sqK_3hMFEs6...|   88|
|T2xiF9kMc_WRH0GeK...|   40|
|Lc3-XLmobyijQpOxH...|  448|
|VfzUmXIgowYKutptf...|   40|
|NX5YHSIyesufcsI7R...| 1914|
|4JbQSLiRKQudjDn9f...|   45|
|nHxi4UjqWOLRidFNn...|  560|
|YowLDN2YEKVJpHv0L...|  306|
|pQH9UvH3a87GoH-sd...|   90|
+--------------------+-----+
only showing top 20 rows



In [0]:
#How many businesses take place in each state and city? What kind of business do they have the most in each state, in each city?
# Group by state and city, count the number of businesses, and find the most common category
state_business_count = flat_unified_df.groupBy("state").agg(F.countDistinct("business_id").alias("num_businesses"))
state_business_count.show(5)
city_business_count = flat_unified_df.groupBy("city").agg(F.countDistinct("business_id").alias("num_businesses"))
city_business_count.show(5)


+-----+--------------+
|state|num_businesses|
+-----+--------------+
|   AZ|          7432|
+-----+--------------+

+------------+--------------+
|        city|num_businesses|
+------------+--------------+
|       Tempe|           780|
|Fountain Hls|             2|
|     Phoenix|          2708|
|   Ahwatukee|             2|
|  Scottsdale|          1309|
+------------+--------------+
only showing top 5 rows



In [0]:

grouped_df = flat_unified_df.groupBy("state", "category").agg(count("*").alias("category_count"))

# Define a window specification to rank categories within each state
window_spec = Window.partitionBy("state").orderBy(col("category_count").desc())

# Rank categories within each state based on category count
ranked_df = grouped_df.withColumn("rank", rank().over(window_spec))
ranked_df.show(5)#show the first 5 rank
# Filter to keep only the most common category for each state
most_common_category_df = ranked_df.filter(col("rank") == 1)

# Show the result
most_common_category_df.show()



+-----+--------------+--------------+----+
|state|      category|category_count|rank|
+-----+--------------+--------------+----+
|   AZ|   Restaurants|       9970993|   1|
|   AZ|American (New)|       2632116|   2|
|   AZ|     Nightlife|       2552078|   3|
|   AZ|          Bars|       2397120|   4|
|   AZ|          Food|       2045160|   5|
+-----+--------------+--------------+----+
only showing top 5 rows

+-----+-----------+--------------+----+
|state|   category|category_count|rank|
+-----+-----------+--------------+----+
|   AZ|Restaurants|       9970993|   1|
+-----+-----------+--------------+----+



In [0]:
#What time do people usually write reviews?
from pyspark.sql.functions import hour

# Extract the hour component from the date column
reviews_df_with_hour = flat_unified_df.withColumn("review_hour", hour("date"))

# Aggregate by hour and count the number of reviews
review_count_by_hour = reviews_df_with_hour.groupBy("review_hour").count().orderBy("review_hour")

# Show the distribution of review times
review_count_by_hour.show()

+-----------+--------+
|review_hour|   count|
+-----------+--------+
|          0|41960577|
+-----------+--------+

