In [0]:
import dlt
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [0]:
# Charge in memory structured table for sourcing table and cache apply to keep in memory
dfBasedTable = spark.read.table("ds_goc_bronze_dev.ds_goc_silver_dev.structured_statuspage").cache()

In [0]:
# Create a base table with necessary columns and calculated fields
@dlt.view
def base_table():
    return (
        dfBasedTable
        .filter((F.col("EmailValidity") == 'valid') & 
                (F.col("EmailType") == 'outside') & 
                (F.col("BlankStatus") == 'valid'))
        .select("Email", "FirstName", "LastName", "Notification", 
                F.col("StatusPageName").alias("productName"), 
                F.col("StatusPageNameCount").alias("controlProduct"), 
                "CompanyNameSource", "snapshot_date", 
                F.weekofyear("snapshot_date").alias("iso_week_num"),
                F.row_number().over(Window.partitionBy("Email").orderBy("snapshot_date")).alias("row_num"),
                F.when(F.row_number().over(Window.partitionBy("Email").orderBy("snapshot_date")) == 1, 'New').otherwise('Regular').alias("client_status"))
    )

In [0]:
# Get the weekly presence of customers
@dlt.view
def weekly_presence():
    base_table = dlt.read("base_table")
    return (
        base_table
        .groupBy("Email")
        .agg(F.min("iso_week_num").alias("first_seen_week"),
             F.max("iso_week_num").alias("last_seen_week"))
    )

In [0]:
# Identify when customers stop appearing week-over-week
@dlt.view
def customer_tracking():
    base_table = dlt.read("base_table")
    return (
        base_table
        .withColumn("prev_week", F.lag("iso_week_num").over(Window.partitionBy("Email").orderBy("iso_week_num")))
        .withColumn("last_active_week", F.when(F.lag("iso_week_num").over(Window.partitionBy("Email").orderBy("iso_week_num")).isNull(), None)
                    .when(F.lag("iso_week_num").over(Window.partitionBy("Email").orderBy("iso_week_num")) < F.col("iso_week_num") - 1, 
                          F.lag("iso_week_num").over(Window.partitionBy("Email").orderBy("iso_week_num")))
                    .otherwise(None))
    )

In [0]:
# Flag customers as "Removed" if they have disappeared from any week
@dlt.view
def removed_customers():
    customer_tracking = dlt.read("customer_tracking")
    weekly_presence = dlt.read("weekly_presence")
    return (
        customer_tracking
        .join(weekly_presence, "Email", "left")
        .withColumn("last_seen_week", F.coalesce(F.col("last_active_week"), F.col("last_seen_week")))
        .withColumn("subscription_status", F.when(F.col("last_active_week").isNotNull(), 'Removed').otherwise('Active'))
    )

In [0]:
# Keep only the earliest status per week
@dlt.view
def earliest_status():
    removed_customers = dlt.read("removed_customers")
    return (
        removed_customers
        .withColumn("rn", F.row_number().over(Window.partitionBy("Email", "iso_week_num").orderBy("snapshot_date", F.length("productName"))))
    )

In [0]:
# Keep only the first occurrence per week
@dlt.view
def clean_subscription_status():
    earliest_status = dlt.read("earliest_status")
    return (
        earliest_status
        .filter(F.col("rn") == 1)
        .select("Email", "FirstName", "LastName", "Notification", "productName", "controlProduct", "CompanyNameSource", 
                "iso_week_num", "client_status", "subscription_status", F.length("productName").alias("len_product"), "last_seen_week")
    )

In [0]:
# Identify missing customers who vanished from the latest week
@dlt.view
def missing_customers():
    clean_subscription_status = dlt.read("clean_subscription_status")
    return (
        clean_subscription_status.alias("p")
        .join(clean_subscription_status.alias("c"), 
              (F.col("p.Email") == F.col("c.Email")) & (F.col("c.iso_week_num") == F.col("p.iso_week_num") + 1), 
              "left")
        .filter(F.col("c.Email").isNull())
        .select(F.col("p.Email"), F.col("p.FirstName"), F.col("p.LastName"), F.col("p.Notification"), F.col("p.productName"), 
                F.col("p.controlProduct"), F.col("p.CompanyNameSource"), F.col("p.iso_week_num").alias("prev_week_num"), 
                F.col("p.subscription_status").alias("prev_status"))
    )

In [0]:
# Append missing customers with "Removed" status
@dlt.view
def final_status():
    clean_subscription_status = dlt.read("clean_subscription_status")
    missing_customers = dlt.read("missing_customers")
    return (
        clean_subscription_status
        .unionByName(missing_customers
            .select("Email", "FirstName", "LastName", "Notification", "productName", "controlProduct", "CompanyNameSource", 
                    (F.col("prev_week_num") + 1).alias("iso_week_num"), 
                    F.lit('Regular').alias("client_status"), 
                    F.lit('Removed').alias("subscription_status"), 
                    F.lit(None).alias("len_product"), 
                    F.col("prev_week_num").alias("last_seen_week"))
        )
    )

In [0]:
# Identify changes across weeks only selecting iso weeks with current date
@dlt.view
def change_identify():
    final_status = dlt.read("final_status")
    max_week_num = spark.read.table("ds_goc_bronze_dev.ds_goc_silver_dev.structured_statuspage").agg(F.max(F.weekofyear("snapshot_date"))).collect()[0][0]
    
    return (
        final_status
        .withColumn("prev_len_product", F.lag("len_product").over(Window.partitionBy("Email").orderBy("iso_week_num", "productName", "CompanyNameSource")))
        .withColumn("prev_Email", F.lag("Email").over(Window.partitionBy("Email").orderBy("iso_week_num", "productName", "CompanyNameSource")))
        .withColumn("prev_FirstName", F.lag("FirstName").over(Window.partitionBy("Email").orderBy("iso_week_num", "productName", "FirstName", "CompanyNameSource")))
        .withColumn("prev_LastName", F.lag("LastName").over(Window.partitionBy("Email").orderBy("iso_week_num", "productName", "LastName", "CompanyNameSource")))
        .withColumn("prev_Notification", F.lag("Notification").over(Window.partitionBy("Email").orderBy("iso_week_num", "productName", "CompanyNameSource")))
        .withColumn("prev_productName", F.lag("productName").over(Window.partitionBy("Email").orderBy("iso_week_num", "productName", "CompanyNameSource")))
        .filter(F.col("iso_week_num") <= F.lit(max_week_num))
    )

In [0]:
# Generate final report with change tracking
@dlt.table(
    name="curated_statuspage",
    comment="This table transform all users for the subscribers on status page of the applications",
    table_properties={"quality": "gold"}
)
def curated_statuspage():
    change_identify = dlt.read("change_identify")
    return (
        change_identify
        .select("Email", "FirstName", "LastName", "Notification", "productName", "controlProduct", "CompanyNameSource", 
                "iso_week_num", "client_status", "subscription_status", "last_seen_week",
                F.when(F.col("Email") != F.col("prev_Email"), 'Email_Changed')
                 .when(F.col("FirstName") != F.col("prev_FirstName"), 'First_Name_Changed')
                 .when(F.col("LastName") != F.col("prev_LastName"), 'Last_Name_Changed')
                 .when(F.col("Notification") != F.col("prev_Notification"), 'Notification_Changed')
                 .when((F.col("len_product") != F.col("prev_len_product")) & (F.col("productName") != F.col("prev_productName")), 'Product_Changed')
                 .when(F.col("subscription_status") == 'Removed', 'Removed')
                 .otherwise(None).alias("change_type"))
        .orderBy("Email", "iso_week_num")
    )