Project 1


In [0]:
"""
Project 1 
@Author: Sammi Jiang 
This project demonstrated an SCD type II procedure from raw data -> bronze ->silver, and a couple of analyzing cases are provided 
- For demo purpose, only two files are read and processed here 
    - yelp_academic_dataset_business
    - yelp_academic_dataset_review
"""

# Import session 
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os   # Add import for os to use os.path.exists()
raw_biz_path = "dbfs:/FileStore/tables/yelp/yelp_academic_dataset_business.json"
raw_review_path = "dbfs:/FileStore/tables/yelp/yelp_academic_dataset_review_*.json"
DATE_FORMAT = "yyyy-MM-dd"
EOW_DATE = "9999-12-31"
SOURCE_PATH = "FileStore/tables/yelp/current_data.json"
DEST_PATH = "FileStore/tables/SCD/archive.json"

def read_data(biz, review):
    """
    Processes business and review data from JSON files and returns them as dataframes.

    @param biz: str
        Path to the business JSON file containing information like address, ID, categories, etc.
    
    @param review: str
        Path to the review JSON file containing review data.

    @return: tuple
        A tuple of processed business DataFrame and unprocessed review DataFrame.

    @description:
        Reads and processes the business JSON file, flattens attributes, and explodes categories into separate rows.
        Reads the review JSON file without processing.
    """
    df_biz = spark.read.option("header", "true").json(biz)
    attr_cols = [col("attributes." + attr).alias(attr) for attr in df_biz.select("attributes.*").columns]

# Select other columns and attributes columns together
    df_biz = df_biz.select("address", "business_id", "categories", "city", "hours", "is_open", 
                            "latitude", "longitude", "name", "postal_code", "review_count", 
                            "stars", "state", *attr_cols)

# Split the categories string into an array
    df_biz = df_biz.withColumn("categories", split(col("categories"), ", "))

# Explode the categories array into multiple rows
    df_biz = df_biz.withColumn("category", explode(col("categories")))

# Drop the original categories column if needed
    df_biz = df_biz.drop("categories")
    df_review = spark.read.option("header", "true").json(review)

    return df_biz, df_review

def create_current_raw(df_biz, df_review):
    """
    Joins and cleans business and review data.

    @param df_biz: DataFrame
        Processed business data.
    
    @param df_review: DataFrame
        Unprocessed review data.

    @return: DataFrame
        Cleaned, joined data with selected columns, distinct entries, and no null values.

    @description:
        Joins business and review data on business ID, selects relevant columns, and removes duplicate and null entries.
    """
    view = df_review.join(df_biz, df_review['business_id'] == df_biz['business_id'], 'right')
    view = view.select(
        df_biz["business_id"],
        view['review_count'],
        view["review_id"],
        view["text"],
        view['date'],
        view['state'],
        view['city'],
        view['name'],
        view['category']
    ).distinct().dropna()

    return view

def raw_to_bronze(view):
    """
    Transforms raw data to a bronze-level dataset with added metadata and limited sample size.

    @param view: DataFrame
        The raw, joined data of business and review information.

    @return: DataFrame
        A bronze-level data enriched with effective and expiration dates, and a current flag, limited to 10,000 rows.

    @description:
        Enriches the raw data with metadata including the effective date, expiration date, and a flag indicating the 
        current data. For demo purposes, the dataset is limited to 10,000 rows to enhance processing speed.
    """
    df_current = view \
        .withColumn("effective_date", date_format(current_date(), DATE_FORMAT)) \
        .withColumn("expiration_date", date_format(lit(EOW_DATE), DATE_FORMAT)) \
        .withColumn("current_flag", lit(True)) \
        .limit(10000)

    return df_current

def bronze_to_silver():
    """
    Transforms the bronze data to silver by handling updates and inserts and combining them with historical data.

    @return: DataFrame
        A silver-level dataset updated with the current data and enriched with historical records.

    @description:
        Identifies updated and inserted records from the current data, updates historical records accordingly, and
        combines them to create a comprehensive, up-to-date silver-level dataset.
    """
    df_current = spark.read.option("header", "true").json("dbfs:/FileStore/tables/yelp/current_data.json")
    historical_data = spark.read.option("header", "true").json("dbfs:/FileStore/tables/SCD/archive.json")
        

    columns_to_compare = ["name"]

    # Create condition for identifying updates
    condition = ' OR '.join(
        [f'(a.{col} != b.{col} OR (a.{col} IS NULL AND b.{col} IS NOT NULL) OR (a.{col} IS NOT NULL AND b.{col} IS NULL))'
        for col in columns_to_compare]
    )

    # Identify updated and inserted records
    updates = df_current.alias("a").join(historical_data.alias("b"), "business_id", how="inner").filter(F.expr(condition))
    inserts = df_current.alias("a").join(historical_data.alias("b"), "business_id", how="left_anti")

    # Collect updated keys - Optimized to avoid RDD
    updated_keys = [row.business_id for row in updates.select("business_id").distinct().collect()]

    # Handle updates in the historical data
    historical_data = historical_data.withColumn(
        "current_flag", 
        F.when(F.col("business_id").isin(updated_keys), F.lit(False)).otherwise(F.col("current_flag"))
    ).withColumn(
        "expiration_date", 
        F.when(F.col("business_id").isin(updated_keys), F.date_sub(F.current_date(), 1)).otherwise(F.col("expiration_date"))
    )

    # Prepare updated records - Selecting columns explicitly to avoid ambiguity
    updates = updates.select([F.col(f'a.{col}').alias(col) for col in df_current.columns]) \
        .withColumn("effective_date", F.current_date()) \
        .withColumn("expiration_date", F.lit(EOW_DATE)) \
        .withColumn("current_flag", F.lit(True))

    # Prepare new records
    inserts = inserts.withColumn("effective_date", F.current_date()) \
        .withColumn("expiration_date", F.lit(EOW_DATE)) \
        .withColumn("current_flag", F.lit(True))

    # Union updated and new records with historical data
    final_df = historical_data.union(updates.select(*historical_data.columns)).union(inserts.select(*historical_data.columns))


    return final_df

    # ... (previous parts of the script)

def main():
    spark = SparkSession.builder \
        .master("local") \
        .appName("test") \
        .config("test", "somevalue") \
        .getOrCreate()



    df_biz, df_review = read_data(raw_biz_path, raw_review_path)
    view = create_current_raw(df_biz, df_review)
    df_current = raw_to_bronze(view)

    # Write df_current to the source path
    df_current.write \
        .format("json") \
        .mode("overwrite") \
        .save(SOURCE_PATH)

    if not os.path.exists(DEST_PATH):
        df_current.write \
            .format("json") \
            .mode("overwrite") \
            .save(DEST_PATH)

    df_final = bronze_to_silver()
    df_final.write \
        .format("json") \
        .mode("overwrite") \
        .save(DEST_PATH)

if __name__ == "__main__":
    main()

"""
+--------------------+-------------+------------+-------------------+--------------+---------------+--------------------+------------+--------------------+-----+--------------------+
|         business_id|         city|current_flag|               date|effective_date|expiration_date|                name|review_count|           review_id|state|                text|
+--------------------+-------------+------------+-------------------+--------------+---------------+--------------------+------------+--------------------+-----+--------------------+
|GILL0ZkvVXJaNQzYR...|  New Orleans|        true|2012-10-08 19:32:44|    2023-10-12|     9999-12-31|Paris Parker Salo...|         119|cejwyz1Hn1yQ-v2m9...|   LA|I absolutely love...|
|C7ZNfxgDQWinNzRfT...| Philadelphia|        true|2014-03-14 22:11:21|    2023-10-12|     9999-12-31|The Men's Club Ba...|          62|1sZkJCitG9-XSlZQJ...|   PA|Men's clubs is th...|
|bMVgakLiCEA6LWRUE...|         Reno|        true|2015-05-12 17:42:56|    2023-10-12|     9999-12-31|  Express Smog Check|          37|SFy1mkQaiYFrWqoeB...|   NV|I love coming her...|
|dIpTuL2T1L98GQGU1...|        Tampa|        true|2012-03-02 03:16:56|    2023-10-12|     9999-12-31|          The Bricks|         548|dFgGps6gQQbWoKOR6...|   FL|My friend and I v...|
|WJ17PIWEaqrNZ07un...|    Sahuarita|        true|2018-07-17 08:37:06|    2023-10-12|     9999-12-31|Green Valley Mort...|           5|NU4_03KfndoUcE8mD...|   AZ|The only reason I...|
|USekrAG0-4tJUs9V2...|      Dunedin|        true|2016-10-14 01:12:42|    2023-10-12|     9999-12-31|            The Honu|         234|lkdzFSsG-W1v7pP3n...|   FL|Stopped in tonigh...|
|IDtLPgUrqorrpqSLd...|Santa Barbara|        true|2018-07-17 00:20:41|    2023-10-12|     9999-12-31|Helena Avenue Bakery|         389|Q3fPo_x6xKxafAzy1...|   CA|Pricey ( a ham an...|
|MHqeqoJEjsTHivmuX...|  Saint Louis|        true|2013-12-29 01:22:34|    2023-10-12|     9999-12-31|       3500 Winehaus|          40|IrQyuZfiMKRkh3V-0...|   MO|Really great plac...|
|9uHEhLKTVXATCt7JS...| Philadelphia|        true|2010-07-07 16:58:43|    2023-10-12|     9999-12-31|    John's Water Ice|         274|biClS4h8ZNYEVrlc7...|   PA|Calories and diab...|
|tP8wd-9CrI_RkHK42...|       Tucson|        true|2013-01-27 23:05:05|    2023-10-12|     9999-12-31|        Panera Bread|         128|iFRIf-_U8cIW0rC5t...|   AZ|They call a chick...|
|OHzX-ZD9qyoeoxR8Z...| Philadelphia|        true|2014-07-28 00:27:44|    2023-10-12|     9999-12-31|     Khyber Pass Pub|         845|1V0bzX70ZuP7KJXIJ...|   PA|Had a wonderful l...|
|sr-5EY6bmp4jINhea...| Indianapolis|        true|2017-04-08 01:47:10|    2023-10-12|     9999-12-31|The Cake Bake Sho...|         996|8bGJJrY9zhMpKqIkm...|   IN|The experience is...|
|_RwlMTw9uFeOkfX9C...| Philadelphia|        true|2014-01-17 07:22:52|    2023-10-12|     9999-12-31|Dinardo's Famous ...|         256|rihdCbit6O5sGOV9I...|   PA|Thankfully I had ...|
|ADgeB1sfOGbzCR3LI...|  New Orleans|        true|2014-02-05 22:30:59|    2023-10-12|     9999-12-31|      Cafe Reconcile|         152|slE0DDjPefSStdL4D...|   LA|Went to lunch tod...|
|encfXG_jrG1M6gX1I...|        Tampa|        true|2016-11-10 14:28:28|    2023-10-12|     9999-12-31|      Ballyhoo Grill|         262|_nB4sSn0ytRkNEOol...|   FL|Pretty good house...|
|50MNOCMK3OxqR_VL3...| Philadelphia|        true|2013-10-24 15:44:32|    2023-10-12|     9999-12-31|        Sprint Store|          15|q3MLkJC3KI1IO-t6G...|   PA|I went to this sp...|
|mUIBtlWNPD7sz3rGG...| Philadelphia|        true|2008-12-05 14:16:05|    2023-10-12|     9999-12-31|             Kanella|         579|-nwOShwCyytQDHrq9...|   PA|Great for lunch! ...|
|vOgQnvKbE4nMopFTj...|    Nashville|        true|2011-02-13 19:46:20|    2023-10-12|     9999-12-31|Watermark Restaurant|         296|lonLaQxyruKIVuxQ6...|   TN|MY favorite place...|
|hEMNTKjcnhHxw9NRp...|   Norristown|        true|2014-12-17 18:20:29|    2023-10-12|     9999-12-31|       Beagle Tavern|          15|kwUvMHrp0ZZ1PqLS-...|   PA|My days as the du...|
|0pL-eq0ufX5jMDgVA...| Sicklerville|        true|2017-05-13 23:29:39|    2023-10-12|     9999-12-31|              Sakura|          73|Xe__9plcT4VI1oAfv...|   NJ|2nd time here on ...|
+--------------------+-------------+------------+-------------------+--------------+---------------+--------------------+------------+--------------------+-----+--------------------+
only showing top 20 rows
"""



"\n+--------------------+-------------+------------+-------------------+--------------+---------------+--------------------+------------+--------------------+-----+--------------------+\n|         business_id|         city|current_flag|               date|effective_date|expiration_date|                name|review_count|           review_id|state|                text|\n+--------------------+-------------+------------+-------------------+--------------+---------------+--------------------+------------+--------------------+-----+--------------------+\n|GILL0ZkvVXJaNQzYR...|  New Orleans|        true|2012-10-08 19:32:44|    2023-10-12|     9999-12-31|Paris Parker Salo...|         119|cejwyz1Hn1yQ-v2m9...|   LA|I absolutely love...|\n|C7ZNfxgDQWinNzRfT...| Philadelphia|        true|2014-03-14 22:11:21|    2023-10-12|     9999-12-31|The Men's Club Ba...|          62|1sZkJCitG9-XSlZQJ...|   PA|Men's clubs is th...|\n|bMVgakLiCEA6LWRUE...|         Reno|        true|2015-05-12 17:42:56|    2023