In [13]:
import pandas as pd


In [14]:
data = pd.read_csv('sensor_data.csv')


In [15]:
data.head()

Unnamed: 0,UDI,Product ID,Type,Air temperature [K],Process temperature [K],Rotational speed [rpm],Torque [Nm],Tool wear [min],Machine failure,TWF,HDF,PWF,OSF,RNF
0,1,M14860,M,298.1,308.6,1551,42.8,0,0,0,0,0,0,0
1,2,L47181,L,298.2,308.7,1408,46.3,3,0,0,0,0,0,0
2,3,L47182,L,298.1,308.5,1498,49.4,5,0,0,0,0,0,0
3,4,L47183,L,298.2,308.6,1433,39.5,7,0,0,0,0,0,0
4,5,L47184,L,298.2,308.7,1408,40.0,9,0,0,0,0,0,0


In [16]:
!pip install pyspark



In [17]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import DoubleType, IntegerType



In [18]:
# Spark session
spark = SparkSession.builder \
    .appName("PredictiveMaintenance") \
    .getOrCreate()


In [19]:
columns = ["UDI", "Product_ID", "Type", "Air_temperature", "Process_temperature", "Rotational_speed",
           "Torque", "Tool_wear", "Machine_failure", "TWF", "HDF", "PWF", "OSF", "RNF"]

df = spark.createDataFrame(data, columns)

In [20]:


df = df.withColumn("Air_temperature", df["Air_temperature"].cast(DoubleType()))
df = df.withColumn("Process_temperature", df["Process_temperature"].cast(DoubleType()))
df = df.withColumn("Rotational_speed", df["Rotational_speed"].cast(IntegerType()))
df = df.withColumn("Torque", df["Torque"].cast(DoubleType()))
df = df.withColumn("Tool_wear", df["Tool_wear"].cast(IntegerType()))
df = df.withColumn("Machine_failure", df["Machine_failure"].cast(IntegerType()))

df.printSchema()
df.show(5)



root
 |-- UDI: long (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Air_temperature: double (nullable = true)
 |-- Process_temperature: double (nullable = true)
 |-- Rotational_speed: integer (nullable = true)
 |-- Torque: double (nullable = true)
 |-- Tool_wear: integer (nullable = true)
 |-- Machine_failure: integer (nullable = true)
 |-- TWF: long (nullable = true)
 |-- HDF: long (nullable = true)
 |-- PWF: long (nullable = true)
 |-- OSF: long (nullable = true)
 |-- RNF: long (nullable = true)

+---+----------+----+---------------+-------------------+----------------+------+---------+---------------+---+---+---+---+---+
|UDI|Product_ID|Type|Air_temperature|Process_temperature|Rotational_speed|Torque|Tool_wear|Machine_failure|TWF|HDF|PWF|OSF|RNF|
+---+----------+----+---------------+-------------------+----------------+------+---------+---------------+---+---+---+---+---+
|  1|    M14860|   M|          298.1|              308.6|  

In [21]:
assembler = VectorAssembler(inputCols=[
    "Air_temperature", "Process_temperature", "Rotational_speed",
    "Torque", "Tool_wear"
], outputCol="features")

data_with_features = assembler.transform(df)

train, test = data_with_features.randomSplit([0.8, 0.2])

In [22]:
rf = RandomForestClassifier(labelCol="Machine_failure", featuresCol="features", numTrees=10)

model = rf.fit(train)

predictions = model.transform(test)

evaluator = MulticlassClassificationEvaluator(labelCol="Machine_failure", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print(f"Test Accuracy: {accuracy}")

Test Accuracy: 0.9659442724458205


In [23]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol="Machine_failure", featuresCol="features")

model = lr.fit(train)

predictions = model.transform(test)

evaluator = MulticlassClassificationEvaluator(labelCol="Machine_failure", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print(f"Test Accuracy (Logistic Regression): {accuracy}")


Test Accuracy (Logistic Regression): 0.9654282765737874


In [24]:
spark.stop()