In [0]:
import sys
sys.path.append("../../")

In [0]:
from pyspark.sql.functions import (
    col, lit, to_date, to_timestamp, row_number,
    countDistinct, first, sum as sql_sum, when, coalesce,
    max as sql_max, min as sql_min, current_date, current_timestamp
)
from pyspark.sql.window import Window
from delta.tables import DeltaTable

from bussiness.customer import bv_customer_360_schema

In [0]:
partition_name = 'partition_date'
partition_date = '20250828'

In [0]:
partition_date_fmt = f"{partition_date[:4]}-{partition_date[4:6]}-{partition_date[6:]}"

sat_cust_info_table = "ctl_central_published.sc_sv_dv.sat_customer_personinfo"
hub_cust_table = "ctl_central_published.sc_sv_dv.hub_customer_daily"
link_cust_pol_table = "ctl_central_published.sc_sv_dv.link_customer_policy"

sat_pol_det = "ctl_central_published.sc_sv_dv.sat_policy_details"
sat_pol_event = "ctl_central_published.sc_sv_dv.sat_policy_status_event"

output_table  = "ctl_central_published.sc_sv_bv_customer.customer_360"

# Customer

In [0]:
today_cust = spark.sql(f"""
                        select * 
                        from {hub_cust_table}
                        where {partition_name} = '{partition_date_fmt}'
                       """)

In [0]:
cust_det = spark.sql(f"""
                        select * 
                        from {sat_cust_info_table}
                        where is_current = TRUE
                       """)\
                .select("customer_id","full_name","dob","email","phone","city")

In [0]:
today_cust_det = today_cust.alias('h').join(cust_det.alias('s') ,on="customer_id", how="left")
today_cust_det.display()

# Policy

In [0]:
sat_pol_det_changed = spark.sql(f"""
                        select * 
                        from {sat_pol_det}
                        where is_current = TRUE
                       """)
sat_pol_det_changed.display()

In [0]:
sat_pol_event = spark.sql(f"""
                        select * 
                        from {sat_pol_event}
                        where {partition_name} = '{partition_date_fmt}'
                       """)

w_policy = Window.partitionBy("policy_number").orderBy(col("status_ts").desc())
cur_status = (sat_pol_event
    .withColumn("rn", row_number().over(w_policy))
    .where(col("rn") == lit(1))
    .select("policy_number", col("status").alias("current_status"), col("status_ts").alias("current_status_ts")))

cur_status = cur_status.alias('cur').join(sat_pol_det_changed.alias('s') , on="policy_number", how="left")\
    .select('cur.*', 's.start_date', 's.end_date')

cur_status.display()

# Link customer policy

In [0]:
link_cust_pol = spark.sql(f"""
                        select * 
                        from {link_cust_pol_table}
                       """)
link_cust_pol.display()

# Customer Link Policy

In [0]:
today_cust_pol = (
    today_cust_det.alias("h").join(link_cust_pol.alias("l"), on="customer_id", how="left")
     .select('h.*', 
             'l.policy_number', 
             'l.record_source',
     ).withColumnRenamed("_ingest_ts", "first_seen_ts")
)
today_cust_pol.display()

# Join Policy

In [0]:
today_cust_pol = (today_cust_pol.alias('h')
                  .join(cur_status.alias('s'), on="policy_number", how="left")
                  )\
                  .withColumnRenamed('current_status_ts','last_status_ts')\
                  .withColumn(
                    "policy_is_active",
                    coalesce(
                        col("current_status").isin(["ACTIVE","REINSTATED"]),
                        (current_date() >= col("start_date")) & (current_date() <= col("end_date"))
                    )
                )
                  
today_cust_pol.display()

# Roll up

In [0]:
roll = (today_cust_pol.groupBy("customer_id")
    .agg(
        first(col("full_name")).alias("full_name"),
        first(col("dob")).alias("dob"),
        first(col("email")).alias("email"),
        first(col("phone")).alias("phone"),
        first(col("city")).alias("city"),
        first(col("first_seen_ts")).alias("first_seen_ts"),
        countDistinct("policy_number").alias("policies_total"),
        sql_sum(when(col("policy_is_active"), lit(1)).otherwise(lit(0))).cast("int").alias("policies_active"),
        sql_max(col("last_status_ts")).alias("last_status_ts"),
    ))
roll.display()

In [0]:
if not spark.catalog.tableExists(output_table):
    bv_customer_360_schema = bv_customer_360_schema()
    (
        spark.createDataFrame([], bv_customer_360_schema)
        .write
        .format("delta")
        .saveAsTable(output_table)
    )

In [0]:
target = DeltaTable.forName(spark, output_table)
(target.alias("t")
    .merge(roll.alias("s"), "t.customer_id = s.customer_id")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute())