In [0]:
# ADLS 
storage_account_name = "adcampaigndata"
landing_container = "landing"
bronze_container = "bronze"

# Base paths
landing_path = f"abfss://{landing_container}@{storage_account_name}.dfs.core.windows.net/"
bronze_path = f"abfss://{bronze_container}@{storage_account_name}.dfs.core.windows.net/"
checkpoint_path = f"{bronze_path}/checkpoint/"

# Configure Spark to access ADLS
spark.conf.set(
  f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net",
  dbutils.secrets.get(scope="adls-creds", key="storage-key")
)

current_user = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('user')
print(current_user)


In [0]:
from pyspark.sql.functions import current_timestamp, lit

# Create bronze schema if not exists
spark.sql("CREATE SCHEMA IF NOT EXISTS bronze")

# for listing all the csv folders in the landing zone
csv_folders = dbutils.fs.ls(landing_path)

table_folders = {
    folder.name.strip("/"): folder.name.strip("/")
    for folder in csv_folders
    if folder.isDir() and not folder.name.startswith("checkpoints") and not folder.name.startswith("_")
}
print("Folders and tables are defined")
for table_name, folder_name in table_folders.items():
    source_path = f"{landing_path}{folder_name}/"
    bronze_table_path = f"{bronze_path}{folder_name}/"
    checkpoint_path = f"{bronze_path}checkpoints/{folder_name}/"

    try:
        # Autoloader for incremental data ingestion
        query = (
            spark.readStream
                .format("cloudFiles")
                .option("cloudFiles.format", "csv")
                .option("header", "true")
                .option("mergeSchema", "true")  # For schema evolution
                .option("cloudFiles.schemaLocation", checkpoint_path)
                .load(source_path)
                .withColumn("ingestion_timestamp", current_timestamp())
                .withColumn("modified_by", lit(current_user))
                .writeStream
                .format("delta")
                .option("checkpointLocation", checkpoint_path)
                .outputMode("append")
                .trigger(once=True)  # Run once per job
                .start(bronze_table_path)
        )

        query.awaitTermination()

        print(f"✅ Auto Loader started for table: {table_name}")
        print(f"    - Source path: {source_path}")
        print(f"    - Bronze path: {bronze_table_path}")
        print(f"    - Checkpoint path: {checkpoint_path}")
        print(f"    - Trigger: Once (for daily scheduling)")

        # Register table in metastore
        bronze_table_name = f"br_{table_name}"
        spark.sql(f"""
            CREATE TABLE IF NOT EXISTS bronze.{bronze_table_name}
            USING DELTA
            LOCATION '{bronze_table_path}'
        """)
        print(f"✅ Registered Hive table: bronze.{bronze_table_name}\n")

    except Exception as e:
        print(f"Error processing table '{table_name}': {e}\n")

bronze_tables = dbutils.fs.ls(f"abfss://{bronze_container}@{storage_account_name}.dfs.core.windows.net/")

for table in bronze_tables:
    if table.name.endswith("/") and "checkpoints" not in table.name.lower():
        bronze_df = spark.read.format("delta").load(
            f"abfss://{bronze_container}@{storage_account_name}.dfs.core.windows.net/{table.name}"
        )
        print(f"Showing data for table: {table.name}")
        display(bronze_df)