In [0]:
from pyspark.sql.functions import current_timestamp, current_user,expr

def load_and_append_dedup(
    source_table: str,
    target_table: str
):
    """
    Loads data from a source table, deduplicates rows based on specified columns,
    adds metadata columns ('ingested_timestamp' and 'ingested_by'), and appends the result to a target table.

    Args:
        source_table (str): Name of the source table to read from.
        target_table (str): Name of the target table to append to.
        dedup_columns (list): List of column names to use for deduplication.
        ingested_by (str): Identifier for the user or process performing the ingestion.
    """
    # Read source table
    if source_table.endswith("lineitem"):
        src_df = spark.table(source_table)
        #src_df = spark.table(source_table+"_v1") # Simulating failure in for loop
    else:
        src_df = spark.table(source_table)
    
    # Deduplicate rows based on dedup_columns
    deduped_df = src_df.dropDuplicates()
    
    # Add metadata columns
    result_df = deduped_df.withColumn("ingested_timestamp", current_timestamp()) \
                          .withColumn("ingested_by", current_user())
    
    # Append to target table
    result_df.write.format("delta").mode("overwrite").saveAsTable(target_table)

In [0]:
def insert_audit_record(
    usecase_id,
    workspace_url,
    workspace_id,
    job_params,
    start_time,
    triggered_time,
    status
):
    """
    Inserts an audit record into the workflow_meta.audit_table with metadata and timing information.

    Args:
        workspace_url (str): URL of the workspace.
        job_params (str): Job parameters.
        start_time (str): Time when the job started (ISO format).
        triggered_time (str): Time when the job was triggered (ISO format).
        status (str): Status of the workflow run.
    """
    df = (
        spark.createDataFrame(
            [[usecase_id,workspace_url,workspace_id, job_params, start_time, triggered_time, status]],
            "usecase_id  string,workspace_url string,workspaceid string, job_params string, start_time string, triggered_time string, status string"
        ))
    df=(df.withColumn("job_run_url", expr("concat_ws('/',split(workspace_url,r'\\?')[0],job_params)"))
        .withColumn("start_time", expr("to_timestamp(start_time)"))
        .withColumn("trigger_time", expr("to_timestamp(triggered_time)"))
        .selectExpr(
            "*",
            "cast(timestampdiff(MILLISECOND, triggered_time, start_time) as double) waiting_time_in_ms",
            "cast(timestampdiff(SECOND, start_time, current_timestamp()) as double) as total_time",
            "current_timestamp() as ingested_timestamp",
            "current_user() as ingested_by"
        )
    )
    df.select(spark.table("workflow_meta.audit_table").columns).write.format("delta").mode("append").saveAsTable("workflow_meta.audit_table")