In [1]:
!pip install pandas scikit-learn joblib pyspark==3.3.0



In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, expr, when, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from typing import Iterator
import joblib
import pandas as pd
import logging

In [3]:
# Konfigurasi logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
log = logging.getLogger(__name__)

In [4]:
# Membuat spark session
spark = SparkSession.builder \
    .appName("RealtimeAnomalyDetection") \
    .master("spark://spark-master:7077") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0") \
    .getOrCreate()

log.info("SparkSession berhasil dibuat.")

:: loading settings :: url = jar:file:/opt/conda/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-fab1f4d6-030e-43ae-9232-631eab6efc5c;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.0 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.2 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: reso

25/07/14 14:14:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2025-07-14 14:14:33,850 - INFO - SparkSession berhasil dibuat.


In [5]:
# SEL 4: Definisikan Fungsi Inferensi untuk .mapInPandas
# Fungsi ini akan diterapkan pada setiap partisi data.

# Definisikan urutan fitur yang sama persis seperti saat pelatihan
feature_columns = ['vibration', 'acoustic', 'temperature', 'current', 'imf_1', 'imf_2', 'imf_3']
model_path = '/home/jovyan/work/anomaly_detection_model.pkl'

def predict_anomalies(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    """
    Fungsi ini memuat model sekali per partisi dan melakukan prediksi pada
    seluruh potongan DataFrame, menghindari error serialisasi.
    """
    # Muat model HANYA SEKALI per partisi/pekerja
    model = joblib.load(model_path)
    
    # Loop melalui setiap potongan DataFrame dalam partisi
    for pdf in iterator:
        # Pilih kolom fitur dari DataFrame input
        X = pdf[feature_columns]
        # Buat kolom prediksi baru di dalam DataFrame Pandas
        pdf['anomaly_prediction'] = model.predict(X)
        # Kembalikan DataFrame yang sudah diperkaya dengan prediksi
        yield pdf

log.info("Fungsi inferensi untuk .mapInPandas telah dibuat.")

2025-07-14 14:14:36,355 - INFO - Fungsi inferensi untuk .mapInPandas telah dibuat.


In [6]:
# Mendefinisikan Skema
raw_schema = StructType([
    StructField("timestamp", StringType(), True),
    StructField("machine_id", StringType(), True),
    StructField("vibration", DoubleType(), True),
    StructField("acoustic", DoubleType(), True),
    StructField("temperature", DoubleType(), True),
    StructField("current", DoubleType(), True),
    StructField("IMF_1", DoubleType(), True),
    StructField("IMF_2", DoubleType(), True),
    StructField("IMF_3", DoubleType(), True),
    StructField("label", IntegerType(), True)
])

In [7]:
# Membaca dari topik Kafka
raw_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "raw_sensor_data") \
    .option("startingOffsets", "earliest") \
    .load()

log.info("Berhasil terhubung ke aliran data Kafka.")

2025-07-14 14:14:51,630 - INFO - Berhasil terhubung ke aliran data Kafka.


In [8]:
raw_stream_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [9]:
# SEL 6: Pipeline Transformasi dan Inferensi Real-time
try:
    # 1. Parsing data JSON dari Kafka
    parsed_df = raw_stream_df \
        .select(from_json(col("value").cast("string"), raw_schema).alias("data")) \
        .select("data.*")

    # 2. Transformasi dasar
    transformed_df = parsed_df \
        .withColumn("event_timestamp", to_timestamp(col("timestamp"))) \
        .withColumn("machine_id", col("machine_id").cast(IntegerType()))

    # --- PERBAIKAN: Terapkan fungsi inferensi menggunakan .mapInPandas ---
    # Definisikan skema output setelah fungsi diterapkan
    output_schema = transformed_df.schema.add("anomaly_prediction", "integer")
    
    # Terapkan fungsi ke setiap partisi
    result_with_predictions_df = transformed_df.mapInPandas(predict_anomalies, schema=output_schema)

    # 3. Membuat kolom status akhir berdasarkan hasil deteksi anomali
    final_df = result_with_predictions_df \
        .withColumn("status",
            when(col("anomaly_prediction") == -1, "Anomaly Detected")
            .otherwise("Normal")
        )

    log.info("Pipeline transformasi dan inferensi telah dibangun.")
    final_df.printSchema()
except Exception as e:
    log.error("Gagal membangun pipeline Spark.", exc_info=True)
    final_df = None

2025-07-14 14:14:58,177 - ERROR - Gagal membangun pipeline Spark.
Traceback (most recent call last):
  File "/tmp/ipykernel_8300/906051681.py", line 18, in <module>
    result_with_predictions_df = transformed_df.mapInPandas(predict_anomalies, schema=output_schema)
                                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/pyspark/sql/pandas/map_ops.py", line 91, in mapInPandas
    udf_column = udf(*[self[col] for col in self.columns])
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/pyspark/sql/pandas/map_ops.py", line 91, in <listcomp>
    udf_column = udf(*[self[col] for col in self.columns])
                       ~~~~^^^^^
  File "/opt/conda/lib/python3.11/site-packages/pyspark/sql/dataframe.py", line 1965, in __getitem__
    jc = self._jdf.apply(item)
         ^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/py4j/j

In [None]:
# SEL 7: Menulis Hasil ke Konsol (Untuk Debugging)
if final_df:
    console_query = final_df \
        .writeStream \
        .outputMode("append") \
        .format("console") \
        .option("truncate", "false") \
        .start()
    
    log.info("Query untuk menampilkan hasil ke konsol telah dimulai.")

In [None]:
# SEL 8: Menulis Hasil ke Topik Kafka Baru (Untuk Downstream)
if final_df:
    kafka_output_df = final_df.select(expr("to_json(struct(*)) AS value"))
    
    kafka_query = kafka_output_df \
        .writeStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:9092") \
        .option("topic", "clean_data_with_anomaly") \
        .option("checkpointLocation", "/tmp/spark_checkpoints/anomaly_writer") \
        .start()
        
    log.info("Query untuk menulis hasil ke Kafka telah dimulai.")

In [1]:
# SEL 1: Instalasi Library yang Diperlukan
!pip install pandas scikit-learn joblib pyspark==3.3.0



In [2]:
# SEL 2: Impor Library
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, expr, when, to_timestamp, struct, pandas_udf
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
import joblib
import pandas as pd
import logging
from typing import Iterator

In [3]:
# Konfigurasi logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
log = logging.getLogger(__name__)

In [4]:
# SEL 3: Membuat SparkSession
spark = SparkSession.builder \
    .appName("RealtimeAnomalyDetection") \
    .master("spark://spark-master:7077") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0") \
    .getOrCreate()

log.info("SparkSession berhasil dibuat.")

:: loading settings :: url = jar:file:/opt/conda/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-10aa1847-2852-4316-aaca-74b87a5b0542;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.0 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.2 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: reso

25/07/14 14:26:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2025-07-14 14:27:00,474 - INFO - SparkSession berhasil dibuat.


In [8]:
# SEL 4: Definisikan Fungsi Prediksi Menggunakan PANDAS UDF (Implementasi Andal)

# Definisikan urutan fitur dan path model
feature_columns = ['vibration', 'acoustic', 'temperature', 'current', 'imf_1', 'imf_2', 'imf_3']
model_path = '/home/jovyan/work/anomaly_detection_model.pkl'

# Gunakan decorator @pandas_udf untuk mendefinisikan fungsi
@pandas_udf(IntegerType())
def detect_anomaly_udf(*cols: pd.Series) -> pd.Series:
    """
    Fungsi ini menerima beberapa kolom Pandas, memuat model, 
    dan mengembalikan satu kolom (Series) hasil prediksi.
    Ini adalah cara yang paling andal untuk inferensi model scikit-learn di Spark.
    """
    # --- PERBAIKAN UTAMA: Muat model di dalam UDF ---
    # Ini memastikan setiap worker Spark memiliki salinan modelnya sendiri
    # dan menghindari semua masalah serialisasi (PicklingError).
    model = joblib.load(model_path)
    
    # Menggabungkan semua kolom input menjadi satu DataFrame Pandas
    X = pd.concat(cols, axis=1)
    X.columns = feature_columns # Beri nama kolom yang benar
    
    # Lakukan prediksi
    predictions = model.predict(X)
    
    # Kembalikan hasilnya sebagai sebuah Series Pandas
    return pd.Series(predictions)

log.info("Pandas UDF untuk deteksi anomali berhasil dibuat.")

2025-07-14 14:29:38,214 - INFO - Pandas UDF untuk deteksi anomali berhasil dibuat.


In [9]:
# SEL 5: Definisikan Skema dan Baca Aliran Data dari Kafka
raw_schema = StructType([
    StructField("timestamp", StringType(), True),
    StructField("machine_id", StringType(), True),
    StructField("vibration", DoubleType(), True),
    StructField("acoustic", DoubleType(), True),
    StructField("temperature", DoubleType(), True),
    StructField("current", DoubleType(), True),
    StructField("imf_1", DoubleType(), True),
    StructField("imf_2", DoubleType(), True),
    StructField("imf_3", DoubleType(), True),
    StructField("label", IntegerType(), True)
])

raw_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "raw_sensor_data_v2") \
    .option("startingOffsets", "earliest") \
    .load()

log.info("Berhasil terhubung ke aliran data Kafka.")

2025-07-14 14:29:53,718 - INFO - Berhasil terhubung ke aliran data Kafka.


In [11]:
# SEL 6: Pipeline Transformasi dan Inferensi Real-time
try:
    # 1. Parsing data JSON dari Kafka
    parsed_df = raw_stream_df \
        .select(from_json(col("value").cast("string"), raw_schema).alias("data")) \
        .select("data.*")

    # 2. Transformasi dasar dan inferensi model
    transformed_df = parsed_df \
        .withColumn("event_timestamp", to_timestamp(col("timestamp"))) \
        .withColumn("machine_id", col("machine_id").cast(IntegerType())) \
        .withColumn("anomaly_prediction", detect_anomaly_udf(*[col(c) for c in feature_columns]))

    # 3. Membuat kolom status akhir berdasarkan hasil deteksi anomali
    final_df = transformed_df \
        .withColumn("status",
            when(col("anomaly_prediction") == -1, "Anomaly Detected")
            .otherwise("Normal")
        )

    log.info("Pipeline transformasi dan inferensi telah dibangun.")
    final_df.printSchema()
except Exception as e:
    log.error("Gagal membangun pipeline Spark.", exc_info=True)
    final_df = None

Traceback (most recent call last):
  File "/opt/conda/lib/python3.11/site-packages/pyspark/serializers.py", line 458, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/opt/conda/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 602, in dump
    return Pickler.dump(self, obj)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 692, in reducer_override
    return self._function_reduce(obj)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 565, in _function_reduce
    return self._dynamic_function_reduce(obj)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/pyspark/clou

In [None]:
# SEL 7: Menulis Hasil ke Konsol (Untuk Debugging)
if final_df:
    console_query = final_df \
        .writeStream \
        .outputMode("append") \
        .format("console") \
        .option("truncate", "false") \
        .start()
    
    log.info("Query untuk menampilkan hasil ke konsol telah dimulai.")

In [None]:
# SEL 8: Menulis Hasil ke Topik Kafka Baru (Untuk Downstream)
if final_df:
    kafka_output_df = final_df.select(expr("to_json(struct(*)) AS value"))
    
    kafka_query = kafka_output_df \
        .writeStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:9092") \
        .option("topic", "clean_data_with_anomaly") \
        .option("checkpointLocation", "/tmp/spark_checkpoints/anomaly_writer") \
        .start()
        
    log.info("Query untuk menulis hasil ke Kafka telah dimulai.")