In [1]:
import pyspark
from pyspark.sql import functions as F
from pyspark.sql.functions import cast, count, when, col, lit
from pyspark.sql.types import BooleanType, IntegerType
from pyspark.sql.functions import to_timestamp, to_date
from pyspark.sql.functions import size, split

In [2]:
import pandas as pd

In [3]:
spark = SparkSession.builder.getOrCreate()

In [4]:
folder_path = "gs://yelpfrog/landing/yelp_dataset/yelp_academic_dataset_"

# Business Cleaning

In [5]:
business = spark.read.json(f"{folder_path}business.json")

24/10/27 03:56:22 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [6]:
# Fill in missing values in categories column with General Business
business = business.withColumn("categories", when(col("categories").isNull(), "General Business").otherwise(col("categories")))

In [7]:
# Drop unneeded columns
business = business.drop("attributes", "hours", "latitude", "longitude")

In [8]:
# Change the is_open's datatype to Boolean
business = business.withColumn("is_open", col("is_open").cast(BooleanType()))

In [9]:
business.printSchema()

root
 |-- address: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- city: string (nullable = true)
 |-- is_open: boolean (nullable = true)
 |-- name: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- stars: double (nullable = true)
 |-- state: string (nullable = true)



# Checkin Cleaning

In [47]:
checkin = spark.read.json(f"{folder_path}checkin.json")

                                                                                

In [51]:
# Change the date's datatype to timestamp
checkin = checkin.withColumn("date", to_timestamp("date", "yyyy-MM-dd HH:mm:ss"))

In [49]:
checkin.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- date: timestamp (nullable = true)



# Review Cleaning

In [13]:
review = spark.read.json(f"{folder_path}review.json")

                                                                                

In [14]:
# Change the date's datatype to date
review = review.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))

In [15]:
# Add a word_count column
review = review.withColumn("word_count", size(split(col("text"), " ")))

In [16]:
# Drop unneeded columns
# datetime replaces date
review = review.drop("cool", "funny")

In [17]:
review.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- date: date (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- word_count: integer (nullable = false)



# Tip Cleaning

In [18]:
tip = spark.read.json(f"{folder_path}tip.json")

                                                                                

In [19]:
# Change the compliment_count's datatype to Integer
tip = tip.withColumn("compliment_count", col("compliment_count").cast(IntegerType()))

In [20]:
# Change the date's datatype
tip = tip.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))

In [21]:
# Add a word_count column
tip = tip.withColumn("word_count", size(split(col("text"), " ")))

In [22]:
tip.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- compliment_count: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- text: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- word_count: integer (nullable = false)



# User Cleaning

In [None]:
user = spark.read.json(f"{folder_path}user.json")



In [24]:
# Drop unneeded columns
user = user.drop('compliment_cool', 'compliment_cute', 'compliment_funny', 'compliment_hot', 
                 'compliment_list', 'compliment_more', 'compliment_note', 'compliment_photos',
                 'compliment_plain', 'compliment_profile', 'compliment_writer', 'cool', 'elite',
                 'friends', 'funny')

In [25]:
# Change the yelping_since's datatype to date
user = user.withColumn("yelp_since", to_date(col("yelping_since"), "yyyy-MM-dd"))

In [26]:
# Change the columns_to_cast columns's datatype to Integer

columns_to_cast = ["average_stars", "fans", "review_count", "useful"]
user = user.select([col(c).cast(IntegerType()).alias(c) for c in columns_to_cast])

In [27]:
user.printSchema()

root
 |-- average_stars: integer (nullable = true)
 |-- fans: integer (nullable = true)
 |-- review_count: integer (nullable = true)
 |-- useful: integer (nullable = true)



# To Parquet

In [39]:
cleaned_folder="gs://yelpfrog/cleaned/"

In [40]:
cleaned_business = f"{cleaned_folder}cleaned_business.parquet"
cleaned_checkin = f"{cleaned_folder}cleaned_checkin.parquet"
cleaned_review = f"{cleaned_folder}cleaned_review.parquet"
cleaned_tip = f"{cleaned_folder}cleaned_tip.parquet"
cleaned_user = f"{cleaned_folder}cleaned_user.parquet"

In [41]:
business.write.parquet(cleaned_business)

                                                                                

In [53]:
# checkin, review, and tip df have dates and datetimes with extra space after the :
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [54]:
checkin.write.parquet(cleaned_checkin)

                                                                                

In [55]:
review.write.parquet(cleaned_review)

                                                                                

In [56]:
tip.write.parquet(cleaned_tip)

                                                                                

In [57]:
user.write.parquet(cleaned_user)

                                                                                