# Kafka Consumer

| Nama                    | NRP        |
| ----------------------- | ---------- |
| Dwiyasa Nakula   | 5027221001 |
| Muhammad Afif | 5027221032 |

In [1]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, concat, lit
from pyspark.sql.types import StructType, StringType, FloatType

In [2]:
# Step 1: Configure Spark session for local execution
spark = SparkSession.builder \
    .appName("SensorDataProcessor") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0") \
    .config("spark.master", "local[*]") \
    .config("spark.driver.host", "localhost") \
    .config("spark.driver.bindAddress", "localhost") \
    .getOrCreate()


In [3]:
# Disable verbose logging
spark.sparkContext.setLogLevel("ERROR")

In [4]:
# Step 2: Create Kafka stream
sensor_data = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "157.245.61.228:9092") \
    .option("subscribe", "iot-suhu") \
    .load()

In [5]:
# Step 3: Define schema for JSON data
schema = StructType() \
    .add("sensor_id", StringType()) \
    .add("suhu", FloatType())

In [6]:
batch_data = spark \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "157.245.61.228:9092") \
    .option("subscribe", "iot-suhu") \
    .load() \
    .selectExpr("CAST(value AS STRING) as json") \
    .select(from_json(col("json"), schema).alias("data")) \
    .select("data.sensor_id", "data.suhu")

# Show a small sample
batch_data.show(10)

+---------+-----+
|sensor_id| suhu|
+---------+-----+
|       S3|53.34|
|       S3|97.43|
|       S2|64.49|
|       S3|74.17|
|       S1|89.61|
|       S3|94.21|
|       S2|90.11|
|       S2|62.46|
|       S1|64.48|
|       S2|54.66|
+---------+-----+
only showing top 10 rows



## Tampilkan Data perolehan yang difilter

In [7]:
# Step 4: Filter batch data
alert_df = batch_data.filter(batch_data.suhu > 80)
alert_df = alert_df.withColumn("suhu", concat(col("suhu").cast("string"), lit("°C")))

# Show the alerts
alert_df.show(10)

+---------+-------+
|sensor_id|   suhu|
+---------+-------+
|       S3|97.43°C|
|       S1|89.61°C|
|       S3|94.21°C|
|       S2|90.11°C|
|       S1|93.02°C|
|       S1|80.06°C|
|       S1|95.39°C|
|       S1|90.36°C|
|       S3|86.37°C|
|       S2|99.39°C|
+---------+-------+
only showing top 10 rows



## Cara Alternatif

Gunakan kode berikut pada Step 4 apabila menggunakan pyspark local. Google Colab dan Kaggle tidak mendukung tampilan data streaming

In [8]:
# # Step 4: Convert Kafka value to structured data
# sensor_df = sensor_data \
#     .selectExpr("CAST(value AS STRING) as json") \
#     .select(from_json(col("json"), schema).alias("data")) \
#     .select("data.sensor_id", "data.suhu")

# # Filter for temperatures above 80°C and add "°C" suffix
# alert_df = sensor_df.filter(sensor_df.suhu > 80)
# alert_df = alert_df.withColumn("suhu", concat(col("suhu").cast("string"), lit("°C")))

In [9]:
# import time

In [11]:
# # Step 5: Display alerts in console every second
# query = alert_df \
#     .writeStream \
#     .outputMode("append") \
#     .format("console") \
#     .option("truncate", "false") \
#     .trigger(processingTime="1 second") \
#     .start()

# # Gunakan await termination apabila ingin terus dijalankan, hentikan dengan mematikan sel

# query.awaitTermination()

# # Gunakan time apabila hanya ingin dijalankan selama waktu tertentu

# time.sleep(30)

# # Stop the query after the sleep period
# query.stop()