In [None]:
kafka-topics.sh --create --topic sensor-suhu --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

In [None]:
pip install kafka-python
pip install pyspark

# Simulasi Data Kafka

In [None]:
from kafka import KafkaProducer
import json
import time
import random

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

sensor_ids = ['S1', 'S2', 'S3']

try:
    while True:
        for sensor_id in sensor_ids:
            suhu = random.randint(60, 100)
            data = {
                "sensor_id": sensor_id,
                "suhu": suhu
            }
            producer.send('sensor-suhu', value=data)
            print(f"Data terkirim: {data}")
        
        time.sleep(1)
except KeyboardInterrupt:
    print("Producer berhenti.")
finally:
    producer.close()


# Olah Data Pyspark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Inisialisasi Spark session
spark = SparkSession.builder \
    .appName("SensorSuhuConsumer") \
    .master("local[*]") \
    .getOrCreate()

# Definisikan skema data yang diterima
schema = StructType([
    StructField("sensor_id", StringType(), True),
    StructField("suhu", IntegerType(), True)
])

# Membaca data dari Kafka
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "sensor-suhu") \
    .load()

# Ubah data dari format biner ke JSON
df = df.selectExpr("CAST(value AS STRING) as json")
df = df.select(from_json(col("json"), schema).alias("data")).select("data.*")

# Filter data dengan suhu di atas 80°C
filtered_df = df.filter(col("suhu") > 80)

# Cetak data suhu di atas 80°C ke console sebagai tanda peringatan
query = filtered_df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()
