# datacontract-cli — Unity Catalog demo (v2)

- Uses `dataContractSpecification: 1.2.0` (supported schema). soda-core==3.3.20
- Robust YAML path resolution (works in Repos **and** Users workspace).


In [0]:
%pip install -U pip
%pip install --force-reinstall --no-cache-dir 'datacontract-cli[databricks]'

In [0]:
%restart_python

In [0]:
#Quick test
!datacontract --version
!datacontract test https://datacontract.com/examples/orders-latest/datacontract.yaml


In [0]:
# Config for UC targets
import os
from dataclasses import dataclass

@dataclass
class Cfg:
    CATALOG: str = os.getenv("DC_DEMO_CATALOG", "DATAC")
    SCHEMA: str = os.getenv("DC_DEMO_SCHEMA", "orders")
    TABLE: str = "orders_raw"
    VOLUME_PATH: str = f"/Volumes/{CATALOG}/{SCHEMA}/contract_audit/"

cfg = Cfg()
print(cfg)


In [0]:
YAML_PATH = "/Workspace/Users/hadi.farhat@databricks.com/datacontract/datacontracts/orders/datacontract.uc2.yaml"
print("YAML:", YAML_PATH)


In [0]:
import datetime
from datetime import timedelta
from pyspark.sql import functions as F
from pyspark.sql import types as T

# --- setup catalog & schema, reset target table ---
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {cfg.CATALOG}.{cfg.SCHEMA}")
spark.sql(f"USE CATALOG {cfg.CATALOG}")
spark.sql(f"USE {cfg.SCHEMA}")
spark.sql(f"DROP TABLE IF EXISTS {cfg.CATALOG}.{cfg.SCHEMA}.{cfg.TABLE}")

now = datetime.datetime.utcnow()

# Helper: EUR → cents (long)
def cents(x: float) -> int:
    return int(round(x * 100))

rows_ok = [
    # order_id, order_timestamp ISO,           order_total_cents, customer_id,  customer_email,    processed_timestamp ISO
    ("o-1",     now.isoformat(),               cents(19.99),      "1000000001","a@x.com",          (now + timedelta(minutes=1)).isoformat()),
    ("o-2",     (now - timedelta(hours=2)).isoformat(), cents(29.99), "1000000002","b@x.com",    (now + timedelta(minutes=1)).isoformat()),
]

schema = T.StructType([
    T.StructField("order_id", T.StringType(), False),
    T.StructField("order_timestamp", T.StringType(), False),
    T.StructField("order_total", T.LongType(),  False),   # long in cents
    T.StructField("customer_id", T.StringType(), True),
    T.StructField("customer_email_address", T.StringType(), False),
    T.StructField("processed_timestamp", T.StringType(), False),
])

df_ok = (
    spark.createDataFrame(rows_ok, schema)
         .withColumn("order_timestamp", F.to_timestamp("order_timestamp"))
         .withColumn("processed_timestamp", F.to_timestamp("processed_timestamp"))
)

# --- write to UC as delta ---
target_fq = f"{cfg.CATALOG}.{cfg.SCHEMA}.{cfg.TABLE}"  # e.g., DATAC.orders.orders_raw
(df_ok.write
    .mode("overwrite")
    .format("delta")
    .saveAsTable(target_fq))

# --- temp view matches the model key in the contract (e.g., 'orders_raw') ---
spark.table(target_fq).createOrReplaceTempView(cfg.TABLE)
print(f"Temp view created -> {cfg.TABLE} => {target_fq}")


In [0]:
spark.sql(f" SELECT * FROM {cfg.CATALOG}.{cfg.SCHEMA}.{cfg.TABLE}").display()

In [0]:
from datacontract.data_contract import DataContract
import re

# --- Run lint & test ---
dc = DataContract(data_contract_file=YAML_PATH, spark=spark)

lint = dc.lint()
print(f"Lint result: {lint.result}")

run = dc.test()

# --- Helper: parse checks into rows ---
def _enum_to_str(v):
    s = str(v)
    m = re.search(r"'([^']+)'", s)
    return m.group(1) if m else s

def checks_to_rows(run_dict):
    rows = []
    for c in run_dict.get("checks", []):
        diag = c.get("diagnostics", {}) or {}
        fail = diag.get("fail") or {}
        thr_parts = []
        if "lessThan" in fail and "greaterThan" in fail:
            thr_parts.append(f"between {fail['greaterThan']} and {fail['lessThan']}")
        else:
            if "lessThan" in fail:
                thr_parts.append(f"< {fail['lessThan']}")
            if "lessThanOrEqual" in fail:
                thr_parts.append(f"≤ {fail['lessThanOrEqual']}")
            if "greaterThan" in fail:
                thr_parts.append(f"> {fail['greaterThan']}")
            if "greaterThanOrEqual" in fail:
                thr_parts.append(f"≥ {fail['greaterThanOrEqual']}")
        thresholds = ", ".join(thr_parts) or ""
        rows.append((
            _enum_to_str(c.get("result")).replace("ResultEnum.", "").lower(),
            c.get("category"),
            c.get("model"),
            c.get("field") or "",
            c.get("name"),
            diag.get("value", ""),
            thresholds
        ))
    return rows

# --- Extract dict form of run ---
try:
    run_dict = run.model_dump()
except Exception:
    try:
        run_dict = run.__dict__
    except Exception:
        run_dict = vars(run)

rows = checks_to_rows(run_dict)

# --- Save to Delta ---
result_schema = ["Result", "Category", "Model", "Field", "Check", "Measured", "Thresholds"]

df_results = spark.createDataFrame(rows, result_schema)

target_results_table = f"{cfg.CATALOG}.{cfg.SCHEMA}.orders_quality_results"

(df_results.write
    .mode("overwrite")
    .format("delta")
    .saveAsTable(target_results_table))

print(f"✅ Results saved to {target_results_table}")

# --- Optionally wrap in a materialized view (Databricks SQL syntax) ---
mv_name = f"{cfg.CATALOG}.{cfg.SCHEMA}.orders_quality_mv"
spark.sql(f"DROP VIEW IF EXISTS v_orders_audit")
spark.sql(f"CREATE VIEW v_orders_audit AS SELECT * FROM {target_results_table}")

print(f"📊 Materialized view created: v_orders_audit")


In [0]:
spark.sql(f"SELECT * FROM {cfg.CATALOG}.{cfg.SCHEMA}.v_orders_audit").display()