In [0]:
# Bronze ingestion config
SOURCE_PATH = "dbfs:/databricks-datasets/retail-org/promotions/"
TARGET_TABLE = "retail_project.bronze.promotions"

In [0]:
# Imports
from pyspark.sql import functions as F

In [0]:
# Detect file format (standardized, Spark-safe)

files = dbutils.fs.ls(SOURCE_PATH)

# Ignore Spark metadata files and directories
data_files = [
    f.name.lower()
    for f in files
    if not f.name.startswith("_") and "." in f.name
]

if not data_files:
    raise ValueError(f"No data files found under {SOURCE_PATH}")

# Collect unique file extensions
extensions = {name.split(".")[-1] for name in data_files}

# Enforce single-format sources
if len(extensions) != 1:
    raise ValueError(
        f"Mixed or unsupported file types under {SOURCE_PATH}: {extensions}"
    )

FILE_FORMAT = extensions.pop()

# Allow only known formats
if FILE_FORMAT not in {"parquet", "csv", "json", "xml"}:
    raise ValueError(
        f"Unsupported file format '{FILE_FORMAT}' under {SOURCE_PATH}"
    )

print("Detected format:", FILE_FORMAT)

Detected format: csv


In [0]:
# Read raw CSV data (multiline, quoted fields)
reader = (
    spark.read
         .format("csv")
         .option("header", "true")
         .option("inferSchema", "true")
         .option("mode", "PERMISSIVE")
         .option("multiLine", "true")   # fields may span multiple lines
         .option("quote", "\"")         # quoted fields
         .option("escape", "\"")
)

df_raw = reader.load(SOURCE_PATH)

# Bronze enrichment (standard)
df_bronze = (
    df_raw
    .withColumn("_read_timestamp", F.current_timestamp())
    .withColumn("_source_path", F.col("_metadata.file_path"))
    .withColumn("_file_size", F.col("_metadata.file_size"))
)

display(df_bronze.limit(10))
df_bronze.printSchema()

promotion_id,promotion_type,dollar_discount,percent_discount,qualifying_products,units_required,free_product_ids,length,valid_from,valid_to,_read_timestamp,_source_path,_file_size
0,percent_discount,,0.03,AVpfjP9uilAPnD_xdy6- AVpfPEx61cnluZ0-gyT9 AVpfMVD-ilAPnD_xW6bu,3,,,2019-01-01T01:00:00.000Z,,2025-12-13T15:55:48.807Z,dbfs:/databricks-datasets/retail-org/promotions/promotions.csv,526
1,percent_discount,,0.05,AVphq88q1cnluZ0-FFwx AVpfZaCp1cnluZ0-kDV9 AVqVGUFCv8e3D1O-ldFF,5,,,2019-01-01T01:00:00.000Z,,2025-12-13T15:55:48.807Z,dbfs:/databricks-datasets/retail-org/promotions/promotions.csv,526
2,percent_discount,,0.07,AVpge6k2LJeJML43OhAl AVpfYKih1cnluZ0-jsHP AVpfeG5oilAPnD_xcTsG  AVpiHEE31cnluZ0-J8jJ,4,,30.0,2019-09-01T01:00:00.000Z,2019-10-01T01:00:00.000Z,2025-12-13T15:55:48.807Z,dbfs:/databricks-datasets/retail-org/promotions/promotions.csv,526


root
 |-- promotion_id: integer (nullable = true)
 |-- promotion_type: string (nullable = true)
 |-- dollar_discount: string (nullable = true)
 |-- percent_discount: double (nullable = true)
 |-- qualifying_products: string (nullable = true)
 |-- units_required: integer (nullable = true)
 |-- free_product_ids: string (nullable = true)
 |-- length: double (nullable = true)
 |-- valid_from: timestamp (nullable = true)
 |-- valid_to: timestamp (nullable = true)
 |-- _read_timestamp: timestamp (nullable = false)
 |-- _source_path: string (nullable = false)
 |-- _file_size: long (nullable = false)



In [0]:
# Write to Delta Bronze table
(
    df_bronze.write
        .format("delta")
        .option("overwriteSchema", "true")  # Bronze schema is authoritative
        .mode("overwrite")                  # Full refresh
        .saveAsTable(TARGET_TABLE)
)

print(f"Wrote Bronze table: {TARGET_TABLE}")

Wrote Bronze table: retail_project.bronze.promotions


In [0]:
# Quick validation
spark.sql(f"SELECT COUNT(*) AS row_count FROM {TARGET_TABLE}").show()

+---------+
|row_count|
+---------+
|        3|
+---------+

