In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, to_date, dayofweek, hour, lag, mean, lit, when,avg
from pyspark.sql.window import Window
import json
from pyspark.sql.functions import rand, isnan, when
from pyspark.sql.types import FloatType
from pyspark.sql.functions import rand, round
from keras.callbacks import TensorBoard
from sklearn.preprocessing import MinMaxScaler
from keras.models import Sequential
from keras.layers import LSTM, Dense

from sklearn.preprocessing import MinMaxScaler
import numpy as np
import datetime
from tensorflow.keras.utils import to_categorical

# Initialize a SparkSession
spark = SparkSession.builder.appName("PredictiveMaintenance").getOrCreate()

# Load your dataset into a PySpark DataFrame
df = spark.read.csv('/Users/mac/Downloads/predictive.csv', header=True, inferSchema=True)

# Assuming 'date_insertion' is the correct timestamp column based on your dataset schema
# Adjust the following transformations accordingly:

# Extract JSON fields function
def extract_from_json(column, key):
    try:
        json_data = json.loads(column.replace("'", "\""))
        return json_data.get(key, None)
    except:
        return None

# Registering the UDF
extract_from_json_udf = udf(extract_from_json)

# Step 1: Extracting 'oil_value' and 'fuel_liters'
df = df.withColumn("oil_value", extract_from_json_udf(col("details"), lit("oil_value")))
df = df.withColumn("fuel_liters", extract_from_json_udf(col("details"), lit("fuel_liters")))

# Step 2: Creating time-based features
df = df.withColumn("date_insertion", to_date(col("date_insertion")))
df = df.withColumn("day_of_week", dayofweek(col("date_insertion")))
df = df.withColumn("hour_of_day", hour(col("date_insertion")))



# Step 4: Aggregate readings on a daily basis
daily_avg_df = df.groupBy("thing_id", "date_insertion").agg(mean("power_supply_voltage").alias("daily_avg_voltage"))
df = df.join(daily_avg_df, ["thing_id", "date_insertion"], "left")

# Step 5: Create binary indicator for 'engine_status'
df = df.withColumn("engine_alert", when(col("engine_status") == "Abnormal", 1).otherwise(0))

# Define a UDF to generate random values within a range
def random_value(min_value, max_value):
    return (rand() * (max_value - min_value) + min_value).cast(FloatType())

# random_value_udf = udf(random_value, FloatType())

# Set min and max values for 'oil_value' and 'fuel_liters'
oil_value_min, oil_value_max = 0, 4
fuel_liters_min, fuel_liters_max = 0, 60

# Replace null values with random numbers
# Replace null values with random numbers and round to 1 decimal place
df = df.withColumn("oil_value", when(df['oil_value'].isNull(), round((rand() * (oil_value_max - oil_value_min) + oil_value_min), 1)).otherwise(df['oil_value']))
df = df.withColumn("fuel_liters", when(df['fuel_liters'].isNull(), round((rand() * (fuel_liters_max - fuel_liters_min) + fuel_liters_min), 1)).otherwise(df['fuel_liters']))

# Step 6: Generate interaction features
# df = df.withColumn("voltage_current_interaction", col("power_supply_voltage") * col("battery_current"))

# Step 3: Calculating rate of change for 'battery_current'
windowSpec = Window.partitionBy("thing_id").orderBy("date_insertion")
df = df.withColumn("battery_current_change", col("power_supply_voltage") - lag("power_supply_voltage", 1).over(windowSpec))


df = df.select("thing_id", "date_insertion", "speed", "total_km", "engine_status", "power_supply_voltage" ,"oil_value", "fuel_liters",  "battery_current_change", "daily_avg_voltage")


from pyspark.sql import Window
from pyspark.sql.functions import lag, avg, stddev

# Define a window
window = Window.orderBy('date_insertion').rowsBetween(-9, 0)  # assuming 'date_insertion' is your time column

# Calculate rolling averages and standard deviations
# df = df.withColumn('speed_avg', avg(df['speed']).over(window))
# df = df.withColumn('oil_value_std', stddev(df['oil_value']).over(window))

# Calculate changes between consecutive readings
# df = df.withColumn('speed_change', df['speed'] - lag(df['speed']).over(Window.orderBy('date_insertion')))
df = df.withColumn('fuel_change', df['fuel_liters'] - lag(df['fuel_liters']).over(Window.orderBy('date_insertion')))

# Step 7: Calculate rolling average


# Define a Window specification
# windowSpec = Window.orderBy('date_insertion').rowsBetween(-4, 0)  # 5 rows including current row

# Calculate rolling average
# df = df.withColumn('oil_quality_rolling_avg', avg(df['oil_value']).over(windowSpec))

# Show the first 5 rows of the DataFrame
df.show()



24/04/30 12:38:40 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/30 12:38:40 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/30 12:38:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/30 12:38:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/30 12:38:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/30 12:38:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/04/30 1

+--------+--------------+-----+---------+-------------+--------------------+---------+-----------+----------------------+-----------------+-------------------+
|thing_id|date_insertion|speed| total_km|engine_status|power_supply_voltage|oil_value|fuel_liters|battery_current_change|daily_avg_voltage|        fuel_change|
+--------+--------------+-----+---------+-------------+--------------------+---------+-----------+----------------------+-----------------+-------------------+
|     629|    2024-02-28|    0|334365360|            1|               22.25|      3.3|       58.3|                  NULL|          21.3035| 1.7999999999999972|
|     629|    2024-02-28|    1|334365360|            1|               22.18|      4.0|       30.0|  -0.07000000000000028|          21.3035|-28.299999999999997|
|     629|    2024-02-28|    1|334365364|            1|                22.2|      0.6|       50.7|  0.019999999999999574|          21.3035| 20.700000000000003|
|     629|    2024-02-28|    0|334365367