# Production Push Setup

This notebook orchestrates production push operations for the Vendor Catalog schema bundle.

Modes:
1. `apply`: apply object SQL, optional base seed SQL, then validate.
2. `drop_except`: drop all views/tables except excluded object names.
3. `truncate_except`: truncate all tables except excluded object names.
4. `rebuild`: preflight write permissions, drop non-excluded objects, then apply+seed+validate.
5. `validate`: validate expected tables/views from SQL object scripts.

Destructive modes are `drop_except`, `truncate_except`, and `rebuild`.


In [None]:
dbutils.widgets.text("catalog", "a1_dlk")
dbutils.widgets.text("schema", "twvendor")
dbutils.widgets.text("sql_root", "/Workspace/Repos/<repo>/setup/production_push/sql")
dbutils.widgets.dropdown("operation", "apply", ["apply", "drop_except", "truncate_except", "rebuild", "validate"])
dbutils.widgets.text("exclude_objects", "schema_version")
dbutils.widgets.dropdown("include_seed", "true", ["true", "false"])
dbutils.widgets.dropdown("include_optimize", "true", ["true", "false"])
dbutils.widgets.dropdown("dry_run", "false", ["true", "false"])
dbutils.widgets.dropdown("confirm_destructive", "false", ["false", "true"])


from datetime import datetime, timezone
import uuid


def parse_bool(value: str) -> bool:
    return str(value or "").strip().lower() in {"1", "true", "yes", "y", "on"}


def resolve_sql_root(raw_value: str) -> str:
    value = str(raw_value or "").strip()
    if "<repo>" not in value:
        return value
    try:
        notebook_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
        notebook_path = str(notebook_path or "").strip()
        if not notebook_path:
            return value
        base_dir = notebook_path.rsplit("/", 1)[0]
        if base_dir.startswith("/Workspace/"):
            candidate = f"{base_dir}/sql"
        elif base_dir.startswith("/"):
            candidate = f"/Workspace{base_dir}/sql"
        else:
            candidate = f"/Workspace/{base_dir}/sql"
        return candidate
    except Exception:
        return value


catalog = dbutils.widgets.get("catalog").strip()
schema = dbutils.widgets.get("schema").strip()
raw_sql_root = dbutils.widgets.get("sql_root").strip()
sql_root = resolve_sql_root(raw_sql_root)
operation = dbutils.widgets.get("operation").strip().lower()
exclude_objects = dbutils.widgets.get("exclude_objects").strip()
include_seed = parse_bool(dbutils.widgets.get("include_seed"))
include_optimize = parse_bool(dbutils.widgets.get("include_optimize"))
dry_run = parse_bool(dbutils.widgets.get("dry_run"))
confirm_destructive = parse_bool(dbutils.widgets.get("confirm_destructive"))

assert catalog, "catalog is required"
assert schema, "schema is required"
assert sql_root, "sql_root is required"
assert operation in {"apply", "drop_except", "truncate_except", "rebuild", "validate"}, "invalid operation"

if operation in {"drop_except", "truncate_except", "rebuild"} and not confirm_destructive and not dry_run:
    raise AssertionError(
        "Destructive operation blocked. Set confirm_destructive=true (or dry_run=true)."
    )

if operation in {"apply", "rebuild"} and dry_run:
    print("WARNING: dry_run=true means no tables, views, or seed data will be created.")

run_id = uuid.uuid4().hex[:12]
run_ts_utc = datetime.now(timezone.utc).isoformat()

print(f"run_id={run_id} run_ts_utc={run_ts_utc}")
print(f"catalog={catalog} schema={schema}")
print(
    f"operation={operation} include_seed={include_seed} include_optimize={include_optimize} "
    f"dry_run={dry_run} confirm_destructive={confirm_destructive}"
)
if raw_sql_root != sql_root:
    print(f"sql_root resolved from placeholder: {raw_sql_root} -> {sql_root}")


In [None]:
import re
from pathlib import Path

TOKEN_PATTERN = re.compile(r"\$\{(CATALOG|SCHEMA)\}")

execution_stats = {
    "files": [],
    "statements_executed": 0,
    "statements_previewed": 0,
    "statements_skipped": 0,
    "statements_failed": 0,
    "retries": 0,
}


def render_tokens(sql_text: str) -> str:
    context = {"CATALOG": catalog, "SCHEMA": schema}
    return TOKEN_PATTERN.sub(lambda m: context[m.group(1)], sql_text)


def split_sql_statements(sql_text: str) -> list[str]:
    statements: list[str] = []
    current: list[str] = []
    in_single = False
    in_double = False
    idx = 0

    while idx < len(sql_text):
        ch = sql_text[idx]
        nxt = sql_text[idx + 1] if idx + 1 < len(sql_text) else ""

        if ch == "'" and not in_double:
            current.append(ch)
            if in_single and nxt == "'":
                current.append(nxt)
                idx += 1
            else:
                in_single = not in_single
        elif ch == '"' and not in_single:
            in_double = not in_double
            current.append(ch)
        elif ch == ";" and not in_single and not in_double:
            stmt = "".join(current).strip()
            if stmt:
                statements.append(stmt)
            current = []
        else:
            current.append(ch)
        idx += 1

    tail = "".join(current).strip()
    if tail:
        statements.append(tail)

    return statements


def normalize_object_name(token: str) -> str:
    value = str(token or "").strip().replace("`", "")
    if "." in value:
        value = value.split(".")[-1]
    return value.strip().lower()


def with_default_feature(statement: str) -> str:
    if "USING DELTA" in statement.upper() and "TBLPROPERTIES" not in statement.upper():
        return re.sub(
            r"(?i)USING\s+DELTA",
            "USING DELTA TBLPROPERTIES ('delta.feature.allowColumnDefaults' = 'supported')",
            statement,
            count=1,
        )
    return statement


def execute_sql(statement: str) -> bool:
    if dry_run:
        preview = statement if len(statement) <= 180 else (statement[:180] + "...")
        print(f"[dry_run] {preview}")
        return False
    spark.sql(statement)
    return True


def has_permission_error(message: str) -> bool:
    lowered = str(message or "").lower()
    return (
        "insufficient privileges" in lowered
        or "insufficient_permissions" in lowered
        or "permission_denied" in lowered
        or "not authorized" in lowered
    )


def catalog_and_schema_are_usable() -> bool:
    try:
        spark.sql(f"USE CATALOG `{catalog}`")
        spark.sql(f"USE SCHEMA `{schema}`")
        return True
    except Exception:
        return False


def catalog_is_usable() -> bool:
    try:
        spark.sql(f"USE CATALOG `{catalog}`")
        return True
    except Exception:
        return False


def execute_sql_statement(statement: str, *, file_name: str, index: int) -> str:
    try:
        ran = execute_sql(statement)
        if ran:
            execution_stats["statements_executed"] += 1
            return "executed"
        execution_stats["statements_previewed"] += 1
        return "previewed"
    except Exception as exc:
        message = str(exc)
        upper = statement.upper()

        if upper.startswith("CREATE CATALOG IF NOT EXISTS") and has_permission_error(message):
            if catalog_is_usable():
                print(f"[skip-create-catalog-no-privilege] {file_name}#{index}")
                execution_stats["statements_skipped"] += 1
                return "skipped"

        if upper.startswith("CREATE SCHEMA IF NOT EXISTS") and has_permission_error(message):
            if catalog_and_schema_are_usable():
                print(f"[skip-create-schema-no-privilege] {file_name}#{index}")
                execution_stats["statements_skipped"] += 1
                return "skipped"

        if upper.startswith("OPTIMIZE ") and ("TABLE_OR_VIEW_NOT_FOUND" in message or "not found" in message.lower()):
            print(f"[skip-optimize-missing] {file_name}#{index}")
            execution_stats["statements_skipped"] += 1
            return "skipped"

        if upper.startswith("CREATE TABLE") and "DEFAULT" in upper and "WRONG_COLUMN_DEFAULTS_FOR_DELTA_FEATURE_NOT_ENABLED" in message:
            patched = with_default_feature(statement)
            print(f"[retry-default-feature] {file_name}#{index}")
            execution_stats["retries"] += 1
            ran = execute_sql(patched)
            if ran:
                execution_stats["statements_executed"] += 1
                return "executed"
            execution_stats["statements_previewed"] += 1
            return "previewed"

        if "CREATE OR REPLACE VIEW VW_EMPLOYEE_DIRECTORY" in upper and (
            "TABLE_OR_VIEW_NOT_FOUND" in message
            or "PERMISSION_DENIED" in message
            or "insufficient privileges" in message.lower()
        ):
            fallback = f"""
            CREATE OR REPLACE VIEW `{catalog}`.`{schema}`.vw_employee_directory AS
            SELECT
              employee_id,
              lower(coalesce(network_id, login_identifier, email)) AS login_identifier,
              network_id,
              email,
              first_name,
              last_name,
              display_name,
              CASE
                WHEN lower(trim(cast(active_flag AS STRING))) IN ('1', 'true', 't', 'y', 'yes', 'a', 'active') THEN 'A'
                ELSE 'T'
              END AS active_flag,
              CASE
                WHEN lower(trim(cast(active_flag AS STRING))) IN ('1', 'true', 't', 'y', 'yes', 'a', 'active') THEN 'Active'
                ELSE 'Inactive'
              END AS employment_status,
              NULL AS last_action_date,
              NULL AS hire_date,
              NULL AS termination_date,
              NULL AS job_title,
              NULL AS department_name,
              NULL AS manager_level,
              'Unknown' AS hierarchy_tier,
              1 AS default_security_level,
              manager_id
            FROM `{catalog}`.`{schema}`.app_employee_directory
            """.strip()
            print(f"[fallback-vw_employee_directory] {file_name}#{index}")
            execution_stats["retries"] += 1
            ran = execute_sql(fallback)
            if ran:
                execution_stats["statements_executed"] += 1
                return "executed"
            execution_stats["statements_previewed"] += 1
            return "previewed"

        execution_stats["statements_failed"] += 1
        raise RuntimeError(f"Failed in {file_name} statement #{index}: {exc}") from exc


def execute_sql_file(path: Path) -> None:
    if not path.exists() or not path.is_file():
        raise FileNotFoundError(f"SQL file not found: {path}")

    rendered = render_tokens(path.read_text(encoding="utf-8"))
    statements = split_sql_statements(rendered)
    print(f"Executing {path.name}: statements={len(statements)}")

    file_counts = {"executed": 0, "previewed": 0, "skipped": 0}
    for idx, statement in enumerate(statements, start=1):
        status = execute_sql_statement(statement, file_name=path.name, index=idx)
        file_counts[status] = file_counts.get(status, 0) + 1

    execution_stats["files"].append(
        {
            "name": path.name,
            "statements": len(statements),
            "executed": file_counts.get("executed", 0),
            "previewed": file_counts.get("previewed", 0),
            "skipped": file_counts.get("skipped", 0),
        }
    )
    print(
        f"Completed {path.name}: executed={file_counts.get('executed', 0)} "
        f"previewed={file_counts.get('previewed', 0)} skipped={file_counts.get('skipped', 0)}"
    )


In [None]:
OBJECT_SQL_FILES = [
    "00_create_v1_schema.sql",
    "01_create_lookup_tables.sql",
    "02_create_core_tables.sql",
    "03_create_assignment_tables.sql",
    "04_create_governance_tables.sql",
    "05_create_functional_parity_bridge.sql",
    "06_create_functional_runtime_compat.sql",
    "07_create_reporting_views.sql",
    "90_create_indexes.sql",
]

SEED_SQL_FILES = [
    "94_seed_critical_reference_data.sql",
    "95_seed_base_security_roles.sql",
    "96_seed_help_center.sql",
]

if include_optimize:
    object_sql_files_to_run = list(OBJECT_SQL_FILES)
else:
    object_sql_files_to_run = [name for name in OBJECT_SQL_FILES if name != "90_create_indexes.sql"]

CREATE_TABLE_RE = re.compile(r"(?is)CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?([`\w\.]+)")
CREATE_VIEW_RE = re.compile(r"(?is)CREATE\s+(?:OR\s+REPLACE\s+)?VIEW\s+(?:IF\s+NOT\s+EXISTS\s+)?([`\w\.]+)")


def preflight_sql_files() -> None:
    sql_root_path = Path(sql_root)
    if not sql_root_path.exists() or not sql_root_path.is_dir():
        raise FileNotFoundError(
            "sql_root does not exist or is not a directory: "
            f"{sql_root_path}. Set sql_root to your workspace folder ending in /setup/production_push/sql."
        )

    required_files = list(object_sql_files_to_run)
    if include_seed:
        required_files.extend(SEED_SQL_FILES)

    missing = [name for name in required_files if not (sql_root_path / name).is_file()]
    if missing:
        raise FileNotFoundError(
            f"Missing SQL files under {sql_root_path}: {', '.join(missing)}"
        )

    print(f"Preflight SQL root OK: {sql_root_path}")


def expected_objects_from_sql() -> tuple[set[str], set[str]]:
    expected_tables: set[str] = set()
    expected_views: set[str] = set()

    for file_name in OBJECT_SQL_FILES:
        sql_file = Path(sql_root) / file_name
        if not sql_file.exists() or not sql_file.is_file():
            raise FileNotFoundError(f"Required object SQL file not found: {sql_file}")

        rendered = render_tokens(sql_file.read_text(encoding="utf-8"))
        expected_tables.update(normalize_object_name(item) for item in CREATE_TABLE_RE.findall(rendered))
        expected_views.update(normalize_object_name(item) for item in CREATE_VIEW_RE.findall(rendered))

    return expected_tables, expected_views


preflight_sql_files()
expected_tables, expected_views = expected_objects_from_sql()
print(f"Expected tables: {len(expected_tables)}")
print(f"Expected views: {len(expected_views)}")


In [None]:
def list_schema_objects() -> tuple[set[str], set[str]]:
    query = (
        f"SELECT table_name, table_type "
        f"FROM `{catalog}`.information_schema.tables "
        f"WHERE table_schema = '{schema}'"
    )
    rows = spark.sql(query).collect()

    tables: set[str] = set()
    views: set[str] = set()
    for row in rows:
        name = str(row["table_name"] or "").strip().lower()
        table_type = str(row["table_type"] or "").strip().upper()
        if not name:
            continue
        if table_type == "VIEW":
            views.add(name)
        else:
            tables.add(name)

    return tables, views


def print_schema_counts(label: str) -> tuple[set[str], set[str]]:
    tables, views = list_schema_objects()
    print(f"{label}: tables={len(tables)} views={len(views)}")
    return tables, views


def parse_excludes(raw_value: str) -> set[str]:
    return {
        value.strip().lower()
        for value in str(raw_value or "").split(",")
        if value.strip()
    }


def preflight_apply_permissions() -> None:
    if dry_run:
        print("Skipping write-permission preflight because dry_run=true")
        return

    probe_name = f"_prod_push_perm_probe_{run_id}"
    probe_ref = f"`{catalog}`.`{schema}`.`{probe_name}`"

    try:
        spark.sql(f"USE CATALOG `{catalog}`")
        spark.sql(f"USE SCHEMA `{schema}`")
    except Exception as exc:
        raise AssertionError(
            f"Cannot access target catalog/schema {catalog}.{schema}: {exc}"
        ) from exc

    try:
        spark.sql(f"CREATE TABLE {probe_ref} (probe_id STRING) USING DELTA")
        spark.sql(f"INSERT INTO {probe_ref} VALUES ('ok')")
        spark.sql(f"DROP TABLE IF EXISTS {probe_ref}")
        print("Write-permission preflight passed (CREATE/INSERT/DROP).")
    except Exception as exc:
        try:
            spark.sql(f"DROP TABLE IF EXISTS {probe_ref}")
        except Exception:
            pass
        raise AssertionError(
            "Write-permission preflight failed. Confirm CREATE TABLE and INSERT privileges on "
            f"{catalog}.{schema}. Original error: {exc}"
        ) from exc


def drop_objects_except(exclude_set: set[str]) -> None:
    before_tables, before_views = print_schema_counts("Before drop_except")

    for name in sorted(before_views):
        if name in exclude_set:
            print(f"Skip view (excluded): {name}")
            continue
        execute_sql(f"DROP VIEW IF EXISTS `{catalog}`.`{schema}`.`{name}`")

    for name in sorted(before_tables):
        if name in exclude_set:
            print(f"Skip table (excluded): {name}")
            continue
        execute_sql(f"DROP TABLE IF EXISTS `{catalog}`.`{schema}`.`{name}`")

    print_schema_counts("After drop_except")


def truncate_tables_except(exclude_set: set[str]) -> None:
    before_tables, _ = print_schema_counts("Before truncate_except")
    for name in sorted(before_tables):
        if name in exclude_set:
            print(f"Skip table (excluded): {name}")
            continue
        execute_sql(f"TRUNCATE TABLE `{catalog}`.`{schema}`.`{name}`")
    print_schema_counts("After truncate_except")


def validate_schema() -> None:
    existing_tables, existing_views = list_schema_objects()
    missing_tables = sorted(expected_tables - existing_tables)
    missing_views = sorted(expected_views - existing_views)

    if missing_tables or missing_views:
        if missing_tables:
            print("Missing tables:")
            for name in missing_tables:
                print(f"- {name}")
        if missing_views:
            print("Missing views:")
            for name in missing_views:
                print(f"- {name}")
        raise AssertionError("Schema validation failed.")

    print("Schema validation passed.")


def validate_seed_rows() -> None:
    if dry_run or not include_seed:
        return

    seed_expectations = {
        "app_lookup_option": 1,
        "sec_role_definition": 1,
        "vendor_help_article": 1,
    }
    failures: list[str] = []
    for table_name, min_rows in seed_expectations.items():
        query = f"SELECT COUNT(*) AS cnt FROM `{catalog}`.`{schema}`.`{table_name}`"
        count = int(spark.sql(query).collect()[0]["cnt"])
        print(f"Seed check {table_name}: rows={count}")
        if count < int(min_rows):
            failures.append(f"{table_name} has {count} rows (expected >= {min_rows})")

    if failures:
        raise AssertionError("Seed validation failed: " + "; ".join(failures))


def apply_bundle(*, verify_permissions: bool = True) -> None:
    if verify_permissions:
        preflight_apply_permissions()

    before_tables, before_views = print_schema_counts("Before apply")

    for file_name in object_sql_files_to_run:
        execute_sql_file(Path(sql_root) / file_name)

    if include_seed:
        for file_name in SEED_SQL_FILES:
            execute_sql_file(Path(sql_root) / file_name)

    after_tables, after_views = print_schema_counts("After apply (pre-validate)")

    if not dry_run and expected_tables and len(after_tables) == 0:
        raise AssertionError(
            "Apply completed but schema is still empty. Check CREATE TABLE permissions, sql_root path, and operation mode."
        )

    validate_schema()
    validate_seed_rows()


In [None]:
exclude_set = parse_excludes(exclude_objects)

print("Execution summary:")
print(f"- run_id: {run_id}")
print(f"- operation: {operation}")
print(f"- catalog.schema: {catalog}.{schema}")
print(f"- sql_root: {sql_root}")
print(f"- include_seed: {include_seed}")
print(f"- include_optimize: {include_optimize}")
print(f"- dry_run: {dry_run}")
print(f"- confirm_destructive: {confirm_destructive}")
print(f"- exclude_objects: {sorted(exclude_set)}")

if operation == "apply":
    apply_bundle()
elif operation == "rebuild":
    preflight_apply_permissions()
    drop_objects_except(exclude_set)
    apply_bundle(verify_permissions=False)
elif operation == "drop_except":
    drop_objects_except(exclude_set)
elif operation == "truncate_except":
    truncate_tables_except(exclude_set)
elif operation == "validate":
    validate_schema()
else:
    raise ValueError(f"Unsupported operation: {operation}")

if execution_stats["files"]:
    print("File execution stats:")
    for item in execution_stats["files"]:
        print(
            f"- {item['name']}: statements={item['statements']} "
            f"executed={item['executed']} previewed={item['previewed']} skipped={item['skipped']}"
        )

print(
    "Statement totals: "
    f"executed={execution_stats['statements_executed']} "
    f"previewed={execution_stats['statements_previewed']} "
    f"skipped={execution_stats['statements_skipped']} "
    f"failed={execution_stats['statements_failed']} "
    f"retries={execution_stats['retries']}"
)

if not dry_run:
    tables, views = list_schema_objects()
    print(f"Current schema object counts: tables={len(tables)} views={len(views)}")

print("Operation complete.")
