In [0]:
# Define your project structure names
catalog_name = "big_data_project"
bronze_schema_name = "bronze"
silver_schema_name = "silver"

# Load the Bronze tables using their full names
products_df = spark.table(f"{catalog_name}.{bronze_schema_name}.amazon_products")
categories_df = spark.table(f"{catalog_name}.{bronze_schema_name}.amazon_categories")

print("Successfully loaded Bronze tables.")
display(products_df.limit(5))

In [0]:
from pyspark.sql.functions import col, lower, trim, regexp_replace, concat_ws

# 1. Handle missing values (e.g., fill null prices/reviews with 0)
products_cleaned_df = products_df.na.fill(0, ["price", "listPrice", "reviews", "boughtInLastMonth"])

# 2. Join products and categories - FIXED
# We explicitly join 'category_id' from the left DF with 'id' from the right DF.
joined_df = products_cleaned_df.join(
    categories_df,
    on=products_cleaned_df["category_id"] == categories_df["id"],
    how="left"  # Use "left" join to keep all products, even if they have no category
)

# 3. Feature Engineering: Create a single text field for our model.
# This combines the product title and its category name
cleaned_silver_df = joined_df.withColumn(
    "searchable_text",
    lower(
        concat_ws(
            " ",  # Separator
            col("title"),
            col("category_name")
        )
    )
).withColumn("searchable_text", regexp_replace(col("searchable_text"), "[^a-zA-Z0-9 ]", "")) # Remove punctuation

# 4. Select only the columns we need for the final silver table
# Note: Because we used an explicit join condition (not 'on="col_name"'),
# both 'category_id' and 'id' columns from the join are present.
# We select 'category_id' which is from the products table.
final_silver_df = cleaned_silver_df.select(
    "asin",
    "title",
    "searchable_text",
    "imgUrl",
    "productURL",
    "stars",
    "reviews",
    "price",
    "category_id",
    "category_name",
    "boughtInLastMonth"
)

print("Data cleaning and transformation complete.")
display(final_silver_df.limit(10))

In [0]:
# Define the full name for our new Silver table
silver_table_name = f"{catalog_name}.{silver_schema_name}.products_cleaned"

# Write the cleaned and joined data to the Silver layer
# We use "overwrite" so we can re-run the notebook if needed
final_silver_df.write.format("delta").mode("overwrite").saveAsTable(silver_table_name)

print(f"Successfully created Silver table: {silver_table_name}")