In [0]:
%sql
USE CATALOG aws;
USE SCHEMA silver;


In [0]:
%sql
CREATE TABLE IF NOT EXISTS aws.silver.workflow (
    workflow_id STRING NOT NULL,
    workflow_version STRING,
    outcome STRING,
    completed_date TIMESTAMP,
    modified TIMESTAMP,
    status_page_url STRING,
    special_state STRING,
    workflow_state STRING,
    workflow_type STRING,
    workflow_name STRING,
    coordinator_login STRING,
    release_option STRING,
    release_date TIMESTAMP,
    min_authors INT,
    min_reviewers INT,
    min_approvers INT,
    comment STRING,
    target_completion_date TIMESTAMP,
    planned_effective_date TIMESTAMP,
    halt_on_preprocessing_finished BOOLEAN,

    CONSTRAINT pk_workflow PRIMARY KEY (workflow_id)
)
USING DELTA;


In [0]:
%sql
CREATE TABLE IF NOT EXISTS aws.silver.wfdoc (
    workflow_id STRING NOT NULL,
    doc_number STRING NOT NULL,
    doc_part STRING NOT NULL,
    doc_version STRING NOT NULL,
    doc_type STRING NOT NULL,

    url STRING,
    name STRING,
    title STRING,
    workspace_app_name STRING,
    workspace_url STRING,
    target_state STRING,
    release_date TIMESTAMP,
    is_locked BOOLEAN,
    is_virtual_structure BOOLEAN,
    level_in_structure INT,
    render_job_id STRING,
    render_status STRING,
    render_status_changed TIMESTAMP,
    transfer_status STRING,
    transfer_status_changed TIMESTAMP,
    site STRING,
    is_superseded BOOLEAN,
    reason_for_retirement STRING,

    CONSTRAINT pk_wfdoc PRIMARY KEY (
        workflow_id,
        doc_number,
        doc_part,
        doc_version,
        doc_type
    )
)
USING DELTA;


In [0]:
%sql
CREATE TABLE IF NOT EXISTS aws.silver.wfsupersededdoc (
    workflow_id STRING NOT NULL,

    doc_number STRING NOT NULL,
    doc_part STRING NOT NULL,
    doc_version STRING NOT NULL,
    doc_type STRING NOT NULL,

    prev_doc_number STRING NOT NULL,
    prev_doc_part STRING NOT NULL,
    prev_doc_version STRING NOT NULL,
    prev_doc_type STRING NOT NULL,

    prev_name STRING,
    prev_target_state STRING,
    prev_release_date TIMESTAMP,
    prev_is_locked BOOLEAN,
    prev_is_virtual_structure BOOLEAN,
    prev_level_in_structure INT,
    prev_render_job_id STRING,
    prev_render_status_changed TIMESTAMP,
    prev_transfer_status STRING,
    prev_transfer_status_changed TIMESTAMP,
    prev_is_superseded BOOLEAN,

    CONSTRAINT pk_wfsupersededdoc PRIMARY KEY (
        workflow_id,
        doc_number,
        doc_part,
        doc_version,
        doc_type,
        prev_doc_number,
        prev_doc_part,
        prev_doc_version,
        prev_doc_type
    )
)
USING DELTA;


In [0]:
%sql
CREATE TABLE IF NOT EXISTS aws.silver.wfroles (
    workflow_id STRING NOT NULL,
    role STRING NOT NULL,
    user_login STRING NOT NULL,

    user_name STRING,
    user_email STRING,

    CONSTRAINT pk_wfroles PRIMARY KEY (
        workflow_id,
        role,
        user_login
    )
)
USING DELTA;


In [0]:
%sql

TRUNCATE TABLE aws.silver.wftaskresults;
TRUNCATE TABLE aws.silver.wfroles;
TRUNCATE TABLE aws.silver.wfsupersededdoc;
TRUNCATE TABLE aws.silver.wfdoc;
TRUNCATE TABLE aws.silver.workflow;

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import schema_of_xml, from_xml, col

# 1) Clean Bronze (remove BOM, keep only real xml)
bronze_clean = (
    spark.table("aws.bronze.workflow_xml_raw")
      .filter(F.col("value").isNotNull())
      .withColumn("value_clean", F.regexp_replace(F.col("value"), "^\ufeff", ""))
      .filter(F.trim(F.col("value_clean")).startswith("<?xml"))
)

print("bronze_clean rows =", bronze_clean.count())

# 2) Infer schema from 1 sample xml row
sample_xml = bronze_clean.select("value_clean").first()["value_clean"]
xml_schema = schema_of_xml(F.lit(sample_xml))

# 3) Parse
parsed = bronze_clean.withColumn("x", from_xml(col("value_clean"), xml_schema))
print("parsed rows =", parsed.count())

staging = (
    parsed
    .filter(F.col("x").isNotNull())
    .select(
        "_source_file",
        "ingested_at",

        # workflow
        F.col("x.WorkflowId").alias("workflow_id"),
        F.col("x.WorkflowVersion").alias("workflow_version"),
        F.col("x.Outcome").alias("outcome"),
        F.col("x.CompletedDate").alias("completed_date"),
        F.col("x.Modified").alias("modified_ts"),
        F.col("x.StatusPageUrl").alias("status_page_url"),
        F.col("x.WorkflowState").alias("workflow_state"),
        F.col("x.WorkflowType").alias("workflow_type"),
        F.col("x.WorkflowName").alias("workflow_name"),
        F.col("x.WorkflowCoordinator").alias("coordinator_login"),
        F.col("x.ReleaseOption").alias("release_option"),
        F.col("x.ReleaseDate").alias("release_date"),
        F.col("x.MinimumNumberOfAuthors").alias("min_authors"),
        F.col("x.MinimumNumberOfReviewers").alias("min_reviewers"),
        F.col("x.MinimumNumberOfApprovers").alias("min_approvers"),
        F.col("x.Comment").alias("comment"),
        F.col("x.TargetCompletionDate").alias("target_completion_date"),
        F.col("x.PlannedEffectiveDate").alias("planned_effective_date"),
        F.col("x.HaltOnPreProcessingFinished").alias("halt_on_preprocessing_finished"),

        # requestor
        F.col("x.Requestor.LoginName").alias("requestor_login"),
        F.col("x.Requestor.Name").alias("requestor_name"),
        F.col("x.Requestor.Email").alias("requestor_email"),

        # nested
        F.col("x.Documents.WorkflowDocument").alias("documents"),
        F.col("x.Approvers.WorkflowUser").alias("approvers"),
        F.col("x.TaskResults.TaskResult").alias("task_results"),
    )
)

staging_cached = staging.cache()
print("staging rows =", staging_cached.count())

display(
    staging_cached.select(
        "workflow_id","workflow_name","workflow_state","modified_ts","requestor_login"
    ).orderBy("workflow_id")
)

In [0]:
from pyspark.sql import functions as F, Window

workflow_src = (
    staging_cached.select(
        F.col("workflow_id").cast("string").alias("workflow_id"),
        F.col("workflow_version").cast("string").alias("workflow_version"),
        F.col("outcome").cast("string").alias("outcome"),
        F.col("completed_date").cast("timestamp").alias("completed_date"),
        F.col("modified_ts").cast("timestamp").alias("modified"),
        F.col("status_page_url").cast("string").alias("status_page_url"),

        F.lit(None).cast("string").alias("special_state"),  # <-- fix for SpecialState not present

        F.col("workflow_state").cast("string").alias("workflow_state"),
        F.col("workflow_type").cast("string").alias("workflow_type"),
        F.col("workflow_name").cast("string").alias("workflow_name"),
        F.col("coordinator_login").cast("string").alias("coordinator_login"),

        F.col("release_option").cast("string").alias("release_option"),
        F.lit(None).cast("timestamp").alias("release_date"),  # ReleaseDate is often xsi:nil struct

        F.lit(None).cast("int").alias("min_authors"),        # same reason: xsi:nil struct sometimes
        F.lit(None).cast("int").alias("min_reviewers"),
        F.col("min_approvers").cast("int").alias("min_approvers"),

        F.col("comment").cast("string").alias("comment"),
        F.lit(None).cast("timestamp").alias("target_completion_date"),
        F.col("planned_effective_date").cast("timestamp").alias("planned_effective_date"),
        F.col("halt_on_preprocessing_finished").cast("boolean").alias("halt_on_preprocessing_finished"),
    )
    .filter("workflow_id is not null")
)

# keep latest per workflow_id
w = Window.partitionBy("workflow_id").orderBy(F.col("modified").desc_nulls_last())
workflow_src = (
    workflow_src.withColumn("rn", F.row_number().over(w))
                .filter("rn = 1").drop("rn")
)

workflow_src.createOrReplaceTempView("workflow_src")

spark.sql("""
MERGE INTO aws.silver.workflow t
USING workflow_src s
ON t.workflow_id = s.workflow_id
WHEN MATCHED THEN UPDATE SET
  t.workflow_version = s.workflow_version,
  t.outcome = s.outcome,
  t.completed_date = s.completed_date,
  t.modified = s.modified,
  t.status_page_url = s.status_page_url,
  t.special_state = s.special_state,
  t.workflow_state = s.workflow_state,
  t.workflow_type = s.workflow_type,
  t.workflow_name = s.workflow_name,
  t.coordinator_login = s.coordinator_login,
  t.release_option = s.release_option,
  t.release_date = s.release_date,
  t.min_authors = s.min_authors,
  t.min_reviewers = s.min_reviewers,
  t.min_approvers = s.min_approvers,
  t.comment = s.comment,
  t.target_completion_date = s.target_completion_date,
  t.planned_effective_date = s.planned_effective_date,
  t.halt_on_preprocessing_finished = s.halt_on_preprocessing_finished
WHEN NOT MATCHED THEN INSERT *
""")

display(spark.sql("SELECT workflow_id, workflow_name, workflow_state, modified FROM aws.silver.workflow ORDER BY modified DESC"))


workflow_id,workflow_name,workflow_state,modified
a8594c5d-cc74-467f-9368-b8d764b963be,RMD_BBG_[400282136] CARTON COBAS PCR UNI & DUAL SWAB 100 PKT IVD SZ PDPS,Canceled,2024-09-30T18:09:13.040663Z
7a3d0305-49c7-41cc-a2ac-2b8b7de0bbc6,REQ00616C_EN,Authorized,2024-09-30T16:37:58.244345Z
5818805c-3fb4-4522-8887-6f0c2a740804,RMD_BBG_PQS_GTD-new_[DH-04692.01-410]_HB,Authorized,2024-09-30T14:56:35.563803Z
07e80db0-6853-4b9e-87fd-95af8791b2a3,MassSpec TestDev: CTQ Scorecard AB1,ReviewersStep,2024-09-30T14:56:33.786284Z
29cc6880-ba3c-4cbc-a399-14bd463d0cb7,Retire,PreProcessing,2024-09-30T14:53:58.57618Z
5436ea46-9161-4bbe-a0aa-0351a6d6c968,_RIT_CC-HIA Release_2024-9-30,ApproversStep,2024-09-30T14:47:54.621262Z
5ecba99e-010f-46ee-a8a2-0b3b84e7b01f,cobas_prime_1.1.1_UA_2.1 all languages,Completed,2024-09-30T14:39:03.894463Z
cf2f2be0-8874-4f5b-83f1-26d064194da8,Val of 315.01.32XF_URS,Canceled,2024-09-30T12:30:02.027074Z
4ea6b9eb-603e-45f6-a949-75087c195bad,DH-02365.01-068B,Completed,2024-09-30T10:04:38.125038Z
6a882e01-f068-4afd-b12f-bbfb4fb3eea6,F_CC_DSRRA_EPV_TNTGen6_2023Jul14_Abschluss,ReviewersStep,2024-09-30T09:54:56.649806Z


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *

user_schema = StructType([
    StructField("LoginName", StringType(), True),
    StructField("Name", StringType(), True),
    StructField("Email", StringType(), True),
])

task_schema = StructType([
    StructField("Identity", StringType(), True),
    StructField("Role", StringType(), True),
    StructField("Outcome", StringType(), True),
    StructField("SignatureType", StringType(), True),
    StructField("Timestamp", TimestampType(), True),
])

forced_schema = StructType([
    StructField("WorkflowId", StringType(), True),
    StructField("WorkflowName", StringType(), True),
    StructField("WorkflowType", StringType(), True),
    StructField("WorkflowState", StringType(), True),
    StructField("WorkflowVersion", StringType(), True),
    StructField("Outcome", StringType(), True),
    StructField("Modified", TimestampType(), True),
    StructField("CompletedDate", TimestampType(), True),
    StructField("ReleaseOption", StringType(), True),
    StructField("StatusPageUrl", StringType(), True),
    StructField("WorkflowCoordinator", StringType(), True),

    StructField("Requestor", StructType([
        StructField("LoginName", StringType(), True),
        StructField("Name", StringType(), True),
        StructField("Email", StringType(), True),
    ]), True),

    # FORCE THESE AS ARRAYS (this is the key fix)
    StructField("Approvers", StructType([
        StructField("WorkflowUser", ArrayType(user_schema), True)
    ]), True),

    StructField("Reviewers", StructType([
        StructField("WorkflowUser", ArrayType(user_schema), True)
    ]), True),

    StructField("Authors", StructType([
        StructField("WorkflowUser", ArrayType(user_schema), True)
    ]), True),

    StructField("TaskResults", StructType([
        StructField("TaskResult", ArrayType(task_schema), True)
    ]), True),
])


In [0]:
from pyspark.sql import functions as F

wfdoc_src = (
    staging_cached
      .select("workflow_id", F.explode_outer("documents").alias("d"))
      .select(
          "workflow_id",
          F.col("d.DocId.Number").cast("string").alias("doc_number"),
          F.col("d.DocId.Part").cast("string").alias("doc_part"),
          F.col("d.DocId.Version").cast("string").alias("doc_version"),
          F.col("d.DocId.Type").cast("string").alias("doc_type"),

          F.col("d.Url").cast("string").alias("url"),
          F.col("d.Name").cast("string").alias("name"),
          F.col("d.Title").cast("string").alias("title"),
          F.col("d.WorkspaceAppName").cast("string").alias("workspace_app_name"),
          F.col("d.WorkspaceUrl").cast("string").alias("workspace_url"),
          F.col("d.TargetState").cast("string").alias("target_state"),

          F.lit(None).cast("timestamp").alias("release_date"),   # usually xsi:nil struct
          F.col("d.IsLocked").cast("boolean").alias("is_locked"),
          F.col("d.IsVirtualStructure").cast("boolean").alias("is_virtual_structure"),
          F.col("d.LevelInStructure").cast("int").alias("level_in_structure"),
          F.lit(None).cast("string").alias("render_job_id"),
          F.col("d.RenderStatus").cast("string").alias("render_status"),
          F.col("d.RenderStatusChanged").cast("timestamp").alias("render_status_changed"),
          F.col("d.TransferStatus").cast("string").alias("transfer_status"),
          F.col("d.TransferStatusChanged").cast("timestamp").alias("transfer_status_changed"),
          F.col("d.Site").cast("string").alias("site"),
          F.col("d.IsSuperseded").cast("boolean").alias("is_superseded"),
          F.lit(None).cast("string").alias("reason_for_retirement"),
      )
      .filter("workflow_id is not null and doc_number is not null")
)

wfdoc_src.createOrReplaceTempView("wfdoc_src")

spark.sql("""
MERGE INTO aws.silver.wfdoc t
USING wfdoc_src s
ON  t.workflow_id = s.workflow_id
AND t.doc_number  = s.doc_number
AND t.doc_part    = s.doc_part
AND t.doc_version = s.doc_version
AND t.doc_type    = s.doc_type
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

display(spark.sql("""
SELECT workflow_id, COUNT(*) AS doc_rows
FROM aws.silver.wfdoc
GROUP BY workflow_id
ORDER BY doc_rows DESC
"""))


workflow_id,doc_rows
5818805c-3fb4-4522-8887-6f0c2a740804,6
5ecba99e-010f-46ee-a8a2-0b3b84e7b01f,6
a8594c5d-cc74-467f-9368-b8d764b963be,2
29cc6880-ba3c-4cbc-a399-14bd463d0cb7,2
7a3d0305-49c7-41cc-a2ac-2b8b7de0bbc6,2
cf2f2be0-8874-4f5b-83f1-26d064194da8,1
5436ea46-9161-4bbe-a0aa-0351a6d6c968,1
4ea6b9eb-603e-45f6-a949-75087c195bad,1
6a882e01-f068-4afd-b12f-bbfb4fb3eea6,1
07e80db0-6853-4b9e-87fd-95af8791b2a3,1


In [0]:
from pyspark.sql.functions import from_xml, col

parsed_for_roles = bronze_clean.withColumn("x", from_xml(col("value_clean"), forced_schema))

staging_roles = (
    parsed_for_roles
      .select(
          F.col("x.WorkflowId").alias("workflow_id"),
          F.col("x.WorkflowCoordinator").alias("coordinator_login"),

          F.col("x.Requestor.LoginName").alias("requestor_login"),
          F.col("x.Requestor.Name").alias("requestor_name"),
          F.col("x.Requestor.Email").alias("requestor_email"),

          F.col("x.Approvers.WorkflowUser").alias("approvers_arr"),
          F.col("x.Reviewers.WorkflowUser").alias("reviewers_arr"),
          F.col("x.Authors.WorkflowUser").alias("authors_arr"),
      )
      .filter("workflow_id is not null")
      .cache()
)

display(staging_roles.select(
    "workflow_id",
    F.size("approvers_arr").alias("approvers_cnt"),
    F.size("reviewers_arr").alias("reviewers_cnt"),
    F.size("authors_arr").alias("authors_cnt"),
).orderBy("workflow_id"))


workflow_id,approvers_cnt,reviewers_cnt,authors_cnt
07e80db0-6853-4b9e-87fd-95af8791b2a3,1,4.0,1.0
29cc6880-ba3c-4cbc-a399-14bd463d0cb7,1,,
4ea6b9eb-603e-45f6-a949-75087c195bad,5,1.0,1.0
5436ea46-9161-4bbe-a0aa-0351a6d6c968,1,,
5818805c-3fb4-4522-8887-6f0c2a740804,3,1.0,
5ecba99e-010f-46ee-a8a2-0b3b84e7b01f,1,,
6a882e01-f068-4afd-b12f-bbfb4fb3eea6,6,1.0,
7a3d0305-49c7-41cc-a2ac-2b8b7de0bbc6,2,,
a8594c5d-cc74-467f-9368-b8d764b963be,2,1.0,2.0
cf2f2be0-8874-4f5b-83f1-26d064194da8,1,1.0,1.0


In [0]:
roles_requestor = (
    staging_roles
      .select(
          "workflow_id",
          F.lit("Requestor").alias("role"),
          F.col("requestor_login").alias("user_login"),
          F.col("requestor_name").alias("user_name"),
          F.col("requestor_email").alias("user_email"),
      )
      .filter("user_login is not null")
)

roles_coordinator = (
    staging_roles
      .select(
          "workflow_id",
          F.lit("Coordinator").alias("role"),
          F.col("coordinator_login").alias("user_login"),
          F.lit(None).cast("string").alias("user_name"),
          F.lit(None).cast("string").alias("user_email"),
      )
      .filter("user_login is not null")
)

roles_approvers = (
    staging_roles
      .select("workflow_id", F.explode_outer("approvers_arr").alias("u"))
      .select(
          "workflow_id",
          F.lit("Approver").alias("role"),
          F.col("u.LoginName").alias("user_login"),
          F.col("u.Name").alias("user_name"),
          F.col("u.Email").alias("user_email"),
      )
      .filter("user_login is not null")
)

roles_reviewers = (
    staging_roles
      .select("workflow_id", F.explode_outer("reviewers_arr").alias("u"))
      .select(
          "workflow_id",
          F.lit("Reviewer").alias("role"),
          F.col("u.LoginName").alias("user_login"),
          F.col("u.Name").alias("user_name"),
          F.col("u.Email").alias("user_email"),
      )
      .filter("user_login is not null")
)

roles_authors = (
    staging_roles
      .select("workflow_id", F.explode_outer("authors_arr").alias("u"))
      .select(
          "workflow_id",
          F.lit("Author").alias("role"),
          F.col("u.LoginName").alias("user_login"),
          F.col("u.Name").alias("user_name"),
          F.col("u.Email").alias("user_email"),
      )
      .filter("user_login is not null")
)

roles_df = (
    roles_requestor
      .unionByName(roles_coordinator)
      .unionByName(roles_approvers)
      .unionByName(roles_reviewers)
      .unionByName(roles_authors)
      .dropDuplicates(["workflow_id", "role", "user_login"])
)

roles_df.createOrReplaceTempView("wfroles_src")

spark.sql("""
MERGE INTO aws.silver.wfroles t
USING wfroles_src s
ON  t.workflow_id = s.workflow_id
AND t.role        = s.role
AND t.user_login  = s.user_login
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

display(spark.sql("""
SELECT role, COUNT(*) AS rows
FROM aws.silver.wfroles
GROUP BY role
ORDER BY role
"""))


role,rows
Approver,23
Author,5
Coordinator,10
Requestor,8
Reviewer,9


In [0]:
from pyspark.sql import functions as F

bronze_clean = (
    spark.table("aws.bronze.workflow_xml_raw")
      .filter(F.col("value").isNotNull())
      .withColumn("value_clean", F.regexp_replace(F.col("value"), "^\ufeff", ""))  # remove BOM
      .filter(F.trim(F.col("value_clean")).startswith("<?xml"))
)

print("bronze_clean rows =", bronze_clean.count())


bronze_clean rows = 10


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import schema_of_xml, from_xml, col

# 1) Clean Bronze (remove BOM, keep only real xml)
bronze_clean = (
    spark.table("aws.bronze.workflow_xml_raw")
      .filter(F.col("value").isNotNull())
      .withColumn("value_clean", F.regexp_replace(F.col("value"), "^\ufeff", ""))
      .filter(F.trim(F.col("value_clean")).startswith("<?xml"))
)

print("bronze_clean rows =", bronze_clean.count())

# 2) Infer schema from 1 sample xml row
sample_xml = bronze_clean.select("value_clean").first()["value_clean"]
xml_schema = schema_of_xml(F.lit(sample_xml))

# 3) Parse
parsed = bronze_clean.withColumn("x", from_xml(col("value_clean"), xml_schema))
print("parsed rows =", parsed.filter(F.col("x").isNotNull()).count())

# 4) Staging (includes task_results as currently inferred)
staging = (
    parsed
    .filter(F.col("x").isNotNull())
    .select(
        F.col("x.WorkflowId").alias("workflow_id"),
        F.col("x.TaskResults.TaskResult").alias("task_results")
    )
)

staging_cached = staging.cache()
print("staging_cached rows =", staging_cached.count())

display(staging_cached.orderBy("workflow_id"))


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *

# 1) Rebuild bronze_clean (so no NameError / no dependency on previous cells)
bronze_clean = (
    spark.table("aws.bronze.workflow_xml_raw")
      .filter(F.col("value").isNotNull())
      .withColumn("value_clean", F.regexp_replace(F.col("value"), "^\ufeff", ""))  # remove BOM
      .filter(F.trim(F.col("value_clean")).startswith("<?xml"))
)

print("bronze_clean rows =", bronze_clean.count())

# 2) Forced schema JUST for WorkflowId + TaskResults (keep it minimal and stable)
task_schema = StructType([
    StructField("Identity", StringType(), True),
    StructField("Role", StringType(), True),
    StructField("Outcome", StringType(), True),
    StructField("SignatureType", StringType(), True),
    StructField("Timestamp", TimestampType(), True),
])

forced_tasks_schema = StructType([
    StructField("WorkflowId", StringType(), True),
    StructField("TaskResults", StructType([
        StructField("TaskResult", ArrayType(task_schema), True)
    ]), True),
])

# 3) Parse using forced schema (TaskResult is guaranteed ARRAY)
parsed_tasks = bronze_clean.withColumn("x", F.from_xml(F.col("value_clean"), forced_tasks_schema))

# Optional quick schema check (should show TaskResult as array<struct<...>>)
parsed_tasks.select("x").printSchema()

# 4) Explode task results
task_df = (
    parsed_tasks
      .select(
          F.col("x.WorkflowId").alias("workflow_id"),
          F.explode_outer(F.col("x.TaskResults.TaskResult")).alias("tr")
      )
      .select(
          F.col("workflow_id").cast("string").alias("workflow_id"),
          F.col("tr.Identity").cast("string").alias("identity_login"),
          F.col("tr.Role").cast("string").alias("role"),
          F.col("tr.Outcome").cast("string").alias("outcome"),
          F.col("tr.Timestamp").cast("timestamp").alias("ts"),
          F.col("tr.SignatureType").cast("string").alias("signature_type"),
      )
      .filter(
          F.col("workflow_id").isNotNull() &
          F.col("identity_login").isNotNull() &
          F.col("role").isNotNull() &
          F.col("outcome").isNotNull() &
          F.col("ts").isNotNull()
      )
)

print("task_df rows =", task_df.count())
display(task_df.orderBy("workflow_id", "ts"))

# 5) MERGE into Delta table
task_df.createOrReplaceTempView("wftask_src")

spark.sql("""
MERGE INTO aws.silver.wftaskresults t
USING wftask_src s
ON  t.workflow_id    = s.workflow_id
AND t.identity_login = s.identity_login
AND t.role           = s.role
AND t.outcome        = s.outcome
AND t.ts             = s.ts
WHEN MATCHED THEN UPDATE SET
  t.signature_type = s.signature_type
WHEN NOT MATCHED THEN INSERT (
  workflow_id, identity_login, role, outcome, ts, signature_type
) VALUES (
  s.workflow_id, s.identity_login, s.role, s.outcome, s.ts, s.signature_type
)
""")

display(spark.sql("""
SELECT workflow_id, COUNT(*) AS task_rows
FROM aws.silver.wftaskresults
GROUP BY workflow_id
ORDER BY task_rows DESC
"""))


bronze_clean rows = 10
root
 |-- x: struct (nullable = true)
 |    |-- WorkflowId: string (nullable = true)
 |    |-- TaskResults: struct (nullable = true)
 |    |    |-- TaskResult: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- Identity: string (nullable = true)
 |    |    |    |    |-- Role: string (nullable = true)
 |    |    |    |    |-- Outcome: string (nullable = true)
 |    |    |    |    |-- SignatureType: string (nullable = true)
 |    |    |    |    |-- Timestamp: timestamp (nullable = true)

task_df rows = 15


workflow_id,identity_login,role,outcome,ts,signature_type
07e80db0-6853-4b9e-87fd-95af8791b2a3,i:0#.w|rbamouser\muenchv,Author,Approved,2024-09-30T14:56:32.520598Z,Manufacturing Support
4ea6b9eb-603e-45f6-a949-75087c195bad,i:0#.w|rnumdmas\mababanr,Author,Approved,2024-09-18T21:18:59.352093Z,R and D
4ea6b9eb-603e-45f6-a949-75087c195bad,i:0#.w|rnumdmas\leey36,Reviewer,Approved,2024-09-18T21:51:47.171741Z,Documentation
4ea6b9eb-603e-45f6-a949-75087c195bad,i:0#.w|rnumdmas\pollartd,Approver,Approved,2024-09-19T01:09:18.233953Z,Quality Assurance
4ea6b9eb-603e-45f6-a949-75087c195bad,i:0#.w|rbamouser\arsicd,Approver,Approved,2024-09-19T08:30:51.841876Z,Regulatory Affairs
4ea6b9eb-603e-45f6-a949-75087c195bad,i:0#.w|rbamouser\kozarisi,Approver,Approved,2024-09-23T08:28:44.10445Z,Reagent Manufacturing
4ea6b9eb-603e-45f6-a949-75087c195bad,i:0#.w|rnumdmas\demartik,Approver,Approved,2024-09-23T16:46:34.704714Z,R and D
4ea6b9eb-603e-45f6-a949-75087c195bad,i:0#.w|rbamouser\sollerm,Approver,Approved,2024-09-30T08:45:36.579897Z,Quality Control
5818805c-3fb4-4522-8887-6f0c2a740804,i:0#.w|rnumdmas\brizzie,Reviewer,Approved,2024-09-27T14:53:14.94215Z,Manufacturing
5818805c-3fb4-4522-8887-6f0c2a740804,i:0#.w|rnumdmas\nortonb,Approver,Approved,2024-09-27T15:55:34.379311Z,Quality Assurance


workflow_id,task_rows
4ea6b9eb-603e-45f6-a949-75087c195bad,7
5818805c-3fb4-4522-8887-6f0c2a740804,4
7a3d0305-49c7-41cc-a2ac-2b8b7de0bbc6,2
07e80db0-6853-4b9e-87fd-95af8791b2a3,1
5ecba99e-010f-46ee-a8a2-0b3b84e7b01f,1


In [0]:
%sql
SELECT workflow_id, COUNT(*) AS task_rows
FROM aws.silver.wftaskresults
GROUP BY workflow_id
ORDER BY task_rows DESC;


workflow_id,task_rows
4ea6b9eb-603e-45f6-a949-75087c195bad,7
5818805c-3fb4-4522-8887-6f0c2a740804,4
7a3d0305-49c7-41cc-a2ac-2b8b7de0bbc6,2
07e80db0-6853-4b9e-87fd-95af8791b2a3,1
5ecba99e-010f-46ee-a8a2-0b3b84e7b01f,1


In [0]:
%sql
SELECT COUNT(*) AS total_task_rows
FROM aws.silver.wftaskresults;


total_task_rows
15


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *

# --- Forced schema just for Documents + SupersededVersion ---
docid_schema = StructType([
    StructField("Number", LongType(), True),
    StructField("Part", LongType(), True),
    StructField("Type", LongType(), True),
    StructField("Version", LongType(), True),
])

sup_doc_schema = StructType([
    StructField("DocId", docid_schema, True),
    StructField("Name", StringType(), True),
    StructField("TargetState", StringType(), True),
    StructField("ReleaseDate", TimestampType(), True),
    StructField("IsLocked", BooleanType(), True),
    StructField("IsVirtualStructure", BooleanType(), True),
    StructField("LevelInStructure", LongType(), True),
    StructField("RenderJobId", StringType(), True),
    StructField("RenderStatusChanged", TimestampType(), True),
    StructField("TransferStatus", StringType(), True),
    StructField("TransferStatusChanged", TimestampType(), True),
    StructField("IsSuperseded", BooleanType(), True),
])

wfdoc_schema = StructType([
    StructField("DocId", docid_schema, True),
    StructField("SupersededVersion", sup_doc_schema, True),
])

forced_schema_sup = StructType([
    StructField("WorkflowId", StringType(), True),
    StructField("Documents", StructType([
        StructField("WorkflowDocument", ArrayType(wfdoc_schema), True)
    ]), True),
])

from pyspark.sql.functions import from_xml, col

# Re-parse ONLY what we need for superseded docs
parsed_for_sup = bronze_clean.withColumn("x", from_xml(col("value_clean"), forced_schema_sup))

sup_df = (
    parsed_for_sup
      .select(
          F.col("x.WorkflowId").alias("workflow_id"),
          F.explode_outer(F.col("x.Documents.WorkflowDocument")).alias("d")
      )
      # keep only docs that actually have a superseded version docid
      .filter(F.col("d.SupersededVersion.DocId.Number").isNotNull())
      .select(
          F.col("workflow_id").cast("string").alias("workflow_id"),

          # current doc keys
          F.col("d.DocId.Number").cast("string").alias("doc_number"),
          F.col("d.DocId.Part").cast("string").alias("doc_part"),
          F.col("d.DocId.Version").cast("string").alias("doc_version"),
          F.col("d.DocId.Type").cast("string").alias("doc_type"),

          # previous doc keys
          F.col("d.SupersededVersion.DocId.Number").cast("string").alias("prev_doc_number"),
          F.col("d.SupersededVersion.DocId.Part").cast("string").alias("prev_doc_part"),
          F.col("d.SupersededVersion.DocId.Version").cast("string").alias("prev_doc_version"),
          F.col("d.SupersededVersion.DocId.Type").cast("string").alias("prev_doc_type"),

          # prev attributes
          F.col("d.SupersededVersion.Name").cast("string").alias("prev_name"),
          F.col("d.SupersededVersion.TargetState").cast("string").alias("prev_target_state"),
          F.col("d.SupersededVersion.ReleaseDate").cast("timestamp").alias("prev_release_date"),
          F.col("d.SupersededVersion.IsLocked").cast("boolean").alias("prev_is_locked"),
          F.col("d.SupersededVersion.IsVirtualStructure").cast("boolean").alias("prev_is_virtual_structure"),
          F.col("d.SupersededVersion.LevelInStructure").cast("int").alias("prev_level_in_structure"),
          F.col("d.SupersededVersion.RenderJobId").cast("string").alias("prev_render_job_id"),
          F.col("d.SupersededVersion.RenderStatusChanged").cast("timestamp").alias("prev_render_status_changed"),
          F.col("d.SupersededVersion.TransferStatus").cast("string").alias("prev_transfer_status"),
          F.col("d.SupersededVersion.TransferStatusChanged").cast("timestamp").alias("prev_transfer_status_changed"),
          F.col("d.SupersededVersion.IsSuperseded").cast("boolean").alias("prev_is_superseded"),
      )
      .filter("workflow_id is not null and doc_number is not null and prev_doc_number is not null")
)

print("sup_df rows =", sup_df.count())
display(sup_df.orderBy("workflow_id", "doc_number").limit(50))

sup_df.createOrReplaceTempView("wfsup_src")

spark.sql("""
MERGE INTO aws.silver.wfsupersededdoc t
USING wfsup_src s
ON  t.workflow_id      = s.workflow_id
AND t.doc_number       = s.doc_number
AND t.doc_part         = s.doc_part
AND t.doc_version      = s.doc_version
AND t.doc_type         = s.doc_type
AND t.prev_doc_number  = s.prev_doc_number
AND t.prev_doc_part    = s.prev_doc_part
AND t.prev_doc_version = s.prev_doc_version
AND t.prev_doc_type    = s.prev_doc_type
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

display(spark.sql("""
SELECT workflow_id, COUNT(*) AS superseded_rows
FROM aws.silver.wfsupersededdoc
GROUP BY workflow_id
ORDER BY superseded_rows DESC
"""))


sup_df rows = 14


workflow_id,doc_number,doc_part,doc_version,doc_type,prev_doc_number,prev_doc_part,prev_doc_version,prev_doc_type,prev_name,prev_target_state,prev_release_date,prev_is_locked,prev_is_virtual_structure,prev_level_in_structure,prev_render_job_id,prev_render_status_changed,prev_transfer_status,prev_transfer_status_changed,prev_is_superseded
07e80db0-6853-4b9e-87fd-95af8791b2a3,1200000564262,0,4,2,1200000564262,0,3,2,2.2.3a08_CTQ_Scorecard_AB1.xlsx,Effective,,False,False,,,,,,True
4ea6b9eb-603e-45f6-a949-75087c195bad,1200000614310,0,3,2,1200000614310,0,2,2,DH-02365.01-068B_DNA_Master_Stab.docx,Effective,,False,False,,,2024-09-30T09:46:09.957834Z,UpdateConfirmed,2024-09-30T09:46:37.445518Z,True
5436ea46-9161-4bbe-a0aa-0351a6d6c968,1200000326602,0,29,4,1200000326602,0,28,4,_RIT_CC-HIA_Release_2024-8-29.xlsx,Effective,,False,False,,,,,,True
5ecba99e-010f-46ee-a8a2-0b3b84e7b01f,1200000681077,0,2,4,1200000681077,0,1,4,cobas_prime_1.1_UA2.0.0.1981-stand-alone_hu.zip,Effective,,False,False,,,2024-09-30T14:15:56.225232Z,Superseded,,True
5ecba99e-010f-46ee-a8a2-0b3b84e7b01f,1200000681084,0,2,4,1200000681084,0,1,4,cobas_prime_1.1_UA2.0.0.1981-stand-alone_ro.zip,Effective,,False,False,,,2024-09-30T14:15:56.287733Z,Superseded,,True
5ecba99e-010f-46ee-a8a2-0b3b84e7b01f,1200000681086,0,2,4,1200000681086,0,1,4,cobas_prime_1.1_UA2.0.0.1981-stand-alone_lv.zip,Effective,,False,False,,,2024-09-30T14:15:55.943979Z,Superseded,,True
5ecba99e-010f-46ee-a8a2-0b3b84e7b01f,1200000681089,0,2,4,1200000681089,0,1,4,cobas_prime_1.1_UA2.0.0.1981-stand-alone_bg.zip,Effective,,False,False,,,2024-09-30T14:15:56.022135Z,Superseded,,True
5ecba99e-010f-46ee-a8a2-0b3b84e7b01f,1200000681091,0,2,4,1200000681091,0,1,4,cobas_prime_1.1_UA2.0.0.1981-stand-alone_cs.zip,Effective,,False,False,,,2024-09-30T14:15:56.084602Z,Superseded,,True
5ecba99e-010f-46ee-a8a2-0b3b84e7b01f,1200000681092,0,2,4,1200000681092,0,1,4,cobas_prime_1.1_UA2.0.0.1981-stand-alone_el.zip,Effective,,False,False,,,2024-09-30T14:15:56.162731Z,Superseded,,True
6a882e01-f068-4afd-b12f-bbfb4fb3eea6,1200000688296,0,2,4,1200000688296,0,1,4,F_CC_DSRRA_EPV_TNTGen6_2023Jul14_01.docx,Effective,,False,False,,,,,,True


workflow_id,superseded_rows
5ecba99e-010f-46ee-a8a2-0b3b84e7b01f,6
a8594c5d-cc74-467f-9368-b8d764b963be,2
7a3d0305-49c7-41cc-a2ac-2b8b7de0bbc6,2
4ea6b9eb-603e-45f6-a949-75087c195bad,1
6a882e01-f068-4afd-b12f-bbfb4fb3eea6,1
5436ea46-9161-4bbe-a0aa-0351a6d6c968,1
07e80db0-6853-4b9e-87fd-95af8791b2a3,1
