In [0]:
from pyspark.sql import functions as F

def fact_course_progress(catalog_name):
    # Load input tables
    acc_path = f"{catalog_name}.cleaned_data.analytics_course_completions"
    apo_path = f"{catalog_name}.cleaned_data.analytics_program_overview"
    dc_path = f"{catalog_name}.schema.dim_course_progress"
    dpp_path = f"{catalog_name}.schema.dim_program_progress"
    dfyc_path = f"{catalog_name}.schema.dim_fy_completed"
    dfyb_path = f"{catalog_name}.schema.dim_fy_began"
    acc = spark.read.table(acc_path)
    apo = spark.read.table(apo_path)
    dc = spark.read.table(dc_path)
    dpp = spark.read.table(dpp_path)
    dfyc = spark.read.table(dfyc_path)
    dfyb = spark.read.table(dfyb_path)

    # Build base progress records
    progress_records = (
        acc.alias("acc")
        .join(apo.alias("apo"), (F.col("acc.Course ID") == F.col("apo.Course ID")) & (F.col("acc.User ID") == F.col("apo.User ID")), how="left")
        .join(dc.alias("dc"), (F.col("acc.Course ID") == F.col("dc.course_id")) & (F.col("acc.User ID") == F.col("dc.user_id")), how="left")
        .join(dpp.alias("dpp"), (F.col("apo.Program ID") == F.col("dpp.program_id")) & (F.col("apo.User ID") == F.col("dpp.user_id")), how="left")
        .filter(~(
            F.col("acc.Time enrolled").isNull() &
            F.col("dpp.time_assigned").isNull() &
            F.col("acc.Time completed").isNull()
        ))
        .select(
            F.col("acc.User ID").alias("user_id"),
            F.coalesce(F.col("apo.Program ID"), F.lit(-1)).alias("program_id"),
            F.col("acc.Course ID").alias("course_id"),
            F.lit(-1).cast("bigint").alias("scorm_id"),
            F.lit(-1).alias("cert_id"),
            F.lit(-1).cast("bigint").alias("certification_user_id"),
            F.coalesce(F.col("dpp.program_user_id"), F.lit(-1)).alias("program_user_id"),
            F.coalesce(F.col("dc.course_user_id"), F.lit(-1)).alias("course_user_id"),
            F.lit(-1).alias("event_id"),
            F.lit(-1).alias("session_id"),
            F.lit(1).alias("limit_to_course_progress"),
            F.col("acc.Include in Dashboard"),

            # date_began logic
            F.when(F.col("acc.Time enrolled").isNull(),
                F.when(F.col("dpp.time_assigned").isNull(),
                    F.when(F.col("acc.Time completed").isNull(), F.lit("1900-01-01").cast("date"))
                    .otherwise(F.to_date("acc.Time completed"))
                ).otherwise(
                    F.when(F.col("acc.Time completed") < F.col("dpp.time_assigned"), F.to_date("acc.Time completed"))
                    .otherwise(F.to_date("dpp.time_assigned"))
                )
            ).otherwise(
                F.when(F.col("acc.Time completed") < F.col("acc.Time enrolled"), F.to_date("acc.Time completed"))
                .otherwise(F.to_date("acc.Time enrolled"))
            ).alias("date_began"),

            # date_completed
            F.when(F.col("acc.Time completed").isNull(), F.lit("1900-01-01").cast("date"))
            .otherwise(F.to_date("acc.Time completed")).alias("date_completed"),

            # days_difference
            F.when(F.col("acc.Time completed").isNull() | F.col("acc.Time enrolled").isNull(), None)
            .otherwise(F.datediff(F.to_date("acc.Time completed"), F.to_date("acc.Time enrolled"))).alias("days_difference"),

            # isoverdue
            F.when(F.col("acc.Completion Due Date").isNull(), F.lit(0))
            .when(F.col("acc.Time completed").isNull() & (F.to_date("acc.Completion Due Date") < F.current_date()), F.lit(1))
            .otherwise(F.lit(0)).alias("isoverdue")
        )
    )

    # Join FY tables
    result = (
        progress_records
        .join(dfyc, (F.col("date_completed") >= dfyc["fy_start_date"]) & (F.col("date_completed") <= dfyc["fy_end_date"]), how="left")
        .join(dfyb, (F.col("date_began") >= dfyb["fy_start_date"]) & (F.col("date_began") <= dfyb["fy_end_date"]), how="left")
        .select(
            "user_id", "program_id", "course_id", "scorm_id", "cert_id", "certification_user_id",
            "program_user_id", "course_user_id", "event_id", "session_id",
            "date_began", "date_completed", "days_difference",
            dfyc["fy_completed"], dfyb["fy_began"],
            "isoverdue", "limit_to_course_progress"
        )
    )

    # Save as Delta Table
    output_path = f"{catalog_name}.schema.fact_course_progress"
    result.write.format("delta").mode("overwrite").saveAsTable(output_path)

    return result