In [0]:
# Read raw data from Bronze Delta Table
bronze_path = "/mnt/bronze/cafe_sales"
df_bronze = spark.read.format("delta").load(bronze_path)

# Display initial row count
initial_count = df_bronze.count()
print(f"Initial row count (Bronze - raw): {initial_count}")
display(df_bronze.limit(10))

Initial row count (Bronze - raw): 10000


transaction_id,item,quantity,price_per_unit,total_spent,payment_method,location,transaction_date
txn_1961373,Coffee,2,2,4,Credit Card,Takeaway,08.09.2023
TXn_4977031,Cake,4,3,12,Cash,In-store,16.05.2023
TXN4271903,Cookie,4,1,ERROR,Credit Card,In-store,19.07.2023
TXN_7034554,Salad,2,5,10,UNKNOWN,UNKNOWN,27.04.2023
TXN3160411,Coffee,2,2,4,Digital Wallet,In-store,11.06.2023
TXN_260 2893,Smoothie,5,4,20,Credit Card,,31.03.2023
TXN_44,UNKNOWN,3,3,9,ERROR,Takeaway,06.10.2023
TXN_6699534,Sandwich,4,4,16,Cash,UNKNOWN,28.10.2023
TXN_4717867,,5,3,15,,Takeaway,28.07.2023
TXN_2064365,Sandwich,5,4,20,,In-store,31.12.2023


In [0]:
from pyspark.sql.functions import col, upper, trim

# Define critical columns (text columns that must be valid)
text_columns = ["TRANSACTION_ID", "ITEM", "PAYMENT_METHOD", "LOCATION", "TRANSACTION_DATE"]

# Remove rows with NULL, ERROR, or UNKNOWN in critical text columns
df_cleaned = df_bronze
for column in text_columns:
    df_cleaned = df_cleaned.filter(
        (col(column).isNotNull()) & 
        (~upper(trim(col(column))).isin(['ERROR', 'UNKNOWN', 'NULL', '']))
    )

# Display row count after text filtering
after_text_filter = df_cleaned.count()
print(f"Rows after filtering text columns: {after_text_filter}")
print(f"Rows removed: {initial_count - after_text_filter}")
display(df_cleaned.limit(10))

Rows after filtering text columns: 3580
Rows removed: 6420


transaction_id,item,quantity,price_per_unit,total_spent,payment_method,location,transaction_date
txn_1961373,Coffee,2,2,4,Credit Card,Takeaway,08.09.2023
TXn_4977031,Cake,4,3,12,Cash,In-store,16.05.2023
TXN4271903,Cookie,4,1,ERROR,Credit Card,In-store,19.07.2023
TXN3160411,Coffee,2,2,4,Digital Wallet,In-store,11.06.2023
TXN_2548360,Salad,5,5,25,Cash,Takeaway,07.11.2023
TXN_7619095,Sandwich,2,4,8,Cash,In-store,03.05.2023
TXN_284 7255,Salad,3,5,15,Credit Card,In-store,15.11.2023
TXN_69710,Juice,2,3,6,Cash,In-store,24.02.2023
TXN_3709394,Juice,4,3,12,Cash,Takeaway,15.01.2023
TXN_3522028,Smoothie,ERROR,4,20,Cash,In-store,04.04.2023


In [0]:
from pyspark.sql.functions import when, col, upper, trim, regexp_replace
from pyspark.sql.types import DoubleType
from pyspark.sql.types import StringType

numeric_columns = ["quantity", "price_per_unit", "total_spent"]

# First, convert string columns to numeric, replacing bad values with null
for col_name in numeric_columns:
    df_cleaned = df_cleaned.withColumn(
        col_name,
        when(
            (col(col_name).isNull()) | 
            (upper(trim(col(col_name))).isin(['ERROR', 'UNKNOWN', 'NULL', ''])),
            None
        ).otherwise(col(col_name).cast(DoubleType()))
    )

# Create flags for problematic numeric values (after conversion)
for col_name in numeric_columns:
    df_cleaned = df_cleaned.withColumn(
        f"{col_name}_IS_BAD",
        when(col(col_name).isNull(), 1).otherwise(0)
    )

# Calculate total bad columns per row
df_cleaned = df_cleaned.withColumn(
    "BAD_NUMERIC_COUNT",
    col("quantity_IS_BAD") + col("price_per_unit_IS_BAD") + col("total_spent_IS_BAD")
)

# IMPUTATION LOGIC: Calculate missing values from other columns
# If QUANTITY is NULL/ERROR/UNKNOWN → QUANTITY = TOTAL_SPENT / PRICE_PER_UNIT
df_cleaned = df_cleaned.withColumn(
    "quantity",
    when(
        (col("quantity_IS_BAD") == 1) & 
        (col("total_spent").isNotNull()) & 
        (col("price_per_unit").isNotNull()) &
        (col("price_per_unit") != 0),
        col("total_spent") / col("price_per_unit")
    ).otherwise(col("quantity"))
)

# If PRICE_PER_UNIT is NULL/ERROR/UNKNOWN → PRICE_PER_UNIT = TOTAL_SPENT / QUANTITY
df_cleaned = df_cleaned.withColumn(
    "price_per_unit",
    when(
        (col("price_per_unit_IS_BAD") == 1) & 
        (col("total_spent").isNotNull()) & 
        (col("quantity").isNotNull()) &
        (col("quantity") != 0),
        col("total_spent") / col("quantity")
    ).otherwise(col("price_per_unit"))
)

# If TOTAL_SPENT is NULL/ERROR/UNKNOWN → TOTAL_SPENT = QUANTITY * PRICE_PER_UNIT
df_cleaned = df_cleaned.withColumn(
    "total_spent",
    when(
        (col("total_spent_IS_BAD") == 1) & 
        (col("quantity").isNotNull()) & 
        (col("price_per_unit").isNotNull()),
        col("quantity") * col("price_per_unit")
    ).otherwise(col("total_spent"))
)

# Recalculate bad numeric count after imputation
df_cleaned = df_cleaned.withColumn(
    "BAD_NUMERIC_COUNT_AFTER",
    when(col("quantity").isNull(), 1).otherwise(0) +
    when(col("price_per_unit").isNull(), 1).otherwise(0) +
    when(col("total_spent").isNull(), 1).otherwise(0)
)

# Keep rows with 0 or 1 bad numeric columns after imputation, remove rows with 2 or more
df_cleaned = df_cleaned.filter(col("BAD_NUMERIC_COUNT_AFTER") < 2)

# Drop helper columns
df_cleaned = df_cleaned.drop(
    "quantity_IS_BAD", "price_per_unit_IS_BAD", "total_spent_IS_BAD", 
    "BAD_NUMERIC_COUNT", "BAD_NUMERIC_COUNT_AFTER"
)

# Convert numeric columns back to string type
for col_name in numeric_columns:
    df_cleaned = df_cleaned.withColumn(
        col_name,
        col(col_name).cast(StringType())
    )

# Display row count after numeric filtering and imputation
after_numeric_filter = df_cleaned.count()
print(f"Rows after numeric filtering and imputation: {after_numeric_filter}")
print(f"Rows removed: {after_text_filter - after_numeric_filter}")
display(df_cleaned.limit(10))


Rows after numeric filtering and imputation: 3556
Rows removed: 24


transaction_id,item,quantity,price_per_unit,total_spent,payment_method,location,transaction_date
txn_1961373,Coffee,2.0,2.0,4.0,Credit Card,Takeaway,08.09.2023
TXn_4977031,Cake,4.0,3.0,12.0,Cash,In-store,16.05.2023
TXN4271903,Cookie,4.0,1.0,4.0,Credit Card,In-store,19.07.2023
TXN3160411,Coffee,2.0,2.0,4.0,Digital Wallet,In-store,11.06.2023
TXN_2548360,Salad,5.0,5.0,25.0,Cash,Takeaway,07.11.2023
TXN_7619095,Sandwich,2.0,4.0,8.0,Cash,In-store,03.05.2023
TXN_284 7255,Salad,3.0,5.0,15.0,Credit Card,In-store,15.11.2023
TXN_69710,Juice,2.0,3.0,6.0,Cash,In-store,24.02.2023
TXN_3709394,Juice,4.0,3.0,12.0,Cash,Takeaway,15.01.2023
TXN_3522028,Smoothie,5.0,4.0,20.0,Cash,In-store,04.04.2023


In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number

# Create window partitioned by TRANSACTION_ID
window = Window.partitionBy("TRANSACTION_ID").orderBy("TRANSACTION_ID")

# Add row number for each TRANSACTION_ID group
df_with_row_num = df_cleaned.withColumn("row_num", row_number().over(window))

# Keep only first occurrence (row_num = 1)
df_cleaned = df_with_row_num.filter(col("row_num") == 1).drop("row_num")

# Display final row count
final_count = df_cleaned.count()
print(f"Rows after removing duplicates: {final_count}")
print(f"Total rows removed: {initial_count - final_count}")
display(df_cleaned.limit(10))


Rows after removing duplicates: 3556
Total rows removed: 6444


transaction_id,item,quantity,price_per_unit,total_spent,payment_method,location,transaction_date
TXN_2548360,Salad,5.0,5.0,25.0,Cash,Takeaway,07.11.2023
TXN3160411,Coffee,2.0,2.0,4.0,Digital Wallet,In-store,11.06.2023
TXN4271903,Cookie,4.0,1.0,4.0,Credit Card,In-store,19.07.2023
TXN_1000555,Tea,1.0,1.5,1.5,Credit Card,In-store,19.10.2023
TXN_1002457,Cookie,5.0,1.0,5.0,Digital Wallet,Takeaway,29.09.2023
TXN_1004184,Smoothie,1.0,4.0,4.0,Credit Card,In-store,18.05.2023
TXN_1004563,Tea,5.0,1.5,7.5,Credit Card,In-store,28.10.2023
TXN_1005331,Coffee,1.0,2.0,2.0,Digital Wallet,Takeaway,04.11.2023
TXN_1005377,Cake,5.0,3.0,15.0,Digital Wallet,Takeaway,03.06.2023
TXN_1006942,Salad,1.0,5.0,5.0,Credit Card,In-store,30.11.2023


In [0]:
# Define path for Bronze2 Delta Table
bronze2_path = "/mnt/bronze2/cafe_sales"

# Write cleaned data to Bronze2 (overwrite mode)
df_cleaned.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save(bronze2_path)

print(f"✅ Bronze2 table created/updated with cleaned data")
print(f"Final row count: {final_count}")


✅ Bronze2 table created/updated with cleaned data
Final row count: 3556


In [0]:
# Register Bronze2 table in Databricks catalog
spark.sql("""
    CREATE TABLE IF NOT EXISTS cafe_sales_bronze2
    USING DELTA
    LOCATION '/mnt/bronze2/cafe_sales'
""")

print("✅ Table 'cafe_sales_bronze2' registered in catalog")


✅ Table 'cafe_sales_bronze2' registered in catalog


In [0]:
# Refresh catalog and verify
spark.catalog.refreshTable("cafe_sales_bronze2")
print("✅ Catalog cache refreshed for Bronze2")

# Read back from Bronze2 to verify
df_verify = spark.read.format("delta").load(bronze2_path)

print(f"\nVerification - row count in Bronze2: {df_verify.count()}")
display(df_verify.limit(20))


✅ Catalog cache refreshed for Bronze2

Verification - row count in Bronze2: 3556


transaction_id,item,quantity,price_per_unit,total_spent,payment_method,location,transaction_date
TXN_2548360,Salad,5.0,5.0,25.0,Cash,Takeaway,07.11.2023
TXN3160411,Coffee,2.0,2.0,4.0,Digital Wallet,In-store,11.06.2023
TXN4271903,Cookie,4.0,1.0,4.0,Credit Card,In-store,19.07.2023
TXN_1000555,Tea,1.0,1.5,1.5,Credit Card,In-store,19.10.2023
TXN_1002457,Cookie,5.0,1.0,5.0,Digital Wallet,Takeaway,29.09.2023
TXN_1004184,Smoothie,1.0,4.0,4.0,Credit Card,In-store,18.05.2023
TXN_1004563,Tea,5.0,1.5,7.5,Credit Card,In-store,28.10.2023
TXN_1005331,Coffee,1.0,2.0,2.0,Digital Wallet,Takeaway,04.11.2023
TXN_1005377,Cake,5.0,3.0,15.0,Digital Wallet,Takeaway,03.06.2023
TXN_1006942,Salad,1.0,5.0,5.0,Credit Card,In-store,30.11.2023
