In [0]:
dbutils.widgets.text("JobName", "")
dbutils.widgets.text("JobRunID", "")

In [0]:
job_name = dbutils.widgets.get("JobName")
job_run_id = dbutils.widgets.get("JobRunID")

manual_override = False
if not manual_override:
    mappings = dbutils.widgets.get("mappings")
else:
    mappings = {
        "RootFolder": "SalesDataLandingZone",
        "TargetCatalog": "mdf2",
        "TargetSchema": "bronze",
        "TargetTable": "Sales",
        "FileSearchTerm": None,
        "BaseUrl": "abfss://mdf2@stgacct14022025.dfs.core.windows.net",
        "WatermarkColumn": "Timestamp",
        "LPWatermarkValue": '1970-01-01T00:00:00.000+00:00',
        "EnableFlag": 1,
        "ColumnList": None,
    }

In [0]:
from pyspark.sql import functions as F, types as T
import json
import re

In [0]:
%run "/Workspace/MDFs/ADLS to Lakehouse/functions/NB_Functions"

In [0]:
try:
    mappings = json.loads(mappings)
    start_time = spark.sql("SELECT CURRENT_TIMESTAMP()").first()[0]

    file_source_abfss_path = f"{mappings['BaseUrl']}/{mappings['RootFolder']}/"

    target_table_name = f"{mappings['TargetCatalog']}.{mappings['TargetSchema']}.{mappings['TargetTable']}"

    autoloader_schema_location = f"{mappings['BaseUrl']}/autoloader/{mappings['TargetCatalog']}_{mappings['TargetSchema']}_{mappings['TargetTable']}/schemalocation"

    autoloader_checkpoint_location = f"{mappings['BaseUrl']}/autoloader/{mappings['TargetCatalog']}_{mappings['TargetSchema']}_{mappings['TargetTable']}/checkpointlocation"

    col_list = spark.sql(
        f"SELECT COALESCE(ColumnList, '') FROM {mappings['TargetCatalog']}.{mappings['TargetSchema']}.file_master_config WHERE TargetTable = '{mappings['TargetTable']}' AND EnableFlag=1"
    ).first()[0]
    cols = col_list.replace('"', "").replace(" ", "").split(",")

    if cols == [""]:
        cols = "*"
    else:
        cols = cols + ["FilePath", "SourceFile", "InsertTimestamp"]

    df = (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.schemaLocation", autoloader_schema_location)
        .option("cloudFiles.includeExistingFiles", "True")
        .option("recursiveFileLookup", "true")
        .option("pathGlobFilter", "*.csv")
        .option("header", "true")
        .option("delimiter", ",")
        .option("quote", '"')
        .option("escape", '"')
        .option("multiLine", "true")
        .option("lineSep", "\n")
        .load(file_source_abfss_path)
    )

    df = df.toDF(*[col.strip() for col in df.columns])

    df = (
        df.withColumn("FilePath", F.regexp_replace(F.input_file_name(), "%20", " "))
        .withColumn("SourceFile", F.regexp_extract("FilePath", r"([^/]+$)", 1))
        .withColumn("InsertTimestamp", F.current_timestamp())
        .withColumn(
            "Timestamp",
            F.to_timestamp(
                F.regexp_replace(F.col("Timestamp"), r"[\r\n]+", ""), "yyyy-MM-dd HH:mm:ss"
            ),
        )
        .filter(F.col(mappings['WatermarkColumn']) > F.lit(mappings["LPWatermarkValue"]))
        .select(cols)
    )

    if spark.catalog.tableExists(target_table_name):
        spark.sql(f"TRUNCATE TABLE {target_table_name}")

    (
        df.writeStream.format("delta")
        .option("checkpointLocation", autoloader_checkpoint_location)
        .trigger(once=True)
        .toTable(target_table_name)
    )

    end_time = spark.sql("SELECT CURRENT_TIMESTAMP()").first()[0]
    log_audit(
        job_name, job_run_id, 'Success', "", start_time, end_time
    )
except Exception as e:
    if re.search(r"UNABLE_TO_INFER_SCHEMA", str(e)):
        print("No files found after the specific cutoff time.")
    else:
        end_time = spark.sql("SELECT CURRENT_TIMESTAMP()").first()[0]
        error_details = str(e)[:1000].replace("'", "\"")
        log_audit(
            job_name, job_run_id, 'Fail', error_details, start_time, end_time
        )
        raise