In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, unix_timestamp, lit
import time

# Initialize Spark session
spark = SparkSession.builder \
    .appName("PipelineAuditLogger") \
    .getOrCreate()

# Step 1: Capture start time
start_time = time.time()

# Your actual pipeline logic here
# Example: simple transformation
data = [("Alice", 25), ("Bob", 30)]
columns = ["name", "age"]
df = spark.createDataFrame(data, columns)

# Just some processing for simulation
df_filtered = df.filter(df.age > 20)
df_filtered.show()

# Step 2: Capture end time
end_time = time.time()

# Step 3: Compute duration
duration_minutes = round((end_time - start_time) / 60, 2)

# Step 4: Create audit metadata row
from datetime import datetime

pipeline_name = "sample_pipeline"
start_dt = datetime.fromtimestamp(start_time).strftime("%Y-%m-%d %H:%M:%S")
end_dt = datetime.fromtimestamp(end_time).strftime("%Y-%m-%d %H:%M:%S")
status = "Success"  # or "Failed" based on try/except logic

# Create a single-row DataFrame
audit_row = [(pipeline_name, start_dt, end_dt, duration_minutes, status)]
columns = ["pipeline_name", "start_time", "end_time", "duration_minutes", "status"]
audit_df = spark.createDataFrame(audit_row, columns).withColumn("ingestion_time", current_timestamp())

# Show the audit log
audit_df.show(truncate=False)
