In [0]:
# Databricks Notebook: bronze_ingestion_driver (Loop Through All Metadata Entities)
# -------------------------------------------------------------------------------

import sys
import json
import uuid
from datetime import datetime
from pathlib import Path
from pyspark.sql.functions import col, lit
from pyspark.sql.utils import AnalysisException

# Add utility paths
sys.path.append("/dbfs/mnt/scripts/libs")
from parse_dq_rules import parse_dq_rules
from schema_validator import validate_schema
from dq_engine import apply_dq_rules

# -----------------------------------------------------
# 📥 Load all active entities from metadata table
# -----------------------------------------------------
jdbc_url = "jdbc:sqlserver://sql-retailverse-dev-server.database.windows.net:1433;database=retailverse_metadata_db"
jdbc_properties = {
    "user": dbutils.secrets.get(scope="retailverse-secret", key="sql-user"),
    "password": dbutils.secrets.get(scope="retailverse-secret", key="sql-password"),
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

entities_query = "(SELECT entity_name, source_path, dq_rules, partition_column, expected_schema FROM retailverse_metadata.metadata_control WHERE active = 1) AS metadata"
metadata_df = spark.read.jdbc(url=jdbc_url, table=entities_query, properties=jdbc_properties)
metadata_list = metadata_df.collect()

if not metadata_list:
    raise Exception("❌ No active entities found in metadata table.")

# -----------------------------------------------------
# 🔁 Loop over each entity for ingestion
# -----------------------------------------------------
for row in metadata_list:
    entity_name = row["entity_name"]
    source_path = row["source_path"]
    dq_rules_raw = row["dq_rules"]
    partition_column = row["partition_column"]
    expected_schema_json = row["expected_schema"]

    print(f"🚀 Ingesting entity: {entity_name}")
    print(f"📂 Source Path: {source_path}")
    print(f"📦 Partition Column: {partition_column}")

    bronze_output_path = f"/mnt/bronze/{entity_name}/"
    quarantine_path = f"/mnt/quarantine/{entity_name}/"

    # Audit logging setup
    run_id = str(uuid.uuid4())
    start_time = datetime.now()
    status = "SUCCESS"
    row_count = 0
    error_message = ""

    try:
        files = [f.name for f in dbutils.fs.ls(source_path) if f.name.endswith(".csv")]
        if not files:
            print(f"⚠️ No CSV files found in: {source_path}")
            continue

        df_raw = spark.read.option("header", True).csv(source_path)

        # Load expected schema and DQ rules
        expected_schema_dict = json.loads(expected_schema_json)
        dq_rules = parse_dq_rules(dq_rules_raw)

        # Validate schema
        df_validated = validate_schema(df_raw, expected_schema_dict)

        # Apply DQ rules
        invalid_rows = spark.createDataFrame([], df_validated.schema)
        for rule in dq_rules.get("rules", []):
            column = rule.get("column")
            rule_type = rule.get("rule")
            if rule_type == "not_null":
                failed = df_validated.filter(col(column).isNull()).withColumn("error_reason", lit(f"{column} is null"))
                invalid_rows = invalid_rows.unionByName(failed, allowMissingColumns=True)
                df_validated = df_validated.filter(col(column).isNotNull())
            elif rule_type == "positive_number":
                failed = df_validated.filter((col(column).cast("double") <= 0) | (col(column).isNull())).withColumn("error_reason", lit(f"{column} is not positive"))
                invalid_rows = invalid_rows.unionByName(failed, allowMissingColumns=True)
                df_validated = df_validated.filter(col(column).cast("double") > 0)
            elif rule_type == "email_format":
                failed = df_validated.filter(~col(column).rlike("^[\\w\\.-]+@[\\w\\.-]+\\.\\w+$")).withColumn("error_reason", lit(f"{column} failed email_format"))
                invalid_rows = invalid_rows.unionByName(failed, allowMissingColumns=True)
                df_validated = df_validated.filter(col(column).rlike("^[\\w\\.-]+@[\\w\\.-]+\\.\\w+$"))

        # Quarantine invalid rows
        if invalid_rows.count() > 0:
            invalid_rows.write.mode("append").parquet(quarantine_path)
            print(f"⚠️ Quarantined {invalid_rows.count()} invalid rows for entity: {entity_name}")

        # Write valid rows to Bronze
        if partition_column and partition_column in df_validated.columns:
            df_validated.write.mode("overwrite").partitionBy(partition_column).parquet(bronze_output_path)
        else:
            df_validated.write.mode("overwrite").parquet(bronze_output_path)

        row_count = df_validated.count()
        print(f"✅ Ingestion completed for: {entity_name}")

    except Exception as e:
        status = "FAILED"
        error_message = str(e)
        print(f"❌ Failed ingestion for {entity_name}: {error_message}")

    finally:
        end_time = datetime.now()
        audit_data = [(run_id, entity_name, start_time, end_time, status, row_count, error_message)]
        audit_schema = "run_id STRING, entity_name STRING, start_time TIMESTAMP, end_time TIMESTAMP, status STRING, row_count INT, error_message STRING"
        audit_df = spark.createDataFrame(audit_data, schema=audit_schema)

        audit_df.write \
            .format("jdbc") \
            .option("url", jdbc_url) \
            .option("dbtable", "retailverse_metadata.audit_logs") \
            .option("user", jdbc_properties["user"]) \
            .option("password", jdbc_properties["password"]) \
            .option("driver", jdbc_properties["driver"]) \
            .mode("append") \
            .save()

        audit_df.write \
            .mode("append") \
            .partitionBy("entity_name", "run_id") \
            .parquet("/mnt/logs/audit_logs/")
   
