# 初期設定

In [5]:
%session_id_prefix native-iceberg-dataframe-
%glue_version 5.0
%idle_timeout 60
%%configure 
{
  "--conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
  "--datalake-formats": "iceberg"
}

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.7 
Setting session ID prefix to native-iceberg-dataframe-
Setting Glue version to: 5.0
Current idle_timeout is None minutes.
idle_timeout has been set to 60 minutes.
The following configurations have been updated: {'--conf': 'spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions', '--datalake-formats': 'iceberg'}


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit


# Sparkセッションの設定
spark = SparkSession.builder \
    .appName("GlueNotebook-Iceberg") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.glue_catalog.warehouse", "s3://waptest20241208/warehouse/") \
    .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .getOrCreate()

# データの読み込み
rawDataPath = "s3://waptest20241208/raw_data/"
auditDf = spark.read.format("parquet").load(f"{rawDataPath}/nyc_tlc/yellow/mar2024/")

# Iceberg のブランチ設定
prodTable = "glue_catalog.nyc.yellow_taxi_trips"
auditBranch = "dataAudit_202403"

# WAP 処理を有効化
spark.sql(f"ALTER TABLE {prodTable} SET TBLPROPERTIES ('write.wap.enabled'='true')")
spark.sql(f"ALTER TABLE {prodTable} CREATE BRANCH {auditBranch}")
spark.conf.set("spark.wap.branch", auditBranch)

Trying to create a Glue session for the kernel.
Session Type: glueetl
Idle Timeout: 60
Session ID: 59ce8257-b22a-439a-92c5-79dda5f66a79
Applying the following default arguments:
--glue_kernel_version 1.0.7
--enable-glue-datacatalog true
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
--datalake-formats iceberg
Waiting for session 59ce8257-b22a-439a-92c5-79dda5f66a79 to get into ready status...
Session 59ce8257-b22a-439a-92c5-79dda5f66a79 has been created.



# Write

In [2]:
# データに年と月を追加
auditDf = auditDf.withColumn("month", lit(3)).withColumn("year", lit(2024))

# データを書き込み
auditDf.writeTo(prodTable).append()




# 中間データの確認

In [3]:
# Auditブランチ
spark.table(prodTable).groupBy("year", "month").count().show()

# mainブランチ
spark.read.option("BRANCH", "main").table(prodTable).groupBy("year", "month").count().show()

+----+-----+-------+
|year|month|  count|
+----+-----+-------+
|2024|    2|3007526|
|2024|    3|3582628|
+----+-----+-------+

+----+-----+-------+
|year|month|  count|
+----+-----+-------+
|2024|    2|3007526|
+----+-----+-------+


# Audit/Publish

In [4]:
from pyspark.sql.functions import col
# Auditブランチからデータのロード
auditData = spark.read.option("BRANCH", auditBranch).table(prodTable)

# Audit
monthDf = auditData.filter(col("month") == 2)

if monthDf.isEmpty():
    # データの登録をスキップ
    # SNSで通知するのがいいかなと
    print("No data found for month = 2. Skipping registration and sending notification.")
else:
    # データの登録
    spark.sql(f"""CALL glue_catalog.system.fast_forward('{prodTable}', 'main', '{auditBranch}')""")
    print("Data registered successfully.")


DataFrame[branch_updated: string, previous_ref: bigint, updated_ref: bigint]
Data registered successfully.


# データの登録を確認

In [5]:
# Auditブランチ
spark.table(prodTable).groupBy("year", "month").count().show()

# mainブランチ
spark.read.option("BRANCH", "main").table(prodTable).groupBy("year", "month").count().show()

+----+-----+-------+
|year|month|  count|
+----+-----+-------+
|2024|    2|3007526|
|2024|    3|3582628|
+----+-----+-------+

+----+-----+-------+
|year|month|  count|
+----+-----+-------+
|2024|    2|3007526|
|2024|    3|3582628|
+----+-----+-------+


# 後処理

In [6]:
spark.sql(f"ALTER TABLE {prodTable} UNSET TBLPROPERTIES ('write.wap.enabled')")

spark.sql(f"ALTER TABLE {prodTable} DROP BRANCH {auditBranch}")

DataFrame[]


In [7]:
%stop_session

Stopping session: 59ce8257-b22a-439a-92c5-79dda5f66a79
Stopped session.
