In [84]:
import random
from datetime import datetime, timedelta
import pandas as pd
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel

In [85]:
#Suppress warnings generated by code
def warn(*args, **kwargs):
    pass
    import warnings
    warnings.warn = warn
    warnings.filterwarnings('ignore')

In [86]:
# Mimic real life date for machinee learning

def generate_sensor_data(machines, historical_rows):
    """
    This function generates sensor data for a specified number of machines and historical rows.
    :param machines: Number of machines.
    :param historical_rows: Number of historical rows to generate.
    """
    try:
        machine_data = []
        for index in range(1, machines + 1):
            for _ in range(historical_rows):
                equipment_id = f"{index:03}"
                timestamp = datetime.now() - timedelta(minutes=5 * historical_rows)  # Start from past time
                last_maintenance_date = timestamp - timedelta(days=random.randint(15, 30))  # Random maintenance date in the past
                last_failure_date = timestamp - timedelta(days=random.randint(1, 14))  # Random failure date in the past
                status = "running"
                failure_reason = None  # Initialize failure reason as None

                # Randomly simulate failure
                if random.random() < 0.05:  # 5% chance of failure
                    status = "failed"
                    last_failure_date = timestamp
                    last_maintenance_date = timestamp - timedelta(days=random.randint(15, 30))  # Random maintenance date in the past

                    # Determine failure reason
                    failure_reason = random.choice(["over_pressure", "high_temperature", "low_fuel"])

                # Generate sensor data features
                voltage = random.randint(360, 440)  # Realistic voltage range
                current = random.randint(100, 800)  # Realistic current range
                frequency = random.uniform(55, 59)  # Realistic frequency range
                power_output = random.randint(5000000, 8000000)  # Realistic power output range
                fuel_consumption = random.randint(50, 250)  # Realistic fuel consumption range
                engine_speed = random.randint(1500, 2500)  # Realistic engine speed range
                oil_pressure = random.randint(30, 70)  # Realistic oil pressure range
                coolant_temperature = random.uniform(70, 90)  # Realistic coolant temperature range
                exhaust_gas_temperature = random.uniform(250, 550)  # Realistic exhaust gas temperature range
                battery_voltage = random.randint(24, 36)  # Realistic battery voltage range
                generator_load = random.randint(30, 80)  # Realistic generator load range
                engine_hours = random.randint(100, 800)  # Realistic engine hours range
                vibration = random.randint(10, 100)  # Realistic vibration range
                coolant_level = random.uniform(2, 8)  # Realistic coolant level range
                fuel_level = random.uniform(10, 40)  # Realistic fuel level range

                # If status is failed, introduce a spike in one feature based on failure reason
                if status == "failed":
                    if failure_reason == "over_pressure":
                        oil_pressure += random.randint(30, 50)  # Increase oil pressure due to over-pressure failure
                    elif failure_reason == "high_temperature":
                        coolant_temperature += random.uniform(30, 40)  # Increase coolant temperature due to high temperature failure
                    elif failure_reason == "low_fuel":
                        fuel_level -= random.uniform(2, 5)  # Decrease fuel level due to low fuel failure

                # Calculate remaining useful life based on last maintenance date
                remaining_useful_life = ((timestamp - last_maintenance_date).total_seconds() / 3600) // 24

                sensor_data = {"equipment_id": equipment_id,
                               "timestamp": timestamp,
                               "status": status,
                               "last_maintenance_date": last_maintenance_date,
                               "last_failure_date": last_failure_date,
                               "remaining_useful_life": remaining_useful_life,
                               "failure_reason": failure_reason,
                               "voltage": voltage,
                               "current": current,
                               "frequency": frequency,
                               "power_output": power_output,
                               "fuel_consumption": fuel_consumption,
                               "engine_speed": engine_speed,
                               "oil_pressure": oil_pressure,
                               "coolant_temperature": coolant_temperature,
                               "exhaust_gas_temperature": exhaust_gas_temperature,
                               "battery_voltage": battery_voltage,
                               "generator_load": generator_load,
                               "engine_hours": engine_hours,
                               "vibration": vibration,
                               "coolant_level": coolant_level,
                               "fuel_level": fuel_level}
                machine_data.append(sensor_data)
        machine_data=pd.DataFrame(machine_data)
        return machine_data
    except Exception as e:
        print(f"Error {e}")


# Generate data set for 20 machines
generator_data = generate_sensor_data(20, 2000)

In [87]:
generator_data['failure_reason'] = generator_data['failure_reason'].fillna("NA")

In [88]:
generator_data.to_csv("generator_sensor_data.csv")

In [90]:
# Create SparkSession
spark = SparkSession.builder.appName('predictive maintenance for generators').getOrCreate()

In [91]:
# Read generated data into sparksession
df = spark.read.csv("generator_sensor_data.csv", header=True, inferSchema=True)

In [92]:
# Check data type of rows
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- equipment_id: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- status: string (nullable = true)
 |-- last_maintenance_date: timestamp (nullable = true)
 |-- last_failure_date: timestamp (nullable = true)
 |-- remaining_useful_life: double (nullable = true)
 |-- failure_reason: string (nullable = true)
 |-- voltage: integer (nullable = true)
 |-- current: integer (nullable = true)
 |-- frequency: double (nullable = true)
 |-- power_output: integer (nullable = true)
 |-- fuel_consumption: integer (nullable = true)
 |-- engine_speed: integer (nullable = true)
 |-- oil_pressure: integer (nullable = true)
 |-- coolant_temperature: double (nullable = true)
 |-- exhaust_gas_temperature: double (nullable = true)
 |-- battery_voltage: integer (nullable = true)
 |-- generator_load: integer (nullable = true)
 |-- engine_hours: integer (nullable = true)
 |-- vibration: integer (nullable = true)
 |-- coolant_level: double (null

In [93]:
# Check size of data
rows = df.count()
columns = len(df.columns)
print(f"Number of rows: {rows} \nNumber of columns: {columns}")

Number of rows: 40000 
Number of columns: 23


In [61]:
# Drop any possible duplicates
df = df.dropDuplicates()

In [62]:
# Drop any possible null records
df = df.dropna()

In [94]:
# Round off some columns to 2dp
from pyspark.sql import functions as F
df = df.withColumn("frequency_rounded", F.round(df['frequency'], 2))
df = df.withColumn("coolant_temperature_rounded", F.round(df['coolant_temperature'], 2))
df = df.withColumn("exhaust_gas_temperature_rounded", F.round(df['exhaust_gas_temperature'], 2))
df = df.withColumn("coolant_level_rounded", F.round(df['coolant_level'], 2))
df = df.withColumn("fuel_level_rounded", F.round(df['fuel_level'], 2))

In [95]:
# Drop old columns
columns_to_drop = ['frequency', 'coolant_temperature', 'exhaust_gas_temperature', 'coolant_level', 'fuel_level']
df = df.drop(*columns_to_drop)

In [96]:
# Rename new columns to old columns
df = df.withColumnRenamed("frequency_rounded", "frequency")
df = df.withColumnRenamed("coolant_temperature_rounded", "coolant_temperature")
df = df.withColumnRenamed("exhaust_gas_temperature_rounded", "exhaust_gas_temperature")
df = df.withColumnRenamed("coolant_level_rounded", "coolant_level")
df = df.withColumnRenamed("fuel_level_rounded", "fuel_level")


In [97]:
# Subset data
needed_columns = ['remaining_useful_life', 'voltage', 'current', 'frequency', 'power_output', 
                     'fuel_consumption', 'engine_speed', 'oil_pressure', 'coolant_temperature', 
                     'exhaust_gas_temperature', 'battery_voltage', 'generator_load', 'engine_hours', 
                     'vibration', 'coolant_level', 'fuel_level', 'failure_reason']
df = df.select(needed_columns)

In [80]:
# Convert categorical label using string indexer
indexer = StringIndexer(inputCol= "failure_reason", outputCol="label")

In [98]:
# Prepare feature vector
numerical_columns = ['remaining_useful_life', 'voltage', 'current', 'frequency', 'power_output', 
                     'fuel_consumption', 'engine_speed', 'oil_pressure', 'coolant_temperature', 
                     'exhaust_gas_temperature', 'battery_voltage', 'generator_load', 'engine_hours', 
                     'vibration', 'coolant_level', 'fuel_level']
assembler = VectorAssembler(inputCols=numerical_columns, outputCol='features')

In [99]:
# Scale the features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")


In [100]:
# Create model
lr = LogisticRegression(featuresCol="scaled_features", labelCol="label")

In [105]:
# Create the pipeline 
pipeline = Pipeline(stages=[indexer, assembler, scaler, lr])

In [102]:
# Split data in training and testing sets using a 70:30 ratio
(training_data, testing_data) = df.randomSplit([0.7,0.3], seed=98)

In [107]:
# Fit the pipeline using the training data
pipeline_model = pipeline.fit(training_data)

In [108]:
# Use the model on testing data
predictions = pipeline_model.transform(testing_data)

In [109]:
# Evaluate model performance
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)

evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='weightedPrecision')
precision = evaluator.evaluate(predictions)

evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='weightedRecall')
recall = evaluator.evaluate(predictions)

evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='f1')
f1_score = evaluator.evaluate(predictions)

print(f"accuracy: {accuracy}\nprecision: {precision}\nrecall: {recall}\nf1_score: {f1_score}")

accuracy: 0.9826086956521739
precision: 0.9672850613443253
recall: 0.9826086956521739
f1_score: 0.9748191185257198


In [None]:
# Save model
pipeline_model.write.save("pn_model")