In [0]:
%sql
use catalog maven_catalog;
use schema bronze_schema;

In [0]:
# 1. Load Bronze Layer Data
spark.sql("USE CATALOG maven_catalog")
bronze_df = spark.read.format("delta").table("maven_catalog.maven_market_landing.customers")

display(bronze_df)

In [0]:

# 2. Data Cleaning (Silver Preparation)

# Remove null users
clean_df = bronze_df.filter(col("customer_id").isNotNull())

# Remove duplicates
clean_df = clean_df.dropDuplicates(["customer_id", "first_name"])

In [0]:
# 3. Create User-Level Feature Table
from pyspark.sql.functions import count, countDistinct, max, avg, sum

feature_df = bronze_df.groupBy("customer_id").agg(
    # Demographic & Financial Featuress
    avg("yearly_income").alias("avg_yearly_income"),
    max("total_children").alias("total_children"),
    max("num_children_at_home").alias("children_at_home"),
    
    # Metadata and Categorical Features
    max("_fivetran_synced").alias("last_data_sync"),
    countDistinct("occupation").alias("unique_occupations"),
    
    # Validation flag (checking if multiple records exist per ID)
    count("*").alias("record_count")
)

display(feature_df)

In [0]:
# 4. Feature Quality Validation
from pyspark.sql.functions import col, count, when

# Check for duplicates using the actual customer_id column
dup_check = feature_df.filter(col("record_count") > 1)
print(f"Duplicate Customer IDs Found: {dup_check.count()}")

# Check for nulls across the new feature set
null_counts = feature_df.select([
    count(when(col(c).isNull(), c)).alias(c) for c in feature_df.columns
])

display(null_counts)

In [0]:

# 5. Save as Silver Layer (Delta Table)

feature_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("silver.user_features")