In [0]:
# ================================
# End-to-End Ingestion Pipeline
# ================================

from pyspark.sql import functions as F
from pyspark.sql.types import *
from delta.tables import DeltaTable
from datetime import datetime

# --------------------------------
# Configurations
# --------------------------------
raw_path = "/mnt/raw"
processed_path = "/mnt/processed"
error_log_path = "/mnt/error_logs"
watermark_date = "2024-01-01"

required_columns = ["cust_id", "order_value", "order_date"]

expected_schema = StructType([
    StructField("cust_id", StringType(), True),
    StructField("order_value", DoubleType(), True),
    StructField("order_date", DateType(), True),
])

# --------------------------------
# Error Logging
# --------------------------------
def log_error(file_name, issue, detected_schema=None, severity="ERROR"):
    error_data = [(file_name, issue, severity, str(detected_schema), datetime.now())]

    (spark.createDataFrame(
        error_data,
        ["file_name", "issue", "severity", "detected_schema", "logged_at"]
    )
    .write
    .format("delta")
    .mode("append")
    .save(error_log_path))

# --------------------------------
# File Reader
# --------------------------------
def read_file(file):
    if file.name.endswith(".csv"):
        return (spark.read
                .schema(expected_schema)
                .option("header", True)
                .csv(file.path))
    elif file.name.endswith(".parquet"):
        return spark.read.parquet(file.path)
    else:
        raise ValueError("Unsupported file format")

# --------------------------------
# Schema Validation
# --------------------------------
def validate_schema(df, file_name):

    incoming = {c.lower(): df.schema[c].dataType.simpleString() for c in df.columns}
    expected = {f.name.lower(): f.dataType.simpleString() for f in expected_schema.fields}

    missing = expected.keys() - incoming.keys()
    mismatched = {
        c: (incoming[c], expected[c])
        for c in expected.keys() & incoming.keys()
        if incoming[c] != expected[c]
    }

    if missing or mismatched:
        log_error(file_name,
                  f"Schema issues | Missing: {missing} | Mismatched: {mismatched}",
                  df.schema)
        return False

    return True

# --------------------------------
# Data Quality Validation
# --------------------------------
def data_quality_check(df, file_name):

    result = (
        df.select([
            F.sum(F.col(c).isNull().cast("int")).alias(c)
            for c in required_columns
        ])
        .first()
        .asDict()
    )

    issues = {c: v for c, v in result.items() if v > 0}

    if issues:
        log_error(file_name, f"Null violations: {issues}")
        return False

    return True

# --------------------------------
# Main Processing Function
# --------------------------------
def process_files():

    files = dbutils.fs.ls(raw_path)

    for file in files:
        try:
            # Step 1: Read file
            df = read_file(file)

            # Step 2: Schema validation
            if not validate_schema(df, file.name):
                continue

            # Step 3: Data quality check
            if not data_quality_check(df, file.name):
                continue

            # Step 4: Incremental filter
            if "order_date" not in df.columns:
                log_error(file.name,
                          "Missing incremental column 'order_date'",
                          df.columns)
                continue

            df_incremental = df.filter(
                F.col("order_date") > F.lit(watermark_date)
            )

            if df_incremental.rdd.isEmpty():
                continue

            # Step 5: Deduplication
            df_dedup = df_incremental.dropDuplicates(["cust_id", "order_date"])

            # Step 6: Write to Delta (Schema Evolution Enabled)
            if DeltaTable.isDeltaTable(spark, processed_path):
                (df_dedup.write
                 .format("delta")
                 .mode("append")
                 .option("mergeSchema", "true")
                 .save(processed_path))
            else:
                (df_dedup.write
                 .format("delta")
                 .mode("overwrite")
                 .option("mergeSchema", "true")
                 .save(processed_path))

            print(f"âœ” Successfully processed {file.name}")

        except Exception as e:
            log_error(file.name, str(e))

# --------------------------------
# Execute Pipeline
# --------------------------------
process_files()
