In [0]:
from pyspark.sql import functions as F
# =========================================================
# CONFIG
# =========================================================
BRONZE_DB = "slainte_bronze"
SILVER_DB = "slainte_silver"
JIRA_TABLE = f"{BRONZE_DB}.jira_table"
SILVER_TABLE = "jira_tickets_silver"
SILVER_QUAL = f"{SILVER_DB}.{SILVER_TABLE}"
# =========================================================
# LOAD BRONZE
# =========================================================
jira = spark.table(JIRA_TABLE)
# =========================================================
# HELPERS
# =========================================================
def parse_ts(col):
   return F.coalesce(
       F.expr(f"try_to_timestamp(`{col}`, 'dd/MMM/yyyy h:mm a')"),
       F.expr(f"try_to_timestamp(`{col}`, 'yyyy-MM-dd HH:mm:ss')"),
       F.expr(f"try_to_timestamp(`{col}`, 'yyyy-MM-dd''T''HH:mm:ss')")
   )
# =========================================================
# BASE – ONLY CLOSED TICKETS
# =========================================================
base = (
   jira
   .filter(F.lower(F.col("status")).isin("closed", "done", "resolved"))
   .select(
       F.col("issue_key").cast("string").alias("ticket_id"),
       F.col("priority").cast("string").alias("priority"),
       F.col("status").cast("string").alias("status"),
       F.col("project_key").cast("string").alias("project_key"),
       parse_ts("created").alias("created_at"),
       parse_ts("resolved").alias("resolved_at"),
       F.col("description").cast("string").alias("ticket_description")
   )
)
# =========================================================
# NORMALIZE PRIORITY (ONLY HIGH / MEDIUM / LOW)
# =========================================================
base = (
   base
   .withColumn(
       "priority",
       F.when(F.lower(F.col("priority")).isin("highest", "blocker", "critical", "p1", "high"), F.lit("High"))
        .when(F.lower(F.col("priority")).isin("medium", "p2"), F.lit("Medium"))
        .when(F.lower(F.col("priority")).isin("low", "lowest", "p3", "p4"), F.lit("Low"))
        .otherwise(F.lit("Medium"))  # safe default
   )
)
# =========================================================
# SLA TARGETS (BUSINESS RULES)
# =========================================================
with_targets = (
   base
   .withColumn(
       "resolution_target_hours",
       F.when(F.col("priority") == "High",   F.lit(4.0))
        .when(F.col("priority") == "Medium", F.lit(8.0))
        .when(F.col("priority") == "Low",    F.lit(72.0))
   )
)
# =========================================================
# METRICS
# =========================================================
metrics = (
   with_targets
   .withColumn(
       "resolution_hours",
       (F.unix_timestamp("resolved_at") - F.unix_timestamp("created_at")) / 3600
   )
   .withColumn(
       "resolution_breach",
       F.col("resolution_hours") > F.col("resolution_target_hours")
   )
   .withColumn(
       "breach_hours",
       F.when(
           F.col("resolution_hours") > F.col("resolution_target_hours"),
           F.col("resolution_hours") - F.col("resolution_target_hours")
       ).otherwise(F.lit(0.0))
   )
)
# =========================================================
# ASSIGNEE → RANDOM REAL PEOPLE
# =========================================================
people = [
   "Ali Ben Salah",
   "Sarah Martin",
   "Youssef Trabelsi",
   "Emma Dubois"
]
people_array = F.array(*[F.lit(p) for p in people])
final_df = (
   metrics
   .withColumn(
       "assignee",
       people_array.getItem(F.floor(F.rand() * len(people)).cast("int"))
   )
   .select(
       "ticket_id",
       "priority",
       "status",
       "project_key",
       "assignee",
       "created_at",
       "resolved_at",
       "resolution_target_hours",
       "resolution_hours",
       "resolution_breach",
       "breach_hours",
       "ticket_description"
   )
)
# =========================================================
# WRITE SILVER
# =========================================================
spark.sql(f"CREATE DATABASE IF NOT EXISTS {SILVER_DB}")
spark.sql(f"DROP TABLE IF EXISTS {SILVER_QUAL}")
(
   final_df
   .write
   .format("delta")
   .mode("overwrite")
   .saveAsTable(SILVER_QUAL)
)
print(f"✅ FINAL JIRA SILVER TABLE CREATED: {SILVER_QUAL}")
print("Rows:", spark.table(SILVER_QUAL).count())
# =========================================================
# VALIDATION
# =========================================================
spark.table(SILVER_QUAL).groupBy("priority").count().orderBy("priority").show(truncate=False)
spark.table(SILVER_QUAL).select(
   "ticket_id",
   "priority",
   "resolution_hours",
   "resolution_target_hours",
   "breach_hours",
   "resolution_breach"
).show(10, truncate=False)