In [0]:
# dbutils.library.restartPython()

# ⬇️ 1. Clean up sys.path – ONE project root only
import importlib
import pathlib
import sys

root = pathlib.Path("/Workspace/Users/tom_corbin1@hotmail.com/ETL-framework")

# Drop any entry that lives inside the src tree
sys.path = [p for p in sys.path if not p.endswith(("src", "src/delta_engine"))]

# Put the project root right at the front
sys.path.insert(0, str(root))

# ⬇️ 2. Nuke the negative import cache
importlib.invalidate_caches()


import pyspark.sql.types as T

from src.delta_engine.desired.builders import build_desired_catalog
from src.delta_engine.engine import Engine, OrchestratorOptions
from src.delta_engine.models import Column, Table
from src.delta_engine.state.ports import Aspect, SnapshotPolicy

In [0]:
tables = [
    Table(
        catalog_name="dev",
        schema_name="silver",
        table_name="demo_table",
        columns=[
            Column("id", T.IntegerType(), is_nullable=False, comment="pk"),
            Column("name", T.StringType(), is_nullable=True, comment="name"),
        ],
        comment="Demo table",
        properties={"delta.columnMapping.mode": "name"},
        primary_key=["id"],
    )
]

options = OrchestratorOptions(
    aspects=frozenset({Aspect.SCHEMA, Aspect.COMMENTS, Aspect.PROPERTIES, Aspect.PRIMARY_KEY}),
    snapshot_policy=SnapshotPolicy.PERMISSIVE,
    execute=True,
    fail_on_validation_errors=True,
)

desired_catalog = build_desired_catalog(tables)


engine = Engine(spark)
report = engine.run(desired_catalog, options)
print("Actions:", len(report.plan.actions))
print("Validation OK?", report.validation.ok)


from pprint import pprint

pprint(report.__dict__)

In [0]:
{
    "plan": Plan(
        actions=(
            AlignTable(
                table=FullyQualifiedTableName(catalog="dev", schema="silver", table="demo_table"),
                add_columns=None,
                drop_columns=None,
                alter_nullability=(),
                set_column_comments=None,
                set_table_comment=None,
                set_table_properties=SetTableProperties(
                    properties={"delta.columnMapping.mode": "name"}
                ),
                add_primary_key=None,
                drop_primary_key=None,
            ),
        )
    ),
    "snapshot": SnapshotResult(
        state=CatalogState(
            tables={
                "`dev`.`silver`.`demo_table`": TableState(
                    catalog_name="dev",
                    schema_name="silver",
                    table_name="demo_table",
                    exists=True,
                    columns=(
                        ColumnState(
                            name="id", data_type=IntegerType(), is_nullable=False, comment="pk"
                        ),
                        ColumnState(
                            name="name", data_type=StringType(), is_nullable=True, comment="name"
                        ),
                        ColumnState(
                            name="new_column",
                            data_type=StringType(),
                            is_nullable=True,
                            comment="new",
                        ),
                    ),
                    table_comment="Demo table",
                    table_properties={
                        "delta.columnMapping.mode": "name",
                        "delta.enableDeletionVectors": "true",
                    },
                    primary_key=PrimaryKeyState(
                        name="pk_dev_silver_demo_table__id", columns=("id",)
                    ),
                )
            }
        ),
        warnings=(),
    ),
    "validation": ValidationReport(diagnostics=()),
}