In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkFiles

# 1. Initialize Spark Session
spark = SparkSession.builder \
    .appName("Project") \
    .config("spark.sql.shuffle.partitions", "10") \
    .config("spark.memory.fraction", "0.6") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

# 2. Define GitHub RAW URL
url = "https://raw.githubusercontent.com/cryborg1211/Manufacturing-Detect-Data/refs/heads/main/machine%20failure.csv"

# 3. Add file to Spark Context
spark.sparkContext.addFile(url)

# 4. Read the CSV file
df = spark.read.csv(SparkFiles.get("machine failure.csv"), header=True, inferSchema=True)

# 5. Verify Data Load
print("--- Data Schema ---")
df.printSchema()

print("--- Class Distribution ---")
df.groupBy("Machine failure").count().show()

--- Data Schema ---
root
 |-- UDI: integer (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Air temperature [K]: double (nullable = true)
 |-- Process temperature [K]: double (nullable = true)
 |-- Rotational speed [rpm]: integer (nullable = true)
 |-- Torque [Nm]: double (nullable = true)
 |-- Tool wear [min]: integer (nullable = true)
 |-- Machine failure: integer (nullable = true)
 |-- TWF: integer (nullable = true)
 |-- HDF: integer (nullable = true)
 |-- PWF: integer (nullable = true)
 |-- OSF: integer (nullable = true)
 |-- RNF: integer (nullable = true)

--- Class Distribution ---
+---------------+-----+
|Machine failure|count|
+---------------+-----+
|              0| 9661|
|              1|  339|
+---------------+-----+



In [2]:
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, when
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# --- STEP 2: PREPROCESSING & FEATURE ENGINEERING ---

# 1. Clean Data
cols_to_drop = ["UDI", "Product ID", "TWF", "HDF", "PWF", "OSF", "RNF"]
df_clean = df.drop(*cols_to_drop)

# 2. Define Pipeline Stages
stages = []

# Stage A: Encode 'Type' (L, M, H -> 0, 1, 2)
indexer = StringIndexer(inputCol="Type", outputCol="Type_Index")
stages += [indexer]

# Stage B: Assemble Features into a Vector
feature_cols = ["Type_Index", "Air temperature [K]", "Process temperature [K]",
                "Rotational speed [rpm]", "Torque [Nm]", "Tool wear [min]"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="unscaled_features")
stages += [assembler]

# Stage C: Scale Features (Normalize to 0-1 range)
scaler = MinMaxScaler(inputCol="unscaled_features", outputCol="features")
stages += [scaler]

# 3. Build and Run Pipeline
pipeline = Pipeline(stages=stages)
pipeline_model = pipeline.fit(df_clean)
final_data = pipeline_model.transform(df_clean)

# Select only features and label for training
final_data = final_data.select("features", "Machine failure")

# --- STEP 3: ADVANCED MODEL TRAINING WITH HYPERPARAMETER TUNING ---

# 1. Handling Imbalance (Keep this part, it's crucial)
dataset_size = final_data.count()
count_pos = final_data.filter(col("Machine failure") == 1).count()
count_neg = final_data.filter(col("Machine failure") == 0).count()
balancing_ratio = count_neg / count_pos
final_data_weighted = final_data.withColumn("classWeight", when(col("Machine failure") == 1, balancing_ratio).otherwise(1.0))

# Split Data
train_data, test_data = final_data_weighted.randomSplit([0.8, 0.2], seed=42)

# 2. Define the Base Model
rf = RandomForestClassifier(labelCol="Machine failure", featuresCol="features", weightCol="classWeight")

# 3. Create Parameter Grid for Tuning (Optimization)
# I test different combinations: 50 vs 100 trees, Depth 5 vs 10
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [50, 100]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .build()

# 4. Set up Cross-Validator
# 3-fold cross validation means it trains 3 times on different data chunks to ensure stability
evaluator = BinaryClassificationEvaluator(labelCol="Machine failure", metricName="areaUnderROC")

crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3) # Use 3 folds to save time

print("--- Starting Hyperparameter Tuning (Grid Search)---")

cvModel = crossval.fit(train_data)

# 5. Get the Best Model
best_rf_model = cvModel.bestModel
print(f"Best Model Parameters: NumTrees = {best_rf_model.getNumTrees}, MaxDepth = {best_rf_model.getOrDefault('maxDepth')}")

# 6. Make Predictions on Test Data using the Best Model
predictions = best_rf_model.transform(test_data)

# --- EVALUATION---
print("\n--- Optimized Model Evaluation ---")
auc = evaluator.evaluate(predictions)
print(f"Area Under ROC (AUC): {auc:.4f}")

# Confusion Matrix
predictions.groupBy("Machine failure", "prediction").count().show()

# Save the model to a variable named 'rf_model' for Step 4 compatibility
rf_model = best_rf_model

--- Starting Hyperparameter Tuning (Grid Search)---
Best Model Parameters: NumTrees = 100, MaxDepth = 5

--- Optimized Model Evaluation ---
Area Under ROC (AUC): 0.9363
+---------------+----------+-----+
|Machine failure|prediction|count|
+---------------+----------+-----+
|              0|       1.0|  159|
|              1|       0.0|    8|
|              1|       1.0|   62|
|              0|       0.0| 1692|
+---------------+----------+-----+



In [3]:
import time
import os
import shutil
from pyspark.sql.types import StructType
from pyspark.ml import PipelineModel
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# --- STEP 4: REAL-TIME STREAMING SIMULATION ---

# 1. Create & Save the FULL Model (Preprocessing + Random Forest)
# combine the preprocessing stages with the trained random forest model
full_stages = pipeline_model.stages + [rf_model]
full_model = PipelineModel(stages=full_stages)

model_path = "models/predictive_maintenance_full"
full_model.write().overwrite().save(model_path)
print(f"Full Model saved to {model_path}")

# 2. Prepare Streaming Directory
input_dir = "stream_input"
if os.path.exists(input_dir):
    shutil.rmtree(input_dir) # Clean up old runs
os.makedirs(input_dir)

# 3. Define the Streaming Logic
# Load the saved full model
loaded_model = PipelineModel.load(model_path)

# Reuse the schema from the dataframe we loaded earlier
schema = df.schema

# Create the Streaming DataFrame
streaming_df = spark.readStream \
    .schema(schema) \
    .option("header", "true") \
    .csv(input_dir)

# Apply the model to the streaming data
# Now this will generate the 'prediction' column because RF is included
predictions_stream = loaded_model.transform(streaming_df)

# Select relevant columns for the dashboard
output_stream = predictions_stream.select("UDI", "Air temperature [K]", "Rotational speed [rpm]", "prediction")

# 4. Start the Stream
query = output_stream.writeStream \
    .format("memory") \
    .queryName("failures") \
    .outputMode("append") \
    .start()

print("--- Streaming Pipeline Started. Waiting for data... ---")

# --- 5. Simulate Sensor Data Feed ---
try:
    print("Preparing FULL dataset for streaming (10,000 rows)...")

    # 1. Convert the ENTIRE dataset to Pandas
    # Since I want to simulate the real timeline, I sort by UDI (if available) or keep original order
    full_data = df.orderBy("UDI").toPandas()

    total_rows = len(full_data)
    print(f"Stream loaded with {total_rows} sensor readings.")

    # 2. Large Chunk Size for faster demo
    chunk_size = 100

    for i in range(0, total_rows, chunk_size):
        # Get the batch
        chunk = full_data.iloc[i:i+chunk_size]

        # Create a unique filename for each batch based on timestamp/index
        file_name = f"{input_dir}/batch_{i}.csv"

        # Save batch to the monitored folder
        chunk.to_csv(file_name, index=False)

        # Calculate progress
        percent_complete = ((i + chunk_size) / total_rows) * 100
        print(f"Batch {i//chunk_size + 1} arrived (Rows {i} to {min(i+chunk_size, total_rows)}). Progress: {percent_complete:.1f}%")

        # Wait a short time to simulate high-frequency data ingestion
        time.sleep(1)

        # Query for ALERTS (Prediction = 1)
        # I check if any failure was detected in the current stream buffer
        print("--- Real-time Failures Detected ---")
        alerts = spark.sql("SELECT * FROM failures WHERE prediction = 1.0 AND UDI >= {} AND UDI < {}".format(i, i+chunk_size))

        # If there are alerts, show them. Otherwise print a 'System Normal' message.
        if alerts.count() > 0:
            alerts.show()
        else:
            print("[OK] System Status: Normal")

    # Allow some time for the last batch to be processed
    time.sleep(5)

except KeyboardInterrupt:
    print("Stopping simulation manually...")

finally:
    if 'query' in locals() and query.isActive:
        query.stop()
    print("Stream simulation finished.")

print("The full stream results can be read in Google Colab via: https://colab.research.google.com/drive/1oqeZSsK9BCoEc5GJ6SOsGc2R0U1VUPqY?usp=sharing")

Full Model saved to models/predictive_maintenance_full
--- Streaming Pipeline Started. Waiting for data... ---
Preparing FULL dataset for streaming (10,000 rows)...
Stream loaded with 10000 sensor readings.
Batch 1 arrived (Rows 0 to 100). Progress: 1.0%
--- Real-time Failures Detected ---
+---+-------------------+----------------------+----------+
|UDI|Air temperature [K]|Rotational speed [rpm]|prediction|
+---+-------------------+----------------------+----------+
| 51|              298.9|                  2861|       1.0|
| 70|              298.9|                  1410|       1.0|
| 76|              298.8|                  1379|       1.0|
| 78|              298.8|                  1455|       1.0|
+---+-------------------+----------------------+----------+

Batch 2 arrived (Rows 100 to 200). Progress: 2.0%
--- Real-time Failures Detected ---
+---+-------------------+----------------------+----------+
|UDI|Air temperature [K]|Rotational speed [rpm]|prediction|
+---+-----------------