In [1]:
!pip3 install pyspark

Defaulting to user installation because normal site-packages is not writeable
Collecting pyspark
  Using cached pyspark-3.5.3.tar.gz (317.3 MB)
  Installing build dependencies: started
  Installing build dependencies: finished with status 'done'
  Getting requirements to build wheel: started
  Getting requirements to build wheel: finished with status 'done'
  Preparing metadata (pyproject.toml): started
  Preparing metadata (pyproject.toml): finished with status 'done'
Collecting py4j==0.10.9.7 (from pyspark)
  Using cached py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Using cached py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (pyproject.toml): started
  Building wheel for pyspark (pyproject.toml): finished with status 'done'
  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840672 sha256=57346ed42f665123fdac65a936685622896f1cb7afd545c9b49a6362fb488ef4
  Stored in directory:

In [1]:
import datetime
import uuid

from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType
from pyspark.sql import SparkSession
from configs import kafka_config
import os

In [2]:

os.environ[
    'PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 pyspark-shell'


In [3]:

spark = (SparkSession.builder
         .appName("KafkaStreaming")
         .master("local[*]")
         .config("spark.sql.debug.maxToStringFields", "200")
         .config("spark.sql.columnNameLengthThreshold", "200")
         .getOrCreate())


In [4]:

alerts_df = spark.read.csv("./alerts_conditions.csv", header=True)


In [5]:
alerts_df.show(5)

+---+------------+------------+---------------+---------------+----+-------------+
| id|humidity_min|humidity_max|temperature_min|temperature_max|code|      message|
+---+------------+------------+---------------+---------------+----+-------------+
|  1|           0|          40|           -999|           -999| 101| It's too dry|
|  2|          60|         100|           -999|           -999| 102| It's too wet|
|  3|        -999|        -999|           -300|             30| 103|It's too cold|
|  4|        -999|        -999|             40|            300| 104| It's too hot|
+---+------------+------------+---------------+---------------+----+-------------+



In [6]:

window_duration = "1 minute"
sliding_interval = "30 seconds"

In [7]:

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_config['bootstrap_servers'][0]) \
    .option("kafka.security.protocol", "SASL_PLAINTEXT") \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.sasl.jaas.config",
            'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="VawEzo1ikLtrA8Ug8THa";') \
    .option("subscribe", "building_sensors_volodymyr17") \
    .option("startingOffsets", "earliest") \
    .option("maxOffsetsPerTrigger", "300") \
    .load()

In [8]:

json_schema = StructType([
    StructField("sensor_id", IntegerType(), True),
    StructField("timestamp", StringType(), True),
    StructField("temperature", IntegerType(), True),
    StructField("humidity", IntegerType(), True)
])

In [9]:

avg_stats = df.selectExpr("CAST(key AS STRING) AS key_deserialized", "CAST(value AS STRING) AS value_deserialized", "*") \
    .drop('key', 'value') \
    .withColumnRenamed("key_deserialized", "key") \
    .withColumn("value_json", from_json(col("value_deserialized"), json_schema)) \
    .withColumn("timestamp", from_unixtime(col("value_json.timestamp").cast(DoubleType())).cast("timestamp")) \
    .withWatermark("timestamp", "10 seconds") \
    .groupBy(window(col("timestamp"), window_duration, sliding_interval)) \
    .agg(
    avg("value_json.temperature").alias("t_avg"),
    avg("value_json.humidity").alias("h_avg")
) \
    .drop("topic")

In [10]:

all_alerts = avg_stats.crossJoin(alerts_df)

In [11]:

valid_alerts = all_alerts \
    .where("t_avg > temperature_min AND t_avg < temperature_max") \
    .unionAll(
    all_alerts
    .where("h_avg > humidity_min AND h_avg < humidity_max")
) \
    .withColumn("timestamp", lit(str(datetime.datetime.now()))) \
    .drop("id", "humidity_min", "humidity_max", "temperature_min", "temperature_max")

In [12]:

# Для дебагінгу. Принт проміжного резульату.
# displaying_df = valid_alerts.writeStream \
#     .trigger(processingTime='10 seconds') \
#     .outputMode("update") \
#     .format("console") \
#     .start() \
#     .awaitTermination()

uuid_udf = udf(lambda: str(uuid.uuid4()), StringType())

In [13]:

prepare_to_kafka_df = valid_alerts \
    .withColumn("key", uuid_udf()) \
    .select(
    col("key"),
    to_json(struct(col("window"),
                   col("t_avg"),
                   col("h_avg"),
                   col("code"),
                   col("message"),
                   col("timestamp"))).alias("value")
)

In [None]:
# Для дебагінгу. Принт проміжного резульату.
displaying_df = valid_alerts.writeStream \
    .trigger(processingTime='10 seconds') \
    .outputMode("update") \
    .format("console") \
    .start() \
    .awaitTermination()

In [None]:

# query = prepare_to_kafka_df.writeStream \
#     .trigger(processingTime='30 seconds') \
#     .outputMode("update") \
#     .format("kafka") \
#     .option("kafka.bootstrap.servers", "77.81.230.104:9092") \
#     .option("topic", "avg_alerts") \
#     .option("kafka.security.protocol", "SASL_PLAINTEXT") \
#     .option("kafka.sasl.mechanism", "PLAIN") \
#     .option("kafka.sasl.jaas.config",
#             "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='VawEzo1ikLtrA8Ug8THa';") \
#     .option("checkpointLocation", "/tmp/checkpoints-7") \
#     .start() \
#     .awaitTermination()
