# Databricks Delta governance streaming demo

Drive Structured Streaming writes against Unity Catalog tables while governance tracks compatibility across contract versions.

## 1. Configure parameters
Widgets (Databricks) or environment variables supply table identifiers, checkpoint roots, and dataset metadata.

In [None]:

import os

def _resolve_param(name: str, *, default: str | None = None, label: str | None = None) -> str | None:
    env_key = f"DC43_DEMO_{name.upper()}"
    if 'dbutils' in globals():
        try:
            if label is None:
                label = name
            dbutils.widgets.text(name, os.environ.get(env_key, default) or '', label)
            value = dbutils.widgets.get(name)
            if value:
                return value
        except Exception as exc:  # pragma: no cover - widgets unavailable
            print(f'Falling back to environment for {name}: {exc}')
    return os.environ.get(env_key, default)

CONFIG_PATH = _resolve_param('config_path', label='Service config (optional)')
CATALOG = _resolve_param('catalog', default='main', label='Unity Catalog')
SCHEMA = _resolve_param('schema', default='governed_demo', label='Schema')
TABLE = _resolve_param('table', default='orders_stream', label='Table name')
DATASET_ID = _resolve_param('dataset_id', default='governed.analytics.orders_streaming')
DATA_PRODUCT_ID = _resolve_param('data_product_id', default='dp.analytics.orders')
OUTPUT_PORT = _resolve_param('output_port', default='orders_stream')
CONTRACT_ID = _resolve_param('contract_id', default='contracts.analytics.orders')
CHECKPOINT_ROOT = _resolve_param('checkpoint_root', default='dbfs:/tmp/dc43/checkpoints')
AUTO_DATASET_VERSION = (_resolve_param('auto_dataset_version', default='true', label='Auto dataset versions?') or '').lower() in {'1', 'true', 'yes'}
ENFORCE = (_resolve_param('enforce', default='false') or '').lower() in {'1', 'true', 'yes'}

print('Configuration summary:')
for key in ['CONFIG_PATH', 'CATALOG', 'SCHEMA', 'TABLE', 'DATASET_ID', 'DATA_PRODUCT_ID', 'OUTPUT_PORT', 'CONTRACT_ID', 'AUTO_DATASET_VERSION', 'CHECKPOINT_ROOT', 'ENFORCE']:
    print(f'  {key} = {globals()[key]}')


## 2. Initialise Spark and service clients

In [None]:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
print(f'Using Spark {spark.version}')


In [None]:

from dc43_service_clients.bootstrap import load_service_clients

suite = load_service_clients(CONFIG_PATH)
if suite.contract is None or suite.data_product is None or suite.governance is None:
    raise RuntimeError('Contract, data product, and governance services are required')


## 3. Prepare Unity Catalog targets

In [None]:

table_name = f"{CATALOG}.{SCHEMA}.{TABLE}"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")
print(f'Streaming into {table_name} with checkpoints at {CHECKPOINT_ROOT}')


## 4. Build contract revisions

In [None]:
from dc43_integrations.examples.databricks_delta_versioning_support import (
    VersionedWriteSpec,
    build_contract,
    ensure_active_data_product,
    register_contracts,
)

contracts = [
    build_contract(
        version='0.1.0',
        contract_id=CONTRACT_ID,
        table_name=table_name,
        catalog=CATALOG,
        schema=SCHEMA,
        allowed_currencies=['EUR', 'USD'],
        include_discount=False,
    ),
    build_contract(
        version='0.2.0',
        contract_id=CONTRACT_ID,
        table_name=table_name,
        catalog=CATALOG,
        schema=SCHEMA,
        allowed_currencies=['EUR', 'USD'],
        include_discount=True,
    ),
    build_contract(
        version='0.3.0',
        contract_id=CONTRACT_ID,
        table_name=table_name,
        catalog=CATALOG,
        schema=SCHEMA,
        allowed_currencies=['EUR', 'USD', 'GBP'],
        include_discount=True,
    ),
]

register_contracts(suite.contract, contracts)
product_version = None
for contract in contracts:
    product = ensure_active_data_product(
        data_product_service=suite.data_product,
        data_product_id=DATA_PRODUCT_ID,
        port_name=OUTPUT_PORT,
        contract=contract,
        physical_location=table_name,
    )
    product_version = product.version
DATA_PRODUCT_VERSION = product_version
print(f'Registered {len(contracts)} contract revisions under {CONTRACT_ID}')


## 5. Define streaming dataset versions

In [None]:

writes = [
    VersionedWriteSpec(
        contract=contracts[0],
        dataset_version=None if AUTO_DATASET_VERSION else '1.0.0',
        rows=[
            {
                'order_id': 1,
                'customer_id': 101,
                'order_ts': '2024-01-01T10:00:00Z',
                'amount': 125.5,
                'currency': 'EUR',
            },
            {
                'order_id': 2,
                'customer_id': 102,
                'order_ts': '2024-01-02T11:15:00Z',
                'amount': 75.0,
                'currency': 'USD',
            },
        ],
    ),
    VersionedWriteSpec(
        contract=contracts[1],
        dataset_version=None if AUTO_DATASET_VERSION else '1.1.0',
        rows=[
            {
                'order_id': 1,
                'customer_id': 101,
                'order_ts': '2024-02-01T09:00:00Z',
                'amount': 135.0,
                'currency': 'EUR',
                'discount_rate': 0.05,
            },
            {
                'order_id': 2,
                'customer_id': 102,
                'order_ts': '2024-02-02T09:30:00Z',
                'amount': 80.0,
                'currency': 'USD',
                'discount_rate': 0.10,
            },
        ],
    ),
    VersionedWriteSpec(
        contract=contracts[2],
        dataset_version=None if AUTO_DATASET_VERSION else '2.0.0',
        rows=[
            {
                'order_id': 1,
                'customer_id': 101,
                'order_ts': '2024-03-01T08:30:00Z',
                'amount': 140.0,
                'currency': 'EUR',
                'discount_rate': 0.08,
            },
            {
                'order_id': 2,
                'customer_id': 102,
                'order_ts': '2024-03-02T12:45:00Z',
                'amount': 82.5,
                'currency': 'USD',
                'discount_rate': 0.05,
            },
            {
                'order_id': 3,
                'customer_id': 103,
                'order_ts': '2024-03-03T14:10:00Z',
                'amount': 210.0,
                'currency': 'GBP',
                'discount_rate': 0.15,
            },
        ],
    ),
]
print(f'Prepared {len(writes)} streaming dataset versions')


## 6. Stream governed dataset versions

In [None]:

from dc43_integrations.examples.databricks_delta_versioning_support import (
    drain_streaming_queries,
    extract_streaming_queries,
    write_streaming_dataset_version,
)

results = []
status_payloads = []
for spec in writes:
    validation, status = write_streaming_dataset_version(
        spark=spark,
        spec=spec,
        dataset_id=DATASET_ID,
        data_product_id=DATA_PRODUCT_ID,
        output_port=OUTPUT_PORT,
        table_name=table_name,
        governance_service=suite.governance,
        enforce=ENFORCE,
        data_product_version=DATA_PRODUCT_VERSION,
        checkpoint_root=CHECKPOINT_ROOT,
    )
    results.append((spec.dataset_version, validation.status, getattr(status, 'status', None)))
    status_payloads.append(status)
    message = (
        f"- Streamed {spec.dataset_version} for {spec.contract.id}@{spec.contract.version}: "
        f"validation={validation.status} governance={getattr(status, 'status', 'n/a')}"
    )
    print(message)

query_handles = extract_streaming_queries(status_payloads)
drain_streaming_queries(query_handles)
results


## 7. Inspect Delta table history

In [None]:

from dc43_integrations.examples.databricks_delta_versioning_support import describe_delta_history

delta_history = describe_delta_history(spark, table_name)
for record in delta_history:
    version = record.get('version')
    ts = record.get('timestamp')
    op = record.get('operation')
    print(f'version={version} timestamp={ts} operation={op}')
delta_history


## 8. Render the compatibility matrix

In [None]:

from dc43_integrations.examples.databricks_delta_versioning_support import (
    collect_status_matrix,
    render_markdown_matrix,
)

entries = collect_status_matrix(suite.governance, dataset_id=DATASET_ID)
markdown = render_markdown_matrix(entries)
try:
    if 'displayHTML' in globals():
        displayHTML(f'<pre>{markdown}</pre>')
    else:
        print(markdown)
except Exception:
    print(markdown)
markdown
