In [2]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, coalesce, lit, when, broadcast
from delta.tables import DeltaTable 
from functools import reduce

spark = SparkSession.builder \
    .appName("IncrementalMotorDataProcessing") \
    .getOrCreate()

main_table = spark.read.format("delta").table("raw_to_coverage") 
temp_table = spark.read.format("delta").table("raw_to_coverage_temp")
target_table = spark.read.format("delta").table("A6_MOTOR_EARNED_PREMIUM") 

common_columns = [
    "data_system",
    "v_uid",
    "p_code",
    "p_product_code",
    "std_p_branch",
    "std_p_department",
    "std_p_code_cooperation_type",
    "std_p_customer_name",
    "p_salestaff",
    "coverage",
    "std_v_date_issue",
    "std_v_date_expiry",
    "PREMIUM_written", "UWE_written", "CAT_written"]
main_table_alias = main_table.alias("main")
temp_table_alias = temp_table.alias("temp")


StatementMeta(, f45ce3e8-b51d-4a77-8f7a-ba8763d2f0bc, 4, Finished, Available, Finished)

In [3]:

# Insert
inserts_df = main_table.join(
    temp_table,
    ["v_uid", "data_system", "coverage"],
    "left_anti"
)

# Update
key_columns = ["v_uid", "data_system", "coverage"]
all_columns = [c for c in main_table.columns if c not in key_columns]
update_conditions = [
    (main_table_alias[c] != temp_table_alias[c]) | 
    (main_table_alias[c].isNull() & temp_table_alias[c].isNotNull()) | 
    (main_table_alias[c].isNotNull() & temp_table_alias[c].isNull())
    for c in all_columns
]

updates_df = main_table_alias.join(
    temp_table_alias,
    ["v_uid", "data_system", "coverage"],
    "inner"
).filter(
    reduce(lambda a, b: a | b, update_conditions)
).select(
    *[col(f"main.{c}").alias(c) for c in common_columns if c in main_table.columns]
)
# Delete
deletes_df = temp_table.join(
    main_table,
    ["v_uid", "data_system", "coverage"],
    "left_anti"
)

changed_add_df = inserts_df.union(updates_df)
changed_revert_df = deletes_df.union(updates_df)

changed_revert_df.cache()
changed_revert_df.count() 
changed_add_df.cache()
changed_add_df.count() 

filtered_target = target_table.join(
    changed_revert_df.select("v_uid", "data_system", "coverage").distinct(),
    ["v_uid", "data_system", "coverage"],
    "left_semi"
)

filtered_target.cache()
filtered_target.count() 


StatementMeta(, f45ce3e8-b51d-4a77-8f7a-ba8763d2f0bc, 5, Finished, Available, Finished)

2380

In [5]:

changed_with_premium = changed_revert_df.join(
    broadcast(filtered_target),
    ["v_uid", "data_system", "coverage"],
    "left"
).select(
    changed_revert_df["data_system"],
    changed_revert_df["v_uid"],
    changed_revert_df["p_code"],
    changed_revert_df["p_product_code"],
    changed_revert_df["std_p_branch"],
    changed_revert_df["std_p_department"],
    changed_revert_df["std_p_code_cooperation_type"],
    changed_revert_df["std_p_customer_name"],
    changed_revert_df["p_salestaff"],
    changed_revert_df["coverage"],
    filtered_target["std_v_date_issue"],
    filtered_target["std_v_date_expiry"],
    filtered_target["earn_date"],
    filtered_target["EARNED_PREMIUM"],
    filtered_target["EARNED_UWE"],
    filtered_target["EARNED_CAT"],
    filtered_target["is_reverted"]
)

StatementMeta(, f45ce3e8-b51d-4a77-8f7a-ba8763d2f0bc, 7, Finished, Available, Finished)

In [6]:
reversed_df = changed_with_premium.select(
    *[col(c) for c in changed_with_premium.columns if c != "EARNED_PREMIUM"],
    (col("EARNED_PREMIUM") * -1).alias("EARNED_PREMIUM")
)

reversed_df.write.format("delta").mode("append").saveAsTable("A6_MOTOR_EARNED_PREMIUM")

keys_to_update = changed_revert_df.select("v_uid", "data_system",  "coverage").distinct().collect()

if keys_to_update:
    conditions = " OR ".join(
        [f"(v_uid = '{row['v_uid']}' AND data_system = '{row['data_system']}' AND coverage = '{row['coverage']}')" for row in keys_to_update]
    )

    spark.sql(f"""
        UPDATE A6_MOTOR_EARNED_PREMIUM
        SET is_reverted = 1
        WHERE is_reverted = 0 AND ({conditions})
    """)
else:
    print("⚠️ Không có dòng nào để cập nhật.")


StatementMeta(, f45ce3e8-b51d-4a77-8f7a-ba8763d2f0bc, 8, Finished, Available, Finished)

In [7]:

date_df = changed_add_df.select("std_v_date_issue", "std_v_date_expiry").distinct()

earned_data_filtered = spark.read.format("delta").table("A6_MOTOR_DATE_EARNED_RATE") \
    .join(date_df, ["std_v_date_issue", "std_v_date_expiry"], "inner") \
    .cache()
earned_data_filtered.count()

joined_change_data = changed_add_df.join(
    earned_data_filtered,
    ["std_v_date_issue", "std_v_date_expiry"],
    "left"
)

target_columns = target_table.columns 

final_change_data = joined_change_data.withColumn(
    "EARNED_PREMIUM",
    coalesce(col("EARNED_RATE"), lit(0)) * col("PREMIUM_written")
).withColumn(
    "is_reverted", lit(0)
).withColumn(
    "EARNED_UWE",
    coalesce(col("EARNED_RATE"), lit(0)) * col("UWE_written")
).withColumn(
    "EARNED_CAT",
    coalesce(col("EARNED_RATE"), lit(0)) * col("CAT_written")
).withColumn(
    "std_v_date_issue", col("std_v_date_issue").cast("DATE")
).withColumn(
    "std_v_date_expiry", col("std_v_date_expiry").cast("DATE")
).select(*target_columns)
final_change_data.write.format("delta").mode("append").saveAsTable("A6_MOTOR_EARNED_PREMIUM")

StatementMeta(, f45ce3e8-b51d-4a77-8f7a-ba8763d2f0bc, 9, Finished, Available, Finished)