In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, StringType, DateType

# ---------------------------------------------------------
# 1. LOAD SILVER TABLES
# ---------------------------------------------------------

po_silver = spark.table("abc.abc_dw_silver.abc_dw_sl_pur_ord")
pr_silver = spark.table("abc.abc_dw_silver.abc_dw_sl_pr_req")

# ---------------------------------------------------------
# 2. FULL OUTER JOIN PR + PO DATASETS
# ---------------------------------------------------------

# Rename lastchangedatetime in both tables before join to avoid duplicate column names
#po_silver = po_silver.withColumnRenamed("lastchangedatetime", "po_lastchangedatetime")
#pr_silver = pr_silver.withColumnRenamed("lastchangedatetime", "pr_lastchangedatetime")

df_gold = (
    pr_silver.alias("pr")
    .join(
        po_silver.alias("po"),
        (F.col("pr_purchaserequisition") == F.col("po_purchaserequisition")) &
        (F.col("pr_itemnumber") == F.col("po_itemnumber")),
        "full"
    )
)

# Note: After the join, columns are accessible as:
#   - "purchaserequisition", "pr_itemnumber", "pr_creationdate", ...
#   - "purchaseorder", "po_createdon", "po_approvaldate", ...
# BUT because we used aliases, Spark will auto-resolve the final column names.
# To be explicit and avoid confusion we can reference them without alias prefixes
# where Spark has flattened them.

# ---------------------------------------------------------
# 3. COMPUTE PR-to-PO AGEING (CALENDAR DAYS)
# ---------------------------------------------------------

df_gold = df_gold.withColumn(
    "pr_to_po_ageing",
    F.when(
        (F.col("pr_purchaserequisition").isNotNull()) &
        (F.col("po_purchaserequisition").isNotNull()) &
        (F.col("pr_creationdate").isNotNull()) &
        (F.col("po_createdon").isNotNull()) &
        (F.col("pr_approvalstatus") == "Approved"),
        F.when(
            F.expr("""
                aggregate(
                  filter(
                    sequence(pr_creationdate, po_createdon, interval 1 day),
                    d -> date_format(d, 'E') NOT IN ('Sat', 'Sun')
                  ),
                  0,
                  (acc, x) -> acc + 1
                )
            """) < 0,
            0
        ).otherwise(
            F.expr("""
                aggregate(
                  filter(
                    sequence(pr_creationdate, po_createdon, interval 1 day),
                    d -> date_format(d, 'E') NOT IN ('Sat', 'Sun')
                  ),
                  0,
                  (acc, x) -> acc + 1
                )
            """)
        )
    ).otherwise(None)
)



# ---------------------------------------------------------
# 6. SLA BREACH FLAG (EXISTING: PR→PO > 5 DAYS)
# ---------------------------------------------------------

df_gold = df_gold.withColumn(
    "sla_breach_flag",
    F.when(F.col("pr_to_po_ageing") > 5, "YES")
     .when(F.col("pr_to_po_ageing").isNull(), None)
     .otherwise("NO")
)

# ---------------------------------------------------------
# 7. PR CYCLE TIME (BUSINESS DAYS) & SLA BREACH > 2 DAYS
#     PR cycle = pr_creationdate → pr_approveddate
# ---------------------------------------------------------

df_gold = df_gold.withColumn(
    "pr_cycle_time_bd",
    F.expr("""
        CASE
          WHEN pr.pr_creationdate IS NOT NULL
               AND pr.pr_approveddate IS NOT NULL THEN
            aggregate(
              filter(
                sequence(pr_creationdate, pr_approveddate, interval 1 day),
                d -> date_format(d, 'E') NOT IN ('Sat', 'Sun')
              ),
              0,
              (acc, x) -> acc + 1
            )
          ELSE NULL
        END
    """)
)

df_gold = df_gold.withColumn(
    "pr_cycle_sla_breach_flag",
    F.when(F.col("pr_cycle_time_bd") > 2, "YES")
     .when(F.col("pr_cycle_time_bd").isNull(), None)
     .otherwise("NO")
)

# ---------------------------------------------------------
# 8. PO CYCLE TIME (BUSINESS DAYS) & SLA BREACH > 2 DAYS
#     PO cycle = po_createdon → po_approvaldate
# ---------------------------------------------------------

df_gold = df_gold.withColumn(
    "po_cycle_time_bd",
    F.expr("""
        CASE
          WHEN po.po_createdon IS NOT NULL
               AND po.po_approvaldate IS NOT NULL THEN
            aggregate(
              filter(
                sequence(to_date(po_createdon), po_approvaldate, interval 1 day),
                d -> date_format(d, 'E') NOT IN ('Sat', 'Sun')
              ),
              0,
              (acc, x) -> acc + 1
            )
          ELSE NULL
        END
    """)
)

df_gold = df_gold.withColumn(
    "po_cycle_sla_breach_flag",
    F.when(F.col("po_cycle_time_bd") > 2, "YES")
     .when(F.col("po_cycle_time_bd").isNull(), None)
     .otherwise("NO")
)

# ---------------------------------------------------------
# 9. PO-ONLY AND PR-ONLY HANDLING
# ---------------------------------------------------------

df_gold = df_gold.withColumn(
    "record_type",
    F.when(
        (F.col("pr_purchaserequisition").isNotNull()) & (F.col("purchaseorder").isNotNull()),
        "PR_PO_MATCHED"
    )
    .when(
        (F.col("pr_purchaserequisition").isNotNull()) & (F.col("purchaseorder").isNull()),
        "PR_ONLY"
    )
    .when(
        (F.col("prpr_purchaserequisition").isNull()) & (F.col("po.purchaseorder").isNotNull()),
        "PO_ONLY"
    )
)

# ---------------------------------------------------------
# 10. ADD GOLD LOAD DATE
# ---------------------------------------------------------

df_gold = df_gold.withColumn("gold_load_date", F.current_date()) \
                 .withColumn("gold_load_timestamp", F.current_timestamp())

# ---------------------------------------------------------
# 11. WRITE GOLD TABLE
# ---------------------------------------------------------

gold_table = "abc.abc_dw_gold.abc_dw_gl_procument_cycle_tine"

(
    df_gold.write.format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable(gold_table)
)

print("Gold Layer Loaded Successfully with business-day PR & PO cycle SLA flags.")


In [0]:
from pyspark.sql import functions as F

# ---------------------------------------------------------
# 1. LOAD SILVER TABLES
# ---------------------------------------------------------

# Rename silver_load_timestamp in both source tables before the join
po_silver = (
    spark.table("abc.abc_dw_silver.abc_dw_sl_pur_ord")
    .withColumnRenamed("recordskipindicator", "po_recordskipindicator")
    .withColumnRenamed("silver_load_date", "po_silver_load_date")
    .withColumnRenamed("silver_load_timestamp", "po_silver_load_timestamp")
)
pr_silver = (
    spark.table("abc.abc_dw_silver.abc_dw_sl_pr_req")
    .withColumnRenamed("recordskipindicator", "pr_recordskipindicator")
    .withColumnRenamed("silver_load_date", "pr_silver_load_date")
    .withColumnRenamed("silver_load_timestamp", "pr_silver_load_timestamp")
)

# Continue with the rest of your code unchanged
# ---------------------------------------------------------
# 2. FULL OUTER JOIN PR + PO
#    Join keys:
#      PR : pr_purchaserequisition, pr_itemnumber
#      PO : po_purchaserequisition, po_itemnumber
# ---------------------------------------------------------

# 2. FULL OUTER JOIN PR + PO
df_gold = (
    pr_silver.alias("pr")
    .join(
        po_silver.alias("po"),
        (F.col("pr.pr_purchaserequisition") == F.col("po.po_purchaserequisition")) &
        (F.col("pr.pr_itemnumber") == F.col("po.po_itemnumber")),
        "full"
    )
)

# After the join, column names are flattened:
#   pr_purchaserequisition, pr_itemnumber, pr_creationdate, pr_approveddate, ...
#   po_purchaserequisition, po_itemnumber, po_createdon, po_approvaldate, ...

# ---------------------------------------------------------
# 3. PR→PO AGEING (BUSINESS DAYS)
#     pr_approvaldate → po_createdon
# ---------------------------------------------------------

df_gold = df_gold.withColumn(
    "pr_to_po_ageing",
    F.when(
        (F.col("pr_purchaserequisition").isNotNull()) &
        (F.col("po_purchaserequisition").isNotNull()) &
        (F.col("pr_approveddate").isNotNull()) &
        (F.col("po_createdon").isNotNull()) &
        (F.col("pr_approvalstatus") == "Approved"),
        F.when(
            F.expr("""
                aggregate(
                  filter(
                    sequence(pr_approveddate, po_createdon, interval 1 day),
                    d -> date_format(d, 'E') NOT IN ('fri','Sat')
                  ),
                  0,
                  (acc, x) -> acc + 1
                )
            """) < 0,
            0
        ).otherwise(
            F.expr("""
                aggregate(
                  filter(
                    sequence(pr_approveddate, po_createdon, interval 1 day),
                    d -> date_format(d, 'E') NOT IN ('fri','Sat')
                  ),
                  0,
                  (acc, x) -> acc + 1
                )
            """)
        )
    ).otherwise(None)
)

# ---------------------------------------------------------
# 4. PR APPROVAL AGEING (BUSINESS DAYS)
#     pr_creationdate → pr_approveddate
# ---------------------------------------------------------

df_gold = df_gold.withColumn(
    "pr_approval_ageing",
    F.when(
        (F.col("pr_creationdate").isNotNull()) &
        (F.col("pr_approveddate").isNotNull()),
        F.when(
            F.expr("""
                aggregate(
                  filter(
                    sequence(pr_creationdate, pr_approveddate, interval 1 day),
                    d -> date_format(d, 'E') NOT IN ('fri','Sat')
                  ),
                  0,
                  (acc, x) -> acc + 1
                )
            """) < 0,
            0
        ).otherwise(
            F.expr("""
                aggregate(
                  filter(
                    sequence(pr_creationdate, pr_approveddate, interval 1 day),
                    d -> date_format(d, 'E') NOT IN ('fri','Sat')
                  ),
                  0,
                  (acc, x) -> acc + 1
                )
            """)
        )
    ).otherwise(None)
)


# ---------------------------------------------------------
# 5. PO APPROVAL AGEING (BUSINESS DAYS)
#     po_createdon → po_approvaldate
# ---------------------------------------------------------

df_gold = df_gold.withColumn(
    "po_approval_ageing",
    F.when(
        (F.col("po_createdon").isNotNull()) &
        (F.col("po_approvaldate").isNotNull()),
        F.when(
            F.expr("""
                aggregate(
                  filter(
                    sequence(po_createdon, po_approvaldate, interval 1 day),
                    d -> date_format(d, 'E') NOT IN ('fri','Sat')
                  ),
                  0,
                  (acc, x) -> acc + 1
                )
            """) < 0,
            0
        ).otherwise(
            F.expr("""
                aggregate(
                  filter(
                    sequence(po_createdon, po_approvaldate, interval 1 day),
                    d -> date_format(d, 'E') NOT IN ('fri','Sat')
                  ),
                  0,
                  (acc, x) -> acc + 1
                )
            """)
        )
    ).otherwise(None)
)


# ---------------------------------------------------------
# 6. SLA BREACH FLAGS (BUSINESS DAYS)
# ---------------------------------------------------------

# PR→PO SLA: > 5 business days
df_gold = df_gold.withColumn(
    "sla_breach_flag",
    F.when(F.col("pr_to_po_ageing") > 5, "YES")
     .when(F.col("pr_to_po_ageing").isNull(), None)
     .otherwise("NO")
)

# PR cycle SLA: > 2 business days
df_gold = df_gold.withColumn(
    "pr_cycle_sla_breach_flag",
    F.when(F.col("pr_approval_ageing") > 2, "YES")
     .when(F.col("pr_approval_ageing").isNull(), None)
     .otherwise("NO")
)

# PO cycle SLA: > 2 business days
df_gold = df_gold.withColumn(
    "po_cycle_sla_breach_flag",
    F.when(F.col("po_approval_ageing") > 2, "YES")
     .when(F.col("po_approval_ageing").isNull(), None)
     .otherwise("NO")
)

# ---------------------------------------------------------
# 7. RECORD TYPE CLASSIFICATION
# ---------------------------------------------------------

df_gold = df_gold.withColumn(
    "record_type",
    F.when(
        (F.col("pr_purchaserequisition").isNotNull()) &
        (F.col("po_purchaserequisition").isNotNull()),
        "PR_PO_MATCHED"
    )
    .when(
        (F.col("pr_purchaserequisition").isNotNull()) &
        (F.col("po_purchaserequisition").isNull()),
        "PR_ONLY"
    )
    .when(
        (F.col("pr_purchaserequisition").isNull()) &
        (F.col("po_purchaserequisition").isNotNull()),
        "PO_ONLY"
    )
)

# ---------------------------------------------------------
# 8. GOLD METADATA
# ---------------------------------------------------------

df_gold = df_gold.withColumn("gold_load_date", F.current_date()) \
                 .withColumn("gold_load_timestamp", F.current_timestamp())

# ---------------------------------------------------------
# 9. WRITE GOLD TABLE
# ---------------------------------------------------------

gold_table = "abc.abc_dw_gold.abc_dw_gl_pr_po_kpi"

(
    df_gold.write.format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable(gold_table)
)

print("Gold Layer Loaded Successfully with ALL KPIs in business days (incl. PR approval ageing).")


[0;31m---------------------------------------------------------------------------[0m
[0;31mUnknownException[0m                          Traceback (most recent call last)
File [0;32m<command-7050310056558027>, line 228[0m
[1;32m    218[0m [38;5;66;03m# ---------------------------------------------------------[39;00m
[1;32m    219[0m [38;5;66;03m# 9. WRITE GOLD TABLE[39;00m
[1;32m    220[0m [38;5;66;03m# ---------------------------------------------------------[39;00m
[1;32m    222[0m gold_table [38;5;241m=[39m [38;5;124m"[39m[38;5;124mabc.abc_dw_gold.abc_dw_gl_pr_po_kpi[39m[38;5;124m"[39m
[1;32m    224[0m (
[1;32m    225[0m     df_gold[38;5;241m.[39mwrite[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m)
[1;32m    226[0m     [38;5;241m.[39mmode([38;5;124m"[39m[38;5;124moverwrite[39m[38;5;124m"[39m)
[1;32m    227[0m     [38;5;241m.[39moption([38;5;124m"[39m[38;5;124moverwriteSchema[39m[38;5;124m"[39m, [3

In [0]:
from pyspark.sql import functions as F

# ---------------------------------------------------------
# 1. LOAD SILVER TABLES
# ---------------------------------------------------------

po_silver = (
    spark.table("abc.abc_dw_silver.abc_dw_sl_pur_ord")
    .withColumnRenamed("recordskipindicator", "po_recordskipindicator")
    .withColumnRenamed("silver_load_date", "po_silver_load_date")
    .withColumnRenamed("silver_load_timestamp", "po_silver_load_timestamp")
)

pr_silver = (
    spark.table("abc.abc_dw_silver.abc_dw_sl_pr_req")
    .withColumnRenamed("recordskipindicator", "pr_recordskipindicator")
    .withColumnRenamed("silver_load_date", "pr_silver_load_date")
    .withColumnRenamed("silver_load_timestamp", "pr_silver_load_timestamp")
)

# ---------------------------------------------------------
# 2. FULL OUTER JOIN PR + PO
#    Join keys:
#      PR : pr_purchaserequisition, pr_itemnumber
#      PO : po_purchaserequisition, po_itemnumber
# ---------------------------------------------------------

df_gold = (
    pr_silver.alias("pr")
    .join(
        po_silver.alias("po"),
        (F.col("pr.pr_purchaserequisition") == F.col("po.po_purchaserequisition")) &
        (F.col("pr.pr_itemnumber") == F.col("po.po_itemnumber")),
        "full"
    )
)

# After join:
#   pr_purchaserequisition, pr_itemnumber, pr_creationdate, pr_approveddate, ...
#   po_purchaserequisition, po_itemnumber, po_createdon, po_approvaldate, ...

# ---------------------------------------------------------
# 3. PR→PO AGEING (BUSINESS DAYS, Sun–Thu)
#     pr_approveddate → po_createdon
# ---------------------------------------------------------

df_gold = df_gold.withColumn(
    "pr_to_po_ageing",
    F.when(
        (F.col("pr_purchaserequisition").isNotNull()) &
        (F.col("po_purchaserequisition").isNotNull()) &
        (F.col("pr_approveddate").isNotNull()) &
        (F.col("po_createdon").isNotNull()) &
        (F.col("pr_approvalstatus") == "Approved") &
        (F.col("po_createdon") >= F.col("pr_approveddate")),
        F.size(
            F.expr("""
                filter(
                  sequence(pr_approveddate, po_createdon),
                  d -> dayofweek(d) BETWEEN 1 AND 5
                )
            """)
        )
    ).otherwise(None)
)

# ---------------------------------------------------------
# 4. PR APPROVAL AGEING (BUSINESS DAYS, Sun–Thu)
#     pr_creationdate → pr_approveddate
# ---------------------------------------------------------

df_gold = df_gold.withColumn(
    "pr_approval_ageing",
    F.when(
        (F.col("pr_creationdate").isNotNull()) &
        (F.col("pr_approveddate").isNotNull()) &
        (F.col("pr_approveddate") >= F.col("pr_creationdate")),
        F.size(
            F.expr("""
                filter(
                  sequence(pr_creationdate, pr_approveddate),
                  d -> dayofweek(d) BETWEEN 1 AND 5
                )
            """)
        )
    ).otherwise(None)
)

# ---------------------------------------------------------
# 5. PO APPROVAL AGEING (BUSINESS DAYS, Sun–Thu)
#     po_createdon → po_approvaldate
# ---------------------------------------------------------

df_gold = df_gold.withColumn(
    "po_approval_ageing",
    F.when(
        (F.col("po_createdon").isNotNull()) &
        (F.col("po_approvaldate").isNotNull()) &
        (F.col("po_approvaldate") >= F.col("po_createdon")),
        F.size(
            F.expr("""
                filter(
                  sequence(po_createdon, po_approvaldate),
                  d -> dayofweek(d) BETWEEN 1 AND 5
                )
            """)
        )
    ).otherwise(None)
)

# ---------------------------------------------------------
# 6. SLA BREACH FLAGS (BUSINESS DAYS)
# ---------------------------------------------------------

# PR→PO SLA: > 5 business days
df_gold = df_gold.withColumn(
    "sla_breach_flag",
    F.when(F.col("pr_to_po_ageing") > 5, "YES")
     .when(F.col("pr_to_po_ageing").isNull(), None)
     .otherwise("NO")
)

# PR cycle SLA: > 2 business days
df_gold = df_gold.withColumn(
    "pr_cycle_sla_breach_flag",
    F.when(F.col("pr_approval_ageing") > 2, "YES")
     .when(F.col("pr_approval_ageing").isNull(), None)
     .otherwise("NO")
)

# PO cycle SLA: > 2 business days
df_gold = df_gold.withColumn(
    "po_cycle_sla_breach_flag",
    F.when(F.col("po_approval_ageing") > 2, "YES")
     .when(F.col("po_approval_ageing").isNull(), None)
     .otherwise("NO")
)

# ---------------------------------------------------------
# 7. RECORD TYPE CLASSIFICATION
# ---------------------------------------------------------

df_gold = df_gold.withColumn(
    "record_type",
    F.when(
        (F.col("pr_purchaserequisition").isNotNull()) &
        (F.col("po_purchaserequisition").isNotNull()),
        "PR_PO_MATCHED"
    )
    .when(
        (F.col("pr_purchaserequisition").isNotNull()) &
        (F.col("po_purchaserequisition").isNull()),
        "PR_ONLY"
    )
    .when(
        (F.col("pr_purchaserequisition").isNull()) &
        (F.col("po_purchaserequisition").isNotNull()),
        "PO_ONLY"
    )
)

# ---------------------------------------------------------
# 8. GOLD METADATA
# ---------------------------------------------------------

df_gold = df_gold.withColumn("gold_load_date", F.current_date()) \
                 .withColumn("gold_load_timestamp", F.current_timestamp())

# ---------------------------------------------------------
# 9. WRITE GOLD TABLE
# ---------------------------------------------------------

gold_table = "abc.abc_dw_gold.abc_dw_gl_pr_po_kpi"

(
    df_gold.write.format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable(gold_table)
)

print("Gold Layer Loaded Successfully with ALL KPIs in business days (Sun–Thu).")


Gold Layer Loaded Successfully with ALL KPIs in business days (Sun–Thu).
