In [None]:
# Install Java
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Download Spark 3.4.1 (fully supported)
!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz

# Extract Spark
!tar -xzf spark-3.4.1-bin-hadoop3.tgz

# Install PySpark helper
!pip install -q findspark pyspark


The system cannot find the path specified.
'wget' is not recognized as an internal or external command,
operable program or batch file.
tar: Error opening archive: Failed to open 'spark-3.4.1-bin-hadoop3.tgz'


In [3]:
import os
import findspark

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MachineFailurePrediction") \
    .getOrCreate()


In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MachineFailurePrediction") \
    .getOrCreate()


In [5]:
#load dataset
df = spark.read.csv("/content/machine failure.csv", header=True, inferSchema=True)
df.printSchema()
df.show(5)


root
 |-- UDI: integer (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Air temperature [K]: double (nullable = true)
 |-- Process temperature [K]: double (nullable = true)
 |-- Rotational speed [rpm]: integer (nullable = true)
 |-- Torque [Nm]: double (nullable = true)
 |-- Tool wear [min]: integer (nullable = true)
 |-- Machine failure: integer (nullable = true)
 |-- TWF: integer (nullable = true)
 |-- HDF: integer (nullable = true)
 |-- PWF: integer (nullable = true)
 |-- OSF: integer (nullable = true)
 |-- RNF: integer (nullable = true)

+---+----------+----+-------------------+-----------------------+----------------------+-----------+---------------+---------------+---+---+---+---+---+
|UDI|Product ID|Type|Air temperature [K]|Process temperature [K]|Rotational speed [rpm]|Torque [Nm]|Tool wear [min]|Machine failure|TWF|HDF|PWF|OSF|RNF|
+---+----------+----+-------------------+-----------------------+----------------------+------

In [6]:
#data cleaning
# Drop rows with nulls
df = df.dropna()

# Optional: Treat outliers (Torque < 0 as example)
from pyspark.sql.functions import when

df = df.withColumn("Torque [Nm]", when(df["Torque [Nm]"] < 0, None).otherwise(df["Torque [Nm]"]))
df = df.na.fill(0)


In [17]:
#outliers removal
from pyspark.sql.functions import col

def remove_outliers(df, column):
    q1, q3 = df.approxQuantile(column, [0.25, 0.75], 0.05)
    IQR = q3 - q1
    lower = q1 - 1.5 * IQR
    upper = q3 + 1.5 * IQR
    return df.filter((col(column) >= lower) & (col(column) <= upper))

# Apply on selected numeric columns
columns = [
    "Air temperature [K]", "Process temperature [K]",
    "Rotational speed [rpm]", "Torque [Nm]", "Tool wear [min]"
]

for col_name in columns:
    df = remove_outliers(df, col_name)


In [7]:
#stringindexer + onehotencoder
from pyspark.ml.feature import StringIndexer, OneHotEncoder

indexer = StringIndexer(inputCol="Type", outputCol="Type_Indexed")
encoder = OneHotEncoder(inputCols=["Type_Indexed"], outputCols=["Type_Encoded"])


In [8]:
#bucketizer
from pyspark.ml.feature import Bucketizer

splits = [-float("inf"), 40, 60, float("inf")]
bucketizer = Bucketizer(splits=splits, inputCol="Torque [Nm]", outputCol="Torque_Bucket")


In [9]:
#quantile discretizer
from pyspark.ml.feature import QuantileDiscretizer

discretizer = QuantileDiscretizer(numBuckets=4, inputCol="Tool wear [min]", outputCol="Wear_Discretized")


In [10]:
#transformers:vector assembler + scalers
from pyspark.ml.feature import VectorAssembler, StandardScaler

assembler = VectorAssembler(
    inputCols=["Air temperature [K]", "Process temperature [K]", "Rotational speed [rpm]",
               "Torque [Nm]", "Torque_Bucket", "Wear_Discretized"],
    outputCol="raw_features"
)

scaler = StandardScaler(inputCol="raw_features", outputCol="features")


In [11]:
#window functions
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, row_number

w = Window.partitionBy("Type").orderBy("UDI")
df = df.withColumn("Rolling_Avg_Temp", avg("Air temperature [K]").over(w.rowsBetween(-2, 0)))
df = df.withColumn("RowNum", row_number().over(w))


In [12]:
#ml pipeline + model
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline

rf = RandomForestClassifier(labelCol="Machine failure", featuresCol="features", numTrees=50)

pipeline = Pipeline(stages=[
    indexer, encoder, bucketizer, discretizer,
    assembler, scaler, rf
])


In [13]:
#train-test split,train model
train, test = df.randomSplit([0.8, 0.2], seed=42)
model = pipeline.fit(train)
predictions = model.transform(test)


In [18]:
#Evaluate model
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

# Accuracy
acc_eval = MulticlassClassificationEvaluator(
    labelCol="Machine failure", predictionCol="prediction", metricName="accuracy")
accuracy = acc_eval.evaluate(predictions)

# Precision
precision_eval = MulticlassClassificationEvaluator(
    labelCol="Machine failure", predictionCol="prediction", metricName="weightedPrecision")
precision = precision_eval.evaluate(predictions)

# AUC (already used before)
auc_eval = BinaryClassificationEvaluator(
    labelCol="Machine failure", rawPredictionCol="rawPrediction")
auc = auc_eval.evaluate(predictions)

print(f" Accuracy: {accuracy:.4f}")
print(f" Precision: {precision:.4f}")
print(f" AUC: {auc:.4f}")


 Accuracy: 0.9620
 Precision: 0.9578
 AUC: 0.9464


In [15]:
#sql + temp views
df.createOrReplaceTempView("machines")

spark.sql("""
    SELECT Type, AVG(`Torque [Nm]`) AS avg_torque
    FROM machines
    GROUP BY Type
""").show()


+----+------------------+
|Type|        avg_torque|
+----+------------------+
|   M|40.017250583917296|
|   L| 39.99659999999998|
|   H| 39.83828514456634|
+----+------------------+



In [16]:
#output sink
predictions.select("Product ID", "Machine failure", "prediction") \
    .write.mode("overwrite").csv("/content/predictions_output.csv", header=True)


In [23]:
# 🚀 Take user input
air_temp = float(input("Enter air temperature [K]: "))
process_temp = float(input("Enter process temperature [K]: "))
rpm = float(input("Enter rotational speed [rpm]: "))
torque = float(input("Enter torque [Nm]: "))
wear = float(input("Enter tool wear [min]: "))
m_type = input("Enter machine type (L/M/H): ")

# 🧾 Create DataFrame from user input
user_input = {
    "Air temperature [K]": air_temp,
    "Process temperature [K]": process_temp,
    "Rotational speed [rpm]": rpm,
    "Torque [Nm]": torque,
    "Tool wear [min]": wear,
    "Type": m_type
}

user_df = spark.createDataFrame([user_input])

# ✅ Predict using the full trained pipeline
user_prediction = model.transform(user_df)

# 🖨️ Show prediction and probabilities
user_prediction.select("prediction", "probability").show()

# 🎯 Friendly output
pred_row = user_prediction.select("prediction", "probability").collect()[0]
pred = int(pred_row["prediction"])
probs = pred_row["probability"]

if pred == 1:
    print(f"\n⚠️ Predicted: Machine will FAIL (Confidence: {probs[1]*100:.2f}%)")
else:
    print(f"\n✅ Predicted: Machine is SAFE (Confidence: {probs[0]*100:.2f}%)")


Enter air temperature [K]: 345
Enter process temperature [K]: 355
Enter rotational speed [rpm]: 2000
Enter torque [Nm]: 75
Enter tool wear [min]: 300
Enter machine type (L/M/H): H
+----------+--------------------+
|prediction|         probability|
+----------+--------------------+
|       1.0|[0.29759767352565...|
+----------+--------------------+


⚠️ Predicted: Machine will FAIL (Confidence: 70.24%)
