## Load In Full Dataset

In [None]:
# Read IoT CSV directly from S3
iot_df = (
    spark.read.csv(
        "s3://hqpsusu-ml-data-bucket/raw/iot/metropt.csv",
        header=True,
        inferSchema=True
    )
)

# Preview data
iot_df.show(5)
iot_df.printSchema()


## Data Cleaning – Timestamps and GPS Coordinates

In [None]:
from pyspark.sql.functions import col, to_timestamp, round as spark_round

# Data Cleaning Step
# 1. Convert `timestamp` column from string to Spark's native timestamp type
#    → Ensures proper handling of time-series features (sorting, grouping, windowing).
# 2. Drop rows with missing GPS latitude/longitude values
#    → Removes incomplete records that would break geospatial analysis.
# 3. Add `lat_rounded` and `lon_rounded` columns (rounded to 2 decimals)
#    → Simplifies location-based grouping and aggregation (e.g., hourly metrics per station/area).
iot_clean = (
    iot_df
    .withColumn("timestamp", to_timestamp("timestamp"))   
    .dropna(subset=["gpsLat", "gpsLong"])                
    .withColumn("lat_rounded", spark_round(col("gpsLat"), 2))
    .withColumn("lon_rounded", spark_round(col("gpsLong"), 2))
)

# Preview cleaned data to confirm changes
iot_clean.show(5)
iot_clean.printSchema()


## Feature Engineering – Create Hourly Time Buckets



In [None]:
from pyspark.sql.functions import window, hour, date_trunc

# Feature Engineering Step
# 1. Create `timestamp_hour` by truncating the full timestamp to the nearest hour.
#    → This allows us to aggregate and analyze data at an hourly level 
#      (reduces noise and makes trends clearer).
# 2. Retain `lat_rounded` and `lon_rounded` from the cleaning step
#    → So later we can group by both time and location.
iot_features = (
    iot_clean
    .withColumn("timestamp_hour", date_trunc("hour", col("timestamp")))
)

# Quick sample to validate hourly truncation
iot_features.select("timestamp", "timestamp_hour", "lat_rounded", "lon_rounded").show(5)


## Feature Engineering – Hourly Aggregations by Location

In [None]:
from pyspark.sql.functions import avg, stddev, min, max

# Feature Engineering Step
# Aggregate IoT sensor values at the (hour, location) level.
# Why:
#   - Reduces granularity → smooths out second-level noise
#   - Captures meaningful hourly trends for each rail segment
#   - Generates summary statistics useful for predictive modeling
#
# Features created:
#   • Mean + standard deviation of pressure/temperature sensors
#   • Min, max, and average motor current (useful for anomaly detection)
#   • Hourly-level averages for TP2, TP3, H1, and DV pressure
iot_hourly = (
    iot_features
    .groupBy("timestamp_hour", "lat_rounded", "lon_rounded")
    .agg(
        avg("TP2").alias("TP2_avg"),
        stddev("TP2").alias("TP2_std"),
        avg("TP3").alias("TP3_avg"),
        avg("H1").alias("H1_avg"),
        avg("DV_pressure").alias("DV_pressure_avg"),
        avg("Oil_temperature").alias("Oil_temp_avg"),
        avg("Motor_current").alias("Motor_current_avg"),
        max("Motor_current").alias("Motor_current_max"),
        min("Motor_current").alias("Motor_current_min")
    )
)

# Quick sanity check – inspect aggregated output
iot_hourly.show(5)


## Persist Processed Features to Storage


In [None]:
# Define output storage path (AWS S3 bucket for processed features)
output_path = "s3://hqpsusu-ml-data-bucket/processed/iot_hourly_features"

# Save the aggregated hourly features in Parquet format
# Why Parquet:
#   - Columnar format → efficient for analytics and ML pipelines
#   - Supports schema evolution and compression
#   - Ideal for large-scale Spark workloads
iot_hourly.write.mode("overwrite").parquet(output_path)

print(f"✅ Features saved to {output_path}")


## Load and Validate Processed Features


In [None]:
# Load processed features back from S3 (saved in Parquet format)
features = spark.read.parquet("s3://hqpsusu-ml-data-bucket/processed/iot_hourly_features")

# Inspect schema to confirm expected columns and types
features.printSchema()

# Show a sample of processed records
features.show(5)

# Generate quick summary statistics on key features
# Helps verify ranges, distributions, and catch anomalies
features.describe(["TP2_avg", "TP3_avg", "Oil_temp_avg", "Motor_current_avg"]).show()

# Visualize trends (Databricks built-in display function)
# Example: Track motor current over time
display(features.select("timestamp_hour", "Motor_current_avg"))


## Inspect Distribution of Categorical Signals


In [None]:
# Check the distribution of categorical / binary sensor signals
# These represent switches, flags, or on/off states (e.g., COMP, LPS, MPG).
# Grouping and counting helps identify class imbalance, rare categories, 
# or useless features (e.g., signals always stuck at 0).

for col in ["COMP", "LPS", "MPG", "Pressure_switch", "DV_eletric", "Towers", "Oil_level"]:
    iot_raw.groupBy(col).count().orderBy(col).show()


## Create Failure Labels

In [None]:
from pyspark.sql import functions as F

# Add binary failure column based on strict failure rule:
# A failure is flagged when COMP = 0, MPG = 0, and LPS = 0 simultaneously.
iot_labeled = (
    iot_raw.withColumn(
        "failure",
        F.when(
            (F.col("COMP") == 0) &
            (F.col("MPG") == 0) &
            (F.col("LPS") == 0),
            1
        ).otherwise(0)
    )
)

# Preview some rows with the new failure label
iot_labeled.select("timestamp", "COMP", "MPG", "LPS", "failure").show(10)

# Count distribution of failure vs. non-failure records
iot_labeled.groupBy("failure").count().show()


## Join Features with Failure Labels

In [None]:
from pyspark.sql import functions as F

# 1. Define failure flag in the raw IoT dataset:
# A failure occurs when COMP = 0, MPG = 0, and LPS = 0 simultaneously.
iot_failures = (
    iot_raw
    .withColumn(
        "failure",
        F.when(
            (F.col("COMP") == 0) & 
            (F.col("MPG") == 0) & 
            (F.col("LPS") == 0),
            1
        ).otherwise(0)
    )
    # Add spatial + temporal resolution for later joins
    .withColumn("timestamp_hour", F.date_trunc("hour", F.col("timestamp")))
    .withColumn("lat_rounded", F.round(F.col("gpsLat"), 2))
    .withColumn("lon_rounded", F.round(F.col("gpsLong"), 2))
)

# 2. Aggregate failures at the hourly and location level
failures_hourly = (
    iot_failures
    .groupBy("timestamp_hour", "lat_rounded", "lon_rounded")
    .agg(F.max("failure").alias("failure"))   # if any failure in hour, mark = 1
)

# 3. Load previously engineered hourly features
features = spark.read.parquet("s3://hqpsusu-ml-data-bucket/processed/iot_hourly_features")

# 4. Join features with failure labels on timestamp + location
features_with_failures = (
    features.join(
        failures_hourly,
        on=["timestamp_hour", "lat_rounded", "lon_rounded"],
        how="left"
    )
    .fillna({"failure": 0})   # ensure missing = no failure
)

# 5. Quick validation: check class balance + preview sample
features_with_failures.groupBy("failure").count().show()
features_with_failures.show(5)


## Alternative Failure Labeling (Row-Level → Hourly)


In [None]:
from pyspark.sql import functions as F

# 1. Reload previously engineered features
features = spark.read.parquet("s3://hqpsusu-ml-data-bucket/processed/iot_hourly_features")

# 2. Reload raw IoT data (needed for re-labeling)
iot_raw = (
    spark.read.csv(
        "s3://hqpsusu-ml-data-bucket/raw/iot/metropt.csv",
        header=True,
        inferSchema=True
    )
)

# 3. Define a stricter failure rule at the row level
# Rule: failure = 1 if COMP=0, MPG=0, and LPS=0 simultaneously
# Then aggregate to hourly resolution (failure=1 if ANY row in the hour has failure)
iot_fail = (
    iot_raw
    .withColumn(
        "failure", 
        F.when(
            (F.col("COMP") == 0) & 
            (F.col("MPG") == 0) & 
            (F.col("LPS") == 0),
            1
        ).otherwise(0)
    )
    .withColumn("timestamp_hour", F.date_trunc("hour", F.col("timestamp")))
    .groupBy("timestamp_hour")
    .agg(F.max("failure").alias("failure"))
)

# 4. Join the stricter failure labels onto the engineered features
features_with_failure = features.join(iot_fail, on="timestamp_hour", how="left")

# 5. Preview results
features_with_failure.select(
    "timestamp_hour", 
    "Motor_current_avg", 
    "Oil_temp_avg", 
    "DV_pressure_avg", 
    "failure"
).show(5)


## Optional Failure Labeling – Compute Remaining Useful Life (RUL) 

In [None]:
from pyspark.sql import functions as F, Window

# Define window ordered by time
w = Window.orderBy("timestamp_hour") \
           .rowsBetween(0, Window.unboundedFollowing)

# Step 1. Find the next failure timestamp for each row
features = features.withColumn(
    "next_failure_time",
    F.min(F.when(F.col("failure") == 1, F.col("timestamp_hour"))).over(w)
)

# Step 2. Compute Remaining Useful Life (RUL) in hours
features = features.withColumn(
    "RUL_hours",
    F.when(
        F.col("next_failure_time").isNotNull(),
        (F.unix_timestamp("next_failure_time") - F.unix_timestamp("timestamp_hour")) / 3600
    )
)

# Step 3. Force failures to have RUL = 0
features = features.withColumn(
    "RUL_hours",
    F.when(F.col("failure") == 1, F.lit(0.0)).otherwise(F.col("RUL_hours"))
)

# Inspect
features.select("timestamp_hour", "failure", "next_failure_time", "RUL_hours").show(30, truncate=False)


##  Optional Final Feature Engineering – Binning, Rolling Stats, and RUL Labels

from pyspark.sql import functions as F, Window

# -----------------------------------
# 1) Load raw IoT data and define failure
# -----------------------------------
raw_df = (
    spark.read.csv(
        "s3://hqpsusu-ml-data-bucket/raw/iot/metropt.csv",
        header=True,
        inferSchema=True
    )
)

# Define a raw failure condition (COMP=0 & MPG=0 & LPS=0)
raw_df = raw_df.withColumn(
    "failure_raw",
    F.when((F.col("COMP") == 0) & (F.col("MPG") == 0) & (F.col("LPS") == 0), 1).otherwise(0)
)

# -----------------------------------
# 2) Bin into 2-minute intervals
#    - This reduces granularity and smooths sensor signals
# -----------------------------------
raw_df = raw_df.withColumn(
    "timestamp_bin",
    (F.col("timestamp").cast("long") / 120).cast("long") * 120
)
raw_df = raw_df.withColumn("timestamp_bin", F.from_unixtime("timestamp_bin").cast("timestamp"))

# -----------------------------------
# 3) Aggregate engineered features
#    - Numeric signals → avg, std, min, max
#    - Binary/state signals → max (presence indicator)
# -----------------------------------
numeric_cols = [
    "Motor_current", "Oil_temperature", "DV_pressure",
    "TP2", "TP3", "H1", "Reservoirs", "Flowmeter",
    "Caudal_impulses", "gpsSpeed"
]

binary_cols = [
    "COMP", "MPG", "LPS", "Pressure_switch",
    "DV_eletric", "Towers", "Oil_level", "gpsQuality"
]

agg_exprs = []
for c in numeric_cols:
    agg_exprs += [
        F.avg(c).alias(f"{c}_avg"),
        F.stddev(c).alias(f"{c}_std"),
        F.min(c).alias(f"{c}_min"),
        F.max(c).alias(f"{c}_max"),
    ]
for c in binary_cols:
    agg_exprs += [F.max(F.col(c).cast("int")).alias(f"{c}_flag")]

# Carry the failure flag to each bin
agg_exprs += [F.max("failure_raw").alias("failure")]

features = (
    raw_df.groupBy("timestamp_bin")
          .agg(*agg_exprs)
          .orderBy("timestamp_bin")
)

# -----------------------------------
# 4) Rolling statistics, lags, deltas
#    - 30 min window (15 bins of 2 min each)
# -----------------------------------
w_order = Window.orderBy("timestamp_bin")
w_roll_30m = w_order.rowsBetween(-15, -1)

key_bases = [
    "Motor_current_avg",
    "Oil_temperature_avg",
    "DV_pressure_avg"
]

for base in key_bases:
    features = features.withColumn(f"{base}_roll_mean_30m", F.avg(F.col(base)).over(w_roll_30m))
    features = features.withColumn(f"{base}_roll_std_30m",  F.stddev(F.col(base)).over(w_roll_30m))
    features = features.withColumn(f"{base}_lag1",          F.lag(F.col(base), 1).over(w_order))
    features = features.withColumn(f"{base}_delta",         F.col(base) - F.col(f"{base}_lag1"))

# -----------------------------------
# 5) Remaining Useful Life (RUL) calculation
#    - Next failure time
#    - Last failure time
#    - RUL in minutes
# -----------------------------------
w_fwd  = Window.orderBy("timestamp_bin").rowsBetween(0, Window.unboundedFollowing)
w_back = Window.orderBy("timestamp_bin").rowsBetween(Window.unboundedPreceding, 0)

features = features.withColumn(
    "next_failure_time",
    F.first(F.when(F.col("failure") == 1, F.col("timestamp_bin")), ignorenulls=True).over(w_fwd)
)

features = features.withColumn(
    "last_failure_time",
    F.last(F.when(F.col("failure") == 1, F.col("timestamp_bin")), ignorenulls=True).over(w_back)
)

features = features.withColumn(
    "RUL_minutes",
    F.when(F.col("failure") == 1, F.lit(0.0))
     .otherwise((F.unix_timestamp("next_failure_time") - F.unix_timestamp("timestamp_bin")) / 60.0)
)

features = features.withColumn(
    "minutes_since_last_failure",
    (F.unix_timestamp("timestamp_bin") - F.unix_timestamp("last_failure_time")) / 60.0
)

# Optional: fill NA roll stats at the beginning
fill_zero_cols = [f"{b}_roll_mean_30m" for b in key_bases] + \
                 [f"{b}_roll_std_30m" for b in key_bases] + \
                 [f"{b}_lag1" for b in key_bases] + \
                 [f"{b}_delta" for b in key_bases]
features = features.fillna(0, subset=fill_zero_cols)

# -----------------------------------
# 6) Final dataset
# -----------------------------------
df_final = features

# -----------------------------------
# 7) Preview schema and dimensions
# -----------------------------------
print("Schema:")
df_final.printSchema()

print("\nSample rows:")
df_final.show(10, truncate=False)

print("\nRow count:", df_final.count())
print("Column count:", len(df_final.columns))
