# Step 1: Imports, Configuration and Setup

In [1]:
import json
import time
import random
from datetime import datetime, timezone
import threading
import logging
import colorlog
import warnings

from kafka.admin import KafkaAdminClient, NewTopic
from kafka import KafkaProducer, KafkaConsumer

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

In [2]:
# Configure logging levels and suppress warnings
warnings.filterwarnings("ignore")

# Suppress Py4J and Spark logs
logging.getLogger('py4j').setLevel(logging.ERROR)
logging.getLogger('org.apache.spark').setLevel(logging.ERROR)

# Configure colored logging
handler = colorlog.StreamHandler()
handler.setFormatter(colorlog.ColoredFormatter(
    "%(log_color)s%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    datefmt='%Y-%m-%d %H:%M:%S',
    log_colors={
        'DEBUG': 'cyan',
        'INFO': 'green',
        'WARNING': 'yellow',
        'ERROR': 'red',
        'CRITICAL': 'bold_red',
    }
))
logger = logging.getLogger('KafkaLogger')
logger.addHandler(handler)
logger.setLevel(logging.INFO)

# Step 2: Kafka Configuration

In [3]:
kafka_config = {
    "bootstrap_servers": ['77.81.230.104:9092'],
    "username": 'admin',
    "password": 'VawEzo1ikLtrA8Ug8THa',
    "security_protocol": 'SASL_PLAINTEXT',
    "sasl_mechanism": 'PLAIN'
}

# Common configurations for producers and consumers
common_consumer_config = {
    'bootstrap_servers': kafka_config['bootstrap_servers'],
    'security_protocol': kafka_config['security_protocol'],
    'sasl_mechanism': kafka_config['sasl_mechanism'],
    'sasl_plain_username': kafka_config['username'],
    'sasl_plain_password': kafka_config['password'],
    'value_deserializer': lambda m: json.loads(m.decode('utf-8')),
    'auto_offset_reset': 'earliest',
    'enable_auto_commit': True,
}

common_producer_config = {
    'bootstrap_servers': kafka_config['bootstrap_servers'],
    'security_protocol': kafka_config['security_protocol'],
    'sasl_mechanism': kafka_config['sasl_mechanism'],
    'sasl_plain_username': kafka_config['username'],
    'sasl_plain_password': kafka_config['password'],
    'value_serializer': lambda v: json.dumps(v).encode('utf-8'),
}

In [4]:
# Unique identifier
unique_id = 'eod_goit_de_hw_06'

# Topic names
topic_building_sensors = f'building_sensors_{unique_id}'
topic_alerts = f'alerts_{unique_id}'

# Alerts conditions file
alerts_conditions_file = 'alerts_conditions.csv'

# Step 3: Create Kafka Topics if They Do Not Exist


In [5]:
# Define topic names
topics = [topic_building_sensors, topic_alerts]

# Create KafkaAdminClient
try:
    logger.info("Creating KafkaAdminClient...")
    admin_client = KafkaAdminClient(
        bootstrap_servers=kafka_config['bootstrap_servers'],
        security_protocol=kafka_config['security_protocol'],
        sasl_mechanism=kafka_config['sasl_mechanism'],
        sasl_plain_username=kafka_config['username'],
        sasl_plain_password=kafka_config['password'],
        client_id='admin_client'
    )
    logger.info("KafkaAdminClient created successfully.")
except Exception as e:
    logger.critical(f"Error creating KafkaAdminClient: {e}")
    raise

# Get list of existing topics
try:
    existing_topics = admin_client.list_topics()
    logger.info("Successfully retrieved list of existing topics.")
except Exception as e:
    logger.error(f"Error retrieving list of topics: {e}")
    raise

# Create only topics that do not exist
topic_list = []
for topic_name in topics:
    if topic_name not in existing_topics:
        topic = NewTopic(name=topic_name, num_partitions=1, replication_factor=1)
        topic_list.append(topic)
    else:
        logger.info(f"Topic '{topic_name}' already exists, skipping creation.")

if topic_list:
    try:
        logger.info("Attempting to create new topics...")
        admin_client.create_topics(new_topics=topic_list, validate_only=False)
        logger.info("Topics created successfully:")
        for topic in topic_list:
            logger.info(f"- {topic.name}")
    except Exception as e:
        logger.error(f"Error creating topics: {e}")
else:
    logger.info("All topics already exist, no need to create new ones.")

[32m2024-11-23 20:07:59 - KafkaLogger - INFO - Creating KafkaAdminClient...[0m
[32m2024-11-23 20:08:00 - KafkaLogger - INFO - KafkaAdminClient created successfully.[0m
[32m2024-11-23 20:08:00 - KafkaLogger - INFO - Successfully retrieved list of existing topics.[0m
[32m2024-11-23 20:08:00 - KafkaLogger - INFO - Topic 'building_sensors_eod_goit_de_hw_06' already exists, skipping creation.[0m
[32m2024-11-23 20:08:00 - KafkaLogger - INFO - Topic 'alerts_eod_goit_de_hw_06' already exists, skipping creation.[0m
[32m2024-11-23 20:08:00 - KafkaLogger - INFO - All topics already exist, no need to create new ones.[0m


# Step 4: Sensor Simulation Function with Extended Data Range

In [None]:
# Function to generate temperature values
def generate_temperature():
    if random.random() < 0.8:  # 80% of the time, generate realistic data
        return random.uniform(-50, 50)
    else:  # 20% of the time, generate extreme values
        return random.uniform(-300, 300)

# Sensor simulation function
def sensor_simulation(sensor_id, topic_name):
    producer = KafkaProducer(**common_producer_config)
    try:
        while True:
            # Generate temperature and humidity
            temperature = generate_temperature()
            humidity = random.uniform(0, 100)
            timestamp = datetime.now(timezone.utc).isoformat()

            # Create message
            data = {
                "sensor_id": sensor_id,
                "timestamp": timestamp,
                "temperature": round(temperature, 2),
                "humidity": round(humidity, 2),
            }

            # Send to Kafka
            producer.send(topic_name, value=data)
            logger.info(f"Sent message from sensor {sensor_id}: {data}")

            time.sleep(5)
    except KeyboardInterrupt:
        logger.warning(f"Sensor simulation {sensor_id} stopped.")
    finally:
        producer.close()

In [None]:
# Start simulation for multiple sensors
num_sensors = 3
sensor_ids = random.sample(range(1000, 9999), num_sensors)

logger.info(f"Starting {num_sensors} sensor simulations with IDs: {sensor_ids}")

sensor_threads = []
for sensor_id in sensor_ids:
    sensor_thread = threading.Thread(target=sensor_simulation, args=(sensor_id, topic_building_sensors))
    sensor_thread.daemon = True
    sensor_thread.start()
    sensor_threads.append(sensor_thread)

[32m2024-11-23 20:08:00 - KafkaLogger - INFO - Starting 3 sensor simulations with IDs: [4373, 4351, 7337][0m


# Step 5: Initialize Spark Session

In [8]:
spark = SparkSession \
    .builder \
    .appName("KafkaSparkStreaming") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
    .getOrCreate()

# Set Spark log level to ERROR to reduce logs
spark.sparkContext.setLogLevel("ERROR")

[32m2024-11-23 20:08:00 - KafkaLogger - INFO - Sent message from sensor 4373: {'sensor_id': 4373, 'timestamp': '2024-11-23T19:08:00.876923+00:00', 'temperature': 4.22, 'humidity': 92.63}[0m
[32m2024-11-23 20:08:00 - KafkaLogger - INFO - Sent message from sensor 7337: {'sensor_id': 7337, 'timestamp': '2024-11-23T19:08:00.881755+00:00', 'temperature': -17.88, 'humidity': 49.45}[0m
[32m2024-11-23 20:08:00 - KafkaLogger - INFO - Sent message from sensor 4351: {'sensor_id': 4351, 'timestamp': '2024-11-23T19:08:00.881709+00:00', 'temperature': 10.41, 'humidity': 82.49}[0m
[32m2024-11-23 20:08:05 - KafkaLogger - INFO - Sent message from sensor 4373: {'sensor_id': 4373, 'timestamp': '2024-11-23T19:08:05.922389+00:00', 'temperature': -15.85, 'humidity': 27.8}[0m
[32m2024-11-23 20:08:05 - KafkaLogger - INFO - Sent message from sensor 7337: {'sensor_id': 7337, 'timestamp': '2024-11-23T19:08:05.932681+00:00', 'temperature': -11.19, 'humidity': 24.62}[0m
[32m2024-11-23 20:08:05 - KafkaLo

24/11/23 20:08:01 WARN Utils: Your hostname, nord-laptop resolves to a loopback address: 127.0.1.1; using 192.168.50.104 instead (on interface wlp0s20f3)
24/11/23 20:08:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /home/nord/.ivy2/cache
The jars for the packages stored in: /home/nord/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3327f2c2-e496-4d99-8d21-5c0568c1a33c;1.0
	confs: [default]


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.0 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 243ms :: artifacts dl 8ms
	:: modules in use:
	com.google.code.findbugs#jsr305;3.0.0 from central in [default]
	commons-logging#commons-logging;1.1.3 from central in [default]
	org.apache.commons#commons-pool2;2.11.1 from central in [default]
	org.apache.hadoop#hadoop-client-api;3.3.4 from central in [default]
	org.apache.hadoop#h

# Step 6: Define Schema and Read Stream from Kafka

In [9]:
# Define schema for sensor data
sensor_schema = StructType([
    StructField("sensor_id", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("temperature", DoubleType(), True),
    StructField("humidity", DoubleType(), True)
])

In [10]:
# Kafka options
kafka_options = {
    "kafka.bootstrap.servers": ','.join(kafka_config['bootstrap_servers']),
    "subscribe": topic_building_sensors,
    "startingOffsets": "latest",
    "kafka.security.protocol": kafka_config['security_protocol'],
    "kafka.sasl.mechanism": kafka_config['sasl_mechanism'],
    "kafka.sasl.jaas.config": f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafka_config["username"]}" password="{kafka_config["password"]}";'
}

# Read streaming data from Kafka
df = spark \
    .readStream \
    .format("kafka") \
    .options(**kafka_options) \
    .load()

# Convert value from bytes to string and parse JSON
df = df.selectExpr("CAST(value AS STRING) as json_string") \
       .select(F.from_json(F.col("json_string"), sensor_schema).alias("data")) \
       .select("data.*")

# Ensure timestamp is in TimestampType
df = df.withColumn("timestamp", F.to_timestamp("timestamp"))

# Print schema for debugging (can be commented out later)
df.printSchema()

root
 |-- sensor_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- temperature: double (nullable = true)
 |-- humidity: double (nullable = true)



# Step 7: Load Alert Conditions

In [11]:
alerts_conditions_df = spark.read.csv(alerts_conditions_file, header=True, inferSchema=True)

# Replace -999 with null
alerts_conditions_df = alerts_conditions_df.replace(-999, None)

# Cast types
alerts_conditions_df = alerts_conditions_df.select(
    F.col("id").cast("int"),
    F.col("humidity_min").cast("double"),
    F.col("humidity_max").cast("double"),
    F.col("temperature_min").cast("double"),
    F.col("temperature_max").cast("double"),
    F.col("code").cast("string"),
    F.col("message").cast("string")
)

# Show alert conditions
alerts_conditions_df.show()

+---+------------+------------+---------------+---------------+----+-------------+
| id|humidity_min|humidity_max|temperature_min|temperature_max|code|      message|
+---+------------+------------+---------------+---------------+----+-------------+
|  1|         0.0|        40.0|           NULL|           NULL| 101| It's too dry|
|  2|        60.0|       100.0|           NULL|           NULL| 102| It's too wet|
|  3|        NULL|        NULL|         -300.0|           30.0| 103|It's too cold|
|  4|        NULL|        NULL|           40.0|          300.0| 104| It's too hot|
+---+------------+------------+---------------+---------------+----+-------------+



# Step 7: Compute Averages Using Sliding Window

In [12]:
windowed_df = df \
    .withWatermark("timestamp", "10 seconds") \
    .groupBy(
        F.window(F.col("timestamp"), "1 minute", "30 seconds")
    ) \
    .agg(
        F.avg("temperature").alias("t_avg"),
        F.avg("humidity").alias("h_avg")
    )

# Flatten the window column
windowed_df = windowed_df.select("window.*", "t_avg", "h_avg")

# Round average values for readability
windowed_df = windowed_df \
    .withColumn("t_avg", F.round("t_avg", 2)) \
    .withColumn("h_avg", F.round("h_avg", 2))


# Step 8: Cross Join with Alert Conditions and Filter Data

In [13]:
joined_df = windowed_df.crossJoin(alerts_conditions_df)

# Define filter conditions
conditions = (
    ( (F.col("temperature_min").isNull() | (F.col("t_avg") >= F.col("temperature_min"))) ) &
    ( (F.col("temperature_max").isNull() | (F.col("t_avg") <= F.col("temperature_max"))) ) &
    ( (F.col("humidity_min").isNull() | (F.col("h_avg") >= F.col("humidity_min"))) ) &
    ( (F.col("humidity_max").isNull() | (F.col("h_avg") <= F.col("humidity_max"))) )
)

# Apply filtering to get alerts
alerts_df = joined_df.filter(conditions)

# Add current timestamp
alerts_df = alerts_df.withColumn("timestamp", F.current_timestamp())

# Select necessary columns and structure data with correct field names
alerts_df = alerts_df.select(
    F.struct(
        F.col("start").cast("string").alias("start"),
        F.col("end").cast("string").alias("end")
    ).alias("window"),
    F.round("t_avg", 2).alias("t_avg"),
    F.round("h_avg", 2).alias("h_avg"),
    "code",
    "message",
    F.col("timestamp").cast("string")
)

# Step 10: Write Alerts to Kafka


In [14]:
# Function to convert DataFrame to JSON string
def to_json_string(df):
    return df.select(F.to_json(F.struct("*")).alias("value"))

kafka_output_options = {
    "kafka.bootstrap.servers": ','.join(kafka_config['bootstrap_servers']),
    "topic": topic_alerts,
    "kafka.security.protocol": kafka_config['security_protocol'],
    "kafka.sasl.mechanism": kafka_config['sasl_mechanism'],
    "kafka.sasl.jaas.config": f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafka_config["username"]}" password="{kafka_config["password"]}";'
}

# Write alerts to Kafka topic
def start_kafka_query():
    kafka_query = to_json_string(alerts_df) \
        .writeStream \
        .outputMode("append") \
        .format("kafka") \
        .options(**kafka_output_options) \
        .option("checkpointLocation", "/tmp/checkpoints") \
        .start()
    kafka_query.awaitTermination()

logger.info("Started streaming query to write alerts to Kafka topic.")

[32m2024-11-23 20:08:07 - KafkaLogger - INFO - Started streaming query to write alerts to Kafka topic.[0m


# Step 9: Log Only Alerts to Console

In [15]:
# Function to log only alerts
def log_only_alerts(batch_df, batch_id):
    print(f"Processing batch {batch_id}")
    if not batch_df.isEmpty():
        print("====== ALERTS ======")
        batch_df.show(truncate=False)
    else:
        print("No alerts in this batch.")


# Start streaming query to process alerts and log to console
def start_alerts_query():
    alerts_query = (
        alerts_df.writeStream.outputMode("append").foreachBatch(log_only_alerts).start()
    )
    alerts_query.awaitTermination()


# Start the streaming queries in separate threads
def start_streaming_queries():
    kafka_thread = threading.Thread(target=start_kafka_query)
    alerts_thread = threading.Thread(target=start_alerts_query)
    kafka_thread.daemon = True
    alerts_thread.daemon = True
    kafka_thread.start()
    alerts_thread.start()
    return kafka_thread, alerts_thread


kafka_thread, alerts_thread = start_streaming_queries()
logger.info("Started streaming queries in separate threads.")

[32m2024-11-23 20:08:07 - KafkaLogger - INFO - Started streaming queries in separate threads.[0m


Exception in thread Thread-12 (start_kafka_query):
Traceback (most recent call last):
  File "/usr/lib/python3.12/threading.py", line 1073, in _bootstrap_inner
    self.run()
  File "/home/nord/.local/lib/python3.12/site-packages/ipykernel/ipkernel.py", line 766, in run_closure
    _threading_Thread_run(self)
  File "/usr/lib/python3.12/threading.py", line 1010, in run
    self._target(*self._args, **self._kwargs)
  File "/tmp/ipykernel_2685103/4221190351.py", line 22, in start_kafka_query
  File "/home/nord/.local/lib/python3.12/site-packages/pyspark/sql/streaming/query.py", line 221, in awaitTermination
    return self._jsq.awaitTermination()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/nord/.local/lib/python3.12/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
                   ^^^^^^^^^^^^^^^^^
  File "/home/nord/.local/lib/python3.12/site-packages/pyspark/errors/exceptions/captured.py", line 185, in deco
    raise converted

In [16]:
def consume_alerts():

    consumer = KafkaConsumer(
        topic_alerts,
        bootstrap_servers=kafka_config["bootstrap_servers"],
        security_protocol=kafka_config["security_protocol"],
        sasl_mechanism=kafka_config["sasl_mechanism"],
        sasl_plain_username=kafka_config["username"],
        sasl_plain_password=kafka_config["password"],
        value_deserializer=lambda m: json.loads(m.decode("utf-8")),
        auto_offset_reset="latest",
        enable_auto_commit=True,
    )

    logger.info("Starting Kafka consumer to read alerts...")
    try:
        for message in consumer:
            logger.info("Received alert:")
            logger.info(json.dumps(message.value, indent=2))
    except KeyboardInterrupt:
        logger.warning("Kafka consumer stopped.")
    finally:
        consumer.close()

# Start the consumer in a separate thread
consumer_thread = threading.Thread(target=consume_alerts)
consumer_thread.daemon = True  # Set thread as daemon
consumer_thread.start()


In [17]:
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    logger.warning("Streaming processing stopped by user.")
    spark.stop()
    logger.info("Spark session stopped.")

[32m2024-11-23 20:08:07 - KafkaLogger - INFO - Starting Kafka consumer to read alerts...[0m
[32m2024-11-23 20:08:19 - KafkaLogger - INFO - Received alert:[0m
[32m2024-11-23 20:08:19 - KafkaLogger - INFO - {
  "window": {
    "start": "2024-11-23 20:05:00",
    "end": "2024-11-23 20:06:00"
  },
  "t_avg": 5.78,
  "h_avg": 45.83,
  "code": "103",
  "message": "It's too cold",
  "timestamp": "2024-11-23 20:08:14.094"
}[0m
[32m2024-11-23 20:08:24 - KafkaLogger - INFO - Received alert:[0m
[32m2024-11-23 20:08:24 - KafkaLogger - INFO - {
  "window": {
    "start": "2024-11-23 20:05:30",
    "end": "2024-11-23 20:06:30"
  },
  "t_avg": 0.73,
  "h_avg": 50.41,
  "code": "103",
  "message": "It's too cold",
  "timestamp": "2024-11-23 20:08:19.773"
}[0m
[32m2024-11-23 20:08:25 - KafkaLogger - INFO - Received alert:[0m
[32m2024-11-23 20:08:25 - KafkaLogger - INFO - {
  "window": {
    "start": "2024-11-23 20:06:00",
    "end": "2024-11-23 20:07:00"
  },
  "t_avg": -3.81,
  "h_avg": 5

24/11/23 20:09:46 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 282, writer: org.apache.spark.sql.kafka010.KafkaStreamingWrite@3b1c4cbb] is aborting.
24/11/23 20:09:46 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 282, writer: org.apache.spark.sql.kafka010.KafkaStreamingWrite@3b1c4cbb] aborted.
24/11/23 20:09:47 ERROR MicroBatchExecution: Query [id = 6cf6b445-953b-4ad7-8cad-134dd5cdd50a, runId = 627292d6-965e-41b1-b7fd-72e1b83ad395] terminated with error
java.util.concurrent.ExecutionException: org.apache.spark.SparkException: Job 92 cancelled because SparkContext was shut down
	at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:205)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:212)
	at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegen

Processing batch 0
No alerts in this batch.
Processing batch 1
No alerts in this batch.
Processing batch 2
No alerts in this batch.
Processing batch 3
No alerts in this batch.
Processing batch 4
No alerts in this batch.
Processing batch 5
No alerts in this batch.
Processing batch 6
+------------------------------------------+-----+-----+----+-------------+-----------------------+
|window                                    |t_avg|h_avg|code|message      |timestamp              |
+------------------------------------------+-----+-----+----+-------------+-----------------------+
|{2024-11-23 20:07:30, 2024-11-23 20:08:30}|12.68|49.24|103 |It's too cold|2024-11-23 20:08:57.321|
+------------------------------------------+-----+-----+----+-------------+-----------------------+

Processing batch 7
No alerts in this batch.
Processing batch 8
No alerts in this batch.
