In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.sql.functions import col
from time import time
import sys
import os

In [0]:
from utils.common_util import normalize_columns

In [0]:
start_time = time()

In [0]:
source_path = "/Volumes/fz_catalog/landing/products/"
checkpoint_path = "/Volumes/fz_catalog/landing/_checkpoints/products_bronze/"

In [0]:
# List all CSV files in the source directory
csv_files = [f for f in os.listdir(source_path) if f.endswith('.csv')]

# Exit notebook if no CSV files are found
if not csv_files:
    dbutils.notebook.exit("Error: No CSV files found in the source directory.")

In [0]:
try:
    # Read streaming CSV files using Auto Loader with schema evolution and rescue columns
    df = (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "csv")  # Specify CSV format
        .option("inferSchema", "true")       # Infer schema automatically
        .option("header", "true")            # First row as header
        .option("quote", '"')                # Quote character
        .option("escape", '"')               # Escape character
        .option("cloudFiles.schemaLocation", f"{checkpoint_path}")  # Store schema at checkpoint path
        .option("cloudFiles.schemaEvolutionMode", "rescue")         # Enable rescue columns for new fields
        .load(source_path)                   # Source path for input files
    )
except Exception as e:
    dbutils.notebook.exit(f"Error reading stream: {e}")

In [0]:
# Normalize column names in the DataFrame using the custom utility function
df_renamed = normalize_columns(df)

In [0]:
# Add audit columns: input file name and file modification time from metadata
df_with_audit = df_renamed.withColumn("input_file_name", col("_metadata.file_name")).withColumn(
    "file_modification_time", col("_metadata.file_modification_time")
)

In [0]:
try:
    df_with_audit.writeStream.format("delta")\
        .outputMode("append")\
        .option("checkpointLocation", checkpoint_path)\
        .trigger(availableNow=True)\
        .toTable("fz_catalog.bronze.products")
except Exception as e:
    dbutils.notebook.exit(f"Error writing stream: {e}")
    raise

In [0]:
end_time = time()

In [0]:
dbutils.notebook.exit(f"Run Time: {end_time - start_time} seconds")

I am considering that schema can be changes here and if any ne column comes, it will be stored in rescure columns