In [0]:
def convert_parquet_to_delta_and_register(datasets: list, catalog: str = "olympics", schema: str = "silver"):
    """
    Reads Parquet files from source_container/folder, writes them as Delta
    to destination_container/folder, and registers them as external tables in Unity Catalog.
    Overwrites existing tables and schema.

    Args:
        datasets (list): List of dictionaries with keys:
                         - folder: name of the folder (subpath)
                         - source: name of the source container (e.g., bronze)
                         - destination: name of the destination container (e.g., silver)
        catalog (str): Unity Catalog name
        schema (str): Unity Catalog schema name
    """
    for dataset in datasets:
        folder = dataset["folder"]
        source_container = dataset["source"]
        destination_container = dataset["destination"]

        source_path = f"abfss://{source_container}@ahmedolympicsdatalake.dfs.core.windows.net/{folder}/"
        destination_path = f"abfss://{destination_container}@ahmedolympicsdatalake.dfs.core.windows.net/{folder}/"
        table_name = folder.lower()
        full_table_name = f"{catalog}.{schema}.{table_name}"

        print(f"\n📥 Reading from: {source_path}")
        try:
            df = spark.read.parquet(source_path)
        except Exception as e:
            print(f"❌ Error reading from {source_path}: {e}")
            continue

        print(f"💾 Writing to: {destination_path} as Delta with schema overwrite")
        try:
            df.write.format("delta")\
                .mode("overwrite")\
                .option("overwriteSchema", "true")\
                .save(destination_path)
            print(f"✅ Delta written to: {destination_path}")
        except Exception as e:
            print(f"❌ Error writing Delta to {destination_path}: {e}")
            continue

        print(f"🗃️ Registering (or replacing) table in Unity Catalog: {full_table_name}")
        try:
            spark.sql(f"DROP TABLE IF EXISTS {full_table_name}")
            spark.sql(f"""
                CREATE TABLE {full_table_name}
                USING DELTA
                LOCATION '{destination_path}'
            """)
            print(f"✅ Table registered successfully: {full_table_name}")
        except Exception as e:
            print(f"❌ Error registering table {full_table_name}: {e}")
my_datasets = [
    {"folder": "coaches", "source": "bronze", "destination": "silver"},
    {"folder": "athletes", "source": "bronze", "destination": "silver"},
    {"folder": "events", "source": "bronze", "destination": "silver"},
    {"folder": "nocs", "source": "bronze", "destination": "silver"}
]

convert_parquet_to_delta_and_register(my_datasets)



📥 Reading from: abfss://bronze@ahmedolympicsdatalake.dfs.core.windows.net/coaches/
💾 Writing to: abfss://silver@ahmedolympicsdatalake.dfs.core.windows.net/coaches/ as Delta with schema overwrite
✅ Delta written to: abfss://silver@ahmedolympicsdatalake.dfs.core.windows.net/coaches/
🗃️ Registering (or replacing) table in Unity Catalog: olympics.silver.coaches
✅ Table registered successfully: olympics.silver.coaches

📥 Reading from: abfss://bronze@ahmedolympicsdatalake.dfs.core.windows.net/athletes/
💾 Writing to: abfss://silver@ahmedolympicsdatalake.dfs.core.windows.net/athletes/ as Delta with schema overwrite
✅ Delta written to: abfss://silver@ahmedolympicsdatalake.dfs.core.windows.net/athletes/
🗃️ Registering (or replacing) table in Unity Catalog: olympics.silver.athletes
✅ Table registered successfully: olympics.silver.athletes

📥 Reading from: abfss://bronze@ahmedolympicsdatalake.dfs.core.windows.net/events/
💾 Writing to: abfss://silver@ahmedolympicsdatalake.dfs.core.windows.net/even