**instructions**

# IoT → Buffer (file-based) → Spark Structured Streaming → Storage → Visualization + ML
Run cells in order. This notebook simulates Kafka by writing JSON files to an input folder and uses Spark Structured Streaming to read them, process them, and write results to Parquet. The ML cell trains an anomaly detector on stored results. If the streaming query is running, you can stop it by calling `query.stop()` in the cell that started it.


**Install system dependencies and Python packages**

In [2]:
!apt-get update -qq
!apt-get install -y openjdk-11-jdk-headless -qq
!pip uninstall -y dataproc-spark-connect
!pip install -q pyspark==3.5.0 findspark plotly pandas scikit-learn pyarrow


W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
Found existing installation: dataproc-spark-connect 1.0.1
Uninstalling dataproc-spark-connect-1.0.1:
  Successfully uninstalled dataproc-spark-connect-1.0.1
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m13.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


**Set environment variables and initialize findspark**

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
import findspark
findspark.init()


**Create required directories for simulated stream, output, and checkpoints**

In [4]:
import os
base = "/content/iot_stream_pipeline"
input_dir = os.path.join(base, "input")
output_dir = os.path.join(base, "output_parquet")
checkpoint_dir = os.path.join(base, "checkpoint")
os.makedirs(input_dir, exist_ok=True)
os.makedirs(output_dir, exist_ok=True)
os.makedirs(checkpoint_dir, exist_ok=True)


**Data producer: generate and write N JSON files to simulate IoT streaming**

In [22]:
import json
import time
import random
from threading import Thread

def produce_messages(num_messages=200, interval=0.5):
    for i in range(num_messages):
        msg = {
            "device_id": random.randint(1,5),
            "temperature": round(random.uniform(18.0,40.0),2),
            "humidity": round(random.uniform(20.0,80.0),2),
            "timestamp": int(time.time()*1000)
        }
        filename = f"msg_{int(time.time()*1000)}_{i}.json"
        path = os.path.join(input_dir, filename)
        with open(path, "w") as f:
            f.write(json.dumps(msg))
        time.sleep(interval)

producer_thread = Thread(target=produce_messages, kwargs={"num_messages":2000,"interval":0.1}, daemon=True)
producer_thread.start()


**Create Spark session with required configs**

In [23]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("IoTStreamingPipeline").getOrCreate()


**Define schema, read streaming JSON from input folder, and basic transformations**

In [24]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, LongType
from pyspark.sql.functions import from_unixtime, col, window

schema = StructType([
    StructField("device_id", IntegerType(), True),
    StructField("temperature", DoubleType(), True),
    StructField("humidity", DoubleType(), True),
    StructField("timestamp", LongType(), True)
])

stream_df = spark.readStream.schema(schema).json(input_dir)
stream_df = stream_df.withColumn("event_time", (col("timestamp")/1000).cast("timestamp"))

stream_df = stream_df.withWatermark("event_time", "20 seconds")

agg_df = stream_df.groupBy(
    window(col("event_time"), "10 seconds"),
    col("device_id")
).avg("temperature", "humidity").select(
    col("window.start").alias("window_start"),
    col("window.end").alias("window_end"),
    col("device_id"),
    col("avg(temperature)").alias("avg_temperature"),
    col("avg(humidity)").alias("avg_humidity")
)


**Start streaming query to write aggregated results to Parquet , and capture query in variable**

In [25]:
query = agg_df.writeStream.format("parquet").option("path", output_dir).option("checkpointLocation", checkpoint_dir).outputMode("append").start()

**Let the streaming query run for a fixed time**

In [26]:
time.sleep(40)
query.stop()

**Inspect the Parquet output with Spark and show a sample**

In [27]:
result_df = spark.read.parquet(output_dir)
result_df.orderBy("window_start", "device_id").show(20, False)

+-------------------+-------------------+---------+------------------+------------------+
|window_start       |window_end         |device_id|avg_temperature   |avg_humidity      |
+-------------------+-------------------+---------+------------------+------------------+
|2025-12-12 09:23:00|2025-12-12 09:23:10|1        |27.2475           |53.67875          |
|2025-12-12 09:23:00|2025-12-12 09:23:10|2        |31.50333333333333 |60.47666666666667 |
|2025-12-12 09:23:00|2025-12-12 09:23:10|3        |35.57             |24.74             |
|2025-12-12 09:23:00|2025-12-12 09:23:10|4        |30.085            |68.2              |
|2025-12-12 09:23:00|2025-12-12 09:23:10|5        |26.325000000000003|47.275            |
|2025-12-12 09:23:10|2025-12-12 09:23:20|1        |32.62444444444444 |53.797777777777775|
|2025-12-12 09:23:10|2025-12-12 09:23:20|2        |29.356666666666666|39.135000000000005|
|2025-12-12 09:23:10|2025-12-12 09:23:20|3        |27.72666666666667 |44.41166666666667 |
|2025-12-1

**Subplots Per device**

In [28]:
import pandas as pd
pdf = result_df.toPandas()

if pdf.shape[0] == 0:
    print("No processed records found yet. Re-run producer or allow streaming to run longer.")
else:
    pdf["window_start"] = pd.to_datetime(pdf["window_start"])
    import plotly.express as px
    fig = px.line(
        pdf,
        x="window_start",
        y="avg_temperature",
        facet_row="device_id",
        title="Avg Temperature per Device (10s window)",
        height=900
    )
    fig.show()


**Boxplot per Device**

In [29]:
if pdf.shape[0] > 0:
    import plotly.express as px
    fig = px.box(
        pdf,
        x="device_id",
        y="avg_temperature",
        title="Temperature Distribution per Device (10s Windows)"
    )
    fig.show()
else:
    print("No data available for boxplot.")


**Save processed Parquet as a single consolidated CSV for ML and report artifacts**

In [30]:
artifacts_dir = os.path.join(base, "artifacts")
os.makedirs(artifacts_dir, exist_ok=True)
consolidated_csv = os.path.join(artifacts_dir, "aggregated_results.csv")
pdf.to_csv(consolidated_csv, index=False)
print(consolidated_csv)

/content/iot_stream_pipeline/artifacts/aggregated_results.csv


**ML: train an IsolationForest anomaly detector on avg_temperature and avg_humidity**

In [32]:
from sklearn.ensemble import IsolationForest
import numpy as np

X = pdf[["avg_temperature", "avg_humidity"]].values
if X.shape[0] < 10:
    print("Not enough rows for reliable ML; produce more data and re-run this cell.")
else:
    iso = IsolationForest(contamination=0.10, random_state=42)
    iso.fit(X)
    preds = iso.predict(X)
    pdf["anomaly"] = np.where(preds == -1, 1, 0)
    anomalies = pdf[pdf["anomaly"] == 1]
    print("Anomalies detected:", anomalies.shape[0])
    display(anomalies.head(10))

Anomalies detected: 5


Unnamed: 0,window_start,window_end,device_id,avg_temperature,avg_humidity,anomaly
7,2025-12-12 09:23:00,2025-12-12 09:23:10,3,35.57,24.74,1
16,2025-12-12 09:23:20,2025-12-12 09:23:30,2,21.338333,45.385,1
24,2025-12-12 09:23:00,2025-12-12 09:23:10,4,30.085,68.2,1
25,2025-12-12 09:23:50,2025-12-12 09:24:00,2,36.6775,51.2575,1
28,2025-12-12 09:24:20,2025-12-12 09:24:30,3,31.398,33.45,1


**Subplots Showing Anomalies**

In [33]:
if "anomaly" in pdf.columns:
    import plotly.express as px
    df_plot = pdf.copy()
    df_plot["anomaly_label"] = df_plot["anomaly"].map({0: "Normal", 1: "Anomaly"})
    fig = px.scatter(
        df_plot,
        x="window_start",
        y="avg_temperature",
        color="anomaly_label",
        facet_row="device_id",
        title="Avg Temperature with Anomalies Highlighted per Device",
        height=900
    )
    fig.show()
else:
    print("No anomaly data found. Run the ML cell first.")


**Plot Only Anomalies**

In [34]:
if "anomaly" in pdf.columns:
    anomalies = pdf[pdf["anomaly"] == 1]
    if anomalies.shape[0] > 0:
        import plotly.express as px
        fig = px.scatter(
            anomalies,
            x="window_start",
            y="avg_temperature",
            color="device_id",
            title="Detected Anomalies Only — All Devices",
            size=[10]*len(anomalies)
        )
        fig.show()
    else:
        print("ML model found no anomalies.")
else:
    print("Run the ML cell to generate anomaly labels first.")


**Saving everything into a zip file**

In [35]:
import shutil
zip_path = "/content/iot_pipeline_artifacts.zip"
shutil.make_archive("/content/iot_pipeline_artifacts", "zip", base)
print(zip_path)


/content/iot_pipeline_artifacts.zip
