In [0]:
# Load Bronze table
bronze_df = spark.read.table("retail_bronze")

print("📌 Bronze Table Schema:")
bronze_df.printSchema()

display(bronze_df.limit(5))


In [0]:
# Path to Bronze Delta log
bronze_log_path = "abfss://data@databricksmths.dfs.core.windows.net/retail/bronze/_delta_log/*.json"

# Read Delta transaction log
log_df = spark.read.json(bronze_log_path)

print("📌 Raw Delta Log Schema:")
log_df.printSchema()

display(log_df.limit(5))


In [0]:
from delta.tables import DeltaTable

delta_bronze = DeltaTable.forPath(spark, "abfss://data@databricksmths.dfs.core.windows.net/retail/bronze")

# View transaction history (who wrote what, when)
display(delta_bronze.history())


In [0]:
from pyspark.sql.functions import to_timestamp, col, regexp_replace

silver_df = bronze_df \
    .withColumn("Quantity", col("Quantity").cast("int")) \
    .withColumn("UnitPrice", regexp_replace("UnitPrice", ",", ".").cast("double"))\
    .withColumn("CustomerID", col("CustomerID").cast("int")) \
    .withColumn("InvoiceDate", to_timestamp("InvoiceDate", "dd.MM.yy HH:mm")) \
    .withColumnRenamed("StockCode", "ProductCode") \
    .withColumnRenamed("Description", "ProductDesc") \
    .drop("_rescued_data")

# Drop nulls for key business columns
silver_df = silver_df.filter(
    col("InvoiceNo").isNotNull() &
    col("ProductCode").isNotNull() &
    col("CustomerID").isNotNull()
)


In [0]:
# Save Silver data into ADLS
silver_df.write.format("delta") \
    .mode("overwrite") \
    .save("abfss://data@databricksmths.dfs.core.windows.net/retail/silver")


In [0]:
%sql
-- Drop old table if exists
DROP TABLE IF EXISTS retail_silver;

-- Register Silver table in Unity Catalog
CREATE TABLE retail_silver
USING DELTA
LOCATION 'abfss://data@databricksmths.dfs.core.windows.net/retail/silver';


In [0]:
silver_check = spark.read.table("retail_silver")

print("📌 Silver Table Schema:")
silver_check.printSchema()

display(silver_check.limit(10))
