In [0]:
from pyspark.sql.functions import col, sum, year
import dlt

In [0]:
from pyspark.sql.functions import col, row_number, date_format, coalesce, datediff, current_date, when, lit
from pyspark.sql.window import Window
import dlt

@dlt.table(
    name="gold_opportunities_with_latest_change",
    comment="Combines silver_crm_opportunities with latest change_date, stage, closing month, and overdue status for valid stage changes."
)
def gold_opportunities_with_latest_change():
    opportunities_df = dlt.read("silver_crm_opportunities")
    field_history_df = dlt.read("silver_crm_field_history")

    valid_stages = ["New", "New Stage", "Qualified", "Proposition", "Won", "Lost",]
    latest_field_history = field_history_df \
        .filter(col("crm_id").isNotNull()) \
        .filter((col("field_info") == "stage_id") & col("new_value").isin(valid_stages)) \
        .withColumn(
            "rn",
            row_number().over(
                Window.partitionBy("crm_id").orderBy(col("change_date").desc())
            )
        ) \
        .filter(col("rn") == 1) \
        .select(
            col("crm_id"),
            col("change_date").alias("latest_change_date"),
            col("new_value").alias("latest_stage")
        )
    
    opportunities_enriched = opportunities_df \
        .withColumn(
            "closing_month",
            date_format(coalesce(col("expected_closing_date"), col("create_date")), "MMMM")
        )

    final_df = opportunities_enriched.join(
        latest_field_history,
        opportunities_enriched.opportunity_id == latest_field_history.crm_id,
        "left"
    ).drop("crm_id") \
     .withColumn(
         "is_overdue",
         when(
             datediff(current_date(), coalesce(col("latest_change_date"), col("create_date"))) > 14,
             lit(True)
         ).otherwise(lit(False))
     ) \
     .withColumn(
         "days_overdue",
         datediff(current_date(), coalesce(col("latest_change_date"), col("create_date")))
     ) \
     .na.fill({
         "latest_change_date": "1970-01-01 00:00:00",
         "latest_stage": "Unknown",
         "closing_month": "Unknown",
         "days_overdue": 0
     })
    
    return final_df