# Predictive Maintenance Analysis - Time-Series Feature Engineering

This notebook demonstrates my implementation of rolling time-series feature computations on a large-scale dataset. Computed rolling statistics (mean, difference, std, max, min) for 46 features across 5 time windows, creating 1,150 new features for predictive modeling.

**Technical Achievement:** Overcame PySpark StackOverflow errors and optimized computational performance for this intensive operation.

## Workflow Overview

- [Feature & Window Configuration](#Define-list-of-features-for-rolling-compute,-window-sizes)
- [Performance Optimization Strategies](#What-issues-we-encountered-using-Pyspark-and-how-we-solved-them?)
- [Rolling Statistics Computation](#Rolling-Compute)
  - [Rolling Mean](#Rolling-Mean)
  - [Rolling Difference](#Rolling-Difference)
  - [Rolling Standard Deviation](#Rolling-Std)
  - [Rolling Maximum](#Rolling-Max)
  - [Rolling Minimum](#Rolling-Min)
- [Data Integration](#Join-result-dataset-from-the-five-rolling-compute-cells:)

In [1]:
import pyspark.sql.functions as F
import time
import subprocess
import sys
import os
import re

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import col,udf,lag,date_add,explode,lit,concat,unix_timestamp
from pyspark.sql.dataframe import *
from pyspark.sql.window import Window
from pyspark.sql.types import DateType
from datetime import datetime, timedelta
from pyspark.sql import Row

start_time = time.time()


## Rolling Feature Configuration

Selected 46 key features for time-series analysis including warning aggregates, PCA components, problem types, and fault codes. Implemented rolling computations over 5 temporal windows (3, 7, 14, 30, and 90 days).

In [2]:
rolling_features = [
    'warn_type1_total', 'warn_type2_total', 
    'pca_1_warn','pca_2_warn', 'pca_3_warn', 'pca_4_warn', 'pca_5_warn',
    'pca_6_warn','pca_7_warn', 'pca_8_warn', 'pca_9_warn', 'pca_10_warn',
    'pca_11_warn','pca_12_warn', 'pca_13_warn', 'pca_14_warn', 'pca_15_warn',
    'pca_16_warn','pca_17_warn', 'pca_18_warn', 'pca_19_warn', 'pca_20_warn',
    'problem_type_1', 'problem_type_2', 'problem_type_3','problem_type_4',
    'problem_type_1_per_usage1','problem_type_2_per_usage1',
    'problem_type_3_per_usage1','problem_type_4_per_usage1',
    'problem_type_1_per_usage2','problem_type_2_per_usage2',
    'problem_type_3_per_usage2','problem_type_4_per_usage2',                
    'fault_code_type_1_count', 'fault_code_type_2_count', 'fault_code_type_3_count', 'fault_code_type_4_count',                          
    'fault_code_type_1_count_per_usage1','fault_code_type_2_count_per_usage1',
    'fault_code_type_3_count_per_usage1', 'fault_code_type_4_count_per_usage1',
    'fault_code_type_1_count_per_usage2','fault_code_type_2_count_per_usage2',
    'fault_code_type_3_count_per_usage2', 'fault_code_type_4_count_per_usage2']
               
# lag window 3, 7, 14, 30, 90 days
lags = [3, 7, 14, 30, 90]

print(len(rolling_features))


46


## Technical Challenges & Solutions

### Challenge: StackOverflow Error
**Problem:** Processing 46 features × 5 windows × 5 statistics (1,150 features) caused Spark lineage graph to become too deep, resulting in StackOverflow errors.

**Root Cause:** Spark's DAG (Directed Acyclic Graph) couldn't handle the extensive lineage from chained transformations.

**Solutions Implemented:**
1. **Workload Segmentation:** Broke computation into chunks, saving intermediate results as Parquet files
2. **Lineage Management:** Materialized upstream results from Notebook #1 to reduce lineage depth
3. **Progress Monitoring:** Implemented logging to track processing status across features and windows

### Performance Optimization Insights

**Resource Utilization Analysis:**
- Monitored CPU usage with `htop` command
- Discovered all 32 cores were fully utilized during rolling computations
- Determined that single-node processing on a powerful VM was as efficient as distributed cluster processing

**Best Practices Applied:**
- **Parquet Format:** Preserved schema and enabled selective column reading
- **Checkpoint Strategy:** Materialized intermediate results to prevent recomputation
- **Execution Timing:** Used `%%time` magic to profile and optimize each computation stage

**Architecture Decision:** Selected single-node Spark over multi-node HDInsight cluster based on performance benchmarking.

## Rolling Statistics Computation
### Rolling Mean

Calculated moving averages across temporal windows to capture trending patterns in machine behavior.

In [3]:
%%time

# Load result dataset from Notebook #1
df = sqlContext.read.parquet('/mnt/resource/PysparkExample/notebook1_result.parquet')

for lag_n in lags:
    wSpec = Window.partitionBy('deviceid').orderBy('date').rowsBetween(1-lag_n, 0)
    for col_name in rolling_features:
        df = df.withColumn(col_name+'_rollingmean_'+str(lag_n), F.avg(col(col_name)).over(wSpec))
        print("Lag = %d, Column = %s" % (lag_n, col_name))

# Save the intermediate result for downstream work
df.write.mode('overwrite').parquet('/mnt/resource/PysparkExample/data_rollingmean.parquet')


Lag = 3, Column = warn_type1_total
Lag = 3, Column = warn_type2_total
Lag = 3, Column = pca_1_warn
Lag = 3, Column = pca_2_warn
Lag = 3, Column = pca_3_warn
Lag = 3, Column = pca_4_warn
Lag = 3, Column = pca_5_warn
Lag = 3, Column = pca_6_warn
Lag = 3, Column = pca_7_warn
Lag = 3, Column = pca_8_warn
Lag = 3, Column = pca_9_warn
Lag = 3, Column = pca_10_warn
Lag = 3, Column = pca_11_warn
Lag = 3, Column = pca_12_warn
Lag = 3, Column = pca_13_warn
Lag = 3, Column = pca_14_warn
Lag = 3, Column = pca_15_warn
Lag = 3, Column = pca_16_warn
Lag = 3, Column = pca_17_warn
Lag = 3, Column = pca_18_warn
Lag = 3, Column = pca_19_warn
Lag = 3, Column = pca_20_warn
Lag = 3, Column = problem_type_1
Lag = 3, Column = problem_type_2
Lag = 3, Column = problem_type_3
Lag = 3, Column = problem_type_4
Lag = 3, Column = problem_type_1_per_usage1
Lag = 3, Column = problem_type_2_per_usage1
Lag = 3, Column = problem_type_3_per_usage1
Lag = 3, Column = problem_type_4_per_usage1
Lag = 3, Column = problem_type_

### Rolling Difference

Computed deviations from moving averages to detect anomalous behavior and sudden changes in machine performance.

In [4]:
%%time

# Load result dataset from Notebook #1
df = sqlContext.read.parquet('/mnt/resource/PysparkExample/notebook1_result.parquet')

for lag_n in lags:
    wSpec = Window.partitionBy('deviceid').orderBy('date').rowsBetween(1-lag_n, 0)
    for col_name in rolling_features:
        df = df.withColumn(col_name+'_rollingdiff_'+str(lag_n), col(col_name)-F.avg(col(col_name)).over(wSpec))
        print("Lag = %d, Column = %s" % (lag_n, col_name))

rollingdiff = df.select(['key'] + list(s for s in df.columns if "rollingdiff" in s))

# Save the intermediate result for downstream work
rollingdiff.write.mode('overwrite').parquet('/mnt/resource/PysparkExample/rollingdiff.parquet')


Lag = 3, Column = warn_type1_total
Lag = 3, Column = warn_type2_total
Lag = 3, Column = pca_1_warn
Lag = 3, Column = pca_2_warn
Lag = 3, Column = pca_3_warn
Lag = 3, Column = pca_4_warn
Lag = 3, Column = pca_5_warn
Lag = 3, Column = pca_6_warn
Lag = 3, Column = pca_7_warn
Lag = 3, Column = pca_8_warn
Lag = 3, Column = pca_9_warn
Lag = 3, Column = pca_10_warn
Lag = 3, Column = pca_11_warn
Lag = 3, Column = pca_12_warn
Lag = 3, Column = pca_13_warn
Lag = 3, Column = pca_14_warn
Lag = 3, Column = pca_15_warn
Lag = 3, Column = pca_16_warn
Lag = 3, Column = pca_17_warn
Lag = 3, Column = pca_18_warn
Lag = 3, Column = pca_19_warn
Lag = 3, Column = pca_20_warn
Lag = 3, Column = problem_type_1
Lag = 3, Column = problem_type_2
Lag = 3, Column = problem_type_3
Lag = 3, Column = problem_type_4
Lag = 3, Column = problem_type_1_per_usage1
Lag = 3, Column = problem_type_2_per_usage1
Lag = 3, Column = problem_type_3_per_usage1
Lag = 3, Column = problem_type_4_per_usage1
Lag = 3, Column = problem_type_

### Rolling Standard Deviation

Measured variability over time windows to identify instability patterns. Applied missing value imputation (0-fill) for null standard deviations.

In [5]:
%%time

# Load result dataset from Notebook #1
df = sqlContext.read.parquet('/mnt/resource/PysparkExample/notebook1_result.parquet')

for lag_n in lags:
    wSpec = Window.partitionBy('deviceid').orderBy('date').rowsBetween(1-lag_n, 0)
    for col_name in rolling_features:
        df = df.withColumn(col_name+'_rollingstd_'+str(lag_n), F.stddev(col(col_name)).over(wSpec))
        print("Lag = %d, Column = %s" % (lag_n, col_name))

# There are some missing values for rollingstd features
rollingstd_features = list(s for s in df.columns if "rollingstd" in s)
df = df.fillna(0, subset=rollingstd_features)
rollingstd = df.select(['key'] + list(s for s in df.columns if "rollingstd" in s))

# Save the intermediate result for downstream work
rollingstd.write.mode('overwrite').parquet('/mnt/resource/PysparkExample/rollingstd.parquet')


Lag = 3, Column = warn_type1_total
Lag = 3, Column = warn_type2_total
Lag = 3, Column = pca_1_warn
Lag = 3, Column = pca_2_warn
Lag = 3, Column = pca_3_warn
Lag = 3, Column = pca_4_warn
Lag = 3, Column = pca_5_warn
Lag = 3, Column = pca_6_warn
Lag = 3, Column = pca_7_warn
Lag = 3, Column = pca_8_warn
Lag = 3, Column = pca_9_warn
Lag = 3, Column = pca_10_warn
Lag = 3, Column = pca_11_warn
Lag = 3, Column = pca_12_warn
Lag = 3, Column = pca_13_warn
Lag = 3, Column = pca_14_warn
Lag = 3, Column = pca_15_warn
Lag = 3, Column = pca_16_warn
Lag = 3, Column = pca_17_warn
Lag = 3, Column = pca_18_warn
Lag = 3, Column = pca_19_warn
Lag = 3, Column = pca_20_warn
Lag = 3, Column = problem_type_1
Lag = 3, Column = problem_type_2
Lag = 3, Column = problem_type_3
Lag = 3, Column = problem_type_4
Lag = 3, Column = problem_type_1_per_usage1
Lag = 3, Column = problem_type_2_per_usage1
Lag = 3, Column = problem_type_3_per_usage1
Lag = 3, Column = problem_type_4_per_usage1
Lag = 3, Column = problem_type_

### Rolling Maximum

Captured peak values within time windows to identify extreme operational conditions.

In [6]:
%%time

# Load result dataset from Notebook #1
df = sqlContext.read.parquet('/mnt/resource/PysparkExample/notebook1_result.parquet')

for lag_n in lags:
    wSpec = Window.partitionBy('deviceid').orderBy('date').rowsBetween(1-lag_n, 0)
    for col_name in rolling_features:
        df = df.withColumn(col_name+'_rollingmax_'+str(lag_n), F.max(col(col_name)).over(wSpec))
        print("Lag = %d, Column = %s" % (lag_n, col_name))

rollingmax = df.select(['key'] + list(s for s in df.columns if "rollingmax" in s))

# Save the intermediate result for downstream work
rollingmax.write.mode('overwrite').parquet('/mnt/resource/PysparkExample/rollingmax.parquet')


Lag = 3, Column = warn_type1_total
Lag = 3, Column = warn_type2_total
Lag = 3, Column = pca_1_warn
Lag = 3, Column = pca_2_warn
Lag = 3, Column = pca_3_warn
Lag = 3, Column = pca_4_warn
Lag = 3, Column = pca_5_warn
Lag = 3, Column = pca_6_warn
Lag = 3, Column = pca_7_warn
Lag = 3, Column = pca_8_warn
Lag = 3, Column = pca_9_warn
Lag = 3, Column = pca_10_warn
Lag = 3, Column = pca_11_warn
Lag = 3, Column = pca_12_warn
Lag = 3, Column = pca_13_warn
Lag = 3, Column = pca_14_warn
Lag = 3, Column = pca_15_warn
Lag = 3, Column = pca_16_warn
Lag = 3, Column = pca_17_warn
Lag = 3, Column = pca_18_warn
Lag = 3, Column = pca_19_warn
Lag = 3, Column = pca_20_warn
Lag = 3, Column = problem_type_1
Lag = 3, Column = problem_type_2
Lag = 3, Column = problem_type_3
Lag = 3, Column = problem_type_4
Lag = 3, Column = problem_type_1_per_usage1
Lag = 3, Column = problem_type_2_per_usage1
Lag = 3, Column = problem_type_3_per_usage1
Lag = 3, Column = problem_type_4_per_usage1
Lag = 3, Column = problem_type_

### Rolling Minimum

Tracked minimum values to establish baseline performance metrics across temporal windows.

In [7]:
%%time

# Load result dataset from Notebook #1
df = sqlContext.read.parquet('/mnt/resource/PysparkExample/notebook1_result.parquet')

for lag_n in lags:
    wSpec = Window.partitionBy('deviceid').orderBy('date').rowsBetween(1-lag_n, 0)
    for col_name in rolling_features:
        df = df.withColumn(col_name+'_rollingmin_'+str(lag_n), F.min(col(col_name)).over(wSpec))
        print("Lag = %d, Column = %s" % (lag_n, col_name))

rollingmin = df.select(['key'] + list(s for s in df.columns if "rollingmin" in s))

# Save the intermediate result for downstream work
rollingmin.write.mode('overwrite').parquet('/mnt/resource/PysparkExample/rollingmin.parquet')


Lag = 3, Column = warn_type1_total
Lag = 3, Column = warn_type2_total
Lag = 3, Column = pca_1_warn
Lag = 3, Column = pca_2_warn
Lag = 3, Column = pca_3_warn
Lag = 3, Column = pca_4_warn
Lag = 3, Column = pca_5_warn
Lag = 3, Column = pca_6_warn
Lag = 3, Column = pca_7_warn
Lag = 3, Column = pca_8_warn
Lag = 3, Column = pca_9_warn
Lag = 3, Column = pca_10_warn
Lag = 3, Column = pca_11_warn
Lag = 3, Column = pca_12_warn
Lag = 3, Column = pca_13_warn
Lag = 3, Column = pca_14_warn
Lag = 3, Column = pca_15_warn
Lag = 3, Column = pca_16_warn
Lag = 3, Column = pca_17_warn
Lag = 3, Column = pca_18_warn
Lag = 3, Column = pca_19_warn
Lag = 3, Column = pca_20_warn
Lag = 3, Column = problem_type_1
Lag = 3, Column = problem_type_2
Lag = 3, Column = problem_type_3
Lag = 3, Column = problem_type_4
Lag = 3, Column = problem_type_1_per_usage1
Lag = 3, Column = problem_type_2_per_usage1
Lag = 3, Column = problem_type_3_per_usage1
Lag = 3, Column = problem_type_4_per_usage1
Lag = 3, Column = problem_type_

## Data Integration & Optimization

### Join Optimization Strategy

**Challenge:** Spark joins are computationally expensive, especially with high partition counts.

**Solution Implemented:**
1. **Partition Reduction:** Used `coalesce()` instead of `repartition()` to minimize data shuffling
2. **Performance Benefit:** `coalesce()` avoids costly reshuffling operations while reducing partition count
3. **Sequential Joins:** Merged five rolling statistic datasets using optimized inner joins

Successfully integrated 1,150 rolling features into a unified dataset for downstream modeling.

In [8]:
# Import result dataset 
rollingmean = sqlContext.read.parquet('/mnt/resource/PysparkExample/data_rollingmean.parquet')
rollingdiff = sqlContext.read.parquet('/mnt/resource/PysparkExample/rollingdiff.parquet')
rollingstd = sqlContext.read.parquet('/mnt/resource/PysparkExample/rollingstd.parquet')
rollingmax = sqlContext.read.parquet('/mnt/resource/PysparkExample/rollingmax.parquet')
rollingmin = sqlContext.read.parquet('/mnt/resource/PysparkExample/rollingmin.parquet')

# Check the number of partitions for each dataset
print(rollingmean.rdd.getNumPartitions())
print(rollingdiff.rdd.getNumPartitions())
print(rollingstd.rdd.getNumPartitions())
print(rollingmax.rdd.getNumPartitions())
print(rollingmin.rdd.getNumPartitions())


33
33
33
31
31


In [9]:
%%time

# To make join faster, reduce the number of partitions (not necessarily to "1")
rollingmean = rollingmean.coalesce(1)
rollingdiff = rollingdiff.coalesce(1)
rollingstd = rollingstd.coalesce(1)
rollingmax = rollingmax.coalesce(1)
rollingmin = rollingmin.coalesce(1)

rolling_result = rollingmean.join(rollingdiff, 'key', 'inner')\
                 .join(rollingstd, 'key', 'inner')\
                 .join(rollingmax, 'key', 'inner')\
                 .join(rollingmin, 'key', 'inner')
            

## Write the final result as parquet file for downstream work in Notebook_3
rolling_result.write.mode('overwrite').parquet('/mnt/resource/PysparkExample/notebook2_result.parquet')


CPU times: user 901 ms, sys: 303 ms, total: 1.2 s
Wall time: 1h 50min 38s
