In [None]:
import json

In [None]:
from databricks.labs.lakebridge.config import (
    DatabaseConfig,
    ReconcileConfig,
    ReconcileMetadataConfig
)
from databricks.labs.lakebridge.reconcile.recon_config import Table
from databricks.labs.lakebridge.reconcile.trigger_recon_service import TriggerReconService
from databricks.sdk import WorkspaceClient
from databricks.labs.lakebridge import __version__
from dataclasses import dataclass


In [None]:
dbutils.widgets.text("recon_config", "")
# 1. Initialize the target_env widget (Passed from YAML)
dbutils.widgets.text("target_env", "dev")  # <--- CHANGED

input_json = dbutils.widgets.get("recon_config")
# 2. Get the environment value
env = dbutils.widgets.get("target_env") 

In [None]:
results_schema = f"lakebridge_recon_{env}"

In [None]:
if not input_json:
    raise ValueError(" recon_config parameter is EMPTY")

all_table_configs = json.loads(input_json)

if not all_table_configs:
    raise ValueError(" recon_config JSON is EMPTY")

#print(f"Received {len(all_table_configs)} tables to process.")


In [None]:
@dataclass
class TableRecon:
    source_schema: str
    target_catalog: str
    target_schema: str
    tables: list[Table]
    source_catalog: str | None = None

In [None]:
ws = WorkspaceClient(
    product="lakebridge",
    product_version=__version__
)

In [None]:
for table_config in all_table_configs:
    try:
        print(
            f"Starting Recon for: "
            f"{table_config['source_table']} -> {table_config['target_table']}"
        )

        reconcile_config = ReconcileConfig(
            data_source=table_config["data_source"],
            report_type=table_config["report_type"].lower(),
            secret_scope=table_config["secret_scope"],

            database_config=DatabaseConfig(
                source_catalog=table_config["source_catalog"],
                source_schema=table_config["source_schema"],
                target_catalog=table_config["target_catalog"],
                target_schema=table_config["target_schema"]
            ),

            
                
                # Use the dynamic schema variable (NOT hardcoded string)
                schema=results_schema              # <--- CHANGED
            )
        

        table_recon = TableRecon(
            source_schema=table_config["source_schema"],
            target_catalog=table_config["target_catalog"],
            target_schema=table_config["target_schema"],
            tables=[
                Table(
                    source_name=table_config["source_table"],
                    target_name=table_config["target_table"],
                    join_columns=table_config["join_columns"]
                )
            ]
        )

        result = TriggerReconService.trigger_recon(
            ws=ws,
            spark=spark,
            table_recon=table_recon,
            reconcile_config=reconcile_config
        )

        print(
            f" SUCCESS: {table_config['source_table']} | "
            f"Recon ID: {result.recon_id}"
        )

    except Exception as e:
        print(
            f" FAILED: "
            f"{table_config.get('source_table', 'Unknown')} - {e}"
        )