#Install Java and Spark

In [26]:

!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
!tar -xzf spark-3.3.1-bin-hadoop3.tgz
!pip install -q findspark


#Set Environment

In [27]:
import os
import findspark

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

findspark.init()


#Start Spark Session

In [28]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

# Step 1: Start Spark session
spark = SparkSession.builder.appName("FailurePrediction").getOrCreate()

#Load Data


In [29]:
df = spark.read.csv("/content/ID Fan Clean Trends.csv", header=True, inferSchema=True)
df.printSchema()

# Clean column names: remove problematic characters
cleaned_columns = [col_name.replace(" ", "_")
                             .replace(",", "")
                             .replace(".", "")
                             .replace("(", "")
                             .replace(")", "")
                             for col_name in df.columns]
df = df.toDF(*cleaned_columns)

df.show(5)


root
 |-- datetime: timestamp (nullable = true)
 |-- WEIGHTED COAL FLOW, (t/h): double (nullable = true)
 |-- GENERATOR POWER, (MW): double (nullable = true)
 |-- MAIN STEAM FLOW, (t/h): double (nullable = true)
 |-- CONCENTRATION OF DUST, (mg/m3): double (nullable = true)
 |-- ID FAN B LUBE OIL STATION OUTLET DIRVE OIL PRESS., (Mpa): double (nullable = true)
 |-- ID FAN A VARIABLE BLADE POSITION, (%): double (nullable = true)
 |-- ID FAN B VARIABLE BLADE POSITION, (%): double (nullable = true)
 |-- A IDF DMD, (%): double (nullable = true)
 |-- B IDF DMD, (%): double (nullable = true)
 |-- ID FAN A CURRENT, (A): double (nullable = true)
 |-- ID FAN B CURRENT, (A): double (nullable = true)
 |-- ID FAN A INLET FLUE GAS PRESS., (kPa): double (nullable = true)
 |-- ID FAN A OUTLET FLUE GAS PRESS., (kPa): double (nullable = true)
 |-- ID FAN B INLET FLUE GAS PRESS., (kPa): double (nullable = true)
 |-- ID FAN B OUTLET FLUE GAS PRESS., (kPa): double (nullable = true)
 |-- ID FAN A INLET FLUE

#Cleaning Data

In [30]:
#Setting Date Time Index in Spark
df = df.withColumn("datetime", to_timestamp("datetime", "M/d/yyyy H:mm"))



#Machine Learning

In [31]:
# Filter columns
drop_cols = ["Target_Label", "GENERATOR POWER, (MW)", "datetime"]
feature_cols = [col for col in df.columns if col not in drop_cols + ["Target_Class"]]

#Creating train/test Splits
failure_start = "2024-09-01 10:03:00"
failure_end = "2024-09-02 10:03:00"

failure_df = df.filter((col("datetime") >= failure_start) & (col("datetime") <= failure_end))
normal_df = df.filter(~((col("datetime") >= failure_start) & (col("datetime") <= failure_end)))

failure_count = failure_df.count()
half = failure_count // 2

failure_train = failure_df.limit(half)
failure_test = failure_df.subtract(failure_train)

normal_split = normal_df.randomSplit([0.8, 0.2], seed=42)
normal_train, normal_test = normal_split

train_df = failure_train.union(normal_train)
test_df = failure_test.union(normal_test)

# Drop datetime from train and test sets
train_df = train_df.drop("datetime")
test_df = test_df.drop("datetime")


In [32]:
#Create ML Pipeline

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_vec")
scaler = StandardScaler(inputCol="features_vec", outputCol="features")
rf = RandomForestClassifier(featuresCol="features", labelCol="Target_Class", numTrees=100)

pipeline = Pipeline(stages=[assembler, scaler, rf])
model = pipeline.fit(train_df)

# Prediction
predictions = model.transform(test_df)


In [33]:
# Evaluating the Model

evaluator = MulticlassClassificationEvaluator(
    labelCol="Target_Class", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print(f"✅ Accuracy: {accuracy:.4f}")

# Optional: Show confusion matrix (summary)
print("\n🔍 Confusion Matrix:")
predictions.groupBy("Target_Class", "prediction").count().orderBy("Target_Class", "prediction").show()


✅ Accuracy: 0.9977

🔍 Confusion Matrix:
+------------+----------+-----+
|Target_Class|prediction|count|
+------------+----------+-----+
|           0|       0.0| 5216|
|           1|       0.0|   12|
|           1|       1.0|   60|
+------------+----------+-----+

