In [81]:
from multiprocessing.reduction import duplicate
from babel.util import distinct
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, isnan, when, isnull, sum as spark_sum, countDistinct
from pyspark.sql.types import NumericType, TimestampType, DateType
from pyspark.sql.functions import col
from pyspark.sql.functions import col, min, max, count, lag, datediff
from pyspark.sql.window import Window
import pyspark.sql.functions as F

In [82]:
# Initialize Spark Session
spark = SparkSession.builder.appName("Predictive Maintenance System").getOrCreate()

#### **Motor maintenance**

In [83]:
from pyspark.shell import spark
from pyspark.sql import functions as F

# Load data from CSV
df = spark.read.csv("C:/Users/KhanhChang/PycharmProjects/Predictive-Maintenance-System-using-Apache-Spark/Maintenance Recommendation System/df_maintenance.csv", header=True, inferSchema=True)

# 1. Phương pháp 1: Tính ngưỡng dựa trên Phân phối Thống kê
stats_df = df.select(
    F.mean("vibration").alias("mean_vibration"),
    F.stddev("vibration").alias("std_vibration"),
    F.mean("noise_level").alias("mean_noise"),
    F.stddev("noise_level").alias("std_noise"),
    F.mean("temperature").alias("mean_temperature"),
    F.stddev("temperature").alias("std_temperature"),
    F.mean("current").alias("mean_current"),
    F.stddev("current").alias("std_current"),
    F.mean("power_output").alias("mean_power_output"),
    F.stddev("power_output").alias("std_power_output")
).collect()[0]

# Xác định ngưỡng cho mỗi chỉ số bằng cách sử dụng 3 lần độ lệch chuẩn
VIBRATION_THRESHOLD_STAT = stats_df['mean_vibration'] + 3 * stats_df['std_vibration']
NOISE_THRESHOLD_STAT = stats_df['mean_noise'] + 3 * stats_df['std_noise']
TEMPERATURE_THRESHOLD_STAT = stats_df['mean_temperature'] + 3 * stats_df['std_temperature']
CURRENT_THRESHOLD_STAT = stats_df['mean_current'] + 3 * stats_df['std_current']
POWER_OUTPUT_THRESHOLD_STAT = stats_df['mean_power_output'] - 3 * stats_df['std_power_output']

# 2. Phương pháp 2: Tính ngưỡng dựa trên Dữ liệu Bảo Trì Lịch Sử (Motor Failure)
failure_stats_df = df.filter(F.col("parts_replaced") == "Motor").select(
    F.mean("vibration").alias("motor_failure_vibration"),
    F.mean("noise_level").alias("motor_failure_noise"),
    F.mean("temperature").alias("motor_failure_temperature"),
    F.mean("current").alias("motor_failure_current"),
    F.mean("power_output").alias("motor_failure_power_output")
).collect()[0]

VIBRATION_THRESHOLD_HIST = failure_stats_df['motor_failure_vibration']
NOISE_THRESHOLD_HIST = failure_stats_df['motor_failure_noise']
TEMPERATURE_THRESHOLD_HIST = failure_stats_df['motor_failure_temperature']
CURRENT_THRESHOLD_HIST = failure_stats_df['motor_failure_current']
POWER_OUTPUT_THRESHOLD_HIST = failure_stats_df['motor_failure_power_output']

# 3. Chọn ngưỡng cuối cùng cho mỗi chỉ số bằng cách lấy giá trị nhỏ hơn từ hai phương pháp
VIBRATION_THRESHOLD = VIBRATION_THRESHOLD_HIST if VIBRATION_THRESHOLD_HIST < VIBRATION_THRESHOLD_STAT else VIBRATION_THRESHOLD_STAT
NOISE_THRESHOLD = NOISE_THRESHOLD_HIST if NOISE_THRESHOLD_HIST < NOISE_THRESHOLD_STAT else NOISE_THRESHOLD_STAT
TEMPERATURE_THRESHOLD = TEMPERATURE_THRESHOLD_HIST if TEMPERATURE_THRESHOLD_HIST < TEMPERATURE_THRESHOLD_STAT else TEMPERATURE_THRESHOLD_STAT
CURRENT_THRESHOLD = CURRENT_THRESHOLD_HIST if CURRENT_THRESHOLD_HIST < CURRENT_THRESHOLD_STAT else CURRENT_THRESHOLD_STAT
POWER_OUTPUT_THRESHOLD = POWER_OUTPUT_THRESHOLD_HIST if POWER_OUTPUT_THRESHOLD_HIST < POWER_OUTPUT_THRESHOLD_STAT else POWER_OUTPUT_THRESHOLD_STAT

# 4. Xác định Motor Failure cho các thiết bị có `maintenance_needed == "Maintenance required"`
df = df.withColumn(
    "maintenance_item",
    F.when(
        (F.col("maintenance_needed") == "Maintenance required") &
        #(F.col("maintenance_type").isin("Repair", "Replacement")) &
        (F.col("parts_replaced") == "Motor") &
        (
            (F.col("vibration") > VIBRATION_THRESHOLD) |
            (F.col("noise_level") > NOISE_THRESHOLD) |
            (F.col("temperature") > TEMPERATURE_THRESHOLD) |
            (F.col("current") > CURRENT_THRESHOLD) |
            (F.col("power_output") < POWER_OUTPUT_THRESHOLD)
        ),
        "Motor"  # Set 'Motor' for rows meeting the condition
    )
)

# Display rows to confirm the new 'maintenance_item' column
# Filter and display rows where maintenance_needed is "Maintenance required"
df.filter(F.col("maintenance_needed") == "Maintenance required") \
  .select('equipment_id', 'vibration', 'noise_level', 'temperature', 'current', 'power_output', 'parts_replaced', 'maintenance_type', 'maintenance_needed', 'maintenance_item') \
  .show()


+------------+----------+-----------+-----------+----------+------------+--------------+----------------+--------------------+----------------+
|equipment_id| vibration|noise_level|temperature|   current|power_output|parts_replaced|maintenance_type|  maintenance_needed|maintenance_item|
+------------+----------+-----------+-----------+----------+------------+--------------+----------------+--------------------+----------------+
|           7|0.51609147|    69.5796|  58.727806|  95.03543|    540.1849|          None|     Replacement|Maintenance required|            NULL|
|           7|0.54974616|   71.41562|   42.89155|101.410324|   520.31616|          None|     Replacement|Maintenance required|            NULL|
|           7|0.46776083|   75.18598|  67.534195|  93.23835|    488.6647|          None|     Replacement|Maintenance required|            NULL|
|           7| 0.5295095|   64.15321|  49.406094|  94.45574|    501.9079|          None|     Replacement|Maintenance required|          

#### **Bearing Maintenance**

In [84]:
from pyspark.shell import spark
from pyspark.sql import functions as F


# 1. Statistical Thresholds
stats_df = df.select(
    F.mean("vibration").alias("mean_vibration"),
    F.stddev("vibration").alias("std_vibration"),
    F.mean("noise_level").alias("mean_noise"),
    F.stddev("noise_level").alias("std_noise"),
    F.mean("temperature").alias("mean_temperature"),
    F.stddev("temperature").alias("std_temperature"),
    F.mean("rotational_speed").alias("mean_rotational_speed"),
    F.stddev("rotational_speed").alias("std_rotational_speed")
).collect()[0]

VIBRATION_THRESHOLD_STAT = stats_df['mean_vibration'] + 3 * stats_df['std_vibration']
NOISE_THRESHOLD_STAT = stats_df['mean_noise'] + 3 * stats_df['std_noise']
TEMPERATURE_THRESHOLD_STAT = stats_df['mean_temperature'] + 3 * stats_df['std_temperature']
ROTATIONAL_SPEED_THRESHOLD_STAT = stats_df['mean_rotational_speed'] - 3 * stats_df['std_rotational_speed']

# 2. Historical Thresholds
failure_stats_df = df.filter(F.col("parts_replaced") == "Bearings").select(
    F.mean("vibration").alias("bearing_failure_vibration"),
    F.mean("noise_level").alias("bearing_failure_noise"),
    F.mean("temperature").alias("bearing_failure_temperature"),
    F.mean("rotational_speed").alias("bearing_failure_rotational_speed")
).collect()[0]

VIBRATION_THRESHOLD_HIST = failure_stats_df['bearing_failure_vibration']
NOISE_THRESHOLD_HIST = failure_stats_df['bearing_failure_noise']
TEMPERATURE_THRESHOLD_HIST = failure_stats_df['bearing_failure_temperature']
ROTATIONAL_SPEED_THRESHOLD_HIST = failure_stats_df['bearing_failure_rotational_speed']

# 3. Final Thresholds (Choose the lower of the two)
VIBRATION_THRESHOLD = VIBRATION_THRESHOLD_HIST if VIBRATION_THRESHOLD_HIST < VIBRATION_THRESHOLD_STAT else VIBRATION_THRESHOLD_STAT
NOISE_THRESHOLD = NOISE_THRESHOLD_HIST if NOISE_THRESHOLD_HIST < NOISE_THRESHOLD_STAT else NOISE_THRESHOLD_STAT
TEMPERATURE_THRESHOLD = TEMPERATURE_THRESHOLD_HIST if TEMPERATURE_THRESHOLD_HIST < TEMPERATURE_THRESHOLD_STAT else TEMPERATURE_THRESHOLD_STAT
ROTATIONAL_SPEED_THRESHOLD = ROTATIONAL_SPEED_THRESHOLD_HIST if ROTATIONAL_SPEED_THRESHOLD_HIST < ROTATIONAL_SPEED_THRESHOLD_STAT else ROTATIONAL_SPEED_THRESHOLD_STAT

# 4. Add Maintenance Types for Bearing Failures
df = df.withColumn(
    "maintenance_item",
    F.when(
        (F.col("maintenance_item").isNull()) &  
        (F.col("maintenance_needed") == "Maintenance required") &
        #(F.col("maintenance_type").isin("Repair", "Replacement")) &
        (F.col("parts_replaced") == "Bearings") &
        (
            (F.col("vibration") > VIBRATION_THRESHOLD) |
            (F.col("noise_level") > NOISE_THRESHOLD) |
            (F.col("temperature") > TEMPERATURE_THRESHOLD) |
            (F.col("rotational_speed") < ROTATIONAL_SPEED_THRESHOLD)
        ),
        "Bearings"  # Set 'Bearing' for matching rows
    ).otherwise(F.col("maintenance_item"))  # Retain existing values
)

# Display rows to verify
df.filter(F.col("maintenance_item") == "Bearings").select(
    'equipment_id', 'maintenance_item', 'vibration', 'noise_level', 'temperature', 'rotational_speed',
    'parts_replaced', 'maintenance_type', 'maintenance_needed'
).show()


+------------+----------------+----------+-----------+-----------+----------------+--------------+----------------+--------------------+
|equipment_id|maintenance_item| vibration|noise_level|temperature|rotational_speed|parts_replaced|maintenance_type|  maintenance_needed|
+------------+----------------+----------+-----------+-----------+----------------+--------------+----------------+--------------------+
|           9|        Bearings| 0.6172446|    76.1919|   69.18309|       1188.6361|      Bearings|         Routine|Maintenance required|
|           9|        Bearings|0.52105445|    76.4475|   66.20828|       1122.9075|      Bearings|         Routine|Maintenance required|
|           9|        Bearings|0.56467915|   69.04776|  56.377506|       1144.0042|      Bearings|         Routine|Maintenance required|
|           9|        Bearings|0.48348257|   68.95828|   64.30161|       1135.6228|      Bearings|         Routine|Maintenance required|
|           9|        Bearings|0.45152876

#### **Seals Maintenance**

In [85]:
from pyspark.shell import spark
from pyspark.sql import functions as F

# 1. Statistical Thresholds
stats_df = df.select(
    F.mean("pressure").alias("mean_pressure"),
    F.stddev("pressure").alias("std_pressure"),
    F.mean("oil_viscosity").alias("mean_oil_viscosity"),
    F.stddev("oil_viscosity").alias("std_oil_viscosity"),
    F.mean("temperature").alias("mean_temperature"),
    F.stddev("temperature").alias("std_temperature")
).collect()[0]

PRESSURE_THRESHOLD_STAT = stats_df['mean_pressure'] - 3 * stats_df['std_pressure']
OIL_VISCOSITY_THRESHOLD_STAT = stats_df['mean_oil_viscosity'] - 3 * stats_df['std_oil_viscosity']
TEMPERATURE_THRESHOLD_STAT = stats_df['mean_temperature'] + 3 * stats_df['std_temperature']

# 2. Historical Thresholds
failure_stats_df = df.filter(F.col("parts_replaced") == "Seals").select(
    F.mean("pressure").alias("seal_failure_pressure"),
    F.mean("oil_viscosity").alias("seal_failure_oil_viscosity"),
    F.mean("temperature").alias("seal_failure_temperature")
).collect()[0]

PRESSURE_THRESHOLD_HIST = failure_stats_df['seal_failure_pressure']
OIL_VISCOSITY_THRESHOLD_HIST = failure_stats_df['seal_failure_oil_viscosity']
TEMPERATURE_THRESHOLD_HIST = failure_stats_df['seal_failure_temperature']

# 3. Final Thresholds
PRESSURE_THRESHOLD = PRESSURE_THRESHOLD_HIST if PRESSURE_THRESHOLD_HIST < PRESSURE_THRESHOLD_STAT else PRESSURE_THRESHOLD_STAT
OIL_VISCOSITY_THRESHOLD = OIL_VISCOSITY_THRESHOLD_HIST if OIL_VISCOSITY_THRESHOLD_HIST < OIL_VISCOSITY_THRESHOLD_STAT else OIL_VISCOSITY_THRESHOLD_STAT
TEMPERATURE_THRESHOLD = TEMPERATURE_THRESHOLD_HIST if TEMPERATURE_THRESHOLD_HIST < TEMPERATURE_THRESHOLD_STAT else TEMPERATURE_THRESHOLD_STAT

# 4. Add Maintenance Types for Seal Failures
df = df.withColumn(
    "maintenance_item",
    F.when(
        (F.col("maintenance_item").isNull()) & 
        (F.col("maintenance_needed") == "Maintenance required") &
        #(F.col("maintenance_type").isin("Repair", "Replacement")) &
        (F.col("parts_replaced") == "Seals") &
        (
            (F.col("pressure") < PRESSURE_THRESHOLD) |
            (F.col("oil_viscosity") < OIL_VISCOSITY_THRESHOLD) |
            (F.col("temperature") > TEMPERATURE_THRESHOLD)
        ),
        "Seals"  # Set 'Seal' for matching rows
    ).otherwise(F.col("maintenance_item"))  # Retain existing values
)

# Display rows to verify
df.filter(F.col("maintenance_item") == "Seals").select(
    'equipment_id', 'maintenance_item', 'pressure', 'oil_viscosity', 'temperature',
    'parts_replaced', 'maintenance_type', 'maintenance_needed'
).show()

+------------+----------------+----------+-------------+-----------+--------------+----------------+--------------------+
|equipment_id|maintenance_item|  pressure|oil_viscosity|temperature|parts_replaced|maintenance_type|  maintenance_needed|
+------------+----------------+----------+-------------+-----------+--------------+----------------+--------------------+
|          14|           Seals|  99.62995|    50.189896|  75.265144|         Seals|          Repair|Maintenance required|
|          14|           Seals|  96.99443|    56.372288|   83.00572|         Seals|          Repair|Maintenance required|
|          14|           Seals|  71.99615|    51.641594|    74.3573|         Seals|          Repair|Maintenance required|
|          14|           Seals| 126.75134|    52.163834|   75.87618|         Seals|          Repair|Maintenance required|
|          14|           Seals| 101.08404|     52.36618|    74.0869|         Seals|          Repair|Maintenance required|
|          14|          

#### **Filters Maintenance**

In [86]:
from pyspark.shell import spark
from pyspark.sql import functions as F

# 1. Statistical Thresholds
stats_df = df.select(
    F.mean("pressure").alias("mean_pressure"),
    F.stddev("pressure").alias("std_pressure"),
    F.mean("oil_viscosity").alias("mean_oil_viscosity"),
    F.stddev("oil_viscosity").alias("std_oil_viscosity"),
    F.mean("temperature").alias("mean_temperature"),
    F.stddev("temperature").alias("std_temperature")
).collect()[0]

PRESSURE_THRESHOLD_STAT = stats_df['mean_pressure'] - 3 * stats_df['std_pressure']
OIL_VISCOSITY_THRESHOLD_STAT = stats_df['mean_oil_viscosity'] - 3 * stats_df['std_oil_viscosity']
TEMPERATURE_THRESHOLD_STAT = stats_df['mean_temperature'] + 3 * stats_df['std_temperature']

# 2. Historical Thresholds
failure_stats_df = df.filter(F.col("parts_replaced") == "Seals").select(
    F.mean("pressure").alias("seal_failure_pressure"),
    F.mean("oil_viscosity").alias("seal_failure_oil_viscosity"),
    F.mean("temperature").alias("seal_failure_temperature")
).collect()[0]

PRESSURE_THRESHOLD_HIST = failure_stats_df['seal_failure_pressure']
OIL_VISCOSITY_THRESHOLD_HIST = failure_stats_df['seal_failure_oil_viscosity']
TEMPERATURE_THRESHOLD_HIST = failure_stats_df['seal_failure_temperature']

# 3. Final Thresholds
PRESSURE_THRESHOLD = PRESSURE_THRESHOLD_HIST if PRESSURE_THRESHOLD_HIST < PRESSURE_THRESHOLD_STAT else PRESSURE_THRESHOLD_STAT
OIL_VISCOSITY_THRESHOLD = OIL_VISCOSITY_THRESHOLD_HIST if OIL_VISCOSITY_THRESHOLD_HIST < OIL_VISCOSITY_THRESHOLD_STAT else OIL_VISCOSITY_THRESHOLD_STAT
TEMPERATURE_THRESHOLD = TEMPERATURE_THRESHOLD_HIST if TEMPERATURE_THRESHOLD_HIST < TEMPERATURE_THRESHOLD_STAT else TEMPERATURE_THRESHOLD_STAT

# 4. Add Maintenance Types for Seal Failures
df = df.withColumn(
    "maintenance_item",
    F.when(
        (F.col("maintenance_item").isNull()) & 
        (F.col("maintenance_needed") == "Maintenance required") &
        #(F.col("maintenance_type").isin("Repair", "Replacement")) &
        (F.col("parts_replaced") == "Filters") &
        (
            (F.col("pressure") < PRESSURE_THRESHOLD) |
            (F.col("oil_viscosity") < OIL_VISCOSITY_THRESHOLD) |
            (F.col("temperature") > TEMPERATURE_THRESHOLD)
        ),
        "Filters"  # Set 'Filters' for matching rows
    ).otherwise(F.col("maintenance_item"))  # Retain existing values
)

# Display rows to verify
df.filter(F.col("maintenance_item") == "Filters").select(
    'equipment_id', 'maintenance_item', 'pressure', 'oil_viscosity', 'temperature',
    'parts_replaced', 'maintenance_type', 'maintenance_needed'
).show()

+------------+----------------+----------+-------------+-----------+--------------+----------------+--------------------+
|equipment_id|maintenance_item|  pressure|oil_viscosity|temperature|parts_replaced|maintenance_type|  maintenance_needed|
+------------+----------------+----------+-------------+-----------+--------------+----------------+--------------------+
|          51|         Filters| 125.24291|     56.16894|  78.490814|       Filters|          Repair|Maintenance required|
|          51|         Filters| 126.68527|     45.31755|   72.34348|       Filters|          Repair|Maintenance required|
|          51|         Filters| 139.52592|     57.66559|   79.29988|       Filters|          Repair|Maintenance required|
|          51|         Filters| 100.21501|     48.95903|  67.453514|       Filters|          Repair|Maintenance required|
|          51|         Filters|101.326904|     43.48537|   69.47191|       Filters|          Repair|Maintenance required|
|          51|         F

#### **Coupling**

In [87]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Assuming df is already loaded and contains the 'maintenance_item' column

# Update only the rows where maintenance_item is null and maintenance_needed == "Maintenance required"
df = df.withColumn(
    "maintenance_item",
    F.when(
        (F.col("maintenance_item").isNull()) & 
        (F.col("maintenance_needed") == "Maintenance required"),
        "Coupling"
    ).otherwise(F.col("maintenance_item"))  # Retain existing values
)

#### **Final Result Summary**

In [88]:
# Calculate the total count of rows
total_count = df.filter(F.col("maintenance_needed") == "Maintenance required").count()

# Calculate the count and percentage for each maintenance type
maintenance_type_percentage = (
    df.filter(F.col("maintenance_needed") == "Maintenance required")
    .groupBy("maintenance_item")
    .agg(F.count("*").alias("count"))
    .withColumn("percentage", (F.col("count") / total_count) * 100)
)

# Show the results
maintenance_type_percentage.show()

+----------------+------+------------------+
|maintenance_item| count|        percentage|
+----------------+------+------------------+
|           Motor| 56384| 7.816585290113649|
|           Seals| 44842| 6.216503220404304|
|        Bearings| 72148| 10.00196856397417|
|        Coupling|512324| 71.02412461287219|
|         Filters| 35640|4.9408183126356855|
+----------------+------+------------------+



In [89]:
# Filter the dataset for rows where maintenance_needed == "Maintenance required"
filtered_df = df.filter(F.col("maintenance_needed") == "Maintenance required")

# Save the filtered dataset to a new CSV file
output_path = "C:/Users/KhanhChang/PycharmProjects/Predictive-Maintenance-System-using-Apache-Spark/Maintenance Recommendation System/dataset_with_maintenance_item.csv"

# Save the filtered dataset locally with overwrite mode
filtered_df.coalesce(1).write.option("header", True).csv(output_path, mode="overwrite")

print(f"Filtered dataset saved successfully to the current working directory as {output_path}")


Filtered dataset saved successfully to the current working directory as C:/Users/KhanhChang/PycharmProjects/Predictive-Maintenance-System-using-Apache-Spark/Maintenance Recommendation System/dataset_with_maintenance_item.csv


In [90]:
# Count rows where maintenance_item is null in the filtered dataset
null_count = filtered_df.filter(F.col("maintenance_item").isNull()).count()

print(f"Number of null values in 'maintenance_item' column: {null_count}")


Number of null values in 'maintenance_item' column: 0
