Instalando e importando as bibliotecas necessárias - Incluindo a DLT - Delta Live Table

In [None]:
pip install dlt

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
#principal biblioteca da delta live table
import dlt

creation device and subscription folders

In [None]:
# criando um novo diretorio
dbutils.fs.mkdirs("dbfs:/FileStore/tables/pipeline/landing/device")
dbutils.fs.mkdirs("dbfs:/FileStore/tables/pipeline/landing/subscription")

from data lake to bronze tables {device & subscription}

In [None]:
# set location where raw data lands
# data lake landing zone
json_device_path = "dbfs:/FileStore/tables/pipeline/landing/device/*.json"
json_subscription_path = "dbfs:/FileStore/tables/pipeline/landing/subscription/*.json"

#criou uma dlt table e passou propriedades dela - device
@dlt.table(
    comment="raw json data from data lake.",
    table_properties{"quality": "bronze"}
)
def bronze_device():
    return (spark.read.format("json").load(json_device_path))

# subscription
@dlt.table(
    comment="raw json data from data lake.",
    table_properties{"quality": "bronze"}
)
def bronze_subscription():
    return (spark.read.format("json").load(json_subscription_path))

from bronze to silver {transformation & enrichment}

In [None]:
@dlt.view(
    comment="enrich and sanitize data"
)
def device_enriched():
    return(
        dl.read("bronze_device").select(
        col("user_id").alias("device_user_id"),
        col("model").alias("device_model"),
        col("manufacturer").alias("device_manufacturer"),
        col("platform").alias("device_platform"),
        lit(current_timestamp()).alias("device_event_time"),
        col("dt_current_timestamp").alias("device_dt_current_timestamp")
        )
    )
    
@dlt.view(
    comment="enrich and sanitize data"
)
def subscription_enriched():
    return(
        dl.read("bronze_subscription").select(
        col("user_id").alias("subscription_user_id"),
        col("plan").alias("subscription_plan"),
        col("status").alias("subscription_status"),
        col("payment_method").alias("subscription_payment_method"),
        col("subscription_term").alias("subscription_subscription_term"),
        col("payment_term").alias("subscription_payment_term"),
        lit(current_timestamp()).alias("subscription_event_time"),
        col("dt_current_timestamp").alias("subscription_dt_current_timestamp")
        )
    )

silver table {subscriptions} = domain table design

In [None]:
# create silver table
# join between device & subscription
@dlt.table(
    comment="domain subscriptions silver table.",
    table_properties={"quality": "silver"}
)
def silver_subscriptions():
    
    # get data from previous step
    # access the views
    bronze_device = dlt.read("device_enriched")
    bronze_subscription = dlt.read("subscription_enriched")
    
    # join datasets
    # apply inner join
    join_device_subscription = bronze_device.join(bronze_subscription, bronze_device.device_user_id == bronze_subscription.subscription_user_id, how='inner')
    
    # result of the join
    # store final result into silver
    result_df = join_device_subscription.select(
        col("device_user_id").alias("user_id"),
        col("device_model").alias("model"),
        col("device_manufacturer").alias("manufacturer"),
        col("device_platform").alias("platform"),
        col("subscription_plan").alias("plan"),
        col("subscription_status").alias("status"),
        col("subscription_payment_method").alias("payment"),
        col("subscription_subscription_term").alias("commitment"),
        col("subscription_payment_term").alias("term"),
        col("subscription_event_time").alias("event_time"),
        col("subscription_dt_current_timestamp").alias("dt_current_timestamp"),
        )
    
    return result_df

gold tables = plans, grouped_plans

In [None]:
# 1
@dlt.table(
    comment="gold table for analytics team (full).",
    table_properties={"quality": "gold"}
)
def gold_plans():
    
    # get silver table
    get_silver_subscriptions = spark.table("LIVE.silver_subscriptions")
    
    # select columns
    # build table
    plans_gold_tb = get_silver_subscriptions.select(
        col("user_id"),
        col("plan"),
        col("model"),
        col("dt_current_timestamp"),
    )
    
    return plans_gold_tb

# 
@dlt.table(
    comment="gold table for analytics team (full).",
    table_properties={"quality": "gold"}
)
def gold_plans_agg():
    
    # get silver table
    get_silver_subscriptions = spark.table("LIVE.silver_subscriptions")
    
    # agg plans data
    plans_gold_tb_agg = get_silver_subscriptions_groupBy("plan").count()
    
    return plans_gold_tb_agg

delta tables location and consulting

In [None]:
display(dbutils.fs.ls("dbfs:/pipelines/<pipeline_id>/tables"))

In [None]:
%%sql
SELECT *
FROM delta.`dbfs:/pipelines/<pipeline_id>/tables/<table_name>`