In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

In [0]:
spark = SparkSession.builder.getOrCreate()

## Read Yelp data

In [0]:
business_df = spark.read.format("json").load("dbfs:/FileStore/shared_uploads/sparklesparkle1102@outlook.com/yelp_academic_dataset_business.json")
tip_df = spark.read.format("json").load("dbfs:/FileStore/shared_uploads/sparklesparkle1102@outlook.com/yelp_academic_dataset_tip.json")
checkin_df = spark.read.format("json").load("dbfs:/FileStore/shared_uploads/sparklesparkle1102@outlook.com/yelp_academic_dataset_checkin.json")
review_df = spark.read.parquet("dbfs:/FileStore/shared_uploads/sparklesparkle1102@outlook.com/review_parquet")
user_df = spark.read.parquet("dbfs:/FileStore/shared_uploads/sparklesparkle1102@outlook.com/user.parquet")

## Check data schema

In [0]:
business_df.printSchema()
checkin_df.printSchema()
tip_df.printSchema()
review_df.printSchema()
user_df.printSchema()

root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- GoodForDancing: str

## Select columns requested by DA team

In [0]:
business_df_final = business_df.select("business_id", "city", "state", "categories", "review_count")
review_df_final = review_df.select("review_id", "business_id", "user_id", "date")
business_df_final.show()
review_df_final.show()
#business_df.select("attributes.*").display()
#business_df.select("hours.*").display()

+--------------------+--------------+-----+--------------------+------------+
|         business_id|          city|state|          categories|review_count|
+--------------------+--------------+-----+--------------------+------------+
|Pns2l4eNsfO8kk83d...| Santa Barbara|   CA|Doctors, Traditio...|           7|
|mpf3x-BjTdTEA3yCZ...|        Affton|   MO|Shipping Centers,...|          15|
|tUFrWirKiKi_TAnsV...|        Tucson|   AZ|Department Stores...|          22|
|MTSW4McQd7CbVtyjq...|  Philadelphia|   PA|Restaurants, Food...|          80|
|mWMc6_wTdE0EUBKIG...|    Green Lane|   PA|Brewpubs, Breweri...|          13|
|CF33F8-E6oudUQ46H...|  Ashland City|   TN|Burgers, Fast Foo...|           6|
|n_0UpQx1hsNbnPUSl...|     Brentwood|   MO|Sporting Goods, F...|          13|
|qkRM_2X51Yqxk3btl...|St. Petersburg|   FL|Synagogues, Relig...|           5|
|k0hlBqXX-Bt0vf1op...|        Affton|   MO|Pubs, Restaurants...|          19|
|bBDDEgkFA1Otx9Lfe...|     Nashville|   TN|Ice Cream & Froze...|

In [0]:
business_df_final.display()

business_id,city,state,categories,review_count
Pns2l4eNsfO8kk83dixA6A,Santa Barbara,CA,"Doctors, Traditional Chinese Medicine, Naturopathic/Holistic, Acupuncture, Health & Medical, Nutritionists",7
mpf3x-BjTdTEA3yCZrAYPw,Affton,MO,"Shipping Centers, Local Services, Notaries, Mailbox Centers, Printing Services",15
tUFrWirKiKi_TAnsVWINQQ,Tucson,AZ,"Department Stores, Shopping, Fashion, Home & Garden, Electronics, Furniture Stores",22
MTSW4McQd7CbVtyjqoe9mw,Philadelphia,PA,"Restaurants, Food, Bubble Tea, Coffee & Tea, Bakeries",80
mWMc6_wTdE0EUBKIGXDVfA,Green Lane,PA,"Brewpubs, Breweries, Food",13
CF33F8-E6oudUQ46HnavjQ,Ashland City,TN,"Burgers, Fast Food, Sandwiches, Food, Ice Cream & Frozen Yogurt, Restaurants",6
n_0UpQx1hsNbnPUSlodU8w,Brentwood,MO,"Sporting Goods, Fashion, Shoe Stores, Shopping, Sports Wear, Accessories",13
qkRM_2X51Yqxk3btlwAQIg,St. Petersburg,FL,"Synagogues, Religious Organizations",5
k0hlBqXX-Bt0vf1op7Jr1w,Affton,MO,"Pubs, Restaurants, Italian, Bars, American (Traditional), Nightlife, Greek",19
bBDDEgkFA1Otx9Lfe7BZUQ,Nashville,TN,"Ice Cream & Frozen Yogurt, Fast Food, Burgers, Restaurants, Food",10


## Drop duplicated rows in business table

In [0]:
# Drop dulicated rows
business_df_final_ = business_df_final.dropDuplicates()
review_df_final_ = review_df_final.dropDuplicates()

business_df_final_.show()
review_df_final_.show()


+--------------------+--------------+-----+--------------------+------------+
|         business_id|          city|state|          categories|review_count|
+--------------------+--------------+-----+--------------------+------------+
|jaxMSoInw8Poo3XeM...|    Clearwater|   FL|General Dentistry...|          10|
|WKMJwqnfZKsAae75R...|      Edmonton|   AB|Coffee & Tea, Foo...|          40|
|ROeacJQwBeh05Rqg7...|  Philadelphia|   PA| Korean, Restaurants|         205|
|k0hlBqXX-Bt0vf1op...|        Affton|   MO|Pubs, Restaurants...|          19|
|CF33F8-E6oudUQ46H...|  Ashland City|   TN|Burgers, Fast Foo...|           6|
|bBDDEgkFA1Otx9Lfe...|     Nashville|   TN|Ice Cream & Froze...|          10|
|rBmpy_Y1UbBx8ggHl...|        Tucson|   AZ|Automotive, Auto ...|          10|
|il_Ro8jwPlHresjw9...|  Indianapolis|   IN|American (Traditi...|          28|
|0bPLkL0QhhPO5kt1_...|         Largo|   FL|Food, Delis, Ital...|         100|
|UJsufbvfyfONHeWdv...| Land O' Lakes|   FL|Department Stores...|

## Check duplicates in prime key

In [0]:
# Group by the primary key column(s) and count the occurrences
duplicate_rows_business = business_df_final_.groupBy('business_id').count().filter(col('count') > 1)
duplicate_rows_review = review_df_final_.groupBy('review_id').count().filter(col('count') > 1)

# Check if there are any duplicate rows in the primary key
if duplicate_rows_business.count() > 0:
    print("Duplicate rows found in the primary key: business_id.")
    duplicate_rows_business.show()
elif duplicate_rows_review.count() > 0:
    print("Duplicate rows found in the primary key: review_id.")
    duplicate_rows_review.show()
else:
    print("No duplicate rows found in the primary key.")

No duplicate rows found in the primary key.


## Check Null values in Business and Review

In [0]:
# Calculate the sum of null values in each column
null_counts_business = business_df_final_.select([sum(col(column).isNull().cast("integer")).alias(column) for column in business_df_final_.columns])
null_counts_review = review_df_final_.select([sum(col(column).isNull().cast("integer")).alias(column) for column in review_df_final_.columns])

# Show the column-wise null value counts
null_counts_business.show()
null_counts_review.show()

+-----------+----+-----+----------+------------+
|business_id|city|state|categories|review_count|
+-----------+----+-----+----------+------------+
|          0|   0|    0|       103|           0|
+-----------+----+-----+----------+------------+

+---------+-----------+-------+----+
|review_id|business_id|user_id|date|
+---------+-----------+-------+----+
|        0|          0|      0|   0|
+---------+-----------+-------+----+



## Fill the Null Values with "N/A"

In [0]:
# Get the columns name with Null values
subset_business = [column for column in null_counts_business.columns if null_counts_business.filter(col(column).isNotNull() & (col(column) != 0)).count() > 0]
subset_review = [column for column in null_counts_review.columns if null_counts_review.filter(col(column).isNotNull() & (col(column) != 0)).count() > 0]

# Fill null values with a default value
business_df_final_filled = business_df_final_.fillna('N/A', subset=subset_business)
review_df_final_filled = review_df_final_.fillna('N/A', subset=subset_review)

# Show the DataFrame with filled null values
business_df_final_filled.show()
review_df_final_filled.show()

+--------------------+--------------+-----+--------------------+------------+
|         business_id|          city|state|          categories|review_count|
+--------------------+--------------+-----+--------------------+------------+
|jaxMSoInw8Poo3XeM...|    Clearwater|   FL|General Dentistry...|          10|
|WKMJwqnfZKsAae75R...|      Edmonton|   AB|Coffee & Tea, Foo...|          40|
|ROeacJQwBeh05Rqg7...|  Philadelphia|   PA| Korean, Restaurants|         205|
|k0hlBqXX-Bt0vf1op...|        Affton|   MO|Pubs, Restaurants...|          19|
|CF33F8-E6oudUQ46H...|  Ashland City|   TN|Burgers, Fast Foo...|           6|
|bBDDEgkFA1Otx9Lfe...|     Nashville|   TN|Ice Cream & Froze...|          10|
|rBmpy_Y1UbBx8ggHl...|        Tucson|   AZ|Automotive, Auto ...|          10|
|il_Ro8jwPlHresjw9...|  Indianapolis|   IN|American (Traditi...|          28|
|0bPLkL0QhhPO5kt1_...|         Largo|   FL|Food, Delis, Ital...|         100|
|UJsufbvfyfONHeWdv...| Land O' Lakes|   FL|Department Stores...|

##Check if there are negtive values in business table

In [0]:
# Specify the name of the column to check
column_name = "review_count"

# Check if there are negative values in the specified column
has_negative_values = business_df_final_filled.filter(col(column_name).isNotNull() & (col(column_name) < 0)).count() > 0

# Print the result
if has_negative_values:
    print("The column '{}' contains negative values.".format(column_name))
else:
    print("The column '{}' does not contain negative values.".format(column_name))

The column 'review_count' does not contain negative values.


In [0]:
%fs ls dbfs:/FileStore/shared_uploads/sparklesparkle1102@outlook.com/yelp

path,name,size,modificationTime
dbfs:/FileStore/shared_uploads/sparklesparkle1102@outlook.com/yelp/_SUCCESS,_SUCCESS,0,1688628565000
dbfs:/FileStore/shared_uploads/sparklesparkle1102@outlook.com/yelp/_committed_1173459090301467925,_committed_1173459090301467925,2724,1688628565000
dbfs:/FileStore/shared_uploads/sparklesparkle1102@outlook.com/yelp/_started_1173459090301467925,_started_1173459090301467925,0,1688628538000
dbfs:/FileStore/shared_uploads/sparklesparkle1102@outlook.com/yelp/part-00000-tid-1173459090301467925-0a91ed2a-cb83-48a9-974c-f874244228ff-280-1-c000.json,part-00000-tid-1173459090301467925-0a91ed2a-cb83-48a9-974c-f874244228ff-280-1-c000.json,134188930,1688628542000
dbfs:/FileStore/shared_uploads/sparklesparkle1102@outlook.com/yelp/part-00001-tid-1173459090301467925-0a91ed2a-cb83-48a9-974c-f874244228ff-281-1-c000.json,part-00001-tid-1173459090301467925-0a91ed2a-cb83-48a9-974c-f874244228ff-281-1-c000.json,134190147,1688628542000
dbfs:/FileStore/shared_uploads/sparklesparkle1102@outlook.com/yelp/part-00002-tid-1173459090301467925-0a91ed2a-cb83-48a9-974c-f874244228ff-282-1-c000.json,part-00002-tid-1173459090301467925-0a91ed2a-cb83-48a9-974c-f874244228ff-282-1-c000.json,134190242,1688628542000
dbfs:/FileStore/shared_uploads/sparklesparkle1102@outlook.com/yelp/part-00003-tid-1173459090301467925-0a91ed2a-cb83-48a9-974c-f874244228ff-283-1-c000.json,part-00003-tid-1173459090301467925-0a91ed2a-cb83-48a9-974c-f874244228ff-283-1-c000.json,134189627,1688628542000
dbfs:/FileStore/shared_uploads/sparklesparkle1102@outlook.com/yelp/part-00004-tid-1173459090301467925-0a91ed2a-cb83-48a9-974c-f874244228ff-284-1-c000.json,part-00004-tid-1173459090301467925-0a91ed2a-cb83-48a9-974c-f874244228ff-284-1-c000.json,134189177,1688628546000
dbfs:/FileStore/shared_uploads/sparklesparkle1102@outlook.com/yelp/part-00005-tid-1173459090301467925-0a91ed2a-cb83-48a9-974c-f874244228ff-285-1-c000.json,part-00005-tid-1173459090301467925-0a91ed2a-cb83-48a9-974c-f874244228ff-285-1-c000.json,134191084,1688628546000
dbfs:/FileStore/shared_uploads/sparklesparkle1102@outlook.com/yelp/part-00006-tid-1173459090301467925-0a91ed2a-cb83-48a9-974c-f874244228ff-286-1-c000.json,part-00006-tid-1173459090301467925-0a91ed2a-cb83-48a9-974c-f874244228ff-286-1-c000.json,134191657,1688628546000


## Function raw_to_bronze

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

def read_and_load_to_bronze(input_file_1, input_file_2, bronze_table_name_1, bronze_table_name_2):
    # Create a SparkSession
    spark = SparkSession.builder.getOrCreate()

    # Read the raw data from the input file into a DataFrame
    business_df = spark.read.format("json").load(input_file_1)
    #tip_df = spark.read.format("json").load("dbfs:/FileStore/shared_uploads/sparklesparkle1102@outlook.com/yelp_academic_dataset_tip.json")
    #checkin_df = spark.read.format("json").load("dbfs:/FileStore/shared_uploads/sparklesparkle1102@outlook.com/yelp_academic_dataset_checkin.json")
    review_df = spark.read.parquet(input_file_2)
    #user_df = spark.read.parquet("dbfs:/FileStore/shared_uploads/sparklesparkle1102@outlook.com/user.parquet")

    # Perform any necessary transformations or cleaning on the raw_data DataFrame

        
        #1. Select all required columns for DA team
        
        business_df_final = business_df.select("business_id", "city", "state", "categories", "review_count")
        review_df_final = review_df.select("review_id", "business_id", "user_id", "date")

        
        #2. Check duplicates in prime key
        
        # Group by the primary key column(s) and count the occurrences
        duplicate_rows_business = business_df_final_.groupBy('business_id').count().filter(col('count') > 1)
        duplicate_rows_review = review_df_final_.groupBy('review_id').count().filter(col('count') > 1)

        # Check if there are any duplicate rows in the primary key
        if duplicate_rows_business.count() > 0:
            print("Duplicate rows found in the primary key: business_id.")
            duplicate_rows_business.show()
        elif duplicate_rows_review.count() > 0:
            print("Duplicate rows found in the primary key: review_id.")
            duplicate_rows_review.show()
        else:
            print("No duplicate rows found in the primary key.")
        
        
        #3. Check Null values
        # Calculate the sum of null values in each column
        null_counts_business = business_df_final_.select([sum(col(column).isNull().cast("integer")).alias(column) for column in business_df_final_.columns])
        null_counts_review = review_df_final_.select([sum(col(column).isNull().cast("integer")).alias(column) for column in review_df_final_.columns])

        # Show the column-wise null value counts
        null_counts_business.show()
        null_counts_review.show() 

        
        #4. Fill the Null Values with a default value
        # Get the columns name with Null values
        subset_business = [column for column in null_counts_business.columns if null_counts_business.filter(col(column).isNotNull() & (col(column) != 0)).count() > 0]
        subset_review = [column for column in null_counts_review.columns if null_counts_review.filter(col(column).isNotNull() & (col(column) != 0)).count() > 0]

        # Fill null values with a default value
        business_df_final_filled = business_df_final_.fillna('N/A', subset=subset_business)
        review_df_final_filled = review_df_final_.fillna('N/A', subset=subset_review)

        
        #5. Check if there are negtive values in business table
        # Specify the name of the column to check
        column_name = "review_count"

        # Check if there are negative values in the specified column
        has_negative_values = business_df_final_filled.filter(col(column_name).isNotNull() & (col(column_name) < 0)).count() > 0

        # Print the result
        if has_negative_values:
            print("The column '{}' contains negative values.".format(column_name))
        else:
            print("The column '{}' does not contain negative values.".format(column_name))

    # Write the DataFrame to the bronze table
    business_df_final_filled.mode("append").saveAsTable(bronze_table_name_1)
    review_df_final_filled.mode("append").saveAsTable(bronze_table_name_2)

# Example usage:
input_file_1 = "dbfs:/FileStore/shared_uploads/sparklesparkle1102@outlook.com/yelp_academic_dataset_business.json"
input_file_2 = "dbfs:/FileStore/shared_uploads/sparklesparkle1102@outlook.com/review_parquet"
bronze_table_name_1 = "business_bronze"
bronze_table_name_2 = "review_bronze"

read_and_load_to_bronze(input_file_1, input_file_2, bronze_table_name_1, bronze_table_name_2)

## Create a mapping relation ship between business and each category

In [0]:
from pyspark.sql.functions import split, explode

# Split the categories column by comma delimiter and explode the resulting array
business_category_df = business_df_final_filled.selectExpr('business_id', "explode(split({}, ',')) AS category".format('categories'))

# Show the business category table
business_category_df.show()

+--------------------+-------------------+
|         business_id|           category|
+--------------------+-------------------+
|cVBxfMC4lp3DnocjY...|       Coffee & Tea|
|cVBxfMC4lp3DnocjY...|              Cafes|
|cVBxfMC4lp3DnocjY...|               Pets|
|cVBxfMC4lp3DnocjY...|        Restaurants|
|cVBxfMC4lp3DnocjY...|       Pet Adoption|
|cVBxfMC4lp3DnocjY...|               Food|
|1P_mGUY1PyPq7_Zab...|            Italian|
|1P_mGUY1PyPq7_Zab...|        Restaurants|
|1P_mGUY1PyPq7_Zab...|        Steakhouses|
|LdQUIP4_DPnJjq-Jf...|        Auto Repair|
|LdQUIP4_DPnJjq-Jf...|         Automotive|
|LdQUIP4_DPnJjq-Jf...| Auto Customization|
|jfz_-ngCCAbKUzilu...|        Hair Salons|
|jfz_-ngCCAbKUzilu...|      Hair Stylists|
|jfz_-ngCCAbKUzilu...|      Beauty & Spas|
|jfz_-ngCCAbKUzilu...|       Hair Removal|
|jfz_-ngCCAbKUzilu...|    Eyelash Service|
|jfz_-ngCCAbKUzilu...|        Nail Salons|
|rrdSkNz4YG6eDgOq3...|         Automotive|
|rrdSkNz4YG6eDgOq3...|        Car Dealers|
+----------

## Create prime key for the mapping table

In [0]:
%sql
-- Assuming `table_name` is the name of the table and `column_name` is the column to be set as the primary key

-- Add primary key constraint
ALTER TABLE business_category_df
ADD CONSTRAINT pk_constraint_name PRIMARY KEY (column_name);


## Function bronze to silver

I do want to complete the function but I can't finish all before the ddl. 
I plan to turn bronze table to silver with the following steps:
<br>
1. create a category table which shows the mapping relationship between business_id and category
  <br>This table contains the following columns: 
  <br>business_id:string
  <br>category:string
  <br>prime_key:long
  <br>timestamp:date
2. create a business table
  <br>This table contains the following columns: 
  <br>business_id:string
  <br>city:string
  <br>state:string
  <br>review_count:long
  <br>prime_key:long
  <br>timestamp:date
3. create a review table
  <br>This table contains the following columns:
  <br>review_id:string
  <br>business_id:string
  <br>user_id:string
  <br>date:date
  <br>prime_key:long
  <br>timestamp:date
4. save those tables to database.
5. maybe add a column which help to flag new_entry/edit/delete

In [0]:
from pyspark.sql.functions import col

def transform_bronze_to_silver(bronze_df):
    """
    Function to transform data from the bronze to silver layer.
    
    Args:
        bronze_df (DataFrame): The input DataFrame from the bronze layer.
        
    Returns:
        DataFrame: The transformed DataFrame in the silver layer.
    """
    # Apply necessary transformations and data processing
    silver_df = bronze_df.select(
        col("business_id"),
        col("business_name"),
        col("categories").split(",\s*").alias("categories"),
        col("address"),
        # Add more columns and transformations as needed
    )
    
    # Convert string column to date and time
    silver_df_with_datetime = silver_df.withColumn("datetime", to_timestamp("date_string_column", "yyyy-MM-dd HH:mm:ss"))

    # Add a new column with the current timestamp
    silver_df_with_timestamp = silver_df.withColumn("timestamp", current_timestamp())

    # Perform additional cleaning, validation, or enrichment steps
    
    return silver_df

# Example usage:
bronze_df = spark.read.parquet("bronze_table.parquet")  # Read bronze table
silver_df = transform_bronze_to_silver(bronze_df)  # Transform from bronze to silver
silver_df.write.parquet("silver_table.parquet")  # Write silver table
