# Praktikum Minggu 12: Stream Processing & Real-time Analytics
## *Week 12 Lab: Stream Processing & Real-time Analytics*

**Mata Kuliah / Course:** Big Data Analytics  
**Topik / Topic:** Stream Processing, Apache Kafka Simulation, Spark Structured Streaming  

---
### Deskripsi
Pada praktikum ini kita akan mensimulasikan stream processing menggunakan Python, termasuk:
- Generator data stream IoT
- Windowed aggregations
- Real-time anomaly detection
- Simulasi Kafka Producer-Consumer dengan Queue
- Konsep Spark Structured Streaming

In [None]:
# Install dependencies
!pip install pyspark kafka-python --quiet

# Core imports
import time
import random
import queue
import threading
import json
from datetime import datetime, timedelta
from collections import deque

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.animation

print('Libraries loaded successfully!')

## 1. Simulasi Data Stream dengan Generator Python

Kita akan membuat generator yang mensimulasikan sensor IoT yang mengirimkan data setiap 0.1 detik.
Setiap record berisi: timestamp, sensor_id, temperature, humidity.

In [None]:
def iot_sensor_stream(num_sensors=5, delay=0.1):
    """Generator that simulates IoT sensor data stream."""
    sensor_ids = [f'SENSOR_{i:03d}' for i in range(1, num_sensors + 1)]
    base_temps = {s: random.uniform(20, 35) for s in sensor_ids}
    base_humidity = {s: random.uniform(40, 70) for s in sensor_ids}

    while True:
        for sensor_id in sensor_ids:
            # Simulate slight drift and noise
            temperature = base_temps[sensor_id] + random.gauss(0, 1.5)
            humidity = base_humidity[sensor_id] + random.gauss(0, 3)
            # Occasionally inject anomaly
            if random.random() < 0.03:
                temperature += random.choice([-15, 20])  # spike/drop

            record = {
                'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'),
                'sensor_id': sensor_id,
                'temperature': round(temperature, 2),
                'humidity': round(max(0, min(100, humidity)), 2)
            }
            yield record
        time.sleep(delay)

# Collect 100 records from the stream
print('Collecting 100 records from IoT sensor stream...')
stream = iot_sensor_stream(num_sensors=5, delay=0.01)  # fast for demo
records = []
for i, record in enumerate(stream):
    records.append(record)
    if i >= 99:
        break

df_stream = pd.DataFrame(records)
df_stream['timestamp'] = pd.to_datetime(df_stream['timestamp'])
print(f'Collected {len(df_stream)} records')
print(df_stream.head(10))

## 2. Windowed Aggregations pada Stream Data

Windowing membagi stream menjadi segmen waktu untuk komputasi agregasi (rata-rata, max, min).
Di sini kita menggunakan rolling window untuk menghitung statistik bergerak.

In [None]:
# Assign synthetic timestamps spaced 0.1s apart for windowing demo
base_time = datetime(2024, 1, 15, 9, 0, 0)
df_stream['event_time'] = [base_time + timedelta(seconds=i * 0.1) for i in range(len(df_stream))]

# --- Tumbling window (10-second bins) ---
df_stream['window_10s'] = df_stream['event_time'].dt.floor('10s')
tumbling_agg = df_stream.groupby(['window_10s', 'sensor_id']).agg(
    avg_temp=('temperature', 'mean'),
    max_temp=('temperature', 'max'),
    min_temp=('temperature', 'min'),
    count=('temperature', 'count')
).round(2).reset_index()

print('=== Tumbling Window Aggregations (10-second bins) ===')
print(tumbling_agg.head(15))

# --- Rolling window (last 10 records per sensor) ---
df_stream_sorted = df_stream.sort_values(['sensor_id', 'event_time'])
df_stream_sorted['rolling_avg_temp'] = (
    df_stream_sorted.groupby('sensor_id')['temperature']
    .transform(lambda x: x.rolling(window=10, min_periods=1).mean())
).round(2)

print('\n=== Rolling Average Temperature (window=10) ===')
print(df_stream_sorted[['event_time', 'sensor_id', 'temperature', 'rolling_avg_temp']].head(20))

# Visualize rolling average for one sensor
sensor_data = df_stream_sorted[df_stream_sorted['sensor_id'] == 'SENSOR_001']
plt.figure(figsize=(12, 4))
plt.plot(sensor_data['event_time'], sensor_data['temperature'], alpha=0.5, label='Raw Temperature')
plt.plot(sensor_data['event_time'], sensor_data['rolling_avg_temp'], linewidth=2, label='Rolling Avg (10 records)')
plt.xlabel('Event Time')
plt.ylabel('Temperature (°C)')
plt.title('SENSOR_001: Raw vs Rolling Average Temperature')
plt.legend()
plt.tight_layout()
plt.show()

## 3. Deteksi Anomali Real-time

Anomaly detection berbasis threshold sederhana: jika temperature menyimpang lebih dari
3 standar deviasi dari rata-rata rolling, record tersebut dianggap anomali.

In [None]:
def detect_anomalies_streaming(df, sensor_col='sensor_id', value_col='temperature',
                                window=20, threshold_sigma=2.5):
    """Detect anomalies using rolling Z-score (threshold-based)."""
    df = df.sort_values([sensor_col, 'event_time']).copy()

    grp = df.groupby(sensor_col)[value_col]
    df['rolling_mean'] = grp.transform(lambda x: x.rolling(window, min_periods=3).mean())
    df['rolling_std']  = grp.transform(lambda x: x.rolling(window, min_periods=3).std())
    df['z_score']      = ((df[value_col] - df['rolling_mean']) / df['rolling_std'].replace(0, 1)).abs()
    df['is_anomaly']   = df['z_score'] > threshold_sigma
    return df

df_anomaly = detect_anomalies_streaming(df_stream_sorted)

anomalies = df_anomaly[df_anomaly['is_anomaly']]
print(f'Total records: {len(df_anomaly)}')
print(f'Anomalies detected: {len(anomalies)} ({len(anomalies)/len(df_anomaly)*100:.1f}%)')
print('\nAnomaly records:')
print(anomalies[['event_time', 'sensor_id', 'temperature', 'rolling_mean', 'z_score']].to_string())

# Visualize anomalies
fig, ax = plt.subplots(figsize=(14, 5))
for sensor_id, grp in df_anomaly.groupby('sensor_id'):
    ax.plot(grp['event_time'], grp['temperature'], alpha=0.4, linewidth=0.8)

if len(anomalies) > 0:
    ax.scatter(anomalies['event_time'], anomalies['temperature'],
               color='red', zorder=5, s=80, label=f'Anomaly ({len(anomalies)})', marker='X')

ax.set_title('Real-time Anomaly Detection — All Sensors')
ax.set_xlabel('Event Time')
ax.set_ylabel('Temperature (°C)')
ax.legend()
plt.tight_layout()
plt.show()

## 4. Simulasi Kafka Producer-Consumer dengan Queue

Kita mensimulasikan pola Kafka Producer-Consumer menggunakan `queue.Queue` Python.
Producer mempublikasikan pesan ke topic; Consumer membaca dan memproses pesan.

In [None]:
import queue
import threading

# Kafka-like Topic implementation
class KafkaTopic:
    def __init__(self, name, num_partitions=3):
        self.name = name
        self.partitions = [queue.Queue() for _ in range(num_partitions)]
        self.num_partitions = num_partitions

    def publish(self, message, key=None):
        partition_idx = hash(key or '') % self.num_partitions
        self.partitions[partition_idx].put({'key': key, 'value': message,
                                             'timestamp': datetime.now().isoformat()})

    def consume(self, partition_idx=0, timeout=0.05):
        try:
            return self.partitions[partition_idx].get(timeout=timeout)
        except queue.Empty:
            return None

# Producer function
produced_messages = []
consumed_messages = []

def producer(topic, n_messages=30):
    sensor_gen = iot_sensor_stream(num_sensors=3, delay=0)
    for i in range(n_messages):
        record = next(sensor_gen)
        topic.publish(record, key=record['sensor_id'])
        produced_messages.append(record)
    print(f'[Producer] Published {n_messages} messages to topic "{topic.name}"')

# Consumer function
def consumer(topic, consumer_id, partitions):
    count = 0
    for _ in range(100):  # poll iterations
        for p in partitions:
            msg = topic.consume(p)
            if msg:
                consumed_messages.append({'consumer': consumer_id, 'message': msg})
                count += 1
    print(f'[Consumer {consumer_id}] Consumed {count} messages from partitions {partitions}')

# Create topic and simulate pub/sub
sensor_topic = KafkaTopic('sensor-data', num_partitions=3)

t_producer = threading.Thread(target=producer, args=(sensor_topic, 30))
t_consumer1 = threading.Thread(target=consumer, args=(sensor_topic, 'C1', [0, 1]))
t_consumer2 = threading.Thread(target=consumer, args=(sensor_topic, 'C2', [2]))

t_producer.start(); t_consumer1.start(); t_consumer2.start()
t_producer.join(); t_consumer1.join(); t_consumer2.join()

print(f'\nTotal produced: {len(produced_messages)}')
print(f'Total consumed: {len(consumed_messages)}')
c1_count = sum(1 for m in consumed_messages if m['consumer'] == 'C1')
c2_count = sum(1 for m in consumed_messages if m['consumer'] == 'C2')
print(f'Consumer C1 consumed: {c1_count} | Consumer C2 consumed: {c2_count}')
print('\nSample consumed messages:')
for m in consumed_messages[:3]:
    print(f"  {m['consumer']}: {m['message']}")

## 5. Spark Structured Streaming (Simulasi dengan Micro-batch)

Spark Structured Streaming memperlakukan data streaming sebagai tabel yang terus berkembang.
Di sini kita mensimulasikan konsep micro-batch processing menggunakan PySpark.

In [None]:
!pip install pyspark --quiet

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# Initialize Spark
spark = SparkSession.builder \
    .appName('StreamProcessingDemo') \
    .master('local[2]') \
    .config('spark.sql.shuffle.partitions', '4') \
    .getOrCreate()
spark.sparkContext.setLogLevel('ERROR')
print(f'Spark version: {spark.version}')

# Simulate micro-batch: process stream records in batches
BATCH_SIZE = 20
print('\n=== Micro-batch Streaming Simulation ===')
for batch_id in range(3):
    # Each micro-batch = subset of stream records
    batch_records = records[batch_id * BATCH_SIZE:(batch_id + 1) * BATCH_SIZE]
    schema = StructType([
        StructField('timestamp', StringType(), True),
        StructField('sensor_id', StringType(), True),
        StructField('temperature', DoubleType(), True),
        StructField('humidity', DoubleType(), True)
    ])
    batch_df = spark.createDataFrame(batch_records, schema=schema)
    result = batch_df.groupBy('sensor_id').agg(
        F.round(F.avg('temperature'), 2).alias('avg_temp'),
        F.round(F.max('temperature'), 2).alias('max_temp'),
        F.count('*').alias('count')
    ).orderBy('sensor_id')
    print(f'\n--- Batch {batch_id} ({len(batch_records)} records) ---')
    result.show()

# Demonstrate readStream API concept (rate source)
print('=== Structured Streaming API Concept (Rate Source) ===')
print("""
# In a real Spark Structured Streaming job:

df = spark.readStream \\
    .format("kafka") \\
    .option("kafka.bootstrap.servers", "localhost:9092") \\
    .option("subscribe", "sensor-data") \\
    .load()

# Parse JSON and aggregate
result = df \\
    .withWatermark("event_time", "10 minutes") \\
    .groupBy(F.window("event_time", "5 minutes"), "sensor_id") \\
    .agg(F.avg("temperature").alias("avg_temp"))

# Write to console
query = result.writeStream \\
    .outputMode("update") \\
    .format("console") \\
    .trigger(processingTime="30 seconds") \\
    .start()
query.awaitTermination()
""")
spark.stop()

## 6. Analitik Dashboard Real-time (Simulasi)

Mensimulasikan dashboard real-time dengan snapshot metrics dari streaming data.

In [None]:
# Simulate streaming dashboard snapshots
fig, axes = plt.subplots(2, 3, figsize=(15, 8))
fig.suptitle('Real-time Sensor Dashboard (Snapshot)', fontsize=14, fontweight='bold')

colors = plt.cm.tab10(np.linspace(0, 1, 5))

# 1. Temperature time series per sensor
ax = axes[0, 0]
for i, (sensor_id, grp) in enumerate(df_anomaly.groupby('sensor_id')):
    ax.plot(range(len(grp)), grp['temperature'], label=sensor_id, color=colors[i], alpha=0.8)
ax.set_title('Temperature Over Time'); ax.set_xlabel('Record #'); ax.set_ylabel('Temp (°C)')
ax.legend(fontsize=7)

# 2. Average temp per sensor (bar chart)
ax = axes[0, 1]
avg_temps = df_anomaly.groupby('sensor_id')['temperature'].mean()
ax.bar(avg_temps.index, avg_temps.values, color=colors)
ax.set_title('Average Temperature per Sensor'); ax.set_ylabel('Avg Temp (°C)')
ax.tick_params(axis='x', rotation=30)

# 3. Humidity distribution
ax = axes[0, 2]
df_anomaly['humidity'].hist(bins=20, ax=ax, color='steelblue', edgecolor='white')
ax.set_title('Humidity Distribution'); ax.set_xlabel('Humidity (%)')

# 4. Anomaly count per sensor
ax = axes[1, 0]
anomaly_counts = df_anomaly.groupby('sensor_id')['is_anomaly'].sum()
ax.bar(anomaly_counts.index, anomaly_counts.values, color='tomato')
ax.set_title('Anomaly Count per Sensor'); ax.set_ylabel('Count')
ax.tick_params(axis='x', rotation=30)

# 5. Scatter: temperature vs humidity
ax = axes[1, 1]
normal = df_anomaly[~df_anomaly['is_anomaly']]
anom   = df_anomaly[df_anomaly['is_anomaly']]
ax.scatter(normal['temperature'], normal['humidity'], s=15, alpha=0.5, label='Normal')
if len(anom) > 0:
    ax.scatter(anom['temperature'], anom['humidity'], s=60, color='red', marker='X', label='Anomaly')
ax.set_title('Temperature vs Humidity'); ax.set_xlabel('Temp'); ax.set_ylabel('Humidity')
ax.legend(fontsize=8)

# 6. Records per window
ax = axes[1, 2]
window_counts = df_anomaly.groupby('window_10s').size()
ax.bar(range(len(window_counts)), window_counts.values, color='mediumseagreen')
ax.set_title('Records per 10s Window'); ax.set_xlabel('Window #'); ax.set_ylabel('Record Count')

plt.tight_layout()
plt.savefig('streaming_dashboard.png', dpi=100, bbox_inches='tight')
plt.show()
print('Dashboard saved as streaming_dashboard.png')

## Tugas Praktikum

Selesaikan tugas-tugas berikut dan kumpulkan notebook ini:

1. **Tugas 1 — Multi-sensor Windowing**: Modifikasi kode windowed aggregation untuk menggunakan
   *sliding window* (ukuran 15 record, step 5 record) dan hitung persentil ke-95 temperature
   per sensor per window.

2. **Tugas 2 — Adaptive Anomaly Detection**: Implementasikan anomaly detection yang
   menggunakan **IQR (Interquartile Range)** sebagai pengganti Z-score. Bandingkan
   jumlah anomali yang terdeteksi oleh kedua metode.

3. **Tugas 3 — Multi-topic Kafka Simulation**: Perluas simulasi Kafka untuk mensimulasikan
   dua topic berbeda (`sensor-temperature` dan `sensor-humidity`). Tambahkan satu Consumer Group
   yang mengonsumsi dari kedua topic.

4. **Tugas 4 — Real-time Alert System**: Buat fungsi `alert_system(record)` yang memeriksa
   setiap record IoT dan mengirimkan alert (print ke konsol) jika:
   - Temperature > 40°C atau < 0°C
   - Humidity < 20% atau > 90%
   - Rate of change temperature antar record > 10°C dalam satu langkah
   
   Integrasikan dengan generator stream dan tampilkan ringkasan alert setelah 200 records.