In [0]:
bronze_path = "/Volumes/databricks_training/delta_demo/bronze/customer_bronze/"
bronze_df = spark.read.format("delta").load(bronze_path)

bronze_df.printSchema()


In [0]:
from pyspark.sql.functions import col, to_date, regexp_replace ,expr

silver_df = (
    bronze_df
    .dropDuplicates(["customer_id"])
    .filter(col("customer_id").isNotNull())
    .withColumn("geography_id", col("geography_id").cast("int"))
    .withColumn("registration_date",
                expr("try_to_date(registration_date, 'yyyy-MM-dd')"))
    .withColumn("phone",
                regexp_replace(col("phone"), "[^0-9]", ""))
    .filter(col("email").contains("@"))
)

silver_df.printSchema()

In [0]:
silver_path = "/Volumes/databricks_training/delta_demo/silver/customers_silver"

(
    silver_df.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .save(silver_path)
)
silver_df.printSchema()

In [0]:
b_path = "/Volumes/databricks_training/delta_demo/bronze/product_bronze/"
b_df = spark.read.format("delta").load(b_path)

from pyspark.sql.functions import col, to_date, regexp_replace ,coalesce, lit,trim,upper

s_df = (
    b_df
    .filter(col("product_id").isNotNull())
    .dropDuplicates(["product_id"])
    .withColumn("supplier_id", col("supplier_id").cast("long"))
    .filter(col("price").isNotNull() & (col("price") > 0))
    .filter(col("cost").isNotNull() & (col("cost") >= 0))
    .withColumn("product_name", trim(col("product_name")))
    .withColumn("category", upper(trim(col("category"))))
)


s_path = "/Volumes/databricks_training/delta_demo/silver/product_silver"

(
    s_df.write
    .format("delta")
    .mode("overwrite")
    .save(s_path)
)

s_df.printSchema()

In [0]:
bronze_path = "/Volumes/databricks_training/delta_demo/bronze/order_bronze"
orders_bronze_df = spark.read.format("delta").load(bronze_path)
orders_bronze_df.printSchema()


In [0]:

from pyspark.sql.functions import col, coalesce, to_date, lit ,expr

orders_silver_df = (
    orders_bronze_df
    .filter(col("order_id").isNotNull())
    .dropDuplicates(["order_id"])
    .withColumn("customer_id", col("customer_id").cast("long"))
    .withColumn(
        "order_date",
        expr("try_to_date(order_date, 'yyyy-MM-dd')")
    )
    .withColumn("discount", coalesce(col("discount"), lit(0)))
    .withColumn("shipping_cost", coalesce(col("shipping_cost"), lit(0)))
    .filter(col("quantity") > 0)
    .filter(col("unit_price") > 0)
)

orders_silver_df.printSchema()


In [0]:
silver_path = "/Volumes/databricks_training/delta_demo/silver/order_silver"

(
    orders_silver_df.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .save(silver_path)
)

orders_silver_df.display()