<a href="https://colab.research.google.com/github/LathaAlagar/latha2809/blob/main/23BIT050_Real_Time_Data_Streaming_Challenge_py.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import pyspark
print(pyspark.__version__)


3.4.1


In [3]:
!pip install -q scikit-learn joblib pandas


In [4]:
import random
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from joblib import dump

def generate_row():
    temp = random.uniform(20, 45)
    hum = random.uniform(20, 90)
    vib = random.uniform(0, 6)
    # simple synthetic anomaly rule (for demo)
    label = 1 if (vib > 4.0 and temp > 35) else 0
    return [temp, hum, vib, label]

# Create dataset
rows = [generate_row() for _ in range(5000)]
df = pd.DataFrame(rows, columns=["temperature","humidity","vibration","label"])

X = df[["temperature","humidity","vibration"]]
y = df["label"]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

clf = RandomForestClassifier(n_estimators=30, random_state=42)
clf.fit(X_train, y_train)

print("Train score:", clf.score(X_train, y_train))
print("Test score:", clf.score(X_test, y_test))

# Save model
MODEL_PATH = "/content/sensor_rf_model.joblib"
dump(clf, MODEL_PATH)
print("Saved model to:", MODEL_PATH)


Train score: 1.0
Test score: 1.0
Saved model to: /content/sensor_rf_model.joblib


In [5]:
# Cell 3: Create directories and a base simulated stream file
import os
import pandas as pd
import random
from datetime import datetime

input_dir = "/content/stream_input"
output_dir = "/content/stream_output"
checkpoint_dir = "/content/chkpt_stream"

os.makedirs(input_dir, exist_ok=True)
os.makedirs(output_dir, exist_ok=True)
os.makedirs(checkpoint_dir, exist_ok=True)

def generate_sensor_record():
    return {
        "sensor_id": random.choice(["s1","s2","s3"]),
        "temperature": round(random.uniform(20, 45), 2),
        "humidity": round(random.uniform(20, 90), 2),
        "vibration": round(random.uniform(0, 6), 2),
        "timestamp": pd.Timestamp.now()
    }

# initial microbatch file (will be picked up by Spark as first microbatch)
initial_batch = pd.DataFrame([generate_sensor_record() for _ in range(20)])
initial_path = os.path.join(input_dir, "initial_batch.csv")
initial_batch.to_csv(initial_path, index=False)
print("Wrote initial microbatch:", initial_path)
initial_batch.head()


Wrote initial microbatch: /content/stream_input/initial_batch.csv


Unnamed: 0,sensor_id,temperature,humidity,vibration,timestamp
0,s3,27.97,32.65,3.54,2025-10-16 10:34:42.774885
1,s2,37.2,70.85,1.45,2025-10-16 10:34:42.774943
2,s1,39.96,32.58,0.56,2025-10-16 10:34:42.774956
3,s3,31.95,69.43,4.3,2025-10-16 10:34:42.774965
4,s3,25.8,71.19,2.15,2025-10-16 10:34:42.774974


In [6]:
# Cell 4: Start Spark Structured Streaming to read CSVs in input_dir and apply model predictions
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql.functions import col, to_timestamp
from joblib import load
import pandas as pd
import os

# Create Spark session
spark = SparkSession.builder \
    .appName("ColabStreamingML") \
    .master("local[*]") \
    .getOrCreate()
spark.sparkContext.setLogLevel("WARN")

# Input schema
schema = StructType([
    StructField("sensor_id", StringType(), True),
    StructField("temperature", DoubleType(), True),
    StructField("humidity", DoubleType(), True),
    StructField("vibration", DoubleType(), True),
    StructField("timestamp", TimestampType(), True)
])

MODEL_PATH = "/content/sensor_rf_model.joblib"

# Define foreachBatch processing function
def process_batch(batch_df, batch_id):
    """
    This function runs for each microbatch.
    We load the joblib model inside the function (safe), convert to pandas,
    apply model.predict, and print + save the result.
    """
    # Important: don't crash if batch is empty
    if batch_df.count() == 0:
        print(f"[batch {batch_id}] empty")
        return

    # Convert to pandas
    pdf = batch_df.toPandas()
    # Ensure features exist and fill NA if any
    features = pdf[["temperature","humidity","vibration"]].fillna(0.0)
    # Load model inside the batch (safe for pickling/closure)
    model = load(MODEL_PATH)
    preds = model.predict(features)
    pdf["prediction"] = preds
    # Add human-readable timestamp for output
    pdf["processed_at"] = pd.Timestamp.now()
    # Print a small sample
    print(f"\n=== Microbatch {batch_id} (rows={len(pdf)}) ===")
    print(pdf.head(10).to_string(index=False))
    # Append to CSV output directory for record
    out_path = os.path.join("/content/stream_output", f"batch_{batch_id}.csv")
    pdf.to_csv(out_path, index=False)
    print(f"Wrote batch_{batch_id}.csv with {len(pdf)} rows")

# Build streaming DataFrame
streaming_df = (spark.readStream
                .schema(schema)
                .option("maxFilesPerTrigger", 1)   # process one file per microbatch
                .csv(input_dir))

# convert timestamp column to TimestampType if read as string (we set schema so should be timestamp)
# streaming_df = streaming_df.withColumn("timestamp", to_timestamp(col("timestamp")))

# Start the query with foreachBatch
query = (streaming_df.writeStream
         .outputMode("append")
         .foreachBatch(process_batch)
         .option("checkpointLocation", checkpoint_dir)
         .start())

print("⏳ Spark streaming started. Waiting for microbatches... (CTRL-C in Colab to interrupt)")


⏳ Spark streaming started. Waiting for microbatches... (CTRL-C in Colab to interrupt)


In [7]:
# Cell 5: Simulate new microbatches arriving (write 5 new CSV files, 2-second gaps)
import time
import pandas as pd
import random
from datetime import datetime
from IPython.display import clear_output

def simulate_and_write(batch_idx, nrows=12):
    rows = []
    for _ in range(nrows):
        rows.append({
            "sensor_id": random.choice(["s1","s2","s3"]),
            "temperature": round(random.uniform(20, 45), 2),
            "humidity": round(random.uniform(20, 90), 2),
            "vibration": round(random.uniform(0, 6), 2),
            "timestamp": pd.Timestamp.now()
        })
    df_new = pd.DataFrame(rows)
    path = os.path.join(input_dir, f"batch_{batch_idx}.csv")
    df_new.to_csv(path, index=False)
    clear_output(wait=True)
    print(f"📤 New microbatch written: {path}  (rows={len(df_new)})")

# Simulate 5 microbatches
for i in range(5):
    simulate_and_write(i+1, nrows=12)
    time.sleep(2)   # wait so streaming job processes each file as a microbatch

print("✅ Simulation finished: 5 microbatches generated.")


📤 New microbatch written: /content/stream_input/batch_5.csv  (rows=12)
✅ Simulation finished: 5 microbatches generated.


In [8]:
# Cell 6: Stop the streaming query and Spark session
# NOTE: run this after you have processed the batches you want (or if you want to stop)
query.stop()
spark.stop()
print("✅ Streaming stopped and Spark session terminated.")


✅ Streaming stopped and Spark session terminated.


In [9]:
# Cell 7: List the processed batch files and show a sample
import glob
import pandas as pd
files = sorted(glob.glob("/content/stream_output/*.csv"))
print("Processed batch files:", files)
if files:
    sample = pd.read_csv(files[-1])
    print("\nSample rows from last batch file:")
    display(sample.head())


Processed batch files: []
