**CONFIG**

In [0]:
import json

# load metadata
config_path = "/Workspace/Repos/parth.b2109@gmail.com/modastyle-dataeng/configs/tables.json"
with open(config_path, "r") as f:
    meta = json.load(f)

table_conf = meta["store_sales"]


In [0]:
spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")
spark.conf.set("spark.sql.files.ignoreMissingFiles", "true")
spark.conf.set("spark.sql.badRecordsPath", f"/mnt/modastyle/bad_records/store_sales/{ingest_date}")


In [0]:
# ingest_raw_and_bronze_store_sales.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name, current_timestamp, lit
import datetime
import uuid

spark = SparkSession.builder.getOrCreate()

# --- PARAMETERS ---
# Either pass this as widget or edit here for quick run
dbutils.widgets.text("ingest_date", datetime.date.today().strftime("%Y-%m-%d"))
ingest_date = dbutils.widgets.get("ingest_date")

# Paths
source_base   = table_conf["source_path"]
raw_base      = table_conf["raw_path"]
bronze_base   = table_conf["bronze_path"]
bad_base      = table_conf["bad_records_path"]
pk_cols       = table_conf["primary_key"]
partition_col = table_conf["partition_column"]


# Derived
raw_target = f"{raw_base}/dt={ingest_date}"
bronze_target = f"{bronze_base}/dt={ingest_date}"

run_id = str(uuid.uuid4())

print(f"Run: {run_id} | ingest_date: {ingest_date}")
print("Source:", source_base)
print("Raw target:", raw_target)
print("Bronze target:", bronze_target)


bad_records_path = f"/mnt/modastyle/bad_records/store_sales/{ingest_date}"
spark.conf.set("spark.sql.badRecordsPath", bad_records_path)

# --- READ SOURCE (flexible: folder or files) ---
try:
    df_src = (spark.read
            .option("header", "true")
            .option("inferSchema", "false")     # <-- read everything as STRING
            .option("mode", "PERMISSIVE")       # <-- allow bad rows
            .csv(source_base))

except Exception as e:
    print("No files found in source. Exiting.")
    raise

rows_in = df_src.count()
print("Rows in source:", rows_in)

bad_count = 0
try:
    bad_count = len(dbutils.fs.ls(bad_records_path))
    print(f"Bad record files: {bad_count}")
except:
    print("No bad records folder found.")

# --- WRITE RAW (just copy CSV rows as-is into raw partition) ---
# write as CSV to raw partition (preserve original CSV structure)
(df_src.write
    .mode("overwrite")
    .option("header", "true")
    .csv(raw_target))

print("Raw write complete.")

# --- BASIC CLEANING FOR BRONZE ---
# add metadata columns
df_bronze = (df_src
             .withColumn("source_file", input_file_name())
             .withColumn("ingest_ts", current_timestamp())
             .withColumn("ingest_date", lit(ingest_date))
             .withColumn("run_id", lit(run_id))
             .withColumn("layer", lit("bronze")))

# convert to Delta (create folder if not exists)
(df_bronze.write
    .format("delta")
    .mode("overwrite")
    .save(bronze_target))

rows_written = spark.read.format("delta").load(bronze_target).count()
print("Rows written to bronze:", rows_written)

# --- SIMPLE AUDIT LOG (append to delta audit table) ---
audit = spark.createDataFrame([(
    run_id, "store_sales", source_base, raw_target, bronze_target,
    rows_in, rows_written, bad_count, datetime.datetime.now().isoformat()
)], schema=["run_id","table","source_path","raw_path","bronze_path","rows_in","rows_written","bad_count","audit_ts"])

audit_table_path = "/mnt/modastyle/_audit/ingest_runs"
# create folder if not exists


(audit.write
     .format("delta")
     .mode("append")
     .save(audit_table_path))

print("Audit appended to:", audit_table_path)


In [0]:
dbutils.fs.rm("/mnt/modastyle/bronze/store_sales", recurse=True)