## ETL RS silver to gold

### Import library

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, StructType, StructField, MapType, DoubleType
from delta.tables import DeltaTable
import ast
import json

### Configuration

In [0]:
# Define database and table names
SILVER_DATABASE = "`bigdata-and-bi`.silver"
GOLD_DATABASE = "`bigdata-and-bi`.gold"

REVIEWS_TABLE = f"{SILVER_DATABASE}.reviews_clean"
ITEMS_TABLE = f"{SILVER_DATABASE}.items_clean"

GOLD_INTERACTIONS_TABLE = f"{GOLD_DATABASE}.star_interactions"
GOLD_ITEMS_TABLE = f"{GOLD_DATABASE}.star_items"

# Checkpoint locations are required for streaming queries
INTERACTIONS_CHECKPOINT = "/Volumes/bigdata-and-bi/gold/star_checkpoints/interactions_silver_to_gold"
ITEMS_CHECKPOINT = "/Volumes/bigdata-and-bi/gold/star_checkpoints/items_silver_to_gold"

K_CORE_VALUE = 3

# Ensure the gold database exists
spark.sql(f"CREATE DATABASE IF NOT EXISTS {GOLD_DATABASE}")

DataFrame[]

### UDFs for Item Processing

In [0]:
# These UDFs parse complex string-formatted columns from the source data

@F.udf(StringType())
def parse_features_udf(features_str: str) -> str:
    """
    Converts 'features' string-list into a single text sentence.
    E.g., "['['feat1', 'feat2']']" -> "feat1. feat2."
    """
    if not features_str or features_str == "[]":
        return ""
    try:
        # First eval: "['[...]']" -> ['[...]']
        outer_list = ast.literal_eval(features_str)
        if not outer_list:
            return ""
        
        # Second eval: "[...]" -> [...]
        inner_list_str = outer_list[0]
        inner_list = ast.literal_eval(inner_list_str)
        
        # Join into sentences
        return ". ".join(inner_list)
    except Exception:
        return "" # Return empty string on parsing error

@F.udf(StringType())
def parse_details_udf(details_str: str) -> str:
    """
    Extracts key fields from the 'details' dict-string
    and formats them as text.
    """
    if not details_str:
        return ""
    try:
        # 'details' is a string representation of a dict
        details_map = ast.literal_eval(details_str)
        if not isinstance(details_map, dict):
            return ""
            
        selected = []
        # Select key details as requested
        if "Language" in details_map:
            selected.append(f"Language: {details_map['Language']}")
        if "Dimensions" in details_map:
            selected.append(f"Dimensions: {details_map['Dimensions']}")
        if "Paperback" in details_map:
            selected.append(f"Pages: {details_map['Paperback']}")
            
        return ". ".join(selected)
    except Exception:
        return "" # Return empty string on parsing error

### Stream 1: Process and Merge Items

In [0]:
print(f"Setting up Items stream: {ITEMS_TABLE} -> {GOLD_ITEMS_TABLE}")

def process_items_batch(micro_batch_df, batch_id):
    """
    This function processes a micro-batch of item data.
    It aggregates metadata by 'parent_asin' and merges into the Gold table.
    """
    
    # --- Aggregation Step ---
    items_aggregated_df = micro_batch_df.groupBy("parent_asin").agg(
        F.first("title", ignorenulls=True).alias("title"),
        F.first("description", ignorenulls=True).alias("description"),
        F.first("features", ignorenulls=True).alias("features"),
        F.first("categories", ignorenulls=True).alias("categories"),
        F.first("store", ignorenulls=True).alias("store"),
        F.first("details", ignorenulls=True).alias("details"),
        F.avg("average_rating").alias("average_rating"),
        F.avg("price").alias("price")
    )
    
    # --- Apply transformations on aggregated data ---
    # We coalesce text fields to empty strings,
    # but keep numerical fields as NULL if they are missing
    items_processed_df = items_aggregated_df.select(
        F.col("parent_asin").alias("item_id"), 
        F.coalesce(F.col("title"), F.lit("")).alias("title"),
        F.coalesce(F.array_join(F.col("description"), " "), F.lit("")).alias("description"),
        parse_features_udf(F.col("features")).alias("features"),
        F.coalesce(F.array_join(F.col("categories"), " > "), F.lit("")).alias("categories"),
        F.coalesce(F.col("store"), F.lit("")).alias("store"),
        parse_details_udf(F.col("details")).alias("details"),
        F.col("average_rating").cast(DoubleType()).alias("average_rating"),
        F.col("price").cast(DoubleType()).alias("price")
    )
    
    # F.concat_ws("\n", ...) joins non-null lines with a newline
    # and automatically skips any line that evaluates to NULL.
    final_items_df = items_processed_df.withColumn("prompt_text",
        F.concat_ws("\n",
            # Only add the line if the text value is not empty
            F.when(F.col("title") != "", 
                   F.concat(F.lit("title: "), F.col("title"))),
                   
            F.when(F.col("description") != "", 
                   F.concat(F.lit("description: "), F.col("description"))),
                   
            F.when(F.col("features") != "", 
                   F.concat(F.lit("features: "), F.col("features"))),
                   
            F.when(F.col("categories") != "", 
                   F.concat(F.lit("categories: "), F.col("categories"))),
                   
            F.when(F.col("store") != "", 
                   F.concat(F.lit("store: "), F.col("store"))),
                   
            F.when(F.col("details") != "", 
                   F.concat(F.lit("details: "), F.col("details"))),
                   
            # Only add the line if the numeric value is not NULL
            F.when(F.col("average_rating").isNotNull(), 
                   F.concat(F.lit("average_rating: "), F.col("average_rating").cast("string"))),
                   
            F.when(F.col("price").isNotNull(), 
                   F.concat(F.lit("price: "), F.col("price").cast("string")))
        )
    ).select(
        "item_id", "title", "description", "features", "categories", 
        "store", "details", "average_rating", "price",
        "prompt_text"
    )
    
    # --- Merge (Upsert) into the Gold table ---
    # This handles both new items and updates to existing items
    try:
        gold_table = DeltaTable.forName(spark, GOLD_ITEMS_TABLE)
        
        (gold_table.alias("gold")
         .merge(final_items_df.alias("silver"), "gold.item_id = silver.item_id")
         .whenMatchedUpdateAll()  # Update if item metadata changed
         .whenNotMatchedInsertAll() # Insert if new item
         .execute()
        )
        print(f"Items Batch {batch_id}: Merged {final_items_df.count()} records into {GOLD_ITEMS_TABLE}.")
    except Exception as e:
        error_str = str(e)
        if "DELTA_MISSING_DELTA_TABLE" in error_str or "DELTA_TABLE_NOT_FOUND" in error_str:
            print(f"Items Batch {batch_id}: Gold table not found, creating {GOLD_ITEMS_TABLE}...")
            (final_items_df.write.format("delta")
             .mode("overwrite")
             .saveAsTable(GOLD_ITEMS_TABLE))
        else:
            raise e

# Read from Silver items table as a stream
items_stream_df = spark.readStream.table(ITEMS_TABLE)

# Write the stream using foreachBatch and trigger(availableNow=True)
# This makes it a perfect scheduled job
items_stream_query = (items_stream_df.writeStream
                      .foreachBatch(process_items_batch)
                      .option("checkpointLocation", ITEMS_CHECKPOINT)
                      .trigger(availableNow=True)
                      .start()
                     )

print("Items streaming query started (will stop when micro-batch is complete).")

Setting up Items stream: `bigdata-and-bi`.silver.items_clean -> `bigdata-and-bi`.gold.star_items
Items streaming query started (will stop when micro-batch is complete).


### Stream 2: Process and Merge Interactions

In [0]:
print(f"Setting up Interactions stream: {REVIEWS_TABLE} -> {GOLD_INTERACTIONS_TABLE}")

def process_interactions_batch(micro_batch_df, batch_id):
    """
    This function processes a micro-batch of interaction data
    and merges it into the Gold table.
    """
    
    # Select and transform columns
    interactions_df = micro_batch_df.select(
        F.col("parent_asin").alias("item_id"),
        F.col("user_id"),
        F.to_timestamp(F.col("reviewTimestamp")).cast("long").alias("unixReviewTime"),
        F.col("rating").cast(DoubleType()) # Keep original rating
    )
    
    # --- Merge (Upsert) into the Gold table ---
    # We match on a composite key (user, item, time) to ensure
    # idempotency and prevent duplicate interactions.
    try:
        gold_table = DeltaTable.forName(spark, GOLD_INTERACTIONS_TABLE)
        
        (gold_table.alias("gold")
         .merge(interactions_df.alias("silver"), 
                "gold.user_id = silver.user_id AND " +
                "gold.item_id = silver.item_id AND " +
                "gold.unixReviewTime = silver.unixReviewTime")
         .whenMatchedUpdateAll() # e.g., if rating was corrected
         .whenNotMatchedInsertAll() # New interaction
         .execute()
        )
        print(f"Interactions Batch {batch_id}: Merged {interactions_df.count()} records into {GOLD_INTERACTIONS_TABLE}.")
    except Exception as e:
        error_str = str(e)
        if "DELTA_MISSING_DELTA_TABLE" in error_str or "DELTA_TABLE_NOT_FOUND" in error_str:
            print(f"Interactions Batch {batch_id}: Gold table not found, creating {GOLD_INTERACTIONS_TABLE}...")
            (interactions_df.write.format("delta")
             .mode("overwrite")
             .saveAsTable(GOLD_INTERACTIONS_TABLE))
        else:
            raise e
# Read from Silver reviews table as a stream
reviews_stream_df = spark.readStream.table(REVIEWS_TABLE)

# Write the stream using foreachBatch
reviews_stream_query = (reviews_stream_df.writeStream
                        .foreachBatch(process_interactions_batch)
                        .option("checkpointLocation", INTERACTIONS_CHECKPOINT)
                        .trigger(availableNow=True)
                        .start()
                       )
                       
print("Interactions streaming query started (will stop when micro-batch is complete).")

# Wait for the streams to finish processing the current batch
items_stream_query.awaitTermination()
reviews_stream_query.awaitTermination()

print("Streaming ETL job complete for this run.")

Setting up Interactions stream: `bigdata-and-bi`.silver.reviews_clean -> `bigdata-and-bi`.gold.star_interactions
Interactions streaming query started (will stop when micro-batch is complete).


25/11/11 10:06:40 Spark Server has not sent updates for Streaming Query 0b302b47-669d-404d-97ba-1dbf08bc3aab in 60 seconds, but the query is still active. Marking query as in-progress. Spark Session ID is b26caff5-8464-44e4-85f6-7bf78ba48b6f. This is typically not a problem.
25/11/11 10:07:05 Spark Server has not sent updates for Streaming Query 0b302b47-669d-404d-97ba-1dbf08bc3aab in 60 seconds, but the query is still active. Marking query as in-progress. Spark Session ID is b26caff5-8464-44e4-85f6-7bf78ba48b6f. This is typically not a problem.


Streaming ETL job complete for this run.
