In [0]:


dbutils.widgets.text("categories", '["Books", "Electronics"]', "Categories to Process")

# Get the parameter value
categories_param = dbutils.widgets.get("categories")
print(f" Received parameter from ADF: {categories_param}")

# Parse the JSON string into a Python list
import json
categories = json.loads(categories_param)
print(f" Categories to process: {categories}")





from pyspark.sql import functions as F
from pyspark.sql.types import *

# Configure storage
storage_account_name = "your account name"
storage_account_key = "your storage account key"

spark.conf.set(
    f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net",
    storage_account_key
)

bronze_base_path = f"abfss://bronze@{storage_account_name}.dfs.core.windows.net"
silver_base_path = f"abfss://silver@{storage_account_name}.dfs.core.windows.net"

# categories = ["Electronics", "Books"]

print("Configuration complete!")


metadata_schema = StructType([
    StructField("main_category", StringType(), True),
    StructField("title", StringType(), True),
    StructField("average_rating", FloatType(), True),
    StructField("rating_number", IntegerType(), True),
    StructField("features", ArrayType(StringType()), True),
    StructField("description", ArrayType(StringType()), True),
    StructField("price", FloatType(), True),
    StructField("images", ArrayType(
        StructType([
            StructField("hi_res", StringType(), True),
            StructField("thumb", StringType(), True),
            StructField("large", StringType(), True),
            StructField("variant", StringType(), True)
        ])
    ), True),
    StructField("videos", ArrayType(StringType()), True),
    StructField("store", StringType(), True),
    StructField("categories", ArrayType(StringType()), True),
    StructField("details", MapType(StringType(), StringType()), True),
    StructField("parent_asin", StringType(), True),
    StructField("bought_together", ArrayType(StringType()), True)
])

print("Metadata schema defined!")


sample_path = f"{bronze_base_path}/metadata/Electronics"
print(f" Reading sample from: {sample_path}")

df_meta_sample = spark.read.schema(metadata_schema).json(sample_path, multiLine=False)

print(" Metadata Schema:")
df_meta_sample.printSchema()

print("\n Sample Metadata Records:")
df_meta_sample.show(3, truncate=True)

print(f"\n Total Metadata Records: {df_meta_sample.count():,}")

def clean_metadata(category_name):
    
    print(f"\n{'='*60}")
    print(f" Processing metadata for: {category_name}")
    print(f"{'='*60}")
    
    input_path = f"{bronze_base_path}/metadata/{category_name}"
    print(f" Reading from: {input_path}")
    
    df_raw = spark.read.schema(metadata_schema).json(input_path, multiLine=False)
    raw_count = df_raw.count()
    print(f" Raw metadata records: {raw_count:,}")
    
    # 2. CLEAN & TRANSFORM
    df_cleaned = df_raw \
        .dropDuplicates(["parent_asin"]) \
        .filter(F.col("parent_asin").isNotNull()) \
        .withColumn(
            "features_text",
            F.concat_ws(" | ", F.col("features"))
        ) \
        .withColumn(
            "description_text",
            F.concat_ws(" ", F.col("description"))
        ) \
        .withColumn(
            "has_price",
            F.when(F.col("price").isNotNull(), True).otherwise(False)
        ) \
        .withColumn(
            "price_cleaned",
            F.when(F.col("price").isNull(), 0.0)
            .when(F.col("price") <= 0, 0.0)
            .otherwise(F.col("price"))
        ) \
        .withColumn(
            "image_count",
            F.size(F.col("images"))
        ) \
        .withColumn(
            "video_count",
            F.size(F.col("videos"))
        ) \
        .withColumn(
            "category",
            F.lit(category_name)
        ) \
        .withColumn(
            "processing_timestamp",
            F.current_timestamp()
        )
    
    df_cleaned = df_cleaned \
        .withColumn(
            "product_details_json",
            F.to_json(F.col("details"))
        )
    
    df_cleaned = df_cleaned \
        .withColumn("brand", F.col("details").getItem("Brand")) \
        .withColumn("color", F.col("details").getItem("Color")) \
        .withColumn("size_info", F.col("details").getItem("Size")) \
        .withColumn("material", F.col("details").getItem("Material"))
    
    cleaned_count = df_cleaned.count()
    print(f" Cleaned records: {cleaned_count:,}")
    print(f"Removed records: {raw_count - cleaned_count:,}")
    
    # 4. SELECT final columns
    df_final = df_cleaned.select(
        "parent_asin",
        "main_category",
        "title",
        "average_rating",
        "rating_number",
        "features_text",
        "description_text",
        "price_cleaned",
        "store",
        "brand",
        "color",
        "size_info",
        "material",
        "product_details_json",  # Full details as JSON
        "image_count",
        "video_count",
        "category",
        "processing_timestamp"
    )
    
    return df_final

# ----------------------------------------------------------------------------
# CELL 5: Process All Categories
# ----------------------------------------------------------------------------

df_all_metadata = None

for category in categories:
    df_category = clean_metadata(category)
    
    if df_all_metadata is None:
        df_all_metadata = df_category
    else:
        df_all_metadata = df_all_metadata.union(df_category)

print(f"\n{'='*60}")
print(f" TOTAL CLEANED METADATA: {df_all_metadata.count():,}")
print(f"{'='*60}")

print("\n Sample Cleaned Metadata:")
df_all_metadata.show(5, truncate=True)

print("\n Metadata by Category:")
df_all_metadata.groupBy("category").count().orderBy(F.desc("count")).show()

# ----------------------------------------------------------------------------
# CELL 6: Write to Silver Layer
# ----------------------------------------------------------------------------

output_path = f"{silver_base_path}/metadata_cleaned"

print(f"\n Writing to Silver layer: {output_path}")

df_all_metadata.write \
    .mode("overwrite") \
    .partitionBy("category") \
    .parquet(output_path)

print(" Silver layer write complete!")

# CELL 7: Verify

df_silver_verify = spark.read.parquet(output_path)

print("\nVerification:")
print("Check for cleaned price nulls:")

null = df_silver_verify.filter(F.col("price_cleaned").isNull()).count()
print(f"Null price records: {null:,}")

print(f" Total metadata records: {df_silver_verify.count():,}")

print("\n Data Quality Metrics:")
df_silver_verify.select(
    F.count("*").alias("total_products"),
    F.avg("price_cleaned").alias("avg_price"),
    F.avg("average_rating").alias("avg_rating"),
    F.sum(F.when(F.col("store").isNotNull(), 1).otherwise(0)).alias("products_with_store")
).show()

print("\n Notebook 2 Complete! Metadata is now in Silver layer.")