In [6]:
%idle_timeout 10
%glue_version 5.0
%worker_type G.1X
%number_of_workers 2
%%configure 
{
  "--conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
  "--datalake-formats": "iceberg",
  "--additional-python-modules": "awswrangler==3.9.1"
}

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.7 
Current idle_timeout is None minutes.
idle_timeout has been set to 10 minutes.
Setting Glue version to: 5.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 2
The following configurations have been updated: {'--conf': 'spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions', '--datalake-formats': 'iceberg', '--additional-python-modules': 'awswrangler==3.9.1'}


In [52]:
import awswrangler as wr
import boto3
from pyspark.sql import DataFrame, SparkSession
from datetime import datetime
import uuid




In [81]:
# Athena
athena_workgroup = "primary"

# Glue/S3: Bucket, Database, Table
now_string = datetime.now().strftime("%Y%m%d%H%M%S")
s3_bucket = f"wap-demo-{now_string}" # replace with your own bucket name
catalog_name = "glue_catalog"
database_name = f"_wap_demo_{now_string}"
table_name = "my_iceberg_table_athena"
full_table_name = f"{catalog_name}.{database_name}.{table_name}"

print(f"{s3_bucket=}")
print(f"{catalog_name=}")
print(f"{database_name=}")
print(f"{table_name=}")
print(f"{full_table_name=}")

s3_bucket='dsc-wap-athena-20250118183836'
catalog_name='glue_catalog'
database_name='_wap_demo_20250118183836'
table_name='my_iceberg_table_athena'
full_table_name='glue_catalog._wap_demo_20250118183836.my_iceberg_table_athena'


In [82]:
spark = SparkSession.builder \
    .config("spark.sql.session.timeZone", "UTC") \
    .config("spark.sql.iceberg.handle-timestamp-without-timezone", "true") \
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic") \
    .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog") \
    .config(f"spark.sql.catalog.{catalog_name}.warehouse", f"s3://{s3_bucket}/{catalog_name}/") \
    .config(f"spark.sql.catalog.{catalog_name}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config(f"spark.sql.catalog.{catalog_name}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config(f"spark.sql.catalog.{catalog_name}.glue.skip-name-validation", "true") \
    .getOrCreate()




# Infrastructure Setup

In [83]:
# Create Bucket
s3_client = boto3.client('s3')
response = s3_client.create_bucket(Bucket=s3_bucket, CreateBucketConfiguration={'LocationConstraint': "eu-central-1"})
print(response)

{'ResponseMetadata': {'RequestId': 'V3E31JMBEMPCDCJ4', 'HostId': 'DcDzEUSd0TEo/1hKk+LmkjIDGvulsSiuRi5DiLiyR4yFdBHJm3sEyza/ydehArD/Si7bgy17GkE=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'DcDzEUSd0TEo/1hKk+LmkjIDGvulsSiuRi5DiLiyR4yFdBHJm3sEyza/ydehArD/Si7bgy17GkE=', 'x-amz-request-id': 'V3E31JMBEMPCDCJ4', 'date': 'Sat, 18 Jan 2025 18:39:01 GMT', 'location': 'http://dsc-wap-athena-20250118183836.s3.amazonaws.com/', 'content-length': '0', 'server': 'AmazonS3'}, 'RetryAttempts': 0}, 'Location': 'http://dsc-wap-athena-20250118183836.s3.amazonaws.com/'}


In [42]:
# Create Database
response = wr.athena.start_query_execution(
    sql=f"CREATE DATABASE IF NOT EXISTS {database_name}",
    wait=True,
    workgroup=athena_workgroup,
)
print(response)




In [43]:
# Create Table
response = wr.athena.start_query_execution(
    sql=f"""
    CREATE TABLE IF NOT EXISTS {database_name}.{table_name} (
        id INT,
        name STRING,
        age INT
    )
    LOCATION 's3://{s3_bucket}/{database_name}/{table_name}/'
    TBLPROPERTIES (
      'table_type'='ICEBERG',
      'format'='parquet',
      'write_compression'='snappy'
    )""",
    wait=True,
    workgroup=athena_workgroup,
)
print(response)




## watch out: branches
- Initialize 'main' branch, otherwise the table would have no branch whatsoever,
- you need a base branch to branch off of for WAP though.
- When not explicitly creating a branch, Iceberg will create a default one which is
- also called 'main' when you insert data for the first time.

In [44]:
# List existing Iceberg table branches
# This is purely informative, it could be removed or logged as INFO in a real world scenario
spark.sql(f"SELECT * FROM {full_table_name}.refs").show()

+----+----+-----------+-----------------------+---------------------+----------------------+
|name|type|snapshot_id|max_reference_age_in_ms|min_snapshots_to_keep|max_snapshot_age_in_ms|
+----+----+-----------+-----------------------+---------------------+----------------------+
+----+----+-----------+-----------------------+---------------------+----------------------+


In [45]:
spark.sql(f"ALTER TABLE {full_table_name} CREATE BRANCH IF NOT EXISTS main")

DataFrame[]


In [46]:
spark.sql(f"SELECT * FROM {full_table_name}.refs").show()

+----+------+-------------------+-----------------------+---------------------+----------------------+
|name|  type|        snapshot_id|max_reference_age_in_ms|min_snapshots_to_keep|max_snapshot_age_in_ms|
+----+------+-------------------+-----------------------+---------------------+----------------------+
|main|BRANCH|7464855188022954752|                   NULL|                 NULL|                  NULL|
+----+------+-------------------+-----------------------+---------------------+----------------------+


# get some data and transform it

## Extract

In [47]:
def read_data(spark: SparkSession) -> DataFrame:
    # sample data
    return spark.createDataFrame(data=[
        (1, "Alice", 28),
        (2, "Bob", 34),
        (3, "Charlie", 23)
    ], schema=["id", "name", "age"])


df = read_data(spark=spark)
df.show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Alice| 28|
|  2|    Bob| 34|
|  3|Charlie| 23|
+---+-------+---+


## Transform

In [48]:
def transform(df: DataFrame) -> DataFrame:
    return df.filter(df.age > 25)

transformed_df = transform(df=df)
transformed_df.show()

+---+-----+---+
| id| name|age|
+---+-----+---+
|  1|Alice| 28|
|  2|  Bob| 34|
+---+-----+---+


# WAP

## WAP Write

In [49]:
# WAP: Write
# Note: 
#  This setting is only enabled temporarily for the WAP pattern.
#  It could be enabled permanently as far as Spark and Iceberg are concerned.
#  When it is set, you can't use the Athena query SHOW CREATE TABLE will throw an error. 
#  An Athena SELECT query on the Iceberg table would still work even with this setting set though.
#  But anyways, we clean it up in the finally block to enable the SHOW CREATE TABLE query again.
spark.sql(f"ALTER TABLE {full_table_name} SET TBLPROPERTIES ('write.wap.enabled'='true')")

DataFrame[]


## Write new data into a temporary branch   

In [53]:
def generate_branch_name(prefix: str = "branch") -> str:
    return f"{prefix}_{uuid.uuid4().hex[:6]}"

audit_branch_name = generate_branch_name(prefix="audit_branch")
print(f"generated branch: {audit_branch_name}")

generated branch: audit_branch_cdd0da


In [55]:
spark.sql(f"SELECT * FROM {full_table_name}.refs").show()

+----+------+-------------------+-----------------------+---------------------+----------------------+
|name|  type|        snapshot_id|max_reference_age_in_ms|min_snapshots_to_keep|max_snapshot_age_in_ms|
+----+------+-------------------+-----------------------+---------------------+----------------------+
|main|BRANCH|7464855188022954752|                   NULL|                 NULL|                  NULL|
+----+------+-------------------+-----------------------+---------------------+----------------------+


In [57]:
spark.sql(f"ALTER TABLE {full_table_name} DROP BRANCH IF EXISTS {audit_branch_name}")
spark.sql(f"ALTER TABLE {full_table_name} CREATE BRANCH {audit_branch_name}")
spark.sql(f"SELECT * FROM {full_table_name}.refs").show()

+-------------------+------+-------------------+-----------------------+---------------------+----------------------+
|               name|  type|        snapshot_id|max_reference_age_in_ms|min_snapshots_to_keep|max_snapshot_age_in_ms|
+-------------------+------+-------------------+-----------------------+---------------------+----------------------+
|audit_branch_cdd0da|BRANCH|7464855188022954752|                   NULL|                 NULL|                  NULL|
|               main|BRANCH|7464855188022954752|                   NULL|                 NULL|                  NULL|
+-------------------+------+-------------------+-----------------------+---------------------+----------------------+


### writing data into audit_branch

In [58]:
(df.write
    .format("iceberg")
    .mode("append")
    .option("branch", audit_branch_name)
    .save(path=full_table_name))




### only audit_branch has the new data

In [59]:
spark.read \
    .format("iceberg") \
    .load(path=full_table_name).show()

+---+----+---+
| id|name|age|
+---+----+---+
+---+----+---+


In [60]:
spark.read \
    .format("iceberg") \
    .option("branch", audit_branch_name) \
    .load(path=full_table_name).show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Alice| 28|
|  2|    Bob| 34|
|  3|Charlie| 23|
+---+-------+---+


# WAP: Audit

Audit temporary branch

**Note:**
- This is for demonstration, in a real world scenario you would want to do a more complex audit.
    - You could for example refactor this function and inject a test suite to run on the branch_df.
    - You could also differentiate between the severity of failures, i.e. "warning" or "failing" checks.

**Note:**
- The Audit is very likely the most interesting part from a business value and analytics perspective.
- Getting the business rules right is where you should focus your attention during development.

In [61]:
branch_df = spark.read \
    .format("iceberg") \
    .option("branch", audit_branch_name) \
    .load(path=full_table_name)




In [68]:
audit_case_1 = branch_df.count() == df.count()
audit_case_2 = branch_df.count() > 0
audit_passed = audit_case_1 and audit_case_2

if audit_passed:
    print("data quality checks passed")
else:
    print("data quality checks failed")

data quality checks passed


# WAP: Publish
- On the happy path all checks passed
- Publish changes from temporary branch to main branch

In [69]:
# Fast-forward merge: [audit_branch] -> [main] 
spark.sql(f"CALL {catalog_name}.system.fast_forward('{full_table_name}', 'main', '{audit_branch_name}')")

DataFrame[branch_updated: string, previous_ref: bigint, updated_ref: bigint]


In [70]:
spark.read \
    .format("iceberg") \
    .load(path=full_table_name).show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Alice| 28|
|  2|    Bob| 34|
|  3|Charlie| 23|
+---+-------+---+


In [71]:
spark.read \
    .format("iceberg") \
    .option("branch", audit_branch_name) \
    .load(path=full_table_name).show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Alice| 28|
|  2|    Bob| 34|
|  3|Charlie| 23|
+---+-------+---+


In [72]:
# The Audit is done. Thus the audit branch has served its purpose and can be deleted.
# Note:
#  This is explicitly NOT part of the finally block, because you might want to analyze
#  the data in the audit branch in case of data quality check failures and only delete it afterwards.
#  This is especially true in case computing the results is expensive.
#  You could also argue against this decision though, for example in case you need
#  to avoid manual interventions in prod altogether, or in case you know you won't analyze results anyway.
#  Being able to look at the faulty results is generally something you want though.
spark.sql(f"ALTER TABLE {full_table_name} DROP BRANCH {audit_branch_name}")

DataFrame[]


In [73]:
spark.read \
    .format("iceberg") \
    .load(path=full_table_name).show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  1|  Alice| 28|
|  2|    Bob| 34|
|  3|Charlie| 23|
+---+-------+---+


In [74]:
# The temporary audit branch doesn't exist anymore so we expect this to fail
spark.read \
    .format("iceberg") \
    .option("branch", audit_branch_name) \
    .load(path=full_table_name).show()

Py4JJavaError: An error occurred while calling o424.load.
: org.apache.iceberg.exceptions.ValidationException: Cannot use branch (does not exist): audit_branch_cdd0da
	at org.apache.iceberg.exceptions.ValidationException.check(ValidationException.java:49)
	at org.apache.iceberg.spark.source.SparkTable.<init>(SparkTable.java:135)
	at org.apache.iceberg.spark.SparkCatalog.load(SparkCatalog.java:902)
	at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:172)
	at org.apache.spark.sql.connector.catalog.CatalogV2Util$.getTable(CatalogV2Util.scala:355)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.loadV2Source(DataSourceV2Utils.scala:140)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.$anonfun$lookupAndLoadDataSource$1(DataSourceV2Utils.scala:168)
	at scala.Option.flatMap(Option.scala:271)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.lookupAndLoadDataSource(DataSourceV2Utils.scala:166)
	at org.apache.spark.sq

# Und der Fehlerfall?

In [65]:
# WAP: Don't publish
# Because when a check failed we know there's an issue with the data
# Note:
#  In a real world scenario you would want to do a more complex Data Quality Check failure handling,
#  i.e. construct an audit report, send an email to the team, to the consumers, log the error, etc.
print("Audit failed. Not publishing changes.")

Audit failed. Not publishing changes.


# Cleanup

In [75]:
spark.sql(f"ALTER TABLE {full_table_name} UNSET TBLPROPERTIES ('write.wap.enabled')")

DataFrame[]


In [77]:
wr.athena.start_query_execution(
    sql=f"DROP DATABASE IF EXISTS {database_name} CASCADE",
    wait=True,
    workgroup=athena_workgroup,
)

{'QueryExecutionId': 'eab34f9a-308f-4986-a5b2-62fa7f94d46e', 'Query': 'DROP DATABASE IF EXISTS _wap_demo_2025_01_18_18_23_23 CASCADE', 'StatementType': 'DDL', 'ResultConfiguration': {'OutputLocation': 's3://dev-athena-results/results/eab34f9a-308f-4986-a5b2-62fa7f94d46e.txt', 'EncryptionConfiguration': {'EncryptionOption': 'SSE_KMS', 'KmsKey': 'alias/key-bucket-default'}}, 'ResultReuseConfiguration': {'ResultReuseByAgeConfiguration': {'Enabled': False}}, 'QueryExecutionContext': {}, 'Status': {'State': 'SUCCEEDED', 'SubmissionDateTime': datetime.datetime(2025, 1, 18, 18, 34, 16, 262000, tzinfo=tzlocal()), 'CompletionDateTime': datetime.datetime(2025, 1, 18, 18, 34, 17, 755000, tzinfo=tzlocal())}, 'Statistics': {'EngineExecutionTimeInMillis': 1415, 'DataScannedInBytes': 0, 'TotalExecutionTimeInMillis': 1493, 'QueryQueueTimeInMillis': 36, 'ServicePreProcessingTimeInMillis': 17, 'ServiceProcessingTimeInMillis': 25, 'ResultReuseInformation': {'ReusedPreviousResult': False}}, 'WorkGroup': '

In [None]:
s3_client.delete_bucket(Bucket=s3_bucket)