In [0]:
# Load Bronze table
bronze_df = spark.table("fraud_catalog.fraud_schema.bronze_transactions")

# Quick preview of data
print("Bronze Table Preview:")
bronze_df.show(5)

# Print schema
print("Bronze Table Schema:")
bronze_df.printSchema()


Bronze Table Preview:
+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|   amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
| 308|CASH_OUT|210542.52| C971437370|     58178.91|           0.0|C1933766809|    6402405.93|    6612948.45|      0|             0|
| 308|CASH_OUT|205206.17| C874951884|          0.0|           0.0|C1198885686|     250033.63|      455239.8|      0|             0|
| 308|CASH_OUT|371017.64| C264006982|          0.0|           0.0|C1100020359|     889460.47|    1260478.11|      0|             0|
| 308|CASH_OUT| 37757.55| C898772610|          0.0|           0.0| C359417495|     147008.07|     184765.62|      0|             0|
| 308|CASH_OUT|310953.62|C1003704879|          0.0|   

In [0]:
from pyspark.sql.functions import col, trim

# Clean Bronze table to create Silver
silver_df = bronze_df \
    .dropDuplicates() \
    .na.drop(subset=["amount", "nameOrig", "nameDest"]) \
    .withColumn("nameOrig", trim(col("nameOrig"))) \
    .withColumn("nameDest", trim(col("nameDest"))) \
    .filter(col("amount") > 0)

# Optional: cast columns to correct types
silver_df = silver_df.withColumn("amount", col("amount").cast("double")) \
                     .withColumn("oldbalanceOrg", col("oldbalanceOrg").cast("double")) \
                     .withColumn("newbalanceOrig", col("newbalanceOrig").cast("double")) \
                     .withColumn("oldbalanceDest", col("oldbalanceDest").cast("double")) \
                     .withColumn("newbalanceDest", col("newbalanceDest").cast("double")) \
                     .withColumn("isFraud", col("isFraud").cast("int")) \
                     .withColumn("isFlaggedFraud", col("isFlaggedFraud").cast("int"))


In [0]:
# Save cleaned Silver table as Delta
silver_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("fraud_catalog.fraud_schema.silver_transactions")

print("Silver table created successfully!")


Silver table created successfully!


In [0]:
# Verify Silver table
silver_count = spark.sql("SELECT COUNT(*) AS total_rows FROM fraud_catalog.fraud_schema.silver_transactions")
silver_count.show()

# Preview first 10 rows
spark.sql("SELECT * FROM fraud_catalog.fraud_schema.silver_transactions LIMIT 10").show()

+----------+
|total_rows|
+----------+
|   6362604|
+----------+

+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|   amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|  42| PAYMENT| 14347.57| C169454224|      32702.0|      18354.43| M826816809|           0.0|           0.0|      0|             0|
|  42|CASH_OUT|188336.31| C517661735|          0.0|           0.0| C413194235|     696059.94|     996768.24|      0|             0|
|  42| PAYMENT|  8125.56|C2073018087|       4888.0|           0.0| M475964317|           0.0|           0.0|      0|             0|
|  42|TRANSFER|873638.32| C994720712|      21811.0|           0.0| C726721232|     600098.27|    1473736.59|      0|             0|
|  42|CASH