In [1]:
# Welcome to your new notebook
# Type here in the cell editor to add code!

from datetime import datetime
from pyspark.sql import Row

# Define metadata row for Silver Lakehouse
metadata_silver = [Row(
    lakehouse_name="silver_pro",  # Update if needed
    layer="Silver",
    created_on=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
    created_by="your.email@company.com"
)]

# Convert to DataFrame
df_meta_silver = spark.createDataFrame(metadata_silver)

# Save metadata as a Delta Table inside the Silver Lakehouse
df_meta_silver.write.format("delta") \
    .mode("overwrite") \
    .save("abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/silver_pro.Lakehouse/Tables/lakehouse_metadata")

# Confirm by displaying metadata
df_meta_silver.show()

StatementMeta(, 6d67c808-d512-4b59-afe1-02c5bcbf2838, 3, Finished, Available, Finished)

+--------------+------+-------------------+--------------------+
|lakehouse_name| layer|         created_on|          created_by|
+--------------+------+-------------------+--------------------+
|    silver_pro|Silver|2025-04-05 10:48:37|your.email@compan...|
+--------------+------+-------------------+--------------------+



In [2]:
# Read Sales Data from Bronze
df_sal = spark.read.format("delta") \
    .load("abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/bronze_pro.Lakehouse/Tables/bronze_sal")

# Read Product Data from Bronze
df_prod = spark.read.format("delta") \
    .load("abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/bronze_pro.Lakehouse/Tables/bronze_prod")

# Read Region Data from Bronze
df_reg = spark.read.format("delta") \
    .load("abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/bronze_pro.Lakehouse/Tables/bronze_reg")


StatementMeta(, 6d67c808-d512-4b59-afe1-02c5bcbf2838, 4, Finished, Available, Finished)

In [3]:
# Here, “for debugging” refers to capturing and storing problematic or rejected records so you (or others) can analyze them later to:
# Understand why they failed
# Fix data issues at the source or during transformations
# Ensure data transparency and traceability

StatementMeta(, 6d67c808-d512-4b59-afe1-02c5bcbf2838, 5, Finished, Available, Finished)

In [5]:
from pyspark.sql.functions import col, lit, current_timestamp
from pyspark.sql import Row
from datetime import datetime

# -----------------------------------
# Step 2: Filter Rejected Records
# -----------------------------------

# Rejected Sales: Null or negative SalesAmount
df_sales_rejected = df_sal.filter(
    col("SalesAmount").isNull() | (col("SalesAmount") < 0)
)

# Rejected Product: Missing ProductKey or Standard_Cost
df_product_rejected = df_prod.filter(
    col("ProductKey").isNull() | col("Standard_Cost").isNull()
)

# Rejected Region: Null SalesTerritoryKey or Country
df_region_rejected = df_reg.filter(
    col("SalesTerritoryKey").isNull() | col("Country").isNull()
)

# Write rejected records to Silver Layer
df_sales_rejected.write.format("delta").mode("overwrite").save("abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/silver_pro.Lakehouse/Tables/rejected_sales")
df_product_rejected.write.format("delta").mode("overwrite").save("abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/silver_pro.Lakehouse/Tables/rejected_product")
df_region_rejected.write.format("delta").mode("overwrite").save("abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/silver_pro.Lakehouse/Tables/rejected_region")

StatementMeta(, 6d67c808-d512-4b59-afe1-02c5bcbf2838, 7, Finished, Available, Finished)

In [6]:
# -----------------------------------
# Step 3: Clean the Data
# -----------------------------------

# Clean Sales
df_sales_clean = df_sal.dropDuplicates(["SalesOrderNumber", "ProductKey"]) \
    .filter(col("SalesAmount").isNotNull() & (col("SalesAmount") >= 0)) \
    .withColumn("OrderDate", col("OrderDate").cast("date")) \
    .withColumn("SalesAmount", col("SalesAmount").cast("double"))

# Clean Product
df_product_clean = df_prod.dropDuplicates(["ProductKey"]) \
    .filter(col("ProductKey").isNotNull()) \
    .fillna({
        "Color": "Unknown",
        "Subcategory": "Unknown",
        "Category": "Unknown"
    })

# Clean Region
df_region_clean = df_reg.dropDuplicates(["SalesTerritoryKey"]) \
    .filter(col("SalesTerritoryKey").isNotNull()) \
    .fillna({
        "Region": "Unknown",
        "Country": "Unknown",
        "Group": "Unknown"
    })

StatementMeta(, 6d67c808-d512-4b59-afe1-02c5bcbf2838, 8, Finished, Available, Finished)

In [7]:
# -----------------------------------
# Step 4: Write Clean Data to Silver
# -----------------------------------

df_sales_clean.write.format("delta").mode("overwrite").save("abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/silver_pro.Lakehouse/Tables/silver_sales")
df_product_clean.write.format("delta").mode("overwrite").save("abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/silver_pro.Lakehouse/Tables/silver_product")
df_region_clean.write.format("delta").mode("overwrite").save("abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/silver_pro.Lakehouse/Tables/silver_region")


StatementMeta(, 6d67c808-d512-4b59-afe1-02c5bcbf2838, 9, Finished, Available, Finished)

In [9]:
# -----------------------------------
# Step 5: Log Transformation Rules
# -----------------------------------

transformation_log = [
    Row(table_name="sales", transformation_applied="Drop duplicates, remove null/negative SalesAmount, cast types", timestamp=str(datetime.now())),
    Row(table_name="product", transformation_applied="Drop duplicates, fill nulls in category fields", timestamp=str(datetime.now())),
    Row(table_name="region", transformation_applied="Drop duplicates, fill nulls in region fields", timestamp=str(datetime.now()))
]

df_trans_log = spark.createDataFrame(transformation_log)

df_trans_log.write.format("delta").mode("append").save("abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/silver_pro.Lakehouse/Tables/silver_transformation_log")


StatementMeta(, 6d67c808-d512-4b59-afe1-02c5bcbf2838, 11, Finished, Available, Finished)

In [10]:

df = spark.read.parquet("Tables/silver_transformation_log/part-00000-a3f467f8-906a-43b1-a6e8-afdede388a51-c000.snappy.parquet")
# df now is a Spark DataFrame containing parquet data from "Tables/silver_transformation_log/part-00000-a3f467f8-906a-43b1-a6e8-afdede388a51-c000.snappy.parquet".
display(df)

StatementMeta(, 6d67c808-d512-4b59-afe1-02c5bcbf2838, 12, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, f5c0d57a-74e9-4255-80aa-763e93140417)

In [12]:
from pyspark.sql import Row
from pyspark.sql.functions import col, sum as _sum
from datetime import datetime

def log_data_quality(table_name, df, primary_key):
    # Count NULLs in each column
    null_counts = df.select([
        col(c).isNull().cast("int").alias(c) for c in df.columns
    ]).agg(
        *[_sum(col(c)).alias(f"{c}_nulls") for c in df.columns]
    ).collect()[0].asDict()

    # Count duplicates
    duplicate_count = df.count() - df.dropDuplicates([primary_key]).count()

    # Prepare a flat row with null counts
    row_data = {
        "table_name": table_name,
        "record_count": df.count(),
        "duplicate_count": duplicate_count,
        "log_time": str(datetime.now())
    }
    row_data.update(null_counts)

    # Create DataFrame with a single row of metrics
    quality_log = spark.createDataFrame([Row(**row_data)])

    # Write to delta (append mode)
    quality_log.write.format("delta") \
    .option("mergeSchema", "true") \
    .mode("append") \
    .save("abfss://FabricTrainingWorkspace@onelake.dfs.fabric.microsoft.com/silver_pro.Lakehouse/Tables/data_quality_log1")



StatementMeta(, 6d67c808-d512-4b59-afe1-02c5bcbf2838, 14, Finished, Available, Finished)

In [13]:
log_data_quality("silver_sales", df_sales_clean, "SalesOrderNumber")
log_data_quality("silver_product", df_product_clean, "ProductKey")
log_data_quality("silver_region", df_region_clean, "SalesTerritoryKey")

StatementMeta(, 6d67c808-d512-4b59-afe1-02c5bcbf2838, 15, Finished, Available, Finished)

In [20]:
# # Sales
# df_sales_clean.write \
#     .format("sqlanalytics") \
#     .option("url", "pstif7vh7kne3o72pus3obd3ie-57eh7tffavqefhydxiyvypi2pe.datawarehouse.fabric.microsoft.com") \
#     .option("dbtable", "gold.sales") \
#     .mode("overwrite") \
#     .save()

# # Product
# df_product_clean.write \
#     .format("sqlanalytics") \
#     .option("url", "pstif7vh7kne3o72pus3obd3ie-57eh7tffavqefhydxiyvypi2pe.datawarehouse.fabric.microsoft.com") \
#     .option("dbtable", "gold.product") \
#     .mode("overwrite") \
#     .save()

# # Region
# df_region_clean.write \
#     .format("sqlanalytics") \
#     .option("url", "pstif7vh7kne3o72pus3obd3ie-57eh7tffavqefhydxiyvypi2pe.datawarehouse.fabric.microsoft.com") \
#     .option("dbtable", "gold.region") \
#     .mode("overwrite") \
#     .save()


StatementMeta(, 6d67c808-d512-4b59-afe1-02c5bcbf2838, 22, Finished, Available, Finished)

In [22]:
df_product_clean.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("silver_product2")

StatementMeta(, 6d67c808-d512-4b59-afe1-02c5bcbf2838, 24, Finished, Available, Finished)

In [23]:
df_region_clean.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("silver_region3")

StatementMeta(, 6d67c808-d512-4b59-afe1-02c5bcbf2838, 25, Finished, Available, Finished)

In [24]:
df_sales_clean.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("silver_sale2")

StatementMeta(, 6d67c808-d512-4b59-afe1-02c5bcbf2838, 26, Finished, Available, Finished)