## Write to Iceberg without upserting

In [1]:
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, TimestampType, LongType

# Define the updated schema
schema = StructType([
    StructField("Campaign_ID", StringType(), True),
    StructField("DataEntries", ArrayType(StructType([
        StructField("DataName", StringType(), True),
        StructField("DataType", StringType(), True),
        StructField("DataValue", StringType(), True),
        StructField("TimeStamp", StringType(), True)
    ])), True),
    StructField("VinNumber", StringType(), True)
])

# Kafka parameters
kafka_bootstrap_servers = "b-2.kafkaclustericeberg.wa9le5.c3.kafka.eu-central-1.amazonaws.com:9092,b-1.kafkaclustericeberg.wa9le5.c3.kafka.eu-central-1.amazonaws.com:9092"
kafka_topic = "topic6"

# Create SparkSession
spark = SparkSession.builder \
    .appName("KafkaToIceberg") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.demo.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config("spark.sql.catalog.demo.warehouse", "s3://aws-emr-studio-381492251123-eu-central-1/ICEBERG/") \
    .config("spark.sql.catalog.demo.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.streaming.checkpointLocation", "s3://aws-emr-studio-381492251123-eu-central-1/stream_checkpoint/") \
    .getOrCreate()

# Create a DataFrame representing the stream of input lines from Kafka
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .load()

# Convert the value column from Kafka to a string and parse JSON
parsed_df = kafka_df.select(from_json(col("value").cast("string"), schema).alias("data"))

# Flatten the structure for easier querying
flattened_df = parsed_df.select(
    col("data.VinNumber"),
    explode(col("data.DataEntries")).alias("entry")
).select(
    col("VinNumber"),
    col("entry.DataName").alias("Parameter_Name"),
    col("entry.DataType"),
    col("entry.DataValue"),
    # Convert the TimeStamp from string to timestamp
    to_timestamp(from_unixtime(col("entry.TimeStamp").cast("long") / 1000)).alias("TimeStamp")
)

# Add a watermark on the TimeStamp column
watermarked_df = flattened_df.withWatermark("TimeStamp", "10 minutes")

# Perform the aggregation
aggregated_df = watermarked_df.groupBy(
    window(col("TimeStamp"), "30 minutes"),
    col("VinNumber"),
    col("Parameter_Name")
).agg(count("*").alias("Count"))

# Adjust the DataFrame schema to match the Iceberg table schema
result_df = aggregated_df \
    .select(
        col("VinNumber").cast(StringType()).alias("VinNumber"),
        to_timestamp(col("window.start")).alias("Window_Start"),
        to_timestamp(col("window.end")).alias("Window_End"),
        col("Parameter_Name").cast(StringType()).alias("Parameter_Name"),
        col("Count").cast(LongType()).alias("Count")
    )

# Define the foreachBatch function
def process_batch(batch_df, batch_id):
    try:
        # Show a sample of the batch data
        batch_df.show(5, truncate=False)
        
        # Write the batch to Iceberg table
        batch_df.write \
            .format("iceberg") \
            .mode("append") \
            .save("demo.uraues_db.events_logs_agg6")
    except Exception as e:
        print(f"Error processing batch {batch_id}: {str(e)}")

# Write the aggregated data to the Iceberg table using foreachBatch
query = result_df.writeStream \
    .foreachBatch(process_batch) \
    .outputMode("update") \
    .trigger(processingTime='10 seconds') \
    .start()

# Wait for the streaming query to terminate
query.awaitTermination()

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
48,application_1726023487946_0049,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


KeyboardInterrupt



## Writing SEVs without Upsert

In [1]:
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, TimestampType, LongType


# Create SparkSession
spark = SparkSession.builder \
    .appName("KafkaToIceberg") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.demo.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config("spark.sql.catalog.demo.warehouse", "s3://aws-emr-studio-381492251123-eu-central-1/ICEBERG/") \
    .config("spark.sql.catalog.demo.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.streaming.checkpointLocation", "s3://aws-emr-studio-381492251123-eu-central-1/stream_checkpoint/") \
    .getOrCreate()

# Define the schema
ioc_parameters_schema = StructType([
    StructField("timestamp", TimestampType(), True),
    StructField("InternalParameter", StructType([
        StructField("ParameterName", StringType(), True),
        StructField("ParamterType", StringType(), True),
        StructField("ParameterValue", StringType(), True),
        StructField("ParameterUnit", StringType(), True)
    ]), True)
])

schema = StructType([
    StructField("SEV_ID", StringType(), True),
    StructField("VinNumber", StringType(), True),
    StructField("Timestamp", TimestampType(), True),
    StructField("Name", StringType(), True),
    StructField("Severity", StringType(), True),
    StructField("SEV_Msg", StringType(), True),
    StructField("Origin", StringType(), True),
    StructField("NetworkType", StringType(), True),
    StructField("NetworkID", StringType(), True),
    StructField("IoC", StructType([
        StructField("Parameters", ArrayType(ioc_parameters_schema), True)
    ]), True)
])

# Kafka parameters
kafka_bootstrap_servers = "b-2.kafkaclustericeberg.wa9le5.c3.kafka.eu-central-1.amazonaws.com:9092,b-1.kafkaclustericeberg.wa9le5.c3.kafka.eu-central-1.amazonaws.com:9092"
kafka_topic = "sev_topic2"

# Create a DataFrame representing the stream of input lines from Kafka
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .load()

# Convert the value column from Kafka to a string and parse the JSON data
parsed_df = kafka_df.select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*")

# Flatten the structure for easier querying
flattened_df = parsed_df.select(
    col("VinNumber"),
    col("SEV_ID"),
    # Convert the TimeStamp from string to timestamp
    to_timestamp(from_unixtime(col("TimeStamp").cast("long") / 1000)).alias("TimeStamp")
)

# Add a watermark on the TimeStamp column
watermarked_df = flattened_df.withWatermark("TimeStamp", "10 minutes")

# Perform hourly aggregation
aggregated_df = watermarked_df \
    .withWatermark("Timestamp", "1 hour") \
    .groupBy(
        window("Timestamp", "1 hour"),
        "VinNumber"
    ) \
    .agg(count("SEV_ID").alias("SEV_Count"))

# Function to write data to Iceberg table
def write_to_iceberg(batch_df, batch_id):
    # Rename the window start and end columns
    batch_df = batch_df.select(
        col("window.start").alias("WindowStart"),
        col("window.end").alias("WindowEnd"),
        "VinNumber",
        "SEV_Count"
    )

    # Write to Iceberg table
    batch_df.write \
        .format("iceberg") \
        .mode("append") \
        .saveAsTable("demo.uraues_db.sevs_agg2")

# Write the aggregated data to Iceberg
query = aggregated_df.writeStream \
    .foreachBatch(write_to_iceberg) \
    .outputMode("update") \
    .start()

# Wait for the streaming query to terminate
query.awaitTermination()

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
55,application_1726023487946_0056,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


KeyboardInterrupt



In [2]:
spark.stop()

VBox()

Exception in thread cell_monitor-2:
Traceback (most recent call last):
  File "/mnt/notebook-env/lib/python3.9/threading.py", line 980, in _bootstrap_inner
    self.run()
  File "/mnt/notebook-env/lib/python3.9/threading.py", line 917, in run
    self._target(*self._args, **self._kwargs)
  File "/mnt/notebook-env/lib/python3.9/site-packages/awseditorssparkmonitoringwidget/cellmonitor.py", line 154, in cell_monitor
    job_group_filtered_jobs = [job for job in jobs_data if job['jobGroup'] == str(statement_id)]
  File "/mnt/notebook-env/lib/python3.9/site-packages/awseditorssparkmonitoringwidget/cellmonitor.py", line 154, in <listcomp>
    job_group_filtered_jobs = [job for job in jobs_data if job['jobGroup'] == str(statement_id)]
KeyError: 'jobGroup'
Interrupted by user


## Upsert SEVS

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, TimestampType

# Create SparkSession
spark = SparkSession.builder \
    .appName("KafkaToIceberg") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.demo.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config("spark.sql.catalog.demo.warehouse", "s3://aws-emr-studio-381492251123-eu-central-1/ICEBERG/") \
    .config("spark.sql.catalog.demo.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.streaming.checkpointLocation", "s3://aws-emr-studio-381492251123-eu-central-1/stream_checkpoint/") \
    .getOrCreate()

# Define the schema
ioc_parameters_schema = StructType([
    StructField("timestamp", TimestampType(), True),
    StructField("InternalParameter", StructType([
        StructField("ParameterName", StringType(), True),
        StructField("ParamterType", StringType(), True),
        StructField("ParameterValue", StringType(), True),
        StructField("ParameterUnit", StringType(), True)
    ]), True)
])

schema = StructType([
    StructField("SEV_ID", StringType(), True),
    StructField("VinNumber", StringType(), True),
    StructField("Timestamp", TimestampType(), True),
    StructField("Name", StringType(), True),
    StructField("Severity", StringType(), True),
    StructField("SEV_Msg", StringType(), True),
    StructField("Origin", StringType(), True),
    StructField("NetworkType", StringType(), True),
    StructField("NetworkID", StringType(), True),
    StructField("IoC", StructType([
        StructField("Parameters", ArrayType(ioc_parameters_schema), True)
    ]), True)
])

# Kafka parameters
kafka_bootstrap_servers = "b-1.kafkaprivatecluster.2vrw32.c3.kafka.eu-central-1.amazonaws.com:9092,b-2.kafkaprivatecluster.2vrw32.c3.kafka.eu-central-1.amazonaws.com:9092"
kafka_topic = "sevs_topic2"

# Create a DataFrame representing the stream of input lines from Kafka
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .load()

# Convert the value column from Kafka to a string and parse the JSON data
parsed_df = kafka_df.select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*")

# Flatten the structure for easier querying
flattened_df = parsed_df.select(
    col("VinNumber"),
    col("SEV_ID"),
    # Convert the TimeStamp from string to timestamp
    to_timestamp(from_unixtime(col("TimeStamp").cast("long"))).alias("TimeStamp")
)

# Add a watermark on the TimeStamp column
watermarked_df = flattened_df.withWatermark("TimeStamp", "10 minutes")

# Perform hourly aggregation
aggregated_df = watermarked_df \
    .withWatermark("Timestamp", "1 hour") \
    .groupBy(
        window("Timestamp", "1 hour"),
        "VinNumber"
    ) \
    .agg(count("SEV_ID").alias("SEV_Count"))

def write_to_iceberg(batch_df, batch_id):
    try:
        print(f"Processing batch {batch_id}")
        print(f"Batch size: {batch_df.count()} records")
        batch_df.show(5, truncate=False)

        # Rename the window start and end columns
        batch_df = batch_df.select(
            col("window.start").alias("WindowStart"),
            col("window.end").alias("WindowEnd"),
            "VinNumber",
            "SEV_Count"
        )

        # Read the current Iceberg table
        current_table = spark.table("demo.uraues_db.sevs_agg2")

        # Perform the merge operation using DataFrame APIs
        merged_df = current_table.alias("target").join(
            batch_df.alias("source"),
            (col("target.WindowStart") == col("source.WindowStart")) &
            (col("target.WindowEnd") == col("source.WindowEnd")) &
            (col("target.VinNumber") == col("source.VinNumber")),
            "full_outer"
        ).select(
            coalesce(col("source.WindowStart"), col("target.WindowStart")).alias("WindowStart"),
            coalesce(col("source.WindowEnd"), col("target.WindowEnd")).alias("WindowEnd"),
            coalesce(col("source.VinNumber"), col("target.VinNumber")).alias("VinNumber"),
            when(col("source.SEV_Count").isNotNull() & col("target.SEV_Count").isNotNull(),
                 greatest(col("source.SEV_Count"), col("target.SEV_Count")))
            .otherwise(coalesce(col("source.SEV_Count"), col("target.SEV_Count")))
            .alias("SEV_Count")
        )

        # Log the merge results
        print("Merged DataFrame:")
        merged_df.show(10, truncate=False)
        print(f"Merged DataFrame count: {merged_df.count()}")

        # Write the merged data back to the Iceberg table
        merged_df.write \
            .format("iceberg") \
            .mode("overwrite") \
            .saveAsTable("demo.uraues_db.sevs_agg2")

        print(f"Batch {batch_id}: Merged {batch_df.count()} records into the Iceberg table.")

        # Verify the write operation
        updated_table = spark.table("demo.uraues_db.sevs_agg2")
        print("Updated Iceberg table:")
        updated_table.show(10, truncate=False)
        print(f"Updated Iceberg table count: {updated_table.count()}")

    except Exception as e:
        print(f"Error processing batch {batch_id}: {str(e)}")

# Write the aggregated data to Iceberg
query = aggregated_df.writeStream \
    .foreachBatch(write_to_iceberg) \
    .outputMode("update") \
    .trigger(processingTime='10 seconds') \
    .start()

# Wait for the streaming query to terminate
query.awaitTermination()

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
6,application_1726118155659_0010,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


KeyboardInterrupt



## Upsert Log Events

In [None]:
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, TimestampType, LongType

# Define the updated schema
schema = StructType([
    StructField("Campaign_ID", StringType(), True),
    StructField("DataEntries", ArrayType(StructType([
        StructField("DataName", StringType(), True),
        StructField("DataType", StringType(), True),
        StructField("DataValue", StringType(), True),
        StructField("TimeStamp", StringType(), True)
    ])), True),
    StructField("VinNumber", StringType(), True)
])

# Kafka parameters
kafka_bootstrap_servers = "b-2.kafkaclustericeberg.wa9le5.c3.kafka.eu-central-1.amazonaws.com:9092,b-1.kafkaclustericeberg.wa9le5.c3.kafka.eu-central-1.amazonaws.com:9092"
kafka_topic = "topic8"

# Create SparkSession
spark = SparkSession.builder \
    .appName("KafkaToIceberg") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.demo.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config("spark.sql.catalog.demo.warehouse", "s3://aws-emr-studio-381492251123-eu-central-1/ICEBERG/") \
    .config("spark.sql.catalog.demo.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.streaming.checkpointLocation", "s3://aws-emr-studio-381492251123-eu-central-1/stream_checkpoint/") \
    .getOrCreate()

# Create a DataFrame representing the stream of input lines from Kafka
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .load()

# Convert the value column from Kafka to a string and parse JSON
parsed_df = kafka_df.select(from_json(col("value").cast("string"), schema).alias("data"))

# Flatten the structure for easier querying
flattened_df = parsed_df.select(
    col("data.VinNumber"),
    explode(col("data.DataEntries")).alias("entry")
).select(
    col("VinNumber"),
    col("entry.DataName").alias("Parameter_Name"),
    col("entry.DataType"),
    col("entry.DataValue"),
    # Convert the TimeStamp from string to timestamp
    to_timestamp(from_unixtime(col("entry.TimeStamp").cast("long") / 1000)).alias("TimeStamp")
)

# Add a watermark on the TimeStamp column
watermarked_df = flattened_df.withWatermark("TimeStamp", "10 minutes")

# Perform the aggregation
aggregated_df = watermarked_df.groupBy(
    window(col("TimeStamp"), "30 minutes"),
    col("VinNumber"),
    col("Parameter_Name")
).agg(count("*").alias("Count"))

# Adjust the DataFrame schema to match the Iceberg table schema
result_df = aggregated_df \
    .select(
        col("VinNumber").cast(StringType()).alias("VinNumber"),
        to_timestamp(col("window.start")).alias("Window_Start"),
        to_timestamp(col("window.end")).alias("Window_End"),
        col("Parameter_Name").cast(StringType()).alias("Parameter_Name"),
        col("Count").cast(LongType()).alias("Count")
    )

# Define the foreachBatch function with merge logic
def process_batch(batch_df, batch_id):
    try:
        print(f"Processing batch {batch_id}")
        print(f"Batch size: {batch_df.count()} records")
        batch_df.show(5, truncate=False)
        
        # Read the current Iceberg table
        current_table = spark.table("demo.uraues_db.events_logs_agg7")
        
        # Perform the merge operation using DataFrame APIs
        merged_df = current_table.alias("target").join(
            batch_df.alias("source"),
            (col("target.Window_Start") == col("source.Window_Start")) &
            (col("target.Window_End") == col("source.Window_End")) &
            (col("target.VinNumber") == col("source.VinNumber")) &
            (col("target.Parameter_Name") == col("source.Parameter_Name")),
            "full_outer"
        ).select(
            coalesce(col("source.VinNumber"), col("target.VinNumber")).alias("VinNumber"),
            coalesce(col("source.Window_Start"), col("target.Window_Start")).alias("Window_Start"),
            coalesce(col("source.Window_End"), col("target.Window_End")).alias("Window_End"),
            coalesce(col("source.Parameter_Name"), col("target.Parameter_Name")).alias("Parameter_Name"),
            when(col("source.Count").isNotNull() & col("target.Count").isNotNull(),
                 greatest(col("source.Count"), col("target.Count")))
            .otherwise(coalesce(col("source.Count"), col("target.Count")))
            .alias("Count")
        )
        
        # Log the merge results
        print("Merged DataFrame:")
        merged_df.show(10, truncate=False)
        print(f"Merged DataFrame count: {merged_df.count()}")
        
        # Write the merged data back to the Iceberg table
        merged_df.write \
            .format("iceberg") \
            .mode("overwrite") \
            .saveAsTable("demo.uraues_db.events_logs_agg7")
        
        print(f"Batch {batch_id}: Merged {batch_df.count()} records into the Iceberg table.")
        
        # Verify the write operation
        updated_table = spark.table("demo.uraues_db.events_logs_agg7")
        print("Updated Iceberg table:")
        updated_table.show(10, truncate=False)
        print(f"Updated Iceberg table count: {updated_table.count()}")
        
    except Exception as e:
        print(f"Error processing batch {batch_id}: {str(e)}")

# Write the aggregated data to the Iceberg table using foreachBatch
query = result_df.writeStream \
    .foreachBatch(process_batch) \
    .outputMode("update") \
    .trigger(processingTime='10 seconds') \
    .start()

# Wait for the streaming query to terminate
query.awaitTermination()

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
64,application_1726023487946_0065,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# Write the aggregated data to the Iceberg table using merge
def write_to_iceberg(batch_df, batch_id):
    # Instead of creating a temporary view, use the DataFrame directly in the merge operation
    batch_df.createOrReplaceTempView("updates")
    
    spark.sql("""
    MERGE INTO demo.uraues_db.events_logs_aggregates2 AS target
    USING (SELECT * FROM updates) AS source
    ON target.id = source.id
    WHEN MATCHED THEN
      UPDATE SET
        target.Count = source.Count
    WHEN NOT MATCHED THEN
      INSERT (id, VinNumber, Window_Start, Window_End, Parameter_Name, Count)
      VALUES (source.id, source.VinNumber, source.Window_Start, source.Window_End, source.Parameter_Name, source.Count)
    """)

# Start the streaming query with foreachBatch
query = result_df.writeStream \
    .foreachBatch(write_to_iceberg) \
    .outputMode("update") \
    .option("checkpointLocation", "s3://aws-emr-studio-381492251123-eu-central-1/stream_checkpoint/events_logs_aggregates2") \
    .start()

query.awaitTermination()