In [0]:
my_suffix = 'fernando_my_demo_dlt'
path_customers = "/tmp/{}/customers/".format(my_suffix)
path_orders = "/tmp/{}/orders/".format(my_suffix)

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import dlt

@dlt.create_view(
    comment = "customers"
)
def customers():
    return spark.read.csv(path_customers, header = True)

In [0]:
@dlt.create_table(
    comment = "raw sales orders",
    table_properties = {
    "myCompanyPipeline.quality": "bronze",
    "pipelines.autoOptimize.managed": "true"
    }
)
def sales_orders_raw():
    return(
        spark.readStream.format("cloudFiles") \
                .option("cloudFiles.schemaLocation", "/tmp/{}".format(my_suffix)) \
                .option("cloudFiles.format", "json") \
                .option("cloudFiles.inferColumnTypes", "true") \
                .load(path_orders)
        
    )

In [0]:
@dlt.create_table(
    comment = "cleaned sales",
    partition_cols = ["order_date"],
    table_properties = {
        "myCompanyPipeline.quality": "silver",
        "pipelines.autoOptimize.managed": "true"
    }
)
@dlt.expect_or_drop("valid_order_number", "order_number IS NOT NULL")
# @dlt.expect_or_fail("valid_order_number", "order_number IS NOT NULL")
# @dlt.expect("valid_order_number", "order_number IS NOT NULL")
def sales_orders_cleaned():
    df = dlt.read("sales_orders_raw").join(dlt.read("customers"), ["customer_id", "customer_name"], "left")
    df = df.withColumn("order_datetime", from_unixtime(df.order_datetime).cast("TIMESTAMP"))
    df = df.withColumn("order_date", df.order_datetime.cast("DATE"))
    df = df.select("customer_id", "customer_name", "number_of_line_items", "order_datetime", "order_date", "order_number", "ordered_products", "state", "city", "lon", "lat", "units_purchased", "loyalty_segment")
    return df

In [0]:
@dlt.create_table(
    comment = "agg sales orders in CA",
    table_properties = {
        "myCompanyPipeline.quality": "gold",
        "pipelines.autoOptimize.managed": "true"
    }
)
def sales_order_in_ca():
    df = dlt.read_stream("sales_orders_cleaned").where("state == 'CA'")
    df = df.select(df.city, df.order_date, df.customer_id, df.customer_name, explode(df.ordered_products).alias("ordered_products_explode"))
    
    dfAgg = df.groupBy(df.order_date, df.city, df.customer_id, df.customer_name, df.ordered_products_explode.curr.alias("currency"))\
    .agg(sum(df.ordered_products_explode.price).alias("sales"), sum(df.ordered_products_explode.qty).alias("quantity"))
    
    return dfAgg