In [1]:
from pyspark.sql import *
from pyspark.sql.functions import* 
from pyspark.sql.types import *
from pyspark.sql.window import Window

StatementMeta(, b874a7a4-1287-4f7e-bf34-b8c90212c621, 3, Finished, Available, Finished)

In [2]:
retailers_df = spark.table("Silver.dbo.retailers")

StatementMeta(, b874a7a4-1287-4f7e-bf34-b8c90212c621, 4, Finished, Available, Finished)

In [3]:
repayments_df = spark.table("Silver.dbo.repayments")

StatementMeta(, b874a7a4-1287-4f7e-bf34-b8c90212c621, 5, Finished, Available, Finished)

In [7]:
transactions_df = spark.table("Silver.dbo.transactions")

StatementMeta(, 68c63d58-8907-48cb-a617-e553f28e3404, 9, Finished, Available, Finished)

In [3]:
# ============================================================================
# GOLD LAYER - ML FEATURE ENGINEERING (FINAL CORRECTED VERSION)
# Purpose: Create ML-ready features.
# FIX APPLIED: 'total_orders' now ignores 0-value transactions.
# ============================================================================

import mlflow
import mlflow.spark
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, DoubleType

print("=" * 70)
print("GOLD LAYER - ML FEATURE ENGINEERING")
print("=" * 70)

# ============================================================================
# LOAD TABLES
# ============================================================================

print("\nLoading tables...")
# 1. Transactions (History)
denormalized = spark.table("Silver.dbo.silver_retailer_transactions")

# 2. Retailers Master List (Profiles)
# Essential to capture retailers who exist but have NO history
retailers_master = spark.table("Silver.dbo.retailers")

print(f"✓ Loaded Transactions: {denormalized.count():,} records")
print(f"✓ Loaded Retailers Master: {retailers_master.count():,} records")

# ============================================================================
# 1. TRANSACTION-BASED FEATURES (Aggregated by Retailer)
# ============================================================================

print("\n1. Engineering transaction features...")

transaction_features = denormalized.groupBy("retailer_id").agg(
    # --- THE LOGIC FIX ---
    # We count ONLY transactions that have a real value (> 0).
    # If a row has 0.00 amount, it is ignored here.
    F.count(F.when(F.col("order_amount") > 0, True)).alias("total_orders"),
    
    F.countDistinct("transaction_id").alias("unique_transactions"),
    
    # Amount features
    F.sum("order_amount").alias("total_order_value"),
    F.avg("order_amount").alias("avg_order_value"),
    F.max("order_amount").alias("max_order_value"),
    F.min("order_amount").alias("min_order_value"),
    F.stddev("order_amount").alias("order_value_std"),
    
    # Product features
    F.avg("num_products").alias("avg_products_per_order"),
    F.size(F.array_distinct(F.flatten(F.collect_list("product_categories_array")))).alias("unique_product_categories"),
    F.size(F.array_distinct(F.flatten(F.collect_list("manufacturers_array")))).alias("unique_manufacturers"),
    
    # Date features
    F.min("order_date").alias("first_order_date"),
    F.max("order_date").alias("last_order_date"),
    F.datediff(F.max("order_date"), F.min("order_date")).alias("days_between_first_last"),
)

transaction_features = transaction_features \
    .withColumn("orders_per_month", 
                F.col("total_orders") / (F.col("days_between_first_last") / 30.0))

print(f"✓ Calculated transaction features (Ghost transactions excluded from count)")

# ============================================================================
# 2. PAYMENT-BASED FEATURES (From denormalized table)
# ============================================================================

print("\n2. Engineering payment features...")

payment_features = denormalized.groupBy("retailer_id").agg(
    F.sum("amount_paid").alias("total_amount_paid"),
    F.sum("order_amount").alias("total_amount_due"),
    F.avg("days_late").alias("avg_days_late"),
    F.max("days_late").alias("max_days_late"),
    F.stddev("days_late").alias("days_late_std"),
    F.avg(F.when(F.col("is_on_time"), 1).otherwise(0)).alias("on_time_payment_rate"),
    F.avg(F.when(F.col("is_defaulted"), 1).otherwise(0)).alias("default_rate"),
    F.sum(F.when(F.col("is_defaulted"), 1).otherwise(0)).alias("total_defaults"),
)

payment_features = payment_features \
    .withColumn("payment_rate", F.col("total_amount_paid") / F.col("total_amount_due"))

print(f"✓ Calculated payment features")

# ============================================================================
# 3. GET STATIC RETAILER ATTRIBUTES (From Master List)
# ============================================================================

print("\n3. Extracting retailer attributes from Master Table...")

retailer_attributes = retailers_master \
    .select(
        "retailer_id",
        "owner_age",
        "owner_gender",
        "shop_type",
        "state",
        "urbanization_level",
        "years_in_business",
        "months_in_business",
        "num_employees",
        "has_business_registration",
        "mobile_money_pattern",
        "monthly_mobile_money_txns",
        "credit_segment",
        "credit_limit",
        "is_defaulter",
        "account_age_days"
    ).dropDuplicates(["retailer_id"])

print(f"✓ Extracted attributes for {retailer_attributes.count():,} retailers")

# ============================================================================
# 4. COMBINE ALL FEATURES
# ============================================================================

print("\n4. Combining features (Left Join)...")

# Keep everyone from retailer_attributes. 
# If they have 0 orders (or only ghost orders), the join will return NULLs for transaction columns.
ml_features = retailer_attributes \
    .join(transaction_features, "retailer_id", "left") \
    .join(payment_features, "retailer_id", "left")

# ============================================================================
# 5. CALCULATE DERIVED FEATURES
# ============================================================================

print("\n5. Creating derived features...")

ml_features = ml_features \
    .withColumn("credit_utilization", F.col("avg_order_value") / F.col("credit_limit")) \
    .withColumn("max_credit_utilization", F.col("max_order_value") / F.col("credit_limit")) \
    .withColumn("location_stability_months", F.col("months_in_business")) \
    .withColumn("formality_score", 
                (F.when(F.col("has_business_registration"), 0.5).otherwise(0) +
                 F.when(F.col("num_employees") >= 2, 0.3).otherwise(0) +
                 F.when(F.col("shop_type").isin(["Mini Mart", "Superette"]), 0.2).otherwise(0))) \
    .withColumn("mobile_money_score", 
                F.when(F.col("mobile_money_pattern") == "Heavy User", 1.0)
                 .when(F.col("mobile_money_pattern") == "Regular User", 0.7)
                 .when(F.col("mobile_money_pattern") == "Light User", 0.4)
                 .otherwise(0.1)) \
    .withColumn("digital_footprint_score", 
                (F.col("mobile_money_score") + F.col("formality_score")) / 2)

# ============================================================================
# 6. ENCODE CATEGORICAL VARIABLES
# ============================================================================

print("\n6. Encoding categorical variables...")

ml_features = ml_features \
    .withColumn("gender_encoded", F.when(F.col("owner_gender") == "Female", 1).otherwise(0)) \
    .withColumn("urbanization_encoded", 
                F.when(F.col("urbanization_level") == "Urban", 2)
                 .when(F.col("urbanization_level") == "Peri-Urban", 1)
                 .otherwise(0)) \
    .withColumn("shop_type_encoded", 
                F.when(F.col("shop_type") == "Superette", 4)
                 .when(F.col("shop_type") == "Mini Mart", 3)
                 .when(F.col("shop_type") == "Provision Store", 2)
                 .when(F.col("shop_type") == "Market Stall", 1)
                 .otherwise(0)) \
    .withColumn("has_business_registration_int", 
                F.when(F.col("has_business_registration"), 1).otherwise(0))

# ============================================================================
# 7. CREATE TARGET VARIABLE
# ============================================================================

print("\n7. Creating target variable...")

ml_features = ml_features \
    .withColumn("is_default", 
                # SIMPLIFIED LOGIC:
                # Because we fixed Step 1, 'total_orders' is now truly 0 for ghost users.
                # If it's NULL (from the join) or 0 (from the conditional count), they are NEW.
                F.when((F.col("total_orders") == 0) | (F.col("total_orders").isNull()), None)
                .when(F.col("is_defaulter") == True, 1)   # Known Bad
                .when(F.col("payment_rate") < 0.8, 1)     # Behavioral Bad
                .otherwise(0))                            # Known Good

# ============================================================================
# 8. HANDLE NULLS & SAVE
# ============================================================================

print("\n8. Final cleanup and save...")

ml_features = ml_features.fillna({
    "total_orders": 0,
    "avg_order_value": 0,
    "payment_rate": 0,
    "avg_days_late": 0,
    "on_time_payment_rate": 0,
    "default_rate": 1,
    "credit_utilization": 0,
    "order_value_std": 0
})

ml_features.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("gold_ml_features")

print(f"\n✓ Saved {ml_features.count():,} records to gold_ml_features")

# VERIFICATION
print("Verifying Fix: Do 'Ghost Transaction' users now show 0 orders?")
# RTL_000114 was the user with 1 order but $0 value. They should now be 0.
ml_features.filter(F.col("total_orders") == 0).select(
    "retailer_id", "total_orders", "avg_order_value", "is_default"
).show(5)

print("\n✓ ML feature engineering complete!")

StatementMeta(, 1e5d853e-d8eb-430e-a8bf-1d319b13cefa, 5, Finished, Available, Finished)

GOLD LAYER - ML FEATURE ENGINEERING

Loading tables...
✓ Loaded Transactions: 75,347 records
✓ Loaded Retailers Master: 10,000 records

1. Engineering transaction features...
✓ Calculated transaction features (Ghost transactions excluded from count)

2. Engineering payment features...
✓ Calculated payment features

3. Extracting retailer attributes from Master Table...
✓ Extracted attributes for 10,000 retailers

4. Combining features (Left Join)...

5. Creating derived features...

6. Encoding categorical variables...

7. Creating target variable...

8. Final cleanup and save...

✓ Saved 10,000 records to gold_ml_features
Verifying Fix: Do 'Ghost Transaction' users now show 0 orders?
+-----------+------------+---------------+----------+
|retailer_id|total_orders|avg_order_value|is_default|
+-----------+------------+---------------+----------+
| RTL_000114|           0|            0.0|      NULL|
| RTL_000249|           0|            0.0|      NULL|
| RTL_000311|           0|          

In [4]:
# ============================================================================
# GOLD LAYER - ML FEATURE ENGINEERING
# Purpose: Create ML-ready features from denormalized silver table
# ============================================================================

from pyspark.sql import functions as F
from pyspark.sql.window import Window

print("=" * 70)
print("GOLD LAYER - ML FEATURE ENGINEERING")
print("=" * 70)

# ============================================================================
# LOAD DENORMALIZED TABLE ⭐
# ============================================================================

print("\nLoading denormalized table...")
denormalized = spark.table("Silver.dbo.silver_retailer_transactions")
print(f"✓ Loaded {denormalized.count():,} records")

# ============================================================================
# 1. TRANSACTION-BASED FEATURES (Aggregated by Retailer)
# ============================================================================

print("\n1. Engineering transaction features from denormalized table...")

transaction_features = denormalized.groupBy("retailer_id").agg(
    # Count features
    F.count("*").alias("total_orders"),
    F.countDistinct("transaction_id").alias("unique_transactions"),
    
    # Amount features
    F.sum("order_amount").alias("total_order_value"),
    F.avg("order_amount").alias("avg_order_value"),
    F.max("order_amount").alias("max_order_value"),
    F.min("order_amount").alias("min_order_value"),
    F.stddev("order_amount").alias("order_value_std"),
    
    # Product features
    F.avg("num_products").alias("avg_products_per_order"),
    F.size(F.array_distinct(F.flatten(F.collect_list("product_categories_array")))).alias("unique_product_categories"),
    F.size(F.array_distinct(F.flatten(F.collect_list("manufacturers_array")))).alias("unique_manufacturers"),
    
    # Date features
    F.min("order_date").alias("first_order_date"),
    F.max("order_date").alias("last_order_date"),
    F.datediff(F.max("order_date"), F.min("order_date")).alias("days_between_first_last"),
)

transaction_features = transaction_features \
    .withColumn("orders_per_month", 
                F.col("total_orders") / (F.col("days_between_first_last") / 30.0))

print(f"✓ Calculated transaction features")

# ============================================================================
# 2. PAYMENT-BASED FEATURES (From denormalized table)
# ============================================================================

print("\n2. Engineering payment features from denormalized table...")

payment_features = denormalized.groupBy("retailer_id").agg(
    # Payment amounts
    F.sum("amount_paid").alias("total_amount_paid"),
    F.sum("order_amount").alias("total_amount_due"),
    
    # Timeliness
    F.avg("days_late").alias("avg_days_late"),
    F.max("days_late").alias("max_days_late"),
    F.stddev("days_late").alias("days_late_std"),
    
    # Rates
    F.avg(F.when(F.col("is_on_time"), 1).otherwise(0)).alias("on_time_payment_rate"),
    F.avg(F.when(F.col("is_defaulted"), 1).otherwise(0)).alias("default_rate"),
    F.sum(F.when(F.col("is_defaulted"), 1).otherwise(0)).alias("total_defaults"),
)

payment_features = payment_features \
    .withColumn("payment_rate", F.col("total_amount_paid") / F.col("total_amount_due"))

print(f"✓ Calculated payment features")

# ============================================================================
# 3. GET STATIC RETAILER ATTRIBUTES (From denormalized - take first row)
# ============================================================================

print("\n3. Extracting retailer attributes...")

# Since denormalized has multiple rows per retailer, get distinct retailer info
retailer_attributes = denormalized \
    .select(
        "retailer_id",
        "owner_age",
        "owner_gender",
        "shop_type",
        "state",
        "urbanization_level",
        "years_in_business",
        "months_in_business",
        "num_employees",
        "has_business_registration",
        "mobile_money_pattern",
        "monthly_mobile_money_txns",
        "credit_segment",
        "credit_limit",
        "is_defaulter",
        "account_age_days"
    ).dropDuplicates(["retailer_id"])

print(f"✓ Extracted attributes for {retailer_attributes.count():,} retailers")

# ============================================================================
# 4. COMBINE ALL FEATURES
# ============================================================================

print("\n4. Combining all features...")

ml_features = retailer_attributes \
    .join(transaction_features, "retailer_id", "left") \
    .join(payment_features, "retailer_id", "left")

# ============================================================================
# 5. CALCULATE DERIVED FEATURES (Same as before)
# ============================================================================

print("\n5. Creating derived features...")

ml_features = ml_features \
    .withColumn("credit_utilization", F.col("avg_order_value") / F.col("credit_limit")) \
    .withColumn("max_credit_utilization", F.col("max_order_value") / F.col("credit_limit")) \
    .withColumn("location_stability_months", F.col("months_in_business")) \
    .withColumn("formality_score", 
                (F.when(F.col("has_business_registration"), 0.5).otherwise(0) +
                 F.when(F.col("num_employees") >= 2, 0.3).otherwise(0) +
                 F.when(F.col("shop_type").isin(["Mini Mart", "Superette"]), 0.2).otherwise(0))) \
    .withColumn("mobile_money_score", 
                F.when(F.col("mobile_money_pattern") == "Heavy User", 1.0)
                 .when(F.col("mobile_money_pattern") == "Regular User", 0.7)
                 .when(F.col("mobile_money_pattern") == "Light User", 0.4)
                 .otherwise(0.1)) \
    .withColumn("digital_footprint_score", 
                (F.col("mobile_money_score") + F.col("formality_score")) / 2)

# ============================================================================
# 6. ENCODE CATEGORICAL VARIABLES
# ============================================================================

print("\n6. Encoding categorical variables...")

ml_features = ml_features \
    .withColumn("gender_encoded", F.when(F.col("owner_gender") == "Female", 1).otherwise(0)) \
    .withColumn("urbanization_encoded", 
                F.when(F.col("urbanization_level") == "Urban", 2)
                 .when(F.col("urbanization_level") == "Peri-Urban", 1)
                 .otherwise(0)) \
    .withColumn("shop_type_encoded", 
                F.when(F.col("shop_type") == "Superette", 4)
                 .when(F.col("shop_type") == "Mini Mart", 3)
                 .when(F.col("shop_type") == "Provision Store", 2)
                 .when(F.col("shop_type") == "Market Stall", 1)
                 .otherwise(0)) \
    .withColumn("has_business_registration_int", 
                F.when(F.col("has_business_registration"), 1).otherwise(0))

# ============================================================================
# 7. CREATE TARGET VARIABLE
# ============================================================================

print("\n7. Creating target variable...")

ml_features = ml_features \
    .withColumn("is_default", 
                F.when(F.col("total_orders") == 0, None)  # Label as Unknown (Safe!)
                .when(F.col("is_defaulter") == True, 1)   # Known Bad
                .when(F.col("payment_rate") < 0.8, 1)     # Behavioral Bad
                .otherwise(0))                            # Known Good

# ============================================================================
# 8. HANDLE NULLS & SAVE
# ============================================================================

print("\n8. Final cleanup and save...")

ml_features = ml_features.fillna({
    "total_orders": 0,
    "avg_order_value": 0,
    "payment_rate": 0,
    "avg_days_late": 0,
    "on_time_payment_rate": 0,
    "default_rate": 1,
    "credit_utilization": 0,
    "order_value_std": 0
})

ml_features.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("gold_ml_features")

print(f"\n✓ Saved {ml_features.count():,} retailer feature records to gold_ml_features")

# Show sample
ml_features.select(
    "retailer_id", "total_orders", "avg_order_value", 
    "payment_rate", "credit_utilization", "is_default"
).show(5)

print("\n✓ ML feature engineering complete!")

StatementMeta(, b874a7a4-1287-4f7e-bf34-b8c90212c621, 6, Finished, Available, Finished)

GOLD LAYER - ML FEATURE ENGINEERING

Loading denormalized table...
✓ Loaded 75,347 records

1. Engineering transaction features from denormalized table...
✓ Calculated transaction features

2. Engineering payment features from denormalized table...
✓ Calculated payment features

3. Extracting retailer attributes...
✓ Extracted attributes for 10,000 retailers

4. Combining all features...

5. Creating derived features...

6. Encoding categorical variables...

7. Creating target variable...

8. Final cleanup and save...

✓ Saved 10,000 retailer feature records to gold_ml_features
+-----------+------------+---------------+------------+------------------+----------+
|retailer_id|total_orders|avg_order_value|payment_rate|credit_utilization|is_default|
+-----------+------------+---------------+------------+------------------+----------+
| RTL_000114|           1|            0.0|         0.0|               0.0|         0|
| RTL_000249|           1|            0.0|         0.0|               0

In [8]:
from pyspark.sql import functions as F

# 1. Get the total count of unique retailers in the SOURCE
source_count = retailers_df.select("retailer_id").distinct().count()

# 2. Get the total count of unique retailers in the RESULT (Denormalized)
result_count = denormalized.select("retailer_id").distinct().count()

print(f"Source Unique Retailers: {source_count:,}")
print(f"Result Unique Retailers: {result_count:,}")

# Logic Check
if source_count == result_count:
    print("✅ SUCCESS: 100% of retailers are present.")
else:
    diff = source_count - result_count
    print(f"❌ FAILURE: You lost {diff} retailers during the join.")

StatementMeta(, 68c63d58-8907-48cb-a617-e553f28e3404, 10, Finished, Available, Finished)

Source Unique Retailers: 10,000
Result Unique Retailers: 10,000
✅ SUCCESS: 100% of retailers are present.


In [5]:
# Count how many retailers have NO transactions (transaction_id is Null)
inactive_count = denormalized.filter(F.col("transaction_id").isNull()).count()

print(f"Inactive Retailers Preserved: {inactive_count:,}")

if inactive_count > 0:
    print("✅ SUCCESS: Inactive retailers (Zero Transactions) are included.")
    # Show a sample to prove it
    denormalized.filter(F.col("transaction_id").isNull()) \
        .select("retailer_id", "business_name", "order_amount").show(5)
else:
    print("⚠️ WARNING: No inactive retailers found. Did you use an INNER join?")

StatementMeta(, b874a7a4-1287-4f7e-bf34-b8c90212c621, 7, Finished, Available, Finished)

Inactive Retailers Preserved: 5,333
✅ SUCCESS: Inactive retailers (Zero Transactions) are included.
+-----------+--------------------+------------+
|retailer_id|       business_name|order_amount|
+-----------+--------------------+------------+
| RTL_000002|Stephen's Provisions|           0|
| RTL_000003|        Peace's Shop|           0|
| RTL_000004|  Peter's Provisions|           0|
| RTL_000009|        Grace's Shop|           0|
| RTL_000010|      Andrew's Store|           0|
+-----------+--------------------+------------+
only showing top 5 rows



In [6]:
# Find any ID in 'retailers' that is MISSING from 'denormalized'
missing_data = retailers_df.join(denormalized, "retailer_id", "left_anti")

missing_count = missing_data.count()

if missing_count == 0:
    print("✅ PERFECT: No data lost. The Anti-Join is empty.")
else:
    print(f"❌ CRITICAL: {missing_count} retailers are missing from the final table.")
    print("Here are the IDs that were lost:")
    missing_data.select("retailer_id", "business_name").show(5)

StatementMeta(, b874a7a4-1287-4f7e-bf34-b8c90212c621, 8, Finished, Available, Finished)

✅ PERFECT: No data lost. The Anti-Join is empty.
