In [0]:
dbutils.widgets.text("pipeline_name", "")
dbutils.widgets.text("run_id", "")
dbutils.widgets.text("status", "")
dbutils.widgets.text("pipeline_start_time", "")
dbutils.widgets.text("pipeline_end_time", "")


In [0]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, LongType
from pyspark.sql.functions import monotonically_increasing_id, current_timestamp

# Retrieve parameters passed from ADF using widgets
pipeline_name = dbutils.widgets.get("pipeline_name")
run_id = dbutils.widgets.get("run_id")
status = dbutils.widgets.get("status")
pipeline_start_time = dbutils.widgets.get("pipeline_start_time")
pipeline_end_time = dbutils.widgets.get("pipeline_end_time")

# Define the schema explicitly
schema = StructType([
    StructField("pipeline_name", StringType(), True),
    StructField("run_id", StringType(), True),
    StructField("status", StringType(), True),
    StructField("pipeline_start_time", StringType(), True),
    StructField("pipeline_end_time", StringType(), True),
    StructField("ingestion_timestamp", TimestampType(), True),
    StructField("log_id", LongType(), True)
])

# Create a DataFrame with the log details (without log_id)
log_df = spark.createDataFrame(
    [(pipeline_name, run_id, status, pipeline_start_time, pipeline_end_time, None, None)],
    schema=schema
)

# Add a unique log_id using monotonically_increasing_id() and set ingestion_timestamp
log_df = log_df.withColumn("log_id", monotonically_increasing_id())
log_df = log_df.withColumn("ingestion_timestamp", current_timestamp())

# Write the record to the Delta log table without mergeSchema
log_df.write.format("delta").mode("append").saveAsTable("cyberassesmentdatabricks.logs.ingestion_log")


In [0]:
%sql
select * from cyberassesmentdatabricks.logs.ingestion_log

pipeline_name,run_id,status,pipeline_start_time,pipeline_end_time,ingestion_timestamp,log_id
Json_to_Delta_Lake_Delta_Load_go,70999fe8-d53f-4496-94b3-d562d48cc98b,"""Success""",2025-02-27T04:44:42.4132504Z,2025-02-27T04:46:36.3986751Z,2025-02-27T04:46:46.16502Z,25769803776
Json_to_Delta_Lake_Delta_Load_go,1290dd10-32a3-4002-a2ee-d039b1531516,"""Failed""",2025-02-27T04:40:51.5155542Z,2025-02-27T04:41:13.1704777Z,2025-02-27T04:41:23.365753Z,25769803776
Json_to_Delta_Lake_Delta_Load_go,e113e9ed-33d7-487d-b1b6-e85a6b5376e3,"""Failed""",2025-02-27T04:38:30.2202462Z,2025-02-27T04:39:08.0664357Z,2025-02-27T04:39:19.142175Z,25769803776
Json_to_Delta_Lake_Delta_Load_go,6803c4d5-4783-4dde-a476-833bc0156be2,"""Failed""",2025-02-27T03:46:59.687477Z,2025-02-27T03:47:38.7885247Z,2025-02-27T03:47:50.114716Z,25769803776
Json_to_delta,36b6ff86-89b0-4126-b0de-155fb19d72ed,"""Success""",2025-02-25T22:46:14.6475586Z,2025-02-25T22:46:16.9468542Z,2025-02-25T22:46:38.248343Z,25769803776
Json_to_delta,69b90915-3436-4c71-96be-430733c2ffdc,"""Success""",2025-02-25T22:47:28.1123015Z,2025-02-25T22:48:36.6674754Z,2025-02-25T22:48:46.041289Z,25769803776
