In [0]:
from pyspark.sql import functions as F
from pyspark.sql import Window, DataFrame
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, TimestampType, DateType
import json
import time
from datetime import datetime

In [0]:
DQ_CONFIG = {
    "domain_code_regex": r"^[a-z]+(\.[a-z]+)*$",
    "max_null_pct": 0.05,                # si > 5% nulos en columna crit -> alerta
    "min_rows_expected": 1
}

EXPECTED_SCHEMA = StructType([
    StructField("domain_code", StringType(), True),
    StructField("count_views", IntegerType(), True),
    StructField("total_response_size", LongType(), True),
    StructField("event_timestamp", TimestampType(), True),
    StructField("event_date", DateType(), True),
])

In [0]:
# ---------------------------
# 1) Parse raw DataFrame 
# ---------------------------

def parse_raw_df(bronze_table) -> DataFrame:
    parsed = (
        spark
        .read
        .text(bronze_path)
        .filter(~F.col("value").startswith('""'))   
        .withColumn("split", F.split(F.col("value"), " ", 4))
        .filter(F.size("split") == 4)
        .select(
            F.col("split")[0].alias("domain_code"),
            F.col("split")[2].cast("int").alias("count_views"),
            F.col("split")[3].cast("long").alias("total_response_size"),
            F.col("_metadata.file_path").alias("file_path")
        )
        .withColumn("date_str", F.regexp_extract(F.col("file_path"), r"projectviews-(\d{8})-(\d{6})", 1))
        .withColumn("time_str", F.regexp_extract(F.col("file_path"), r"projectviews-(\d{8})-(\d{6})", 2))
        .withColumn("datetime_concat", 
                   F.when((F.col("date_str") != "") & (F.col("time_str") != ""), 
                         F.concat_ws("", F.col("date_str"), F.col("time_str")))
                   .otherwise(None))
        .withColumn("event_timestamp", 
                   F.when(F.col("datetime_concat").isNotNull() & (F.length(F.col("datetime_concat")) == 14),
                         F.to_timestamp(F.col("datetime_concat"), "yyyyMMddHHmmss"))
                   .otherwise(None))
        .withColumn("event_date", F.to_date(F.col("event_timestamp")))
        .drop("date_str", "time_str", "datetime_concat", "file_path")
    )

    parsed = parsed.repartition("event_date")

    return parsed

# ---------------------------
# 2) Enforce schema + casts
# ---------------------------

def enforce_and_cast_schema(df: DataFrame) -> DataFrame:
    cols = df.columns
    for c in ["domain_code", "count_views", "total_response_size", "event_timestamp", "event_date"]:
        if c not in cols:
            df = df.withColumn(c, F.lit(None).cast(StringType() if c=="domain_code" else IntegerType()))
    df = (
        df
        .withColumn("domain_code", F.col("domain_code").cast("string"))
        .withColumn("count_views", F.col("count_views").cast("long"))  
        .withColumn("total_response_size", F.col("total_response_size").cast("long"))
        .withColumn("event_timestamp", F.col("event_timestamp").cast("timestamp"))
        .withColumn("event_date", F.to_date(F.col("event_date")))
    )
    return df

# ---------------------------
# 3) DQ checks
# ---------------------------

def run_dq_checks(df: DataFrame, config: dict = DQ_CONFIG, spark=None, quarantine_path: str = None) -> dict:
    """
    Ejecuta una serie de checks y retorna:
      - valid_df: filas que pasan todos los checks
      - quarantine_df: filas que fallan al menos un check (para inspección)
      - dq_report: dict con métricas y resultados
    """
    total_rows = df.count()
    dq_report = {"total_rows": total_rows, "checks": {}}

    # 3.1 Schema presence (columns)
    expected_cols = ["domain_code", "count_views", "total_response_size", "event_timestamp", "event_date"]
    missing_cols = [c for c in expected_cols if c not in df.columns]
    dq_report["checks"]["missing_columns"] = missing_cols

    # 3.2 Nulls and completeness con cast seguro a string
    null_counts = {}
    null_pct = {}
    for c in ["domain_code", "count_views", "total_response_size", "event_timestamp", "event_date"]:
        # FIX: Verificar que la columna existe antes de hacer el check
        if c in df.columns:
            cnt = df.filter(
                F.col(c).isNull() | (F.trim(F.col(c).cast("string")) == "")
            ).count()
            null_counts[c] = cnt
            null_pct[c] = cnt / total_rows if total_rows > 0 else None
        else:
            null_counts[c] = total_rows  # Si la columna no existe, todas las filas son "null"
            null_pct[c] = 1.0
    dq_report["checks"]["null_counts"] = null_counts
    dq_report["checks"]["null_pct"] = null_pct

    # 3.3 Domain regex validity
    regex = config["domain_code_regex"]
    invalid_domain_df = df.filter((F.col("domain_code").isNull()) | (~F.col("domain_code").rlike(regex)))
    invalid_domain_cnt = invalid_domain_df.count()
    dq_report["checks"]["invalid_domain_count"] = invalid_domain_cnt

    # 3.4 Non-negative ranges
    negative_vals_df = df.filter((F.col("count_views") < 0) | (F.col("total_response_size") < 0))
    negative_cnt = negative_vals_df.count()
    dq_report["checks"]["negative_values_count"] = negative_cnt

    # 3.10 Minimal row count
    dq_report["checks"]["min_rows_ok"] = total_rows >= config.get("min_rows_expected", 1)

    # Construir quarantine_df (filas que fallan algún check)
    failing_frames = []
    if invalid_domain_cnt > 0:
        failing_frames.append(invalid_domain_df.withColumn("dq_reason", F.lit("invalid_domain")))
    if negative_cnt > 0:
        failing_frames.append(negative_vals_df.withColumn("dq_reason", F.lit("negative_values")))
    if failing_frames:
        quarantine_df = failing_frames[0]
        for f in failing_frames[1:]:
            quarantine_df = quarantine_df.unionByName(f, allowMissingColumns=True)
        quarantine_df = quarantine_df.dropDuplicates()  # Sin ingest_id, deduplico todo el row
    else:
        quarantine_df = df.limit(0)  # vacío

    # valid_df = df - quarantine_df por left_anti join usando todas las columnas
    if quarantine_df.count() > 0:
        # Crear columna de hash para comparar filas completas
        from pyspark.sql.functions import sha2, concat_ws

        hash_col = "row_hash"
        df_with_hash = df.withColumn(hash_col, sha2(concat_ws("||", *df.columns), 256))
        quarantine_with_hash = quarantine_df.withColumn(hash_col, sha2(concat_ws("||", *quarantine_df.columns), 256))

        valid_df = df_with_hash.join(quarantine_with_hash.select(hash_col).withColumn("flag", F.lit(1)),
                                    on=hash_col, how="left_anti").drop(hash_col)
    else:
        valid_df = df

    dq_report["counts"] = {
        "total_rows": total_rows,
        "valid_rows": valid_df.count(),
        "quarantine_rows": quarantine_df.count()
    }

    dq_report_df = spark.createDataFrame(
        [(json.dumps(dq_report), datetime.now())],
        ["dq_report_json", "execution_timestamp"]
    )

    return {
        "valid_df": valid_df,
        "quarantine_df": quarantine_df,
        "dq_report": dq_report_df
    }


# ---------------------------
# 4) Write outputs (Delta or Parquet) + save DQ report
# ---------------------------
def write_outputs(valid_df: DataFrame, quarantine_df: DataFrame, dq_report_df: DataFrame,
                  silver_path: str, quarantine_path: str, dq_report_path: str,
                  partition_by: list = ["event_date"]):
    """
    Escribe valid_df a silver (delta), quarantine rows a quarantine_path (parquet),
    y dq_report (json) a dq_report_path.
    """
    # write silver as Delta (append)
    (valid_df
     .write
     .format("delta")
     .option("mergeSchema", "true")
     .mode("overwrite")
     .partitionBy(*partition_by)
     .saveAsTable(silver_path)
    )

    # write quarantine as parquet (append)
    if quarantine_df.count() > 0:
        (quarantine_df
         .write
         .option("mergeSchema", "true")
         .format("delta")
         .mode("overwrite")
         .saveAsTable(quarantine_path)
        )

    (dq_report_df
     .write
     .format("delta")
     .option("mergeSchema", "true")
    .mode("overwrite")
     .saveAsTable(dq_report_path)
    )

In [0]:
bronze_path = "/Volumes/projectviews/bronze/raw_files"
silver_table = "projectviews.silver.projectviews_clean"
quarantine_path = "projectviews.silver.projectviews_quarantine"
dq_report_path = "projectviews.silver.dq_reports"

parsed = parse_raw_df(bronze_path)
parsed_check = enforce_and_cast_schema(parsed)
results = run_dq_checks(parsed_check, config=DQ_CONFIG, spark=spark)

valid_df = results["valid_df"]
quarantine_df = results["quarantine_df"]
dq_report_df = results["dq_report"]
write_outputs(valid_df, quarantine_df, dq_report_df, silver_table, quarantine_path, dq_report_path)

In [0]:
%sql 
with cte as (
  select domain_code, event_date,
sum(count_views) as count_views,
sum(total_response_size) as total_response_size 
from projectviews.silver.projectviews_clean 
group by 1, 2
)

select count(*) from cte