In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import (
    col,
    window,
    sum as _sum,
    count,
    countDistinct,
    approx_count_distinct,
    current_timestamp
)

spark = SparkSession.builder \
    .appName("spark_kafka") \
    .config("spark.cores.max", "1") \
    .config("spark.executor.memory", "1g") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.iceberg.type", "hive") \
    .config("spark.sql.catalog.iceberg.uri", "thrift://hive-metastore:9083") \
    .config("spark.sql.catalog.iceberg.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.catalog.iceberg.warehouse", "s3a://warehouse/") \
    .config("spark.sql.catalog.iceberg.s3.endpoint", "http://minio:9000") \
    .getOrCreate()
    # set log level to WARN for spark internals to reduce noise (tùy bạn)
spark.sparkContext.setLogLevel("WARN")

In [26]:
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg.gold.revenue_1m (
    window_start TIMESTAMP,
    window_end TIMESTAMP,
    product_id STRING,
    price LONG,
    order_count LONG,
    revenue DOUBLE,
    event_count LONG,
    update_ts TIMESTAMP
)
PARTITIONED BY (days(window_start));
""")


DataFrame[]

In [25]:
spark.sql('drop table iceberg.gold.revenue_5m')

DataFrame[]

In [27]:
silver_df = (
    spark.readStream
    .format("iceberg")
    .table("iceberg.silver.user_event")
)

purchase_df = (
    silver_df
    .filter(col("event_type") == "purchase")
    .withWatermark("event_time", "2 minutes")
)

revenue_5m_df = (
    purchase_df
    .groupBy(
        window(col("event_time"), "1 minutes"),
        col("product_id"),
        col("price")
    )
    .agg(
        _sum("price").alias("revenue"),
        count("*").alias("event_count"),
        approx_count_distinct("user_id").alias("order_count")
    )
)

final_df = (
    revenue_5m_df
    .select(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        col("product_id"),
        col("order_count").cast("bigint"),
        col("revenue").cast("double"),
        col("event_count").cast("bigint"),
        current_timestamp().alias("update_ts")
    )
)

# =========================
# 4. Write to Iceberg Bronze
# =========================
query = (
    final_df.writeStream
    .format("iceberg")
    .outputMode("append")
    .option(
        "checkpointLocation",
        "s3a://warehouse/checkpoints/gold/revenue_1m"
    )
    .trigger(processingTime='1 minute')
    .toTable("iceberg.gold.revenue_1m")
)

query.awaitTermination()

25/12/16 10:07:23 WARN StateStore: Loaded state store provider in loadTimeMs=3655 for storeId=StateStoreId[ checkpointRootLocation=s3a://warehouse/checkpoints/gold/revenue_1m/state, operatorId=0, partitionId=188, storeName=default ] and queryRunId=51cec904-629a-4b41-88b4-306b8cb496e8
ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/local/lib/python3.10/socket.py", line 717, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

25/12/16 10:07:26 WARN StateStore: Loaded state store provider in loadTimeMs=3516 for storeId=StateStoreId[ checkpointRootLocation=s3a://warehouse/checkpoints/gold/revenue_1m/state, operatorId=0, partitionId=187, storeName=default ] and queryRunId=51cec904-629a-4b41-88b4-306b8cb496e8
