- Створення програми, яка буде читати дані з одного Kafka-топіку за допомогою Spark Streaming, робити просте перетворення й записувати в інший топік.

- Валідація обробки даних з попереднього пункту за допомогою додаткового Consumer.

# Налаштування середовища

In [1]:
import os

os.environ["PYTHONUNBUFFERED"] = "1"
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@17"
os.environ["SPARK_HOME"] = "/opt/homebrew/Cellar/apache-spark/4.0.0/libexec"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["SPARK_HOME"] + "/bin:" + os.environ["PATH"]

In [2]:
os.environ["PATH"]

'/opt/homebrew/opt/openjdk@17/bin:/opt/homebrew/Cellar/apache-spark/4.0.0/libexec/bin:/Users/aleksejkitajskij/Desktop/goit-de-hw-06/.venv/bin:/opt/homebrew/opt/apache-spark/libexec/bin:/opt/anaconda3/bin:/opt/anaconda3/condabin:/Users/aleksejkitajskij/fam_terminal:/opt/homebrew/opt/openjdk@17/bin:/opt/homebrew/bin:/opt/homebrew/sbin:/Library/Frameworks/Python.framework/Versions/3.12/bin:/Library/Frameworks/Python.framework/Versions/3.10/bin:/Library/Frameworks/Python.framework/Versions/3.11/bin:/usr/local/bin:/System/Cryptexes/App/usr/bin:/usr/bin:/bin:/usr/sbin:/sbin:/var/run/com.apple.security.cryptexd/codex.system/bootstrap/usr/local/bin:/var/run/com.apple.security.cryptexd/codex.system/bootstrap/usr/bin:/var/run/com.apple.security.cryptexd/codex.system/bootstrap/usr/appleinternal/bin:/usr/local/share/dotnet:~/.dotnet/tools'

In [3]:
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, TimestampType
from pyspark.sql import SparkSession
from configs import kafka_config

In [4]:
os.environ['PYSPARK_SUBMIT_ARGS'] = ('--packages org.apache.spark:spark-sql-kafka-0-10_2.13:3.5.1 pyspark-shell')

# Створення сессії

In [5]:
# Створення SparkSession
spark = (SparkSession.builder
         .appName("KafkaStreaming")
         .master("local[*]")
         .getOrCreate())

:: loading settings :: url = jar:file:/opt/homebrew/Cellar/apache-spark/4.0.0/libexec/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/aleksejkitajskij/.ivy2.5.2/cache
The jars for the packages stored in: /Users/aleksejkitajskij/.ivy2.5.2/jars
org.apache.spark#spark-sql-kafka-0-10_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-324d9ed1-4927-4c12-9f77-a9d2b523e420;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.13;3.5.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.13;3.5.1 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in cen

# Визначення схем

In [6]:
data_schema = StructType([
    StructField("timestamp", StringType(), True),
    StructField("temperature", IntegerType(), True),
    StructField("humidity", IntegerType(), True)
])

alerts_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("humidity_min", IntegerType(), True),
    StructField("humidity_max", IntegerType(), True),
    StructField("temperature_min", IntegerType(), True),
    StructField("temperature_max", IntegerType(), True),
    StructField("code", IntegerType(), True),
    StructField("message", StringType(), True)
])

# Підключення і зчитування даних

In [7]:
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_config['bootstrap_servers'][0]) \
    .option("kafka.security.protocol", "SASL_PLAINTEXT") \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.sasl.jaas.config",
            'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="VawEzo1ikLtrA8Ug8THa";') \
    .option("subscribe", "barka_in") \
    .option("startingOffsets", "earliest") \
    .option("maxOffsetsPerTrigger", "5") \
    .load()

alerts_df = spark.read.csv("alerts_conditions.csv", header=True, schema=alerts_schema)

In [8]:
parsed = df.selectExpr("CAST(key AS STRING) AS key", "CAST(value AS STRING) AS value_str") \
    .withColumn("value_json", from_json(col("value_str"), data_schema)) \
    .withColumn("timestamp", to_timestamp(col("value_json.timestamp"))) \
    .withColumn("temperature", col("value_json.temperature")) \
    .withColumn("humidity", col("value_json.humidity")) \
    .drop("value_str", "value_json")

# агрегування та фільтрація

In [9]:
agg = (parsed
       .withWatermark("timestamp", "10 seconds")
       .groupBy(window(col("timestamp"), "1 minute", "30 seconds"))
       .agg(
           avg(col("temperature")).alias("avg_temperature"),
           avg(col("humidity")).alias("avg_humidity")
       )
       .select(
           col("window.start").alias("window_start"),
           col("window.end").alias("window_end"),
           "avg_temperature",
           "avg_humidity"
       )
)

In [10]:
joined = agg.crossJoin(alerts_df)

In [11]:
# Сухо
alerts_dry = joined.filter(
    (col("avg_humidity") < col("humidity_min"))
).select("window_start","window_end","avg_temperature","avg_humidity","id","code","message")

# Вологo
alerts_wet = joined.filter(
    (col("avg_humidity") > col("humidity_max"))
).select("window_start","window_end","avg_temperature","avg_humidity","id","code","message")

# Холодно
alerts_cold = joined.filter(
    (col("avg_temperature") < col("temperature_min"))
).select("window_start","window_end","avg_temperature","avg_humidity","id","code","message")

# Гаряче
alerts_hot = joined.filter(
    (col("avg_temperature") > col("temperature_max"))
).select("window_start","window_end","avg_temperature","avg_humidity","id","code","message")


alerts_dry = alerts_dry.withColumn("trigger_type", lit("dry"))
alerts_wet = alerts_wet.withColumn("trigger_type", lit("wet"))
alerts_cold = alerts_cold.withColumn("trigger_type", lit("cold"))
alerts_hot = alerts_hot.withColumn("trigger_type", lit("hot"))

alerts_triggered = alerts_dry.union(alerts_wet).union(alerts_cold).union(alerts_hot)

# Запис алертів

In [None]:
alerts_triggered.selectExpr("CAST(id AS STRING) AS key",
                            """to_json(struct(*)) AS value""") \
    .writeStream \
    .trigger(processingTime='20 seconds') \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "77.81.230.104:9092") \
    .option("topic", "barka_out") \
    .option("kafka.security.protocol", "SASL_PLAINTEXT") \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.sasl.jaas.config",
            "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='VawEzo1ikLtrA8Ug8THa';") \
    .option("checkpointLocation", "/tmp/checkpoints-10003") \
    .start() \
    .awaitTermination()

25/08/26 11:32:46 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/08/26 11:32:51 WARN HDFSBackedStateStoreProvider: The state for version 26 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
25/08/26 11:32:51 WARN HDFSBackedStateStoreProvider: The state for version 26 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
25/08/26 11:32:51 WARN HDFSBackedStateStoreProvider: The state for version 26 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
25/08/26 11:32:51 WARN HDFSBackedStateStoreProvider: The state for version 26 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the fir

KeyboardInterrupt: 

                                                                                