In [0]:
from datetime import datetime

# === Widget and Secret Inputs ===
container_name = dbutils.widgets.get("container_name")
storage_account_key = dbutils.secrets.get(scope="zillowsecrets", key="storage_account_key")
archive = dbutils.widgets.get("archive") 
catalog = dbutils.widgets.get("catalog")
storage_account_name = dbutils.widgets.get("storage_account_name")

# === Configure Spark Access ===
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net", storage_account_key)

# === Per-Table Configurations ===
table_configs = [
    {
        "table": "taxhistory_silver",
        "schema": "taxhistory",
        "source_folder": "raw_data/TaxHistory"
    },
    {
        "table": "pricehistory_silver",
        "schema": "pricehistory",
        "source_folder": "raw_data/PriceHistory"
    },
    {
        "table": "property_silver",
        "schema": "propertyextended",
        "source_folder": "raw_data/propertyExtendedSearch"
    }
]

# === Main Processing Loop ===
for config in table_configs:
    table = config["table"]
    schema = config["schema"]
    source_folder = config["source_folder"]

    print(f"\n Processing table: {table} from schema: {schema}")

    source_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{source_folder}"
    archive_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{archive}/{table}"

    # List files in source folder
    try:
        items = dbutils.fs.ls(source_path)
        loaded_files = [item.name for item in items]
        loaded_files_set = set(loaded_files)
    except Exception as e:
        error_msg = f" Error accessing source folder for {table}: {str(e)}"
        existing_errors = dbutils.jobs.taskValues.get(key="error", debugValue="")
        updated_errors = existing_errors + f"\n{error_msg}" if existing_errors else error_msg
        dbutils.jobs.taskValues.set(key="error", value=updated_errors)
        print(error_msg)
        continue

    # Fetch file names already loaded into table
    try:
        df = spark.sql(f"SELECT DISTINCT(file_name) FROM {catalog}.{schema}.{table}")
        table_files_set = set([row.file_name for row in df.collect()])
    except Exception as e:
        error_msg = f"Error querying table {catalog}.{schema}.{table}: {str(e)}"
        existing_errors = dbutils.jobs.taskValues.get(key="error", debugValue="")
        updated_errors = existing_errors + f"\n{error_msg}" if existing_errors else error_msg
        dbutils.jobs.taskValues.set(key="error", value=updated_errors)
        print(error_msg)
        continue

    # Determine files to move
    files_to_move = loaded_files_set.intersection(table_files_set)

    # Move files from source to archive
    for file_name in files_to_move:
        src_file = f"{source_path}/{file_name}"
        dst_file = f"{archive_path}/{file_name}"
        try:
            dbutils.fs.mv(src_file, dst_file)
            print(f" Moved {file_name} from {source_path} to {archive_path}")
        except Exception as e:
            error_msg = f" Error moving file {file_name}: {str(e)}"
            existing_errors = dbutils.jobs.taskValues.get(key="error", debugValue="")
            updated_errors = existing_errors + f"\n{error_msg}" if existing_errors else error_msg
            dbutils.jobs.taskValues.set(key="error", value=updated_errors)
            print(error_msg)

# === Final Task Status Logging ===
dbutils.jobs.taskValues.set(key="status", value="completed")



