Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.

# Data preperation and feature engineering for PdM

All of our labs use predictive maintenance as their use-case. In this lab, we read and pre-process the data. The relevant data sources for predictive maintenance include, but are not limited to:

- **Machine operating conditions:** data of the equipment health over time (usually sensor-based and streaming). We will refer to this data as machine telemetry data.
- **Error histor:** this data contains logs of non-breaking errors that happen thoughout a machine's operation and which parts of the machine they came from
- **Failure history:** this data contains logs of severe errors that broke the machine down (requiring maintenance to operate again) and parts of the machine that caused it
- **Maintenance/repair history:** what parts were fixed/replaced (as part of scheduled maintenance or due to failure) and when
    Equipment metadata: anything we know about equipment and parts (such as make, model, etc.)

In [3]:
%run "../includes/mnt_blob"

In [4]:
%sh ls /dbfs/mnt/data/telemetry

From now on we will adopt to following consistent terminology to avoid confusion:

- A system as a whole will be called a **machine** and its parts are called **components**.
- A machine can experience **errors** when anomalies happen. Errors do **NOT** result in shutdown, and they are **NOT** tied to any particular components, but they can cause one or several component to eventually fail.
- A machine can experience **failure** when one of its components shuts down. This requires the component to be replaced before the machine can be operational again.
- For our purposes, **maintenance** means a component was replaced. This can be either as part of a routine schedule or unscheduled maintenance (due to component failure).

In [6]:
%sh head /dbfs/mnt/data/telemetry/telemetry.csv

We can now start loading the data.

In [8]:
df_telemetry = spark.read.csv("/mnt/data/telemetry/telemetry.csv", header = "true", inferSchema = "true").cache()
df_failures = spark.read.csv("/mnt/data/telemetry/failures.csv", header = "true", inferSchema = "true").cache()
df_errors = spark.read.csv("/mnt/data/telemetry/errors.csv", header = "true", inferSchema = "true").cache()
df_maint = spark.read.csv("/mnt/data/telemetry/maintenance.csv", header = "true", inferSchema = "true").cache()
df_machines = spark.read.csv("/mnt/data/telemetry/machines.csv", header = "true", inferSchema = "true").cache()

Let's look at the schema to check our column types. Notice that the timestamp column has a `string` type. This willl need to later be changed to `timestamp` so we can perform date-time operations on it.

In [10]:
df_telemetry.printSchema()

We have four separate data sources, and as our first step toward data preparation, we combine all of them into one. We do so in this case by appending everything to the telemetry data using a left join. We then convert `datetime` into a `timestamp` column.

In [12]:
keys = ["machineID", "datetime"]

type = "left"

df = df_telemetry.join(df_errors.withColumnRenamed("errorID", "error"), keys, type) \
                 .join(df_failures.withColumnRenamed("failure", "fail"), keys, type) \
                 .join(df_maint.withColumnRenamed("comp", "maint"), keys, type) \
                 .join(df_machines, "machineID", type) \
                 .cache()

from pyspark.sql.functions import to_timestamp

df = df.withColumn("datetime", to_timestamp("datetime", "MM/dd/yyyy hh:mm:ss aa")) \
       .orderBy(keys) \
       .dropDuplicates(keys)

In [13]:
display(df)

Let's check our column types again.

In [15]:
df.printSchema()

We now create a temporary view from the DataFrame.

In [17]:
df.createOrReplaceTempView("df")

Our first feature-engineering task will consist of getting rolling averages and standard deviations from the telemetries. Rolling statistics are calculated over a given time interval (or window), which we set at 3 hours. However, based on the use-case the window size can change.

### Hands-on lab

Use SQL to create a new table called `df_roll` that in addition to the original features also contains moving averages and standard deviations over a 3-hour window for the telemetry data (voltage, rotation, pressure and vibration). To do this you will need to use the `over` clause along with aggregate functions `mean` and `stddev`.

In [20]:
%sql
-- put your solution here

In [21]:
%sql
-- maximize this cell (click the + button on the right) to see the solution:

drop table if exists df_roll;

create table df_roll as
select *,
  mean(volt) over (partition by machineid order by datetime range between interval 3 hours preceding and current row) as volt_ma_3,
  mean(rotate) over (partition by machineid order by datetime range between interval 3 hours preceding and current row) as rotate_ma_3,
  mean(pressure) over (partition by machineid order by datetime range between interval 3 hours preceding and current row) as pressure_ma_3,
  mean(vibration) over (partition by machineid order by datetime range between interval 3 hours preceding and current row) as vibration_ma_3,
  stddev(volt) over (partition by machineid order by datetime range between interval 3 hours preceding and current row) as volt_sd_3,
  stddev(rotate) over (partition by machineid order by datetime range between interval 3 hours preceding and current row) as rotate_sd_3,
  stddev(pressure) over (partition by machineid order by datetime range between interval 3 hours preceding and current row) as pressure_sd_3,
  stddev(vibration) over (partition by machineid order by datetime range between interval 3 hours preceding and current row) as vibration_sd_3
from df 
order by machineID, datetime

Create a time series plot comparing the original voltage telemetry to its 3-hour-window moving average, for machine with ID = 1. Does the moving average appear to be smoother compared to the original telemetry? How can we make it even smoother?

In [23]:
%sql
-- put your solution here

In [24]:
%sql
-- maximize this cell (click the + button on the right) to see the solution:
select * from df_roll where machineID = 1 order by datetime;

### End of lab

In the above example, we saw how Spark SQL can be used to compute the rolling averages. For illustration, let's now do the same computation using the DataFrame APIs. This allows us to compare and contrast the two.

In [27]:
# this is the alternative (probably better) way of calculating the rolling averages

from pyspark.sql.functions import avg, stddev, col
from pyspark.sql.window import Window

w = (Window.partitionBy(col("machineID")).orderBy(col("datetime").cast('long')).rangeBetween(-3 * 60 * 60, 0))

df_roll = df.withColumn('volt_ma_3', avg("volt").over(w)) \
            .withColumn('rotate_ma_3', avg("rotate").over(w)) \
            .withColumn('pressure_ma_3', avg("pressure").over(w)) \
            .withColumn('vibration_ma_3', avg("vibration").over(w)) \
            .withColumn('volt_sd_3', stddev("volt").over(w)) \
            .withColumn('rotate_sd_3', stddev("rotate").over(w)) \
            .withColumn('pressure_sd_3', stddev("pressure").over(w)) \
            .withColumn('vibration_sd_3', stddev("vibration").over(w)) \
            .cache()

df_roll.createOrReplaceTempView("df_roll")

To be safe, let's re-create the last time series plot to make sure that the two look similar.

In [29]:
display(df_roll.filter(col("machineID") == 1).orderBy(col("datetime")))

Let's now look at the distribution of errors for each machine. This helps us understand the errors a little better. Recall that errors don't break the machine immediately, but can eventually lead to downtime. However, we must ensure that errors are relatively evenly distributed across machines otherwise our model may fail to generalize well.

In [31]:
df1 = df_roll.select("machineID", "error").filter(col("error").isNull() == False).groupBy("machineID", "error").count().orderBy("machineID", "error")

In [32]:
display(df1)

It's time for us to do some feature engineering with the error, failure and maintenance history. At a high level, our feature engineering will consist of three important steps:

- Create one-hot-encoded features (i.e. "dummy variables") from `error`, `failure` and `maint`. This is an intermediate step because we will not use these features for modeling. Instead we use them to compute the following set of features.
- Create features that measure for any timestamp how many hours have elapsed since the last time an error or a failure happened, and how many hours have elapsed since the last time maintenance was performed.
- Use `failure` to create a target variable.

Let's first take a look at how `error`, `failure` and `maint` break down:

In [34]:
df_roll.groupBy("error").count().sort("count").show()
df_roll.groupBy("fail").count().sort("count").show()
df_roll.groupBy("maint").count().sort("count").show()

As we can see, failure and maintenance are recorded component by component. So for any machine, a given component can fail at a given time and eventually be replaced at some other time. On the other hand, error is NOT component by component but concerns the machine as a whole. So we know that a machine at a given time can experience one of 5 kinds of errors.

To get to our features, we first need to run one-hot encoding on the error, failure and maintenance columns. Prior to that we need to replace any missing values, which we encode here using the string "zzzz".

In [36]:
# we fill NAs for errors prior to one-hot-encoding, and use 'zzzzz' so we can sort alphabetically
df_roll = df_roll.fillna("zzzzz", subset = "error") \
                 .fillna("zzzzz", subset = ["fail", "maint"]) \
                 .drop("volt").drop("rotate").drop("pressure").drop("vibration")

We can now create a pipeline that will do one-hot encoding on the above features. Since they are of `string` type, we begin by using `StringIndexer` to first index the features and we then pass the results to `OneHotEncoderEstimator` which runs one-hot encoding on the features and stores the results as a **sparse matrix**, encoded using `vectortype`.

In [38]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator
stages = []
cat_cols = ["error", "fail", "maint"]

for cat_col in cat_cols:
  indexer = StringIndexer(inputCol = cat_col, outputCol = cat_col + "_index", stringOrderType = "alphabetAsc")
  encoder = OneHotEncoderEstimator(inputCols = [indexer.getOutputCol()], outputCols = [cat_col + "_vec"], dropLast = True)
  stages += [indexer, encoder]
  
pipeline = Pipeline(stages=stages)
df_encoded = pipeline.fit(df_roll).transform(df_roll)
df_encoded = df_encoded.drop("error").drop("fail").drop("maint")
display(df_encoded)

In [39]:
df_encoded.printSchema()

In order to calculate the time elapsed since the last error, failure and maintenance event, we first need to take our one-hot-encoded sparse vactor feature and break it up into indiviual binary features for each event.

In [41]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType
cat_cols = ["error", "fail", "maint"]

for cc in cat_cols:
  indexes = df_encoded.select(cc + '_index').distinct().rdd.flatMap(lambda x : x).collect()
  indexes.sort()
  print(indexes)
  for index in indexes[:-1]: # we remove the last index
      function = udf(lambda item: 1 if item == index else 0, IntegerType())
      new_column_name = cc + '_' + str(int(index))
      df_encoded = df_encoded.withColumn(new_column_name, function(col(cc + '_index')))

We can now drop the original sparse vector features.

In [43]:
cols_drop = cat_cols + [c + "_vec" for c in cat_cols]
df_encoded = df_encoded.select([col for col in df_encoded.columns if col not in cols_drop])
display(df_encoded)

Let's take a look at an example of one of the one-hot-encoded features we created: `error_4` is a binary feature that is set to 1 for any timestamp that corresponds to an error of type 4 in the machine (and 0 otherwise). We can see in the below result how many errors of type 4 occured in the overall data (across all machines).

In [45]:
df_encoded.groupBy("error_4").count().show()

We now use the binary features we created in the previous step to create a new set of features. If `error_4` is a binary feature that represents the an occurence of error of type 4 for a given machine and timestamp, we can use it to create a corresponding feature called `diff_error_4` which measures the number of hours elapsed since the last time an error of type 4 occured.

In [47]:
# this is the alternative (probably better) way of calculating the rolling averages

from pyspark.sql.functions import avg, lag, when, lit, sum, row_number
from pyspark.sql.window import Window

w = Window.partitionBy(col("machineID")).orderBy(col("datetime"))

# first compute a lagged feature for datetime and convert both datetime and the lagged datetime to numbers
df_diff = df_encoded.withColumn('datetime_lag', lag(df_encoded['datetime']).over(w)) \
                    .withColumn('dt_long', col('datetime').cast('long')) \
                    .withColumn('dt_long_lag', col('datetime_lag').cast('long')) \
                    .drop('datetime_lag')

cat_vars = ["error", "fail", "maint"]

for cat_var in cat_vars:
  # find how many categories there are in each categorical feature
  indexes = df_encoded.select(cat_var + '_index').distinct().rdd.flatMap(lambda x : x).collect()
  indexes.sort()
  for index in indexes[:-1]: # we remove the last category (these are the non-events)
    cat_num = str(int(index))
    cat_col = cat_var + '_' + cat_num
    dif_col = 'd_' + cat_var + '_' + cat_num
    cum_col = 'diff_' + cat_var + '_' + cat_num
    print(cum_col)

    # compute an intermediate feature used for grouping results
    find_cum_grp = sum(col(cat_col)).over(w)
    df_diff = df_diff.withColumn('cum_grp', find_cum_grp).cache()

    # compute the number of hours elapsed since the last event (error, failure, or maintenance)
    ww = Window.partitionBy(col("machineID"), col("cum_grp")).orderBy(col("datetime"))
    find_lag_diff = when(col(cat_col) == 1, lit(0)).otherwise((col('dt_long') - col('dt_long_lag')) / (60*60))
    # for first occurence, we don't know when the last event was, so for now we just assume 100 hours
    df_diff = df_diff.withColumn(dif_col, find_lag_diff) \
                     .fillna(100, subset = [dif_col]).cache()
    
    # reset the time elapsed back to 0 every time an event occurs (otherwise compute cumulative sum)
    find_cum_diff = when(col(cat_col) == 1, lit(0)).otherwise(sum(col(dif_col)).over(ww))
    df_diff = df_diff.withColumn(cum_col, find_cum_diff) \
                     .drop('cum_grp').drop(dif_col).drop(cat_col).cache() # if I don't use cache here I get a out-of-memory error
  
  df_diff = df_diff.drop(cat_var)


df_diff = df_diff.drop('dt_long').drop('dt_long_lag').orderBy('machineID', 'datetime')
display(df_diff)

We can visualize our new features to get a better sense of what they look like. In this example, we look at the maintenance history. Recall that there are 4 components to each machine and maintenance happens component by component. In the plot below, components are represented by colors. As we move along the x-axis, we generally see a linear increase that represents time elapsing until there's a sharp drop back to 0 whenever maintenance is performed, at which point we reset the timer.

In [49]:
display(df_diff.filter(col('machineID') == 1))

The last step in data prep is for us to create a targets for the PdM model. You might wonder why we don't just use `fail_0` through `fail_3` as our targets, since they indicate when a component failed (and hence the whole machine). In fact we could, but PdM is not about predicting when a machine fails, but predicting when it's **about to** fail. So it's better to create labels that indicate the state of the machine shortly **prior to failure** (how far back we want to go is something we need to determine use-case by use-case).

Another justification for picking the window size prior to failure is that it based on how long it takes to fix failures after they happen. If for example we expect a 3-day downtime, then we flag everything up to 3 days prior to failure.

In [51]:
import pandas as pd
from pyspark.sql.functions import avg, lag, when, lit, sum

df_all = df_diff.cache()

for i in range(4): # iterate over the four components
    # find all the times a component failed for a given machine
    df_temp = df_diff.filter(col('fail_index') == i).select('machineID', 'datetime').toPandas()
    label = 'y_' + str(i) # name of target column (one per component)
    # create a new target column for each component and set to 0 by default
    df_all = df_all.withColumn(label, lit(0))
    for n in range(df_temp.shape[0]): # iterate over all the failure times
        machineID, datetime = df_temp.iloc[n, :]
        dt_end = datetime - pd.Timedelta('10 minutes') # from 10 minutes prior to failure
        dt_start = datetime - pd.Timedelta('3 days') # up to 3 days prior to failure
        if n % 500 == 0:
            print("failure on machine {0} at {1}, so {2} is between {4} and {3}".format(machineID, datetime, label, dt_end, dt_start))
        # every time a failure happens, set the target feature to 1 within the provided window
        find_labels = when((col('machineID') == int(machineID)) & (col('datetime').between(dt_start, dt_end)), lit(1)).otherwise(col(label))
        df_all = df_all.withColumn(label, find_labels)
        
display(df_all)

As one example, we can visualize `y_0` by machine. Notice how rare our labels are.

In [53]:
display(df_all.crosstab("machineID", "y_0"))

The difficult parts of our data prep is over. Now we have just some house-cleaning to do before we hand the data over to the machine learning algorithms. Let's begin by checking if any missing data are present.

In [55]:
from pyspark.sql.functions import col, sum, isnan
display(df_all.select(*(sum((col(c).isNull()).cast("int")).alias(c) for c in df_all.columns)))

In [57]:
from pyspark.ml.feature import Imputer
impute_cols = [c for c in df_all.columns if c.endswith('_sd_3')]
print(impute_cols)
imputer = Imputer(inputCols = impute_cols, outputCols = impute_cols)
model = imputer.fit(df_all)
# model.surrogateDF.show()
df_all = model.transform(df_all)

In [58]:
X_drop = ['error_index', 'fail_index', 'maint_index', 'f_1', 'f_2', 'f_3', 'f_4', 'y_0, ''y_1', 'y_2', 'y_3', 'model']
Y_keep = ['y_0', 'y_1', 'y_2', 'y_3']
keys = ['machineID', 'datetime']

X_keep = list(set(df_all.columns) - set(X_drop + Y_keep + keys))
display(df_all.select(keys + sorted(X_keep + Y_keep)))

In [59]:
from pyspark.ml.feature import Normalizer, VectorAssembler
from pyspark.ml import Pipeline

vassembler = VectorAssembler(inputCols = X_keep, outputCol = "features")
normalizer = Normalizer(inputCol = "features", outputCol = "norm_features", p = 1.0)
# df_norm = normalizer.transform(df_all)
# display(df_norm)

pipeline = Pipeline(stages = [vassembler, normalizer])
df_norm = pipeline.fit(df_all).transform(df_all).select(keys + ["norm_features"] + Y_keep)
display(df_norm)

If you wanted to, you could run the following code to split the ```norm_features``` vector up into separate columns.  This could be useful if you wanted to export the data into e.g. a CSV file.

In [61]:
# from pyspark.sql.functions import udf, col
# from pyspark.sql.types import ArrayType, DoubleType

# n_features = 21

# def to_array(col):
#     def to_array_(v):
#         return v.toArray().tolist()
#     return udf(to_array_, ArrayType(DoubleType()))(col)

# df = (df_norm
#     .withColumn("xs", to_array(col("norm_features")))
#     .select(["machineID", "datetime"] + [col("xs")[i] for i in range(n_features)] + ['y_0', 'y_1', 'y_2', 'y_3']))


In [62]:
display(df)

In [63]:
dbutils.fs.rm("dbfs:/FileStore/tables/preprocessed", recurse = True)

df_norm.write.parquet("dbfs:/FileStore/tables/preprocessed")

Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.