In [0]:
import dlt
from pyspark.sql.functions import col, when, to_date, udf, current_timestamp
from pyspark.sql.types import StringType

# 1. Streaming source table
@dlt.table(name="opportunity_raw_streaming")
@dlt.expect_or_drop("valid_lead", "Lead_Name IS NOT NULL")
def opportunity_raw_streaming():
    return (
        # Sử dụng Auto Loader để tự động phát hiện file mới
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.schemaLocation", "dbfs:/tmp/j4_schema_checkpoint")
        .option("header", "true")
        .load("dbfs:/tmp/j4_databrick_streaming/")
        .withColumn("processing_time", current_timestamp())
        # Chuyển đổi boolean fields
        .withColumn("Closed_Opportunity", when(col("Closed_Opportunity") == "TRUE", True).otherwise(False))
        .withColumn("Active_Opportunity", when(col("Active_Opportunity") == "TRUE", True).otherwise(False))
        .withColumn("Latest_Status_Entry", when(col("Latest_Status_Entry") == "TRUE", True).otherwise(False))
        # Chuyển đổi date fields
        .withColumn("Date", to_date(col("Date"), "M/d/yyyy"))
        .withColumn("Target_Close", to_date(col("Target_Close"), "M/d/yyyy"))
    )

# 2. Masking function for PII
def mask_text(text):
    return str(text)[0] + "***" if text else text

mask_udf = udf(mask_text, StringType())

# 3. Enriched streaming table
@dlt.table(name="opportunity_enriched_streaming")
def opportunity_enriched_streaming():
    # Lưu ý: read_stream thay vì read
    df = dlt.read_stream("opportunity_raw_streaming")
    df = df.withColumn("Big_Deal", col("Forecasted_Monthly_Revenue") > 100000)
    return df

# 4. Masked streaming table
@dlt.table(name="opportunity_masked_streaming")
def opportunity_masked_streaming():
    # Lưu ý: read_stream thay vì read
    df = dlt.read_stream("opportunity_enriched_streaming")
    df = df.withColumn("Salesperson", mask_udf(col("Salesperson")))
    df = df.withColumn("Lead_Name", mask_udf(col("Lead_Name")))
    return df

# 5. Final streaming table with partitioning
@dlt.table(name="opportunity_final_streaming", partition_cols=["Region"])
def opportunity_final_streaming():
    # Lưu ý: read_stream thay vì read
    return dlt.read_stream("opportunity_masked_streaming")
