<a href="https://colab.research.google.com/github/P-Brundha/info/blob/main/23BIT012_Realtimestreaming.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ============================================================
# Install dependencies (run once in Colab)
# ============================================================
!pip install -q pyspark scikit-learn joblib pandas

# ============================================================
# Imports
# ============================================================
import os, random, time, glob
import pandas as pd
from datetime import datetime
from joblib import dump, load
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# ============================================================
# 1) Train model and print Train/Test scores (so output matches)
# ============================================================
def generate_row():
    temp = random.uniform(20, 45)
    hum = random.uniform(20, 90)
    vib = random.uniform(0, 6)
    label = 1 if (vib > 4.0 and temp > 35) else 0
    return [temp, hum, vib, label]

rows = [generate_row() for _ in range(5000)]
df = pd.DataFrame(rows, columns=["temperature", "humidity", "vibration", "label"])

X = df[["temperature", "humidity", "vibration"]]
y = df["label"]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
clf = RandomForestClassifier(n_estimators=30, random_state=42)
clf.fit(X_train, y_train)

train_score = clf.score(X_train, y_train)
test_score = clf.score(X_test, y_test)

MODEL_PATH = "/content/sensor_rf_model.joblib"
dump(clf, MODEL_PATH)

print("✅ Model training completed.")
print(f"Train score: {train_score:.4f}")
print(f"Test score:  {test_score:.4f}")
print("Saved model to:", MODEL_PATH)

# ============================================================
# 2) Prepare streaming directories and an initial microbatch
# ============================================================
input_dir  = "/content/input_stream"
output_dir = "/content/output_stream"
chkpt_dir  = "/content/stream_checkpoints"

for d in (input_dir, output_dir, chkpt_dir):
    os.makedirs(d, exist_ok=True)

def gen_sensor_record():
    return {
        "sensor_id": random.choice(["s1","s2","s3"]),
        "temperature": round(random.uniform(20,45),2),
        "humidity": round(random.uniform(20,90),2),
        "vibration": round(random.uniform(0,6),2),
        "timestamp": pd.Timestamp.now()
    }

# initial microbatch file so Spark has something to pick up first
initial_batch = pd.DataFrame([gen_sensor_record() for _ in range(20)])
initial_path = os.path.join(input_dir, "initial_batch.csv")
initial_batch.to_csv(initial_path, index=False)
print("📥 Wrote initial microbatch:", initial_path)

# ============================================================
# 3) Start Spark Structured Streaming
# ============================================================
spark = SparkSession.builder.appName("StreamingSensorML").master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

schema = StructType([
    StructField("sensor_id", StringType(), True),
    StructField("temperature", DoubleType(), True),
    StructField("humidity", DoubleType(), True),
    StructField("vibration", DoubleType(), True),
    StructField("timestamp", TimestampType(), True)
])

# safe microbatch handler
def handle_microbatch(batch_df, batch_id):
    # Avoid heavy collect when empty; using rdd.isEmpty() is efficient for small batches
    try:
        if batch_df.rdd.isEmpty():
            print(f"[microbatch {batch_id}] empty, skipping.")
            return
    except Exception as e:
        # fallback to count() if rdd.isEmpty() not available
        if batch_df.count() == 0:
            print(f"[microbatch {batch_id}] empty (count==0), skipping.")
            return

    # Convert to pandas (small microbatches expected)
    pdf = batch_df.toPandas().fillna(0)
    model = load(MODEL_PATH)
    preds = model.predict(pdf[["temperature", "humidity", "vibration"]])
    pdf["prediction"] = preds
    pdf["processed_at"] = pd.Timestamp.now()

    # Print a compact sample like your friend's output
    print(f"\n=== Microbatch {batch_id} (rows={len(pdf)}) ===")
    print(pdf.head(10).to_string(index=False))

    # Save per-microbatch output
    out_path = os.path.join(output_dir, f"batch_{batch_id}.csv")
    pdf.to_csv(out_path, index=False)
    print(f"Wrote {os.path.basename(out_path)} with {len(pdf)} rows")

# Build streaming DataFrame and query
stream_df = (spark.readStream
             .schema(schema)
             .option("maxFilesPerTrigger", 1)
             .csv(input_dir))

query = (stream_df.writeStream
         .outputMode("append")
         .foreachBatch(handle_microbatch)
         .option("checkpointLocation", chkpt_dir)
         .start())

print("⏳ Spark streaming started. Waiting for microbatches...")

# ============================================================
# 4) Simulate microbatches (generate 5 files) — prints similar lines
# ============================================================
from IPython.display import clear_output

def simulate_and_write(batch_idx, nrows=10):
    rows = [gen_sensor_record() for _ in range(nrows)]
    df_new = pd.DataFrame(rows)
    path = os.path.join(input_dir, f"batch_{batch_idx}.csv")
    df_new.to_csv(path, index=False)
    clear_output(wait=True)
    print(f"📥 Wrote new batch file: {path}  ({len(df_new)} rows)")

NUM_BATCHES = 5
for i in range(1, NUM_BATCHES+1):
    simulate_and_write(i, nrows=10)
    # small sleep so Spark processes one file per microbatch
    time.sleep(2)

print(f"✅ {NUM_BATCHES} microbatches generated and processed.")

# ============================================================
# 5) Wait for streaming to process files (polling) then stop gracefully
#    We wait until we see at least NUM_BATCHES output files or timeout.
# ============================================================
def wait_for_outputs(expected, timeout_sec=30):
    start = time.time()
    while time.time() - start < timeout_sec:
        files = glob.glob(os.path.join(output_dir, "batch_*.csv"))
        if len(files) >= expected:
            return sorted(files)
        time.sleep(1)
    return sorted(files)

# Wait up to 30s for expected number of processed batch files
processed_files = wait_for_outputs(NUM_BATCHES, timeout_sec=30)

# Give a tiny extra delay to ensure no mid-processing stop
time.sleep(2)

# Stop query & Spark gracefully
if query.isActive:
    query.stop()
spark.stop()
print("🛑 Streaming stopped and Spark session closed.")

# ============================================================
# 6) Show processed batch files & sample (matching your friend's final prints)
# ============================================================
processed_files = sorted(glob.glob(os.path.join(output_dir, "batch_*.csv")))
print("📂 Processed batch files:", processed_files)
if processed_files:
    sample = pd.read_csv(processed_files[-1])
    print("\nSample rows from last batch file:")
    display(sample.head())


📥 Wrote new batch file: /content/input_stream/batch_5.csv  (10 rows)
✅ 5 microbatches generated and processed.
🛑 Streaming stopped and Spark session closed.
📂 Processed batch files: ['/content/output_stream/batch_0.csv', '/content/output_stream/batch_1.csv', '/content/output_stream/batch_2.csv', '/content/output_stream/batch_3.csv', '/content/output_stream/batch_4.csv', '/content/output_stream/batch_5.csv', '/content/output_stream/batch_6.csv']

Sample rows from last batch file:


Unnamed: 0,sensor_id,temperature,humidity,vibration,timestamp,prediction,processed_at
0,sensor_id,0.0,0.0,0.0,0,0,2025-10-16 11:11:40.516401
1,s1,34.28,78.74,0.57,2025-10-16 11:11:34.743743,0,2025-10-16 11:11:40.516401
2,s1,35.73,34.37,4.44,2025-10-16 11:11:34.743789,1,2025-10-16 11:11:40.516401
3,s3,28.3,83.97,0.79,2025-10-16 11:11:34.743820,0,2025-10-16 11:11:40.516401
4,s3,40.21,63.02,4.84,2025-10-16 11:11:34.743831,1,2025-10-16 11:11:40.516401
