# Consumer

## Read data with kafka consumer

In [None]:
from confluent_kafka import Consumer

from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv("../.env")

# Configuração do consumidor
kafka_broker = "kafka-cpc.certi.org.br:31289"
topic_name = "iot-temperature"

consumer_config = {
    'bootstrap.servers': kafka_broker,
    'group.id': 'my-iot-group',
    'auto.offset.reset': 'earliest'
}
consumer = Consumer(consumer_config)

consumer.subscribe([topic_name])

print("Monitorando mensagens do Kafka... Pressione Ctrl+C para parar.")

try:
    while True:
        msg = consumer.poll(1.0)  # Aguarda por mensagens

        if msg is None:
            continue
        if msg.error():
            print(f"Erro: {msg.error()}")
            continue

        print(f"Recebido: {msg.value().decode('utf-8')} de {msg.topic()} [{msg.partition()}]")

except KeyboardInterrupt:
    print("Parando o consumidor...")
finally:
    consumer.close()


Monitorando mensagens do Kafka... Pressione Ctrl+C para parar.
Recebido: {"timestamp": "2025-01-10T20:34:59.638812+00:00", "temperature": 27.03} de iot-temperature [8]
Recebido: {"timestamp": "2025-01-10T20:35:12.644942+00:00", "temperature": 24.56} de iot-temperature [8]
Recebido: {"timestamp": "2025-01-10T20:36:06.444787+00:00", "temperature": 24.77} de iot-temperature [8]
Recebido: {"timestamp": "2025-01-10T20:36:33.454420+00:00", "temperature": 24.27} de iot-temperature [8]
Recebido: {"timestamp": "2025-01-10T20:36:37.455829+00:00", "temperature": 23.64} de iot-temperature [8]
Recebido: {"timestamp": "2025-01-10T20:37:02.465321+00:00", "temperature": 22.92} de iot-temperature [8]
Recebido: {"timestamp": "2025-01-10T20:37:12.469133+00:00", "temperature": 25.18} de iot-temperature [8]
Recebido: {"timestamp": "2025-01-10T20:37:16.470609+00:00", "temperature": 29.98} de iot-temperature [8]
Recebido: {"timestamp": "2025-01-10T20:37:44.480873+00:00", "temperature": 25.06} de iot-temperat

## Read data with spark structured streaming

Create Delta Lake table with defined schema and change data feed enabled

In [1]:
from pyspark.sql.types import StructType, StringType, DoubleType
from pyspark.sql.functions import from_json, col
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, DoubleType
import os

from dotenv import load_dotenv

load_dotenv()

os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages org.apache.hadoop:hadoop-aws:3.3.4,io.delta:delta-spark_2.12:3.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.4,org.apache.kafka:kafka-clients:3.9.0,org.apache.spark:spark-avro_2.12:3.5.1 pyspark-shell"

# Initialize Spark session with Delta Lake and MinIO support
spark = (SparkSession.builder \
    .appName("DeltaLakeWithMinIO") \
    ## Delta
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    #Hive Catalog
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    ## Optimize Delta
    .config("delta.autoOptimize.optimizeWrite", "true") \
    .config("delta.autoOptimize.autoCompact", "true") \
    .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") \
    ## MinIO
    .config("spark.hadoop.fs.s3a.endpoint", os.getenv("MINIO_ENDPOINT")) \
    .config("spark.hadoop.fs.s3a.access.key", os.getenv("MINIO_ACCESS_KEY")) \
    .config("spark.hadoop.fs.s3a.secret.key", os.getenv("MINIO_SECRET_KEY")) \
    .config('spark.hadoop.fs.s3a.attempts.maximum', "3") \
    .config('spark.hadoop.fs.s3a.connection.timeout', "10000") \
    .config('spark.hadoop.fs.s3a.connection.establish.timeout', "5000") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3n.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate())

your 131072x1 screen size is bogus. expect trouble
25/02/23 13:17:11 WARN Utils: Your hostname, CPC-12806 resolves to a loopback address: 127.0.1.1; using 172.26.242.248 instead (on interface eth0)
25/02/23 13:17:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/arthur/streaming-pipeline/.venv/lib/python3.13/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/arthur/.ivy2/cache
The jars for the packages stored in: /home/arthur/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
io.delta#delta-spark_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.kafka#kafka-clients added as a dependency
org.apache.spark#spark-avro_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-92d8f84b-f5be-4761-aa59-20b0b2299982;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
	found io.delta#delta-spark_2.12;3.3.0 in central
	found io.delta#delta-storage;3.3.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.4 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.4 in central
	found org.apache.ha

In [3]:
from pyspark.sql.types import StructType, StringType, DoubleType

# Define the schema for the processed table
processed_schema = StructType() \
    .add("timestamp", StringType()) \
    .add("temperature", DoubleType())

# Create an empty DataFrame with the schema
empty_processed_df = spark.createDataFrame([], processed_schema)

# Write the empty DataFrame to create the Delta table with CDF enabled
empty_processed_df.write.format("delta") \
    .option("path", "s3a://lakehouse/delta/raw_iot_data") \
    .option("delta.enableChangeDataFeed", "true") \
    .mode("overwrite") \
    .save()

25/01/24 16:19:10 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
25/01/24 16:19:33 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [4]:
spark.sql("SELECT COUNT(*) FROM delta.`s3a://lakehouse/delta/raw_iot_data`;").show()



+--------+
|count(1)|
+--------+
|       0|
+--------+



                                                                                

## Read Kafka Stream

In [5]:
from pyspark.sql.functions import current_timestamp

# Kafka Configuration
kafka_broker = "kafka-cpc.certi.org.br:31289"
topic_name = "iot-temperature"

# Define the schema for the JSON data
schema = StructType() \
    .add("timestamp", StringType()) \
    .add("temperature", DoubleType())

# Read data from Kafka
kafka_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_broker) \
    .option("subscribe", topic_name) \
    .option("startingOffsets", "earliest") \
    .load()

# Deserialize Kafka value (JSON string) into columns
parsed_stream = kafka_stream.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

parsed_stream_with_timestamp = parsed_stream.withColumn("landing_timestamp", current_timestamp())

# Output the parsed stream for verification
parsed_stream_with_timestamp.printSchema()


root
 |-- timestamp: string (nullable = true)
 |-- temperature: double (nullable = true)
 |-- landing_timestamp: timestamp (nullable = false)



## Write Delta Stream

In [None]:
# Define the path to the raw Delta table
raw_delta_path = "s3a://lakehouse/delta/raw_iot_data"

# Write Kafka stream to Delta table
raw_stream_query = parsed_stream_with_timestamp.writeStream \
    .format("delta") \
    .option("checkpointLocation", "s3a://lakehouse/delta/checkpoints/raw_iot_data") \
    .option("mergeSchema", "true") \
    .outputMode("append") \
    .start(raw_delta_path)

print("Streaming Kafka data into Delta Lake (raw_iot_data)...")

# Monitor the query progress
import time

while raw_stream_query.isActive:
    print(raw_stream_query.lastProgress)  # Shows the latest progress info
    time.sleep(5)  # Updates every 5 seconds


25/01/24 16:21:16 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


Streaming Kafka data into Delta Lake (raw_iot_data)...
None


25/01/24 16:21:19 WARN OffsetSeqMetadata: Updating the value of conf 'spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan' in current session from 'false' to 'true'.
25/01/24 16:21:22 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/01/24 16:21:22 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/01/24 16:21:22 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


None


                                                                                

{'id': 'c4c5e95a-a077-482c-8787-76257b98db8f', 'runId': '1c2439ac-09d5-4711-aa50-7bd59450db68', 'name': None, 'timestamp': '2025-01-24T19:21:17.708Z', 'batchId': 165, 'numInputRows': 15, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 1.618646811265782, 'durationMs': {'addBatch': 5462, 'commitOffsets': 1523, 'getBatch': 1, 'queryPlanning': 136, 'triggerExecution': 9267}, 'stateOperators': [], 'sources': [{'description': 'KafkaV2[Subscribe[iot-temperature]]', 'startOffset': {'iot-temperature': {'8': 370, '11': 373, '2': 395, '5': 422, '14': 406, '13': 368, '4': 387, '7': 372, '1': 370, '10': 384, '9': 380, '3': 398, '12': 433, '6': 392, '0': 376}}, 'endOffset': {'iot-temperature': {'8': 370, '11': 374, '2': 397, '5': 422, '14': 407, '13': 368, '4': 389, '7': 374, '1': 371, '10': 386, '9': 380, '3': 398, '12': 436, '6': 392, '0': 377}}, 'latestOffset': None, 'numInputRows': 15, 'inputRowsPerSecond': 0.0, 'processedRowsPerSecond': 1.618646811265782}], 'sink': {'description': 'DeltaSi