In [None]:
from pyspark.sql import functions as F
from datetime import date
from databricks.sdk import WorkspaceClient

In [None]:
def get_job_run_details():
    # Create workspace client
    w = WorkspaceClient()

    context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    # Get current job ID
    if context.jobId().isDefined():
        job_id = context.jobId().get()
        
        # Get current run ID if available
        if context.tags().contains("jobRunId"):
            run_id = context.tags().apply("jobRunId")
        else:
            run_id = None
    else:
        job_id = None
        run_id = None
    return job_id, run_id

def add_audit_columns(batch_df, batch_id):
    # get job details for audit columns
    # Add audit columns
    batch_df = (
        batch_df.withColumn("dp_job_id", F.lit(job_id))
        .withColumn("dp_run_id", F.lit(run_id))
        .withColumn("dp_ingestion_time", F.current_timestamp())
    )
    # Cast all columns to string to handle column type mismatch through the years
    batch_df = batch_df.select([F.col(c).cast("string") for c in batch_df.columns])
    # append to table
    batch_df.write.mode("append").saveAsTable(full_table_name)

def process_bronze(checkpoint_path, file_path, full_table_name, job_id, run_id):
    raw_df = (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "parquet")
        .option(
            "cloudFiles.schemaLocation", checkpoint_path
        )
        .load(file_path)
    )   
    # write to table incrementally
    query = (
        raw_df.writeStream
        .foreachBatch(add_audit_columns)
        .option(
            "checkpointLocation", checkpoint_path
        )
        .option("mergeSchema", "true")
        .outputMode("append")
        .trigger(availableNow=True)  # this means incremental batch
        .start()
    )
    query.awaitTermination()

In [None]:
var = dbutils.widgets.getAll()
for key in var.keys():
    # create variables based on the k,v pair of the var dictionary
    # - we could use the dict directly but this will ease things -
    globals()[f'{key}'] = var[key]

job_id, run_id = get_job_run_details()

print(f"{checkpoint_path = }\n{file_path = }\n{full_table_name = }\n{job_id = }\n{run_id = }")

In [None]:
process_bronze(
    checkpoint_path,
    file_path,
    full_table_name,
    job_id,
    run_id
)
print(f"Data loaded successfully to {full_table_name}.")

In [None]:
# Run the following queries to optimize the table
# in this example it will run once a week on Fridays

# Get today's date
today = date.today()

# Check if today is Friday
if today.strftime("%A") == "Friday":
    queries = [
        # f"ANALYZE TABLE {full_table_name} COMPUTE STATISTICS;", # this is only needed if it is not turned on at the schema level
        f"OPTIMIZE {full_table_name};",
        f"VACUUM {full_table_name};",
    ]
    for query in queries:
        print(query)
        spark.sql(query)
    print("Optimization completed.")    

print("Optimization will run on Friday.")