In [None]:
import json 
from dataclasses import dataclass

from databricks.sdk import WorkspaceClient
from databricks.labs.lakebridge import __version__
from databricks.labs.lakebridge.config import (
    DatabaseConfig,
    ReconcileConfig,
    ReconcileMetadataConfig
)
from databricks.labs.lakebridge.reconcile.recon_config import (
    Filters
)
from databricks.labs.lakebridge.reconcile.recon_config import Table
from databricks.labs.lakebridge.reconcile.trigger_recon_service import TriggerReconService

In [None]:
dbutils.widgets.text("recon_config", "")
dbutils.widgets.text("target_env", "dev")

table_configs = json.loads(dbutils.widgets.get("recon_config"))
env = dbutils.widgets.get("target_env")

print(f"Loaded {len(table_configs)} table configs for env = {env}")

In [None]:
ws = WorkspaceClient(
    product="lakebridge",
    product_version=__version__
)

In [None]:
@dataclass
class TableRecon:
    source_schema: str
    target_catalog: str
    target_schema: str
    tables: list
    source_catalog: str | None = None

In [None]:
results_schema = f"lakebridge_recon_{env}"

In [None]:
try:
    reconcile_config = ReconcileConfig(
        data_source=table_configs["data_source"],
        report_type=table_configs["report_type"].lower(),
        secret_scope=table_configs["secret_scope"],

        database_config=DatabaseConfig(
            source_catalog=table_configs["source_catalog"],
            source_schema=table_configs["source_schema"],
            target_catalog=table_configs["target_catalog"],
            target_schema=table_configs["target_schema"]
        ),

        metadata_config=ReconcileMetadataConfig(
            catalog=table_configs["target_catalog"],
            schema="lakebridge_recon"
        ))

    table_recon = TableRecon(
        source_schema=table_configs["source_schema"],
        target_catalog=table_configs["target_catalog"],
        target_schema=table_configs["target_schema"],
        tables=[
            Table(
                source_name=table_configs["source_table"],
                target_name=table_configs["target_table"],
                join_columns=table_configs["join_columns"],
                filters=Filters(
                source=f"{table_configs['filters_column']} {table_configs['filter_condition']}",
                target=f"{table_configs['filters_column']} {table_configs['filter_condition']}"
                )
            )
        ]
    )

    result = TriggerReconService.trigger_recon(
        ws=ws,
        spark=spark,
        table_recon=table_recon,
        reconcile_config=reconcile_config
    )
    print(f"Recon Success for {table_configs['source_table']} | Recon ID: {result.recon_id}")
except Exception as e:
    print(f"Recon Failed {e}")