Це код для стрімінгової агрегоції та фільтрації даних

In [2]:
# from kafka import KafkaProducer
from config import kafka_config

from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, TimestampType, StringType, TimestampNTZType
from pyspark.sql import SparkSession

import os
import json
import time
import random


In [3]:
#  Пакет, необхідний для читання Kafka зі Spark
os.environ[
    'PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 pyspark-shell'

# Створення SparkSession
spark = (SparkSession.builder
         .appName("KafkaStreaming")
         .master("local[*]")
         .getOrCreate())


In [4]:
# Читання потоку даних із Kafka
# Вказівки, як саме ми будемо під'єднуватися, паролі, протоколи
# maxOffsetsPerTrigger - будемо читати 5 записів за 1 тригер.

# Назва топіків
my_name = "vasyliev_v"
sensor_topic_name = f'{my_name}building_sensors_hw6'
alert_topic_name = f'{my_name}_alerting'

# читання даних з топіка
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_config['bootstrap_servers'][0]) \
    .option("kafka.security.protocol", "SASL_PLAINTEXT") \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.sasl.jaas.config",
            'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="VawEzo1ikLtrA8Ug8THa";') \
    .option("subscribe", sensor_topic_name) \
    .option("startingOffsets", "latest") \
    .option("maxOffsetsPerTrigger", "1") \
    .load()

In [5]:
# Визначення схеми для JSON,
# оскільки Kafka має структуру ключ-значення, а значення має формат JSON. 
json_schema = StructType([
    StructField("timestamp", StringType(), True),
    StructField("sensor_id", StringType(), True),
    StructField("temperature", IntegerType(), True),
    StructField("humidity", IntegerType(), True),
])

# підготовка отриманих даних для агрегації
clean_df = df.selectExpr("CAST(key AS STRING) AS key_deserialized", "CAST(value AS STRING) AS value_deserialized", "*") \
    .drop('key', 'value') \
    .withColumnRenamed("key_deserialized", "key") \
    .withColumn("value_json", from_json(col("value_deserialized"), json_schema)) \
    .withColumn("timestamp", from_unixtime(col("value_json.timestamp").cast(DoubleType())).cast("timestamp")) \
    .withColumn("sensor_id", col("value_json.sensor_id")) \
    .withColumn("temperature", col("value_json.temperature")) \
    .withColumn("humidity", col("value_json.humidity")) \
    .drop("value_json", "value_deserialized")

In [6]:
# Виведення даних на екран для тестування зчитування з вхідного топіку
# displaying_df = clean_df.writeStream \
#     .trigger(availableNow=True) \
#     .outputMode("append") \
#     .format("console") \
#     .option("checkpointLocation", "/tmp/checkpoints-9") \
#     .start() \
#     .awaitTermination()

In [7]:
# агрегація та розрахунок середніх значень для вікна агрегації
grouped_df = clean_df.withWatermark("timestamp", "10 seconds").groupBy(
                                                                    window("timestamp", "60 seconds", '30 seconds'),
                                                                    ).agg(
                                                                        mean('temperature').alias('avg_temperature'),
                                                                        mean('humidity').alias('avg_humidity')
                                                                    )

# завантаження умов для алертів з файлу та кешування даних для їх багаторазового використання
alert_df = spark.read.csv('./alerts_conditions.csv', header=True)
alert_df = alert_df.cache()

# крос-джойн агрегованих даних з умовами алертів
cross_join_df = grouped_df.crossJoin(alert_df)

# фільтрація випадків, коли стрімінгові дані відповідают умовам алертів
filtered_df = cross_join_df.filter((
                                        (cross_join_df.avg_temperature > cross_join_df.temperature_min) & (cross_join_df.avg_temperature < cross_join_df.temperature_max)
                                    ) | (
                                           (cross_join_df.avg_humidity > cross_join_df.humidity_min) & (cross_join_df.avg_humidity < cross_join_df.humidity_max)
                                        )
                                    ).withColumn('timestamp', from_unixtime(lit(time.time()).cast(DoubleType())).cast("timestamp")) \
    .drop('id', 'humidity_min', 'humidity_max', 'temperature_min', 'temperature_max')

# перетворення даних на json-структуру для відправки їх у вихідний топік
prepare_to_kafka_df = filtered_df.select(
                                        to_json(struct(
                                                        col("window"), 
                                                        col("avg_temperature"),
                                                        col("avg_humidity"),
                                                        col("code"),
                                                        col("message"),
                                                        col("timestamp"),        
                                                    )).alias("value")
                                        )

In [8]:
# Виведення даних на екран для тестування агрегації та фільтрації

# def process_batch(batch_df, epoch_id):
#     # if batch_df.count() > 0:  # Умова для відправки даних
#     if not batch_df.isEmpty():  # Умова для відправки даних       
#         batch_df.write.format("console").save()

# query = prepare_to_kafka_df.writeStream \
#     .foreachBatch(process_batch) \
#     .trigger(processingTime='15 seconds') \
#     .outputMode("update") \
#     .option("checkpointLocation", "/tmp/checkpoints-14") \
#     .start() \
#     .awaitTermination()
#     # .format("console") \

In [9]:
# надсилання агрегованих та відфільтрованих даних до вихідного топіку


def process_batch(batch_df, epoch_id):
    '''Фільтрація пустих батчів, щоб не перевантажувати вихідний топік'''
    try:
        if not batch_df.isEmpty():  # Перевірка на порожній DataFrame
            batch_df.write.format("kafka") \
                .option("kafka.bootstrap.servers", "77.81.230.104:9092") \
                .option("topic", alert_topic_name) \
                .option("kafka.security.protocol", "SASL_PLAINTEXT") \
                .option("kafka.sasl.mechanism", "PLAIN") \
                .option("kafka.sasl.jaas.config",
                        "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='VawEzo1ikLtrA8Ug8THa';") \
                .save()
    except Exception as e:
        print(f"Помилка під час запису даних у Kafka: {e}")

# Запис оброблених даних у вихідний топік
query = prepare_to_kafka_df.writeStream \
    .foreachBatch(process_batch) \
    .trigger(processingTime='30 seconds') \
    .outputMode("update") \
    .option("checkpointLocation", "/tmp/checkpoints-15") \
    .start() \
    .awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 