In [1]:
from pyspark.sql.functions import col

# Yelp Businesses
business_filename = "gs://my-bigdata-project-cf/landing/yelp_academic_dataset_business.json"
business_sdf = spark.read.json(business_filename)
business_sdf.printSchema()
business_sdf = business_sdf.withColumnRenamed('stars', 'business_stars') \
                           .withColumnRenamed('name', 'business_name') \
                           .withColumnRenamed('review_count', 'business_review_count') \
                           .withColumnRenamed('categories', 'business_categories')

# Drop some columns we likely don't need
business_sdf = business_sdf.drop('city', 'hours', 'is_open', 'latitude', 'longitude', 'postal_code', 'state', 'address', 'attributes')

# Yelp Users
users_filename = "gs://my-bigdata-project-cf/landing/yelp_academic_dataset_user.json"
users_sdf = spark.read.json(users_filename)
users_sdf.printSchema()
# Rename all of the columns except for user_id
users_sdf = users_sdf.select([col("user_id").alias("user_id")] + [col(c).alias("user_"+c) for c in users_sdf.columns if c != "user_id"])

# Yelp Reviews
reviews_filename = "gs://my-bigdata-project-cf/landing/yelp_academic_dataset_review.json"
reviews_sdf = spark.read.json(reviews_filename)
reviews_sdf.printSchema()
reviews_sdf = reviews_sdf.withColumnRenamed('stars', 'review_stars')

# Likely don't need the review_id and date
reviews_sdf = reviews_sdf.drop('review_id', 'date')

# Join the three datasets together
yelp_sdf = reviews_sdf.join(business_sdf, "business_id")
yelp_sdf = yelp_sdf.join(users_sdf, "user_id")

yelp_sdf.printSchema()

24/12/06 09:24:46 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- GoodForDancing: str

                                                                                

root
 |-- average_stars: double (nullable = true)
 |-- compliment_cool: long (nullable = true)
 |-- compliment_cute: long (nullable = true)
 |-- compliment_funny: long (nullable = true)
 |-- compliment_hot: long (nullable = true)
 |-- compliment_list: long (nullable = true)
 |-- compliment_more: long (nullable = true)
 |-- compliment_note: long (nullable = true)
 |-- compliment_photos: long (nullable = true)
 |-- compliment_plain: long (nullable = true)
 |-- compliment_profile: long (nullable = true)
 |-- compliment_writer: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- elite: string (nullable = true)
 |-- fans: long (nullable = true)
 |-- friends: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- yelping_since: string (nullable = true)





root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)

root
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- business_categories: string (nullable = true)
 |-- business_name: string (nullable = true)
 |-- business_review_count: long (nullable = true)
 |-- business_stars: double (nullable = true)
 |-- user_average_stars: double (nullable = true)
 |-- user_compliment_cool: long (nullable = true)
 |-- user_compliment_cute: long (nullable = true)
 |-- user_compliment_funny: long (nullable

                                                                                

In [2]:
selected_columns = [
    "user_id", "business_id", "review_stars", "text",
    "user_average_stars", "business_stars", "business_review_count",
    "useful","user_review_count", "business_categories"
]

analysis_df = yelp_sdf.select(selected_columns)
analysis_df.printSchema()
analysis_df.show(5)

root
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- review_stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- user_average_stars: double (nullable = true)
 |-- business_stars: double (nullable = true)
 |-- business_review_count: long (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_review_count: long (nullable = true)
 |-- business_categories: string (nullable = true)



[Stage 8:>                                                          (0 + 1) / 1]

+--------------------+--------------------+------------+--------------------+------------------+--------------+---------------------+------+-----------------+--------------------+
|             user_id|         business_id|review_stars|                text|user_average_stars|business_stars|business_review_count|useful|user_review_count| business_categories|
+--------------------+--------------------+------------+--------------------+------------------+--------------+---------------------+------+-----------------+--------------------+
|--6lqYpHZCBbpW5z2...|mUI4DJagyUyu76qnR...|         1.0|Honestly was only...|              3.27|           3.0|                   97|     0|                9|Restaurants, Bars...|
|--RJK834fiQXm21Vp...|aIoUwpy5ZFQXUDxWM...|         1.0|There are new own...|               2.5|           3.5|                  146|     0|                1|Seafood, Diners, ...|
|--UizzbnQlZg7bEv2...|wCluBbW9nzS7MEMFl...|         5.0|Love their shrimp...|              4.25|    

                                                                                

In [3]:
analysis_df.count()

                                                                                

6990247

In [4]:
from pyspark.sql.functions import sum

null_counts = analysis_df.select([
    sum(col(column).isNull().cast("int")).alias(column) 
    for column in analysis_df.columns
])
null_counts.show()



+-------+-----------+------------+----+------------------+--------------+---------------------+------+-----------------+-------------------+
|user_id|business_id|review_stars|text|user_average_stars|business_stars|business_review_count|useful|user_review_count|business_categories|
+-------+-----------+------------+----+------------------+--------------+---------------------+------+-----------------+-------------------+
|      0|          0|           0|   0|                 0|             0|                    0|     0|                0|                689|
+-------+-----------+------------+----+------------------+--------------+---------------------+------+-----------------+-------------------+



                                                                                

In [5]:
selected_columns = [
    "user_id", "business_id", "review_stars", "text",
    "user_average_stars", "business_stars", "business_review_count",
    "user_review_count", "business_categories"
]
df_cleaned = yelp_sdf.select(selected_columns)

# Dropping rows with any null values and businesses with low reviews
df_cleaned = df_cleaned.na.drop()
df_cleaned = df_cleaned.filter(df_cleaned.business_review_count >= 20)
df_cleaned = df_cleaned.filter(df_cleaned.user_review_count != 0)

df_cleaned.printSchema()
df_cleaned.count()
df_cleaned.show()

root
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- review_stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- user_average_stars: double (nullable = true)
 |-- business_stars: double (nullable = true)
 |-- business_review_count: long (nullable = true)
 |-- user_review_count: long (nullable = true)
 |-- business_categories: string (nullable = true)





+--------------------+--------------------+------------+--------------------+------------------+--------------+---------------------+-----------------+--------------------+
|             user_id|         business_id|review_stars|                text|user_average_stars|business_stars|business_review_count|user_review_count| business_categories|
+--------------------+--------------------+------------+--------------------+------------------+--------------+---------------------+-----------------+--------------------+
|--3Hl2oAvTPlq-f7K...|BD1FU6xsYPtbQZ8pX...|         2.0|The service and f...|              2.73|           4.0|                  539|               11|Indian, Restauran...|
|--3Hl2oAvTPlq-f7K...|hodLyDkWXAosXLSTK...|         5.0|This is a very we...|              2.73|           4.5|                  243|               11|Restaurants, Indi...|
|--3Hl2oAvTPlq-f7K...|h0wHeh0hTt6Us4W-1...|         5.0|This is a very we...|              2.73|           4.0|                  583|  

                                                                                

In [6]:
from pyspark.sql.functions import countDistinct
# Counting unique business and user IDs
unique_counts = df_cleaned.agg(
    countDistinct("business_id").alias("unique_businesses"),
    countDistinct("user_id").alias("unique_users")
)
unique_counts.show()



+-----------------+------------+
|unique_businesses|unique_users|
+-----------------+------------+
|            61919|     1805451|
+-----------------+------------+



                                                                                

In [9]:
#dropping because low variance and category is a good enough placeholder to avoid overfitting
df_cleaned = df_cleaned.drop('business_id')

In [8]:
from pyspark.sql.functions import min, max, avg, stddev, round

# Review stars statistics
df_review_stars_stats = df_cleaned.select(
    min("review_stars").alias("min_review_stars"),
    max("review_stars").alias("max_review_stars"),
    round(avg("review_stars"), 2).alias("avg_review_stars"),
    round(stddev("review_stars"), 2).alias("stddev_review_stars")
)
print("Review Stars Statistics:")
df_review_stars_stats.show()

# User average stars statistics
df_user_average_stars_stats = df_cleaned.select(
    min("user_average_stars").alias("min_user_average_stars"),
    max("user_average_stars").alias("max_user_average_stars"),
    round(avg("user_average_stars"), 2).alias("avg_user_average_stars"),
    round(stddev("user_average_stars"), 2).alias("stddev_user_average_stars")
)
print("User Average Stars Statistics:")
df_user_average_stars_stats.show()

# Business stars statistics
df_business_stars_stats = df_cleaned.select(
    min("business_stars").alias("min_business_stars"),
    max("business_stars").alias("max_business_stars"),
    round(avg("business_stars"), 2).alias("avg_business_stars"),
    round(stddev("business_stars"), 2).alias("stddev_business_stars")
)
print("Business Stars Statistics:")
df_business_stars_stats.show()

# Business review count statistics
df_business_review_count_stats = df_cleaned.select(
    min("business_review_count").alias("min_business_review_count"),
    max("business_review_count").alias("max_business_review_count"),
    round(avg("business_review_count"), 2).alias("avg_business_review_count"),
    round(stddev("business_review_count"), 2).alias("stddev_business_review_count")
)
print("Business Review Count Statistics:")
df_business_review_count_stats.show()

# User review count statistics
df_user_review_count_stats = df_cleaned.select(
    min("user_review_count").alias("min_user_review_count"),
    max("user_review_count").alias("max_user_review_count"),
    round(avg("user_review_count"), 2).alias("avg_user_review_count"),
    round(stddev("user_review_count"), 2).alias("stddev_user_review_count")
)
print("User Review Count Statistics:")
df_user_review_count_stats.show()

Review Stars Statistics:


                                                                                

+----------------+----------------+----------------+-------------------+
|min_review_stars|max_review_stars|avg_review_stars|stddev_review_stars|
+----------------+----------------+----------------+-------------------+
|             1.0|             5.0|            3.78|               1.44|
+----------------+----------------+----------------+-------------------+

User Average Stars Statistics:


                                                                                

+----------------------+----------------------+----------------------+-------------------------+
|min_user_average_stars|max_user_average_stars|avg_user_average_stars|stddev_user_average_stars|
+----------------------+----------------------+----------------------+-------------------------+
|                   1.0|                   5.0|                  3.77|                     0.83|
+----------------------+----------------------+----------------------+-------------------------+

Business Stars Statistics:


                                                                                

+------------------+------------------+------------------+---------------------+
|min_business_stars|max_business_stars|avg_business_stars|stddev_business_stars|
+------------------+------------------+------------------+---------------------+
|               1.0|               5.0|              3.78|                  0.7|
+------------------+------------------+------------------+---------------------+

Business Review Count Statistics:


                                                                                

+-------------------------+-------------------------+-------------------------+----------------------------+
|min_business_review_count|max_business_review_count|avg_business_review_count|stddev_business_review_count|
+-------------------------+-------------------------+-------------------------+----------------------------+
|                       20|                     7568|                   420.44|                       773.6|
+-------------------------+-------------------------+-------------------------+----------------------------+

User Review Count Statistics:




+---------------------+---------------------+---------------------+------------------------+
|min_user_review_count|max_user_review_count|avg_user_review_count|stddev_user_review_count|
+---------------------+---------------------+---------------------+------------------------+
|                    1|                17473|               124.09|                  354.43|
+---------------------+---------------------+---------------------+------------------------+



                                                                                

In [10]:
from pyspark.sql.functions import length

# Calculate the min and max length of text reviews
text_length_stats = df_cleaned.withColumn("text_length", length("text")).select(
    min("text_length").alias("min_text_length"),
    max("text_length").alias("max_text_length")
)

# Displaying the text length statistics
text_length_stats.show()



+---------------+---------------+
|min_text_length|max_text_length|
+---------------+---------------+
|              1|           5000|
+---------------+---------------+



                                                                                

In [11]:
# To parquet
output_path = "gs://my-bigdata-project-cf/cleaned/clean_df.parquet"

# Write the DataFrame to Parquet format
df_cleaned.write.mode("overwrite").parquet(output_path)

                                                                                