The following cell configures a new Spark Catalog called **iceberg_catalog** which is compatible with the Glue Data Catalog and has support for Iceberg tables. The `spark.sql.catalog.<catalog_name>.warehouse` property is used to set up the default location for databases and tables created within the **iceberg_catalog**.

**Action needed:** Update the property `spark.sql.catalog.iceberg_catalog.warehouse` and replace the **S3 bucket** placeholder.


In [None]:
%%configure -f
{   "numExecutors": 20, 
    "executorMemory": "27G",
    "executorCores": 4, 
    "conf": {
         "spark.sql.catalog.iceberg_catalog.warehouse":"s3://<s3bucket>/warehouse/bigdata.db/employeeauto/",
         "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
         "spark.sql.catalog.iceberg_catalog":"org.apache.iceberg.spark.SparkCatalog",
         "spark.sql.catalog.iceberg_catalog.catalog-impl":"org.apache.iceberg.aws.glue.GlueCatalog",
         "spark.hadoop.hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
         "spark.dynamicAllocation.enabled": "false",
         "spark.emr-serverless.executor.disk":"50G"
    }
}

**We get the post ingestion snapshot to rollback later on.**

In [None]:
%%local
import pandas as pd
pd.set_option('display.max_colwidth', 10000)

In [None]:
%%sql

SELECT *
    FROM iceberg_catalog.bigdata.employeeauto.snapshots
    ORDER BY committed_at DESC Limit 5


In [None]:
oldest_snapshot = spark.sql(f"""
    SELECT snapshot_id
    FROM iceberg_catalog.bigdata.employeeauto.snapshots
    ORDER BY committed_at ASC
    """).collect()[-1]['snapshot_id']


current_snapshot_timestamp = spark.sql(f"""
    SELECT made_current_at
    FROM iceberg_catalog.bigdata.employeeauto.history
    WHERE snapshot_id = {oldest_snapshot}
    ORDER BY made_current_at DESC
    """).collect()[0]['made_current_at']
print(f"Post ingestion snapshot timestamp: {current_snapshot_timestamp}, Post Ingestion snapshot id: {oldest_snapshot} ")

**The query that will delete the data in intervals from one minute. You need to change the starting (start) and ending (end) dates according to your test.**

Example '2024-10-31 00:00:00' 

In [None]:
spark.sql(f""" 

DELETE FROM iceberg_catalog.bigdata.employeeauto
WHERE start_date BETWEEN 'start' AND 'end'
AND SECOND(start_date) % 2 = 0
  
""")


**Now we look to our snapshot table and we should have many position delete files ( remember that we are using a MoR table)**

In [None]:
%%sql

SELECT *
    FROM iceberg_catalog.bigdata.employeeauto.snapshots
    ORDER BY committed_at DESC Limit 5


**Rollback to post ingestion state.**

In [None]:
expire_query = f"""
    CALL iceberg_catalog.system.expire_snapshots(
        'iceberg_catalog.bigdata.employeeauto',
        TIMESTAMP '{current_snapshot_timestamp}')
    """    
spark.sql(expire_query)

**DISABLE the Glue Optimizers for the table here, then issue this query to delete the data again**

In [None]:
%%sql

spark.sql(f""" 

DELETE FROM iceberg_catalog.bigdata.employeeauto
WHERE start_date BETWEEN '2024-10-31 00:00:00' AND '2024-11-02 00:00:00'
AND SECOND(start_date) % 2 = 0
  
""")