In [0]:
%sql
CREATE TABLE IF NOT EXISTS knowledgehub_lakehouse.silver.dq_rule_registry (
  rule_id STRING,
  dataset STRING,
  rule_name STRING,
  rule_description STRING,
  severity STRING,         -- BLOCKER / WARNING / INFO
  action STRING,           -- QUARANTINE / FLAG / ALLOW
  owner STRING,
  created_ts TIMESTAMP
)
USING DELTA;

INSERT INTO knowledgehub_lakehouse.silver.dq_rule_registry
VALUES
-- DOCS rules
('DOC_001','docs','Missing doc_id','doc_id is null or empty','BLOCKER','QUARANTINE','data_engineering', current_timestamp()),
('DOC_002','docs','Missing doc_type','doc_type is null or empty','BLOCKER','QUARANTINE','data_engineering', current_timestamp()),
('DOC_003','docs','Published doc has empty text','status=Published and doc_text empty','BLOCKER','QUARANTINE','compliance', current_timestamp()),
('DOC_004','docs','Published doc missing confidentiality','status=Published and confidentiality null','BLOCKER','QUARANTINE','compliance', current_timestamp()),

-- EVENTS rules
('EVT_001','events','Missing event_id','event_id is null','BLOCKER','QUARANTINE','data_engineering', current_timestamp()),
('EVT_002','events','Invalid result_count','result_count < 0','WARNING','QUARANTINE','data_engineering', current_timestamp()),
('EVT_003','events','Missing event_ts','event_ts is null','BLOCKER','QUARANTINE','data_engineering', current_timestamp());


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

docs = spark.table("knowledgehub_lakehouse.bronze.docs_raw")

docs_std = (
    docs
    .withColumn("doc_type", F.upper(F.trim("doc_type")))
    .withColumn("status", F.upper(F.trim("status")))
    .withColumn("confidentiality", F.upper(F.trim("confidentiality")))
    .withColumn("department", F.upper(F.trim("department")))

    .withColumn("version_clean", F.regexp_replace("version", "[^0-9.]", ""))
    .withColumn("version_major", F.split("version_clean", "\\.").getItem(0).cast("int"))
    .withColumn("version_minor", F.get(F.split("version_clean", "\\."), 1).cast("int"))
    .withColumn(
        "version_norm",
        F.concat_ws(".", F.col("version_major"), F.coalesce(F.col("version_minor"), F.lit(0)))
    )
    .withColumn("doc_type", F.regexp_replace(F.col("doc_type"), r"(?i)policy", "POLICY")) \
    .withColumn("doc_type", F.regexp_replace(F.col("doc_type"), r"(?i)s.o.p", "SOP")) \
    .withColumn("status", F.regexp_replace(F.col("status"), r"(?i)draft", "DRAFT")) \
    .withColumn("status", F.regexp_replace(F.col("status"), r"(?i)published", "PUBLISHED")) \
    .withColumn("status", F.regexp_replace(F.col("status"), r"(?i)archived", "ARCHIVED")) \
    .withColumn("confidentiality", F.when(F.col("confidentiality").isNull(), "UNKNOWN")
                       .otherwise(F.col("confidentiality"))) \
    .withColumn("department", F.regexp_replace(F.col("department"), r"(?i)ops", "OPERATIONS"))
)

In [0]:
w_dedupe = Window.partitionBy("doc_id","doc_title", "version_norm") \
                 .orderBy(F.col("updated_ts").desc(), F.col("ingest_ts").desc())


docs_dedup = (
    docs_std
    .withColumn("rn", F.row_number().over(w_dedupe))
    .filter("rn = 1")
    .drop("rn")
)
docs_dedup.display()

In [0]:
w_latest = Window.partitionBy("doc_id","doc_title","version_norm") \
                 .orderBy(F.col("version_major").desc(), F.col("version_minor").desc())

docs_latest = (
    docs_dedup
    .withColumn("current_version_flag",
        F.when(F.row_number().over(w_latest) == 1, F.lit(True))
         .otherwise(F.lit(False))
    )
)


In [0]:
docs_quarantine = docs_latest.filter(
    (F.col("status") == "PUBLISHED") & (F.col("doc_text").isNull()) |
    (F.col("doc_id").isNull()) |
    (F.col("doc_title").isNull()) |
    (F.col("doc_type").isNull()) |
    ((F.col("status") == "PUBLISHED") & (F.col("confidentiality").isNull()))
)

In [0]:
docs_clean = docs_latest.subtract(docs_quarantine)

In [0]:
docs_clean.write.mode("append").format("delta").option("mergeSchema", "true").saveAsTable("knowledgehub_lakehouse.silver.docs_clean")

In [0]:
docs_quarantine = docs_quarantine.withColumn(
    "reject_reason",
    F.when(F.col("doc_id").isNull(), F.lit("DOC_002"))
     .when(F.col("doc_title").isNull(), F.lit("DOC_002"))
     .when(F.col("doc_type").isNull(), F.lit("DOC_002"))
     .when((F.col("status") == "PUBLISHED") & F.col("doc_text").isNull(), F.lit("DOC_001"))
     .when((F.col("status") == "PUBLISHED") & F.col("confidentiality").isNull(), F.lit("DOC_003")))
docs_quarantine.write.mode("overwrite").saveAsTable("knowledgehub_lakehouse.quarantine.docs_rejected")

In [0]:
%sql
SELECT
  date(ingest_ts) AS dt,
  reject_reason,
  COUNT(*) AS rejected_count
FROM knowledgehub_lakehouse.quarantine.docs_rejected
GROUP BY dt, reject_reason
ORDER BY dt;


In [0]:
from pyspark.sql import functions as F
events = spark.table("knowledgehub_lakehouse.bronze.access_events_raw")

events_std = (
    events
    .withColumn("action", F.upper(F.trim("action")))
    .withColumn("client_type", F.upper(F.trim("client_type")))
)

In [0]:
from pyspark.sql.window import Window
w_evt = Window.partitionBy("event_id").orderBy(F.col("ingest_ts").desc())

events_dedup = (
    events_std
    .withColumn("rn", F.row_number().over(w_evt))
    .filter("rn = 1")
    .drop("rn")
)

In [0]:
docs_ids = spark.table("knowledgehub_lakehouse.silver.docs_clean").select("doc_id").distinct()

events_enriched = (
    events_dedup
    .join(docs_ids, "doc_id", "left")
    .withColumn(
        "orphan_doc_flag",
        F.when(docs_ids.doc_id.isNull(), F.lit(1)).otherwise(F.lit(0))
    )
)

In [0]:
events_final = (
    events_enriched
    .withColumn(
        "search_success_flag",
        F.when(
            (F.col("action") == "SEARCH") & (F.col("result_count") > 0),
            F.lit(1)
        ).otherwise(F.lit(0))
    )
)

In [0]:
events_final.write.mode("overwrite").saveAsTable("knowledgehub_lakehouse.silver.access_events_clean")

In [0]:
%sql
SELECT
  orphan_doc_flag,
  COUNT(*) AS events
FROM knowledgehub_lakehouse.silver.access_events_clean
GROUP BY orphan_doc_flag;


In [0]:
from pyspark.sql import functions as F
docs_bronze = spark.table("knowledgehub_lakehouse.bronze.docs_raw")
docs_clean = spark.table("knowledgehub_lakehouse.silver.docs_clean")
docs_rejected = spark.table("knowledgehub_lakehouse.quarantine.docs_rejected")

dq_metrics = (
    spark.createDataFrame(
        [
            ("docs", docs_bronze.count(), docs_clean.count(), docs_rejected.count())
        ],
        ["dataset", "bronze_count", "silver_count", "quarantine_count"]
    )
    .withColumn("metric_date", F.current_date())
    .withColumn("created_ts", F.current_timestamp())
)

dq_metrics.write.mode("append").format("delta").saveAsTable(
    "knowledgehub_lakehouse.silver.dq_metrics_daily"
)

display(
    spark.table("knowledgehub_lakehouse.silver.dq_metrics_daily").orderBy(F.desc("created_ts"))
)

In [0]:
%sql
SELECT
  doc_id,
  version_norm,
  current_version_flag
FROM knowledgehub_lakehouse.silver.docs_clean
ORDER BY doc_id, version_norm;

In [0]:
%sql
-- Enable column mapping to support DROP COLUMN
ALTER TABLE knowledgehub_lakehouse.silver.docs_clean
SET TBLPROPERTIES ('delta.columnMapping.mode' = 'name');

-- Now drop the columns
ALTER TABLE knowledgehub_lakehouse.silver.docs_clean
DROP COLUMN version,version_clean,version_major,version_minor;

In [0]:
%sql
select count(*) from knowledgehub_lakehouse.silver.access_events_clean where doc_id is null;

In [0]:
quarantine_docs = spark.table(
    "knowledgehub_lakehouse.quarantine.docs_rejected"
)

silver_docs = spark.table(
    "knowledgehub_lakehouse.silver.docs_clean"
)

In [0]:
from pyspark.sql import functions as F
doc_text_lookup = (
    silver_docs
    .filter(F.col("doc_text").isNotNull())
    .select(
        "doc_id",
        F.col("doc_text").alias("doc_text_lookup")
    )
    .dropDuplicates(["doc_id"])
)

In [0]:
quarantine_enriched = (
    quarantine_docs
    .join(doc_text_lookup, "doc_id", "left")
)

In [0]:
quarantine_fixed = (
    quarantine_enriched
    .withColumn(
        "doc_text",
        F.when(
            F.col("doc_text").isNull(),
            F.col("doc_text_lookup")
        ).otherwise(F.col("doc_text"))
    )
    .drop("doc_text_lookup")
)


In [0]:
recovered_docs = quarantine_fixed.filter(
    (F.col("doc_text").isNotNull()) &
    (F.col("doc_id").isNotNull()) &
    (F.col("doc_title").isNotNull()) &
    (F.col("doc_type").isNotNull()) &
    ~(
        (F.col("status") == "PUBLISHED") &
        (F.col("confidentiality").isNull())
    )
)


In [0]:
still_quarantined = quarantine_fixed.filter(
    F.col("doc_text").isNull()
)

In [0]:
recovered_docs_clean = recovered_docs.drop(
    "reject_reason"
)


In [0]:
recovered_docs_clean.write \
    .mode("append") \
    .option("mergeSchema", "true")\
    .saveAsTable("knowledgehub_lakehouse.silver.docs_clean")


In [0]:
still_quarantined.write \
    .mode("overwrite") \
    .saveAsTable("knowledgehub_lakehouse.quarantine.docs_rejected")

In [0]:
%sql
select count(*) from knowledgehub_lakehouse.silver.docs_clean;