# Step 2: Feature Engineering

Feature engineering combines the different data sources together to create a single data set of features (variables) that can be used to infer a machines's health condition over time. 

In this notebook, we will load the data stored in Azure Blob containers in the previous **Data Ingestion** notebook (`Code/data_ingestion.ipynb`). The note book uses several feature engineering methods to create a data set for use in our predictive maintenance machine learning solution.

In [1]:
# import the libraries
import os

import pyspark.sql.functions as F
from pyspark.sql.functions import col, unix_timestamp, round
from pyspark.sql.functions import datediff
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer

from pyspark.sql import SparkSession

from azure.storage.blob import BlockBlobService
from azure.storage.blob import PublicAccess

spark = SparkSession.builder.getOrCreate()

## Load data from Azure Blob storage container

We have previously downloaded and stored the following data in an Azure blob storage container:

  * Machines: Features differentiating each machine. For example age and model.
  * Error: The log of non-critical errors. These errors may still indicate an impending component failure.
  * Maint: Machine maintenance history detailing component replacement or regular maintenance activities withe the date of replacement.
  * Telemetry: The operating conditions of a machine e.g. data collected from sensors.
  * Failure history: The failure history of a machine or component within the machine.

We'll load these files from blob, and create our analysis data set here. We'll write this data set back into a new blob container to use in our model building and evaluation notebook later. 

Since the Azure Blob storage account name and account key are not passed between notebooks, you'll need to provide those here again.

In [2]:
# Enter your Azure blob storage details here 
ACCOUNT_NAME = "<your blob storage account name>"

# You can find the account key under the _Access Keys_ link in the 
# [Azure Portal](portal.azure.com) page for your Azure storage container.
ACCOUNT_KEY = "<account key>"

#-------------------------------------------------------------------------------------------
# The data from the Data Aquisition note book is stored in the dataingestion container.
CONTAINER_NAME = "dataingestion"

# The data constructed in this notebook will be stored in the featureengineering container
STORAGE_CONTAINER_NAME = "featureengineering"

# Connect to your blob service     
my_service = BlockBlobService(account_name=ACCOUNT_NAME, account_key=ACCOUNT_KEY)

# We will store each of these data sets in blob storage in an 
# Azure Storage Container on your Azure subscription.
# See https://github.com/Azure/ViennaDocs/blob/master/Documentation/UsingBlobForStorage.md
# for details.

# These file names detail which blob each file is stored under. 
MACH_DATA = 'machines_files.parquet'
MAINT_DATA = 'maint_files.parquet'
ERROR_DATA = 'errors_files.parquet'
TELEMETRY_DATA = 'telemetry_files.parquet'
FAILURE_DATA = 'failure_files.parquet'

# These file names detail the local paths where we store the data results.
MACH_LOCAL_DIRECT = 'dataingestion_mach_result.parquet'
ERROR_LOCAL_DIRECT = 'dataingestion_err_result.parquet'
MAINT_LOCAL_DIRECT = 'dataingestion_maint_result.parquet'
TELEMETRY_LOCAL_DIRECT = 'dataingestion_tel_result.parquet'
FAILURES_LOCAL_DIRECT = 'dataingestion_fail_result.parquet'

# This is the final data file.
FEATURES_LOCAL_DIRECT = 'featureengineering_files.parquet'

# define your blob service     
my_service = BlockBlobService(account_name=ACCOUNT_NAME, account_key=ACCOUNT_KEY)

### Machines data set

Load the machines data set from your Azure blob.

In [3]:
# download the entire parquet result folder to local path for a new run 

# create a local path  to store the data.
if not os.path.exists(MACH_LOCAL_DIRECT):
    os.makedirs(MACH_LOCAL_DIRECT)
    print('DONE creating a local directory!')

# Connect to blob storage container
for blob in my_service.list_blobs(CONTAINER_NAME):
    if MACH_DATA in blob.name:
        local_file = os.path.join(MACH_LOCAL_DIRECT, os.path.basename(blob.name))
        my_service.get_blob_to_path(CONTAINER_NAME, blob.name, local_file)

# Read in the data
machines = spark.read.parquet(MACH_LOCAL_DIRECT)

print(machines.count())
machines.show(5)

DONE creating a local directory!
1000
+---------+------+---+
|machineID| model|age|
+---------+------+---+
|        1|model2| 18|
|        2|model4|  7|
|        3|model3|  8|
|        4|model3|  7|
|        5|model2|  2|
+---------+------+---+
only showing top 5 rows



### Errors data set

Load the errors data set from your Azure blob.

In [4]:
# load the previous created final dataset into the workspace

# create a local path  to store the data.
if not os.path.exists(ERROR_LOCAL_DIRECT):
    os.makedirs(ERROR_LOCAL_DIRECT)
    print('DONE creating a local directory!')

# Connect to blob storage container
for blob in my_service.list_blobs(CONTAINER_NAME):
    if ERROR_DATA in blob.name:
        local_file = os.path.join(ERROR_LOCAL_DIRECT, os.path.basename(blob.name))
        my_service.get_blob_to_path(CONTAINER_NAME, blob.name, local_file)

# Read in the data
errors = spark.read.parquet(ERROR_LOCAL_DIRECT)

print(errors.count())
errors.show(5)

DONE creating a local directory!
11967
+-------------------+---------+-------+
|           datetime|machineID|errorID|
+-------------------+---------+-------+
|2015-04-08 19:00:00|      251| error3|
|2015-06-09 06:00:00|      251| error1|
|2015-08-08 06:00:00|      251| error4|
|2015-09-07 06:00:00|      251| error2|
|2015-09-07 06:00:00|      251| error3|
+-------------------+---------+-------+
only showing top 5 rows



### Maintenance data set

Load the maintenance data set from your Azure blob.

In [5]:
# load the previous created final dataset into the workspace

# create a local path  to store the data.
if not os.path.exists(MAINT_LOCAL_DIRECT):
    os.makedirs(MAINT_LOCAL_DIRECT)
    print('DONE creating a local directory!')

# Connect to blob storage container
for blob in my_service.list_blobs(CONTAINER_NAME):
    if MAINT_DATA in blob.name:
        local_file = os.path.join(MAINT_LOCAL_DIRECT, os.path.basename(blob.name))
        my_service.get_blob_to_path(CONTAINER_NAME, blob.name, local_file)

# Read in the data
maint = spark.read.parquet(MAINT_LOCAL_DIRECT)

print(maint.count())
maint.show(5)

DONE creating a local directory!
32592
+-------------------+---------+-----+
|           datetime|machineID| comp|
+-------------------+---------+-----+
|2015-01-04 06:00:00|      252|comp1|
|2015-01-19 06:00:00|      252|comp4|
|2015-02-18 06:00:00|      252|comp3|
|2015-03-05 06:00:00|      252|comp2|
|2015-03-20 06:00:00|      252|comp1|
+-------------------+---------+-----+
only showing top 5 rows



### Telemetry

Load the telemetry data set from your Azure blob.

In [6]:
# load the previous created final dataset into the workspace

# create a local path  to store the data.
if not os.path.exists(TELEMETRY_LOCAL_DIRECT):
    os.makedirs(TELEMETRY_LOCAL_DIRECT)
    print('DONE creating a local directory!')

# Connect to blob storage container
for blob in my_service.list_blobs(CONTAINER_NAME):
    if TELEMETRY_DATA in blob.name:
        local_file = os.path.join(TELEMETRY_LOCAL_DIRECT, os.path.basename(blob.name))
        my_service.get_blob_to_path(CONTAINER_NAME, blob.name, local_file)

# Read in the data
telemetry = spark.read.parquet(TELEMETRY_LOCAL_DIRECT)

print(telemetry.count())
telemetry.show(5)

DONE creating a local directory!
8761000
+-------------------+---------+------------------+------------------+------------------+------------------+
|           datetime|machineID|              volt|            rotate|          pressure|         vibration|
+-------------------+---------+------------------+------------------+------------------+------------------+
|2015-01-08 10:00:00|      501|  165.775141945845|456.01448415760694| 96.77970729292059|  40.2003147414585|
|2015-01-08 11:00:00|      501|   167.69449418333|  415.396525444102|  106.346837851055|  39.4543199241959|
|2015-01-08 12:00:00|      501|  149.286910613195|  549.794168134521|  110.590462370234|46.649346416714295|
|2015-01-08 13:00:00|      501|  164.315444277899|  485.343432044691|102.64442608969802|   38.615502466494|
|2015-01-08 14:00:00|      501|178.78989140143298|447.83020430003097|100.23827927490801|36.380290860492394|
+-------------------+---------+------------------+------------------+------------------+-------

### Failures data set

Load the failures data set from your Azure blob.

In [7]:
# load the previous created final dataset into the workspace

# create a local path  to store the data.

if not os.path.exists(FAILURES_LOCAL_DIRECT):
    os.makedirs(FAILURES_LOCAL_DIRECT)
    print('DONE creating a local directory!')


# download the entire parquet result folder to local path for a new run 
for blob in my_service.list_blobs(CONTAINER_NAME):
    if FAILURE_DATA in blob.name:
        local_file = os.path.join(FAILURES_LOCAL_DIRECT, os.path.basename(blob.name))
        my_service.get_blob_to_path(CONTAINER_NAME, blob.name, local_file)

failures = spark.read.parquet(FAILURES_LOCAL_DIRECT)

print(failures.count())
failures.show(5)

DONE creating a local directory!
6726
+-------------------+---------+-------+
|           datetime|machineID|failure|
+-------------------+---------+-------+
|2015-09-18 06:00:00|      453|  comp2|
|2015-12-17 06:00:00|      453|  comp2|
|2015-03-27 06:00:00|      454|  comp2|
|2015-08-24 06:00:00|      454|  comp2|
|2015-09-23 06:00:00|      454|  comp1|
+-------------------+---------+-------+
only showing top 5 rows



## Feature engineering 

Feature engineering combines the different data sources together to create a single data set of features (variables) that can be used to infer a machines's health condition over time. The ultimate goal is to generate a single record for each time unit for each asset combining its features and labels to be fed into the machine learning algorithm. In order to prepare that clean final data set, some pre-processing steps should be taken. First step is to divide the duration of data collection into time units where each record belongs to a time unit for an asset.

The measurement unit for time can be in seconds, minutes, hours, days, months, cycles, miles or transactions depending on the efficiency of data preparation and the changes observed in the conditions of the asset from a time unit to the other or other factors specific to the domain. In other words, the time unit does not have to be the same as the frequency of data collection as in many cases data may not show any difference from one unit to the other. For example, if temperature values were being collected every 10 seconds, picking a time unit of 10 seconds for the whole analysis inflates the number of examples without providing any additional information. Better strategy would be to use average over an hour as an example.

### Rolling aggregates

For each record of an asset, we pick a rolling window of size "W" which is the number of units of time that we would like to compute historical aggregates for. We then compute rolling aggregate features using the W periods before the date of that record. Some example rolling aggregates can be rolling counts, means, standard deviations, outliers based on standard deviations, CUSUM measures, minimum and maximum values for the window. Another interesting technique is to capture trend changes, spikes and level changes using algorithms that detect anomalies in data using anomaly detection algorithms.

### Lag features
As mentioned earlier, in predictive maintenance, historical data usually comes with timestamps indicating the time of collection for each piece of data. There are many ways of creating features from the data that comes with timestamped data. In this section, we discuss some of these methods used for predictive maintenance. However, we are not limited by these methods alone. Since feature engineering is considered to be one of the most creative areas of predictive modeling, there could be many other ways to create features. Here, we provide some general techniques.

## Telemetry features

Because the telemetry data set is the largest time series data we have, we start feature engineering here. 

A common method is to pick a window size for the lag features to be created and compute rolling aggregate measures such as mean, standard deviation, minimum, maximum, etc. to represent the short term history of the telemetry over the window. In the following, rolling mean and standard deviation of the telemetry data over the last 3 hour and 24 hour lag windows is calculated for every 3 hours.



In [8]:
# rolling mean
# Temporary storage for rolling means
tel_mean = telemetry

# Which features are we interested in telemetry data set
rolling_features = ['volt','rotate', 'pressure', 'vibration']
               
# We choose two windows for our rolling windows 3hrs, 24 hrs
lags = [3,24]

for lag_n in lags:
    wSpec = Window.partitionBy('machineID').orderBy('datetime').rowsBetween(1-lag_n, 0)
    for col_name in rolling_features:
        tel_mean = tel_mean.withColumn(col_name+'_rollingmean_'+str(lag_n), F.avg(col(col_name)).over(wSpec))
        print("Lag = %d, Column = %s" % (lag_n, col_name))

Lag = 3, Column = volt
Lag = 3, Column = rotate
Lag = 3, Column = pressure
Lag = 3, Column = vibration
Lag = 24, Column = volt
Lag = 24, Column = rotate
Lag = 24, Column = pressure
Lag = 24, Column = vibration


We repeat this rolling window process for the standard deviation. 

In [9]:
# rolling standard deviation
# Temporary storage for rolling means
tel_sd = telemetry

for lag_n in lags:
    wSpec = Window.partitionBy('machineID').orderBy('datetime').rowsBetween(1-lag_n, 0)
    for col_name in rolling_features:
        tel_sd = tel_sd.withColumn(col_name+'_rollingstd_'+str(lag_n), F.stddev(col(col_name)).over(wSpec))
        print("Lag = %d, Column = %s" % (lag_n, col_name))

Lag = 3, Column = volt
Lag = 3, Column = rotate
Lag = 3, Column = pressure
Lag = 3, Column = vibration
Lag = 24, Column = volt
Lag = 24, Column = rotate
Lag = 24, Column = pressure
Lag = 24, Column = vibration


### Resample every 3 hours

In [10]:
# tel_mean rolling mean
# 3 hours = 10800 seconds  
time_val = 3 * 60 * 60

# I think this is grabbing datetime from the tel_sd data set, which is equivalent to telemetry
dt_truncated = ((round(unix_timestamp(col("datetime")) / time_val) * time_val).cast("timestamp"))

tel_mean_resampled = tel_mean.withColumn("dt_truncated", dt_truncated).drop('volt', 'rotate', 'pressure', 'vibration')

tel_mean_resampled1 = (tel_mean_resampled.groupBy("machineID","dt_truncated")
                               .agg(F.mean('volt_rollingmean_3').alias('volt_rollingmean_3'),
                                    F.mean('rotate_rollingmean_3').alias('rotate_rollingmean_3'), 
                                    F.mean('pressure_rollingmean_3').alias('pressure_rollingmean_3'), 
                                    F.mean('vibration_rollingmean_3').alias('vibration_rollingmean_3'), 
                                    F.mean('volt_rollingmean_24').alias('volt_rollingmean_24'),
                                    F.mean('rotate_rollingmean_24').alias('rotate_rollingmean_24'), 
                                    F.mean('pressure_rollingmean_24').alias('pressure_rollingmean_24'), 
                                    F.mean('vibration_rollingmean_24').alias('vibration_rollingmean_24')))

tel_mean_resampled1.count()
tel_mean_resampled1.where((col("machineID") == 1)).show(5)

+---------+--------------------+------------------+--------------------+----------------------+-----------------------+-------------------+---------------------+-----------------------+------------------------+
|machineID|        dt_truncated|volt_rollingmean_3|rotate_rollingmean_3|pressure_rollingmean_3|vibration_rollingmean_3|volt_rollingmean_24|rotate_rollingmean_24|pressure_rollingmean_24|vibration_rollingmean_24|
+---------+--------------------+------------------+--------------------+----------------------+-----------------------+-------------------+---------------------+-----------------------+------------------------+
|        1|2015-01-01 06:00:...|  157.570499303353|   531.9910662461275|     104.6551338198705|      47.58198648621824|   157.570499303353|    531.9910662461275|      104.6551338198705|       47.58198648621824|
|        1|2015-01-01 09:00:...| 164.5897626847691|  483.18260805306585|    108.42022548903766|     41.788693381230225| 162.56407261408597|   497.8140905834

In [11]:
# tel_sd rolling sd
tel_sd_resampled = (tel_sd.withColumn("dt_truncated", dt_truncated).drop('volt', 'rotate', 'pressure', 'vibration')
                        .fillna(0))

tel_sd_resampled1 = (tel_sd_resampled.groupBy("machineID","dt_truncated")
                               .agg(F.stddev('volt_rollingstd_3').alias('volt_rollingstd_3'),
                                    F.stddev('rotate_rollingstd_3').alias('rotate_rollingstd_3'), 
                                    F.stddev('pressure_rollingstd_3').alias('pressure_rollingstd_3'), 
                                    F.stddev('vibration_rollingstd_3').alias('vibration_rollingstd_3'), 
                                    F.stddev('volt_rollingstd_24').alias('volt_rollingstd_24'),
                                    F.stddev('rotate_rollingstd_24').alias('rotate_rollingstd_24'), 
                                    F.stddev('pressure_rollingstd_24').alias('pressure_rollingstd_24'), 
                                    F.stddev('vibration_rollingstd_24').alias('vibration_rollingstd_24')))
tel_sd_resampled1.count()
tel_sd_resampled1.show(5)


+---------+--------------------+------------------+-------------------+---------------------+----------------------+-------------------+--------------------+----------------------+-----------------------+
|machineID|        dt_truncated| volt_rollingstd_3|rotate_rollingstd_3|pressure_rollingstd_3|vibration_rollingstd_3| volt_rollingstd_24|rotate_rollingstd_24|pressure_rollingstd_24|vibration_rollingstd_24|
+---------+--------------------+------------------+-------------------+---------------------+----------------------+-------------------+--------------------+----------------------+-----------------------+
|       29|2015-01-26 03:00:...|1.0354484238049477| 14.260962360469602|   0.5964698254143006|    1.5456387610819684|0.07351154796794042|  1.4018520364853078|   0.14692943132851138|     0.1075178454921169|
|       29|2015-01-29 21:00:...| 3.820994609258985|  3.097816538368608|    6.328610359092415|    1.9043920683183384|0.17961103508232468|   5.045343957838469|   0.09655794386488613|

## Errors features

Like telemetry data, errors come with timestamps. An important difference is that the error IDs are categorical values and should not be averaged over time intervals like the telemetry measurements. Instead, we count the number of errors of each type in a lagging window. We begin by reformatting the error data to have one entry per machine per time at which at least one error occurred.

In [12]:
# create a column for each errorID 
error1 = errors.groupBy("machineID","datetime","errorID").pivot('errorID').agg(F.count('machineID').alias('dummy'))

# remove the column called errorID and fill in missing values
error2 = error1.drop('errorID').fillna(0)

# combine errors for a given machine in a given hour
error3 = (error2.groupBy("machineID","datetime")
                .agg(F.sum('error1').alias('error1sum'), 
                     F.sum('error2').alias('error2sum'), 
                     F.sum('error3').alias('error3sum'), 
                     F.sum('error4').alias('error4sum'), 
                     F.sum('error5').alias('error5sum')))

error3.count(), len(error3.columns)
error3.show(5)

+---------+-------------------+---------+---------+---------+---------+---------+
|machineID|           datetime|error1sum|error2sum|error3sum|error4sum|error5sum|
+---------+-------------------+---------+---------+---------+---------+---------+
|       65|2015-08-16 06:00:00|        0|        1|        1|        0|        0|
|      843|2015-06-14 06:00:00|        0|        1|        1|        0|        0|
|      170|2015-07-03 06:00:00|        0|        1|        1|        0|        0|
|      557|2015-11-29 06:00:00|        0|        1|        1|        0|        0|
|      544|2015-10-08 06:00:00|        1|        1|        1|        0|        0|
+---------+-------------------+---------+---------+---------+---------+---------+
only showing top 5 rows



We want the same number of rows in error as in telemetry.



In [13]:
# join the telemetry data with errors
error_count = (telemetry.join(error3, ((telemetry['machineID'] == error3['machineID']) 
                                  & (telemetry['datetime'] == error3['datetime'])), "left")
               .drop('volt', 'rotate', 'pressure', 'vibration').drop(error3.machineID).drop(error3.datetime))

# fill in missing value
error_count1 = error_count.fillna(0)

error_count1.count(), len(error_count1.columns)
error_count1.show(5)


+-------------------+---------+---------+---------+---------+---------+---------+
|           datetime|machineID|error1sum|error2sum|error3sum|error4sum|error5sum|
+-------------------+---------+---------+---------+---------+---------+---------+
|2015-01-08 10:00:00|      501|        0|        0|        0|        0|        0|
|2015-01-08 11:00:00|      501|        0|        0|        0|        0|        0|
|2015-01-08 12:00:00|      501|        0|        0|        0|        0|        0|
|2015-01-08 13:00:00|      501|        0|        0|        0|        0|        0|
|2015-01-08 14:00:00|      501|        0|        0|        0|        0|        0|
+-------------------+---------+---------+---------+---------+---------+---------+
only showing top 5 rows



In [14]:
rolling_features1 = ['error1sum','error2sum', 'error3sum', 'error4sum', 'error5sum']
               
# lag window 24 hrs
lags = [24]

# rolling mean
err_mean = error_count1

for lag_n in lags:
    wSpec = Window.partitionBy('machineID').orderBy('datetime').rowsBetween(1-lag_n, 0)
    for col_name in rolling_features1:
        err_mean = err_mean.withColumn(col_name+'_rollingmean_'+str(lag_n), F.avg(col(col_name)).over(wSpec))
        print("Lag = %d, Column = %s" % (lag_n, col_name))

Lag = 24, Column = error1sum
Lag = 24, Column = error2sum
Lag = 24, Column = error3sum
Lag = 24, Column = error4sum
Lag = 24, Column = error5sum


### Resample to every 3 hours

In [15]:
dt_truncated = ((round(unix_timestamp(col("datetime").cast("timestamp")) / time_val) * time_val)
    .cast("timestamp"))

err_mean_resampled = (err_mean.withColumn("dt_truncated", dt_truncated)
                    .drop('error1sum', 'error2sum', 'error3sum', 'error4sum', 'error5sum').fillna(0))

err_mean_resampled1 = (err_mean_resampled.groupBy("machineID","dt_truncated")
                               .agg(F.mean('error1sum_rollingmean_24').alias('error1sum_rollingmean_24'), 
                                    F.mean('error2sum_rollingmean_24').alias('error2sum_rollingmean_24'), 
                                    F.mean('error3sum_rollingmean_24').alias('error3sum_rollingmean_24'), 
                                    F.mean('error4sum_rollingmean_24').alias('error4sum_rollingmean_24'), 
                                    F.mean('error5sum_rollingmean_24').alias('error5sum_rollingmean_24')))
err_mean_resampled1.count()
err_mean_resampled1.show(5)


+---------+--------------------+------------------------+------------------------+------------------------+------------------------+------------------------+
|machineID|        dt_truncated|error1sum_rollingmean_24|error2sum_rollingmean_24|error3sum_rollingmean_24|error4sum_rollingmean_24|error5sum_rollingmean_24|
+---------+--------------------+------------------------+------------------------+------------------------+------------------------+------------------------+
|       29|2015-01-26 03:00:...|                     0.0|                     0.0|                     0.0|                     0.0|                     0.0|
|       29|2015-01-29 21:00:...|                     0.0|                     0.0|                     0.0|                     0.0|                     0.0|
|      474|2015-01-05 03:00:...|                     0.0|                     0.0|                     0.0|                     0.0|                     0.0|
|      474|2015-01-17 09:00:...|                    

## Days since last replacement from maintenance 

A crucial data set in this example is the maintenance records which contain the information of component replacement records. Possible features from this data set can be, for example, the number of replacements of each component in the last 3 months to incorporate the frequency of replacements. However, more relevent information would be to calculate how long it has been since a component is last replaced as that would be expected to correlate better with component failures since the longer a component is used, the more degradation should be expected.

As a side note, creating lagging features from maintenance data is not as straightforward as for telemetry and errors, so the features from this data are generated in a more custom way. This type of ad-hoc feature engineering is very common in predictive maintenance since domain knowledge plays a big role in understanding the predictors of a problem. In the following, the days since last component replacement are calculated for each component type as features from the maintenance data.

In [16]:
# create a column for each comp 
maint1 = maint.groupBy("machineID","datetime","comp").pivot('comp').agg(F.count('machineID').alias('dummy'))

# remove the column called comp and fill in missing values
maint2 = maint1.drop('comp').fillna(0)

# combine maintenance for a given machine in a given hour
maint3 = (maint2.groupBy("machineID","datetime").agg(F.sum('comp1').alias('comp1sum'), 
                                                    F.sum('comp2').alias('comp2sum'), 
                                                    F.sum('comp3').alias('comp3sum'),
                                                    F.sum('comp4').alias('comp4sum')))
maint3.show(5)

+---------+-------------------+--------+--------+--------+--------+
|machineID|           datetime|comp1sum|comp2sum|comp3sum|comp4sum|
+---------+-------------------+--------+--------+--------+--------+
|      191|2015-12-28 06:00:00|       1|       0|       0|       0|
|      567|2015-09-17 06:00:00|       0|       0|       0|       1|
|      301|2015-04-04 06:00:00|       1|       1|       0|       0|
|      852|2015-06-14 06:00:00|       0|       0|       1|       0|
|      942|2015-09-23 06:00:00|       0|       0|       1|       1|
+---------+-------------------+--------+--------+--------+--------+
only showing top 5 rows



## Days since last replacement for component-1

In [17]:
test_maint_comp1 = (maint3.where((col("comp1sum") == '1')).withColumnRenamed('datetime','datetime_maint')
                           .drop('comp2sum', 'comp3sum', 'comp4sum'))

test_tel_comp1 = (telemetry.withColumnRenamed('datetime','datetime_tel')
                  .drop(telemetry.volt).drop(telemetry.rotate).drop(telemetry.pressure).drop(telemetry.vibration))

test_maint_tel_comp1 = test_tel_comp1.join(test_maint_comp1, ((test_tel_comp1['machineID']==
                                                               test_maint_comp1['machineID']) 
                                            & (test_tel_comp1['datetime_tel'] > test_maint_comp1['datetime_maint']) 
                                            & (test_maint_comp1['comp1sum'] == '1'))).drop(test_maint_comp1.machineID)

comp1 = (test_maint_tel_comp1.withColumn("sincelastcomp1", 
              datediff(test_maint_tel_comp1.datetime_tel, test_maint_tel_comp1.datetime_maint))
              .drop(test_maint_tel_comp1.datetime_maint).drop(test_maint_tel_comp1.comp1sum))

comp1.show(5)

+-------------------+---------+--------------+
|       datetime_tel|machineID|sincelastcomp1|
+-------------------+---------+--------------+
|2015-01-08 10:00:00|      501|             6|
|2015-01-08 10:00:00|      501|            56|
|2015-01-08 11:00:00|      501|             6|
|2015-01-08 11:00:00|      501|            56|
|2015-01-08 12:00:00|      501|             6|
+-------------------+---------+--------------+
only showing top 5 rows



## Days since last replacement for component-2

In [18]:
test_maint_comp2 = (maint3.where(col("comp2sum") == '1').withColumnRenamed('datetime','datetime_maint')
                         .drop('comp1sum', 'comp3sum', 'comp4sum'))

test_tel_comp2 = (telemetry.withColumnRenamed('datetime','datetime_tel')
                          .drop(telemetry.volt).drop(telemetry.rotate).drop(telemetry.pressure)
                          .drop(telemetry.vibration))

test_maint_tel_comp2 = (test_tel_comp2.join(test_maint_comp2, ((test_tel_comp2['machineID']==
                                                                test_maint_comp2['machineID']) 
                                        & (test_tel_comp2['datetime_tel'] > test_maint_comp2['datetime_maint']) 
                                        & (test_maint_comp2['comp2sum'] == '1') 
                                           )).drop(test_maint_comp2.machineID))

comp2 = (test_maint_tel_comp2.withColumn("sincelastcomp2", 
              datediff(test_maint_tel_comp2.datetime_tel, test_maint_tel_comp2.datetime_maint))
              .drop(test_maint_tel_comp2.datetime_maint).drop(test_maint_tel_comp2.comp2sum))
comp2.show(5)

+-------------------+---------+--------------+
|       datetime_tel|machineID|sincelastcomp2|
+-------------------+---------+--------------+
|2015-01-08 10:00:00|      501|            41|
|2015-01-08 11:00:00|      501|            41|
|2015-01-08 12:00:00|      501|            41|
|2015-01-08 13:00:00|      501|            41|
|2015-01-08 14:00:00|      501|            41|
+-------------------+---------+--------------+
only showing top 5 rows



## Days since last replacement for component-3

In [19]:
test_maint_comp3 = (maint3.where(col("comp3sum") == '1').withColumnRenamed('datetime','datetime_maint')
                          .drop('comp1sum', 'comp2sum', 'comp4sum'))

test_tel_comp3 = (telemetry.withColumnRenamed('datetime','datetime_tel')
                    .drop(telemetry.volt).drop(telemetry.rotate).drop(telemetry.pressure).drop(telemetry.vibration))

test_maint_tel_comp3 = test_tel_comp3.join(test_maint_comp3, ((test_tel_comp3['machineID']==
                                                               test_maint_comp3['machineID']) 
                                        & (test_tel_comp3['datetime_tel'] > test_maint_comp3['datetime_maint']) 
                                        & (test_maint_comp3['comp3sum'] == '1') 
                                           )).drop(test_maint_comp3.machineID)

comp3 = (test_maint_tel_comp3.withColumn("sincelastcomp3", 
              datediff(test_maint_tel_comp3.datetime_tel, test_maint_tel_comp3.datetime_maint))
              .drop(test_maint_tel_comp3.datetime_maint).drop(test_maint_tel_comp3.comp3sum))
comp3.show(5)

+-------------------+---------+--------------+
|       datetime_tel|machineID|sincelastcomp3|
+-------------------+---------+--------------+
|2015-01-08 10:00:00|      501|           206|
|2015-01-08 11:00:00|      501|           206|
|2015-01-08 12:00:00|      501|           206|
|2015-01-08 13:00:00|      501|           206|
|2015-01-08 14:00:00|      501|           206|
+-------------------+---------+--------------+
only showing top 5 rows



## Days since last replacement for component-4

In [20]:
test_maint_comp4 = (maint3.where(col("comp4sum") == '1').withColumnRenamed('datetime','datetime_maint')
                         .drop('comp1sum', 'comp2sum', 'comp3sum'))

test_tel_comp4 = (telemetry.withColumnRenamed('datetime','datetime_tel')
                  .drop(telemetry.volt).drop(telemetry.rotate).drop(telemetry.pressure).drop(telemetry.vibration))

test_maint_tel_comp4 = test_tel_comp4.join(test_maint_comp4, ((test_tel_comp4['machineID']==
                                                               test_maint_comp4['machineID']) 
                                        & (test_tel_comp4['datetime_tel'] > test_maint_comp4['datetime_maint']) 
                                        & (test_maint_comp4['comp4sum'] == '1'))).drop(test_maint_comp4.machineID)

comp4 = (test_maint_tel_comp4.withColumn("sincelastcomp4", 
              datediff(test_maint_tel_comp4.datetime_tel, test_maint_tel_comp4.datetime_maint))
              .drop(test_maint_tel_comp4.datetime_maint).drop(test_maint_tel_comp4.comp4sum))
comp4.show(5)

+-------------------+---------+--------------+
|       datetime_tel|machineID|sincelastcomp4|
+-------------------+---------+--------------+
|2015-01-08 10:00:00|      501|           131|
|2015-01-08 11:00:00|      501|           131|
|2015-01-08 12:00:00|      501|           131|
|2015-01-08 13:00:00|      501|           131|
|2015-01-08 14:00:00|      501|           131|
+-------------------+---------+--------------+
only showing top 5 rows



##  Combine comp1, comp2, comp3, comp4 to generate the maintenance feature set

In [22]:
# left join comp1 with (comp2, comp3, comp4) 
# left join comp2 with (comp3, comp4) 
# left join comp3, comp4 
comp3_4 = (comp3.join(comp4, ((comp3['machineID'] == comp4['machineID']) 
                                  & (comp3['datetime_tel'] == comp4['datetime_tel'])), "left")
                                  .drop(comp4.machineID).drop(comp4.datetime_tel))
comp2_3_4 = (comp2.join(comp3_4, ((comp2['machineID'] == comp3_4['machineID']) 
                                  & (comp2['datetime_tel'] == comp3_4['datetime_tel'])), "left")
                                  .drop(comp3_4.machineID).drop(comp3_4.datetime_tel))
comp1_2_3_4 = (comp1.join(comp2_3_4, ((comp1['machineID'] == comp2_3_4['machineID']) 
                                  & (comp1['datetime_tel'] == comp2_3_4['datetime_tel'])), "left")
                                 .drop(comp2_3_4.machineID).drop(comp2_3_4.datetime_tel))
comp1_2_3_4_final = (comp1_2_3_4.groupBy("machineID", "datetime_tel")
                                .agg(F.max('sincelastcomp1').alias('sincelastcomp1'), 
                                     F.max('sincelastcomp2').alias('sincelastcomp2'), 
                                     F.max('sincelastcomp3').alias('sincelastcomp3'), 
                                     F.max('sincelastcomp4').alias('sincelastcomp4')))

# fill in missing value
maint_count1 = comp1_2_3_4_final.fillna(0)

maint_count1.show(5)

+---------+-------------------+--------------+--------------+--------------+--------------+
|machineID|       datetime_tel|sincelastcomp1|sincelastcomp2|sincelastcomp3|sincelastcomp4|
+---------+-------------------+--------------+--------------+--------------+--------------+
|        1|2015-01-26 15:00:00|           134|           134|            74|           209|
|        1|2015-02-01 04:00:00|           140|           140|            80|           215|
|        1|2015-02-05 01:00:00|           144|           144|            84|           219|
|        1|2015-02-13 13:00:00|           152|           152|            92|           227|
|        1|2015-03-17 21:00:00|           184|           184|           124|           259|
+---------+-------------------+--------------+--------------+--------------+--------------+
only showing top 5 rows



### Resample to every 3 hours

In [23]:
# maint_count1 maintenance 
dt_truncated = ((round(unix_timestamp(col("datetime_tel").cast("timestamp")) / time_val) * time_val)
    .cast("timestamp"))

maint_resampled = maint_count1.withColumn("dt_truncated", dt_truncated)
maint_resampled1 = (maint_resampled.groupBy("machineID","dt_truncated")
                                  .agg(F.mean('sincelastcomp1').alias('comp1sum'), 
                                       F.mean('sincelastcomp2').alias('comp2sum'), 
                                       F.mean('sincelastcomp3').alias('comp3sum'), 
                                       F.mean('sincelastcomp4').alias('comp4sum')))
maint_resampled1.show(5)

+---------+--------------------+--------+--------+--------+--------+
|machineID|        dt_truncated|comp1sum|comp2sum|comp3sum|comp4sum|
+---------+--------------------+--------+--------+--------+--------+
|        6|2015-11-29 18:00:...|   366.0|   381.0|   471.0|   501.0|
|        7|2015-06-10 18:00:...|   269.0|   224.0|   344.0|   269.0|
|        7|2015-12-13 18:00:...|   455.0|   410.0|   530.0|   455.0|
|       12|2015-01-01 06:00:...|    64.0|    34.0|    64.0|    79.0|
|       20|2015-10-15 03:00:...|   351.0|   426.0|   456.0|   441.0|
+---------+--------------------+--------+--------+--------+--------+
only showing top 5 rows



## Machine features

The machine features can be used without further modification. These include descriptive information about the type of each machine and its age (number of years in service). If the age information had been recorded as a "first use date" for each machine, a transformation would have been necessary to turn those into a numeric values indicating the years in service.

We do need to create a set of dummy features, boolean variables to indicate the model name of the machine. This is a _one-hot encoding_ step. 

In [24]:
# one hot encoding of the variable model
catVarNames = ['model']  
    
sIndexers = [StringIndexer(inputCol=x, outputCol=x + '_indexed') for x in catVarNames]

machines_cat = Pipeline(stages=sIndexers).fit(machines).transform(machines)

# one-hot encode
ohEncoders = [OneHotEncoder(inputCol=x + '_indexed', outputCol=x + '_encoded')
              for x in catVarNames]
ohPipelineModel = Pipeline(stages=ohEncoders).fit(machines_cat)
machines_cat = ohPipelineModel.transform(machines_cat)

drop_list = [col_n for col_n in machines_cat.columns if 'indexed' in col_n]

machines_edit = machines_cat.select([column for column in machines_cat.columns if column not in drop_list])

machines_edit.show(5)

+---------+------+---+-------------+
|machineID| model|age|model_encoded|
+---------+------+---+-------------+
|        1|model2| 18|(3,[2],[1.0])|
|        2|model4|  7|(3,[1],[1.0])|
|        3|model3|  8|(3,[0],[1.0])|
|        4|model3|  7|(3,[0],[1.0])|
|        5|model2|  2|(3,[2],[1.0])|
+---------+------+---+-------------+
only showing top 5 rows



## Join data into feature engineering set


In [25]:
# join error with components
#err_mean_resampled1.show(3)
#maint_resampled1.show(3)

error_maint = (err_mean_resampled1.join(maint_resampled1, 
                                ((err_mean_resampled1['machineID'] == maint_resampled1['machineID']) 
                                  & (err_mean_resampled1['dt_truncated'] == maint_resampled1['dt_truncated'])), "left")
                                  .drop(maint_resampled1.machineID).drop(maint_resampled1.dt_truncated))
#error_maint.show(10, False)
#error_maint.count(), len(error_maint.columns)

# now join with machines
#machines_edit.show(1)

err_maint_mach = (error_maint.join(machines_edit, ((error_maint['machineID'] == machines_edit['machineID'])), "left")
                             .drop(machines_edit.machineID))
err_maint_mach_select = (err_maint_mach.select([c for c in err_maint_mach.columns if c not in 
                                               {'error1sum', 'error2sum', 'error3sum', 'error4sum', 'error5sum'}]))
#err_maint_mach_select.show(10, False)
#err_maint_mach_select.count(), len(err_maint_mach_select.columns)

telemetry_all = (tel_mean_resampled1.join(tel_sd_resampled1, 
                             ((tel_mean_resampled1['machineID'] == tel_sd_resampled1['machineID']) 
                              & (tel_mean_resampled1['dt_truncated'] == tel_sd_resampled1['dt_truncated'])), "left")
                              .drop(tel_sd_resampled1.machineID).drop(tel_sd_resampled1.dt_truncated))
#telemetry_all.show(10, False)
#telemetry_all.count(), len(telemetry_all.columns)

# join telemetry_all with err_maint_mach_select to create final feature matrix
final_feat = (telemetry_all.join(err_maint_mach_select, 
                                ((telemetry_all['machineID'] == err_maint_mach_select['machineID']) 
                                  & (telemetry_all['dt_truncated'] == err_maint_mach_select['dt_truncated'])), "left")
                                 .drop(err_maint_mach_select.machineID).drop(err_maint_mach_select.dt_truncated))
final_feat.show(5, False)
#final_feat.count(), len(final_feat.columns)

+---------+---------------------+------------------+--------------------+----------------------+-----------------------+-------------------+---------------------+-----------------------+------------------------+------------------+-------------------+---------------------+----------------------+--------------------+--------------------+----------------------+-----------------------+------------------------+------------------------+------------------------+------------------------+------------------------+--------+--------+--------+--------+------+---+-------------+
|machineID|dt_truncated         |volt_rollingmean_3|rotate_rollingmean_3|pressure_rollingmean_3|vibration_rollingmean_3|volt_rollingmean_24|rotate_rollingmean_24|pressure_rollingmean_24|vibration_rollingmean_24|volt_rollingstd_3 |rotate_rollingstd_3|pressure_rollingstd_3|vibration_rollingstd_3|volt_rollingstd_24  |rotate_rollingstd_24|pressure_rollingstd_24|vibration_rollingstd_24|error1sum_rollingmean_24|error2sum_rollingmea

# Label construction

When using multi-class classification for predicting failure due to a problem, labelling is done by taking a time window prior to the failure of an asset and labelling the feature records that fall into that window as "about to fail due to a problem" while labelling all other records as "Â€Âœnormal." This time window should be picked according to the business case: in some situations it may be enough to predict failures hours in advance, while in others days or weeks may be needed to allow e.g. for arrival of replacement parts.

The prediction problem for this example scenerio is to estimate the probability that a machine will fail in the near future due to a failure of a certain component. More specifically, the goal is to compute the probability that a machine will fail in the next 24 hours due to a certain component failure (component 1, 2, 3, or 4). Below, a categorical failure feature is created to serve as the label. All records within a 24 hour window before a failure of component 1 have failure=comp1, and so on for components 2, 3, and 4; all records not within 24 hours of a component failure have failure=none.

In [26]:
# check failure sample data
failures.show(5)

# check the dimensions of the data
failures.count(), len(failures.columns)

+-------------------+---------+-------+
|           datetime|machineID|failure|
+-------------------+---------+-------+
|2015-09-18 06:00:00|      453|  comp2|
|2015-12-17 06:00:00|      453|  comp2|
|2015-03-27 06:00:00|      454|  comp2|
|2015-08-24 06:00:00|      454|  comp2|
|2015-09-23 06:00:00|      454|  comp1|
+-------------------+---------+-------+
only showing top 5 rows



(6726, 3)

In [27]:
# check to see if there are duplicate rows based on machine, datetime
failures1 = failures.dropDuplicates(['machineID', 'datetime'])

# check the dimensions of the data
failures1.count(), len(failures1.columns)

(6368, 3)

In [28]:
# map the failure data to final feature matrix

labeled_features = (final_feat.join(failures1, ((final_feat['machineID'] == failures1['machineID']) 
                                  & (final_feat['dt_truncated'] == failures1['datetime'])), "left")
                                  .drop(failures1.machineID).drop(failures1.datetime))
labeled_features.show(5, False)
#labeled_features.count(), len(labeled_features.columns)

+---------+---------------------+------------------+--------------------+----------------------+-----------------------+-------------------+---------------------+-----------------------+------------------------+------------------+-------------------+---------------------+----------------------+--------------------+--------------------+----------------------+-----------------------+------------------------+------------------------+------------------------+------------------------+------------------------+--------+--------+--------+--------+------+---+-------------+-------+
|machineID|dt_truncated         |volt_rollingmean_3|rotate_rollingmean_3|pressure_rollingmean_3|vibration_rollingmean_3|volt_rollingmean_24|rotate_rollingmean_24|pressure_rollingmean_24|vibration_rollingmean_24|volt_rollingstd_3 |rotate_rollingstd_3|pressure_rollingstd_3|vibration_rollingstd_3|volt_rollingstd_24  |rotate_rollingstd_24|pressure_rollingstd_24|vibration_rollingstd_24|error1sum_rollingmean_24|error2sum_ro

In [29]:
# recoding the column 'failure' to be numeric double for the pyspark classification models
labeled_features1 = (labeled_features.withColumn('failure', F.when(col('failure') == "comp1", 1.0)
                                     .otherwise(col('failure')))
                                     .withColumn('failure', F.when(col('failure') == "comp2", 2.0)
                                     .otherwise(col('failure')))
                                     .withColumn('failure', F.when(col('failure') == "comp3", 3.0)
                                     .otherwise(col('failure')))
                                     .withColumn('failure', F.when(col('failure') == "comp4", 4.0)
                                     .otherwise(col('failure'))))

labeled_features2 = labeled_features1.withColumn("failure1", labeled_features1["failure"].cast(DoubleType()))

#labeled_features2.groupBy('failure').count().show()
#labeled_features2.groupBy('failure1').count().show()

In [30]:
# check data schema
labeled_features2.dtypes

[('machineID', 'bigint'),
 ('dt_truncated', 'timestamp'),
 ('volt_rollingmean_3', 'double'),
 ('rotate_rollingmean_3', 'double'),
 ('pressure_rollingmean_3', 'double'),
 ('vibration_rollingmean_3', 'double'),
 ('volt_rollingmean_24', 'double'),
 ('rotate_rollingmean_24', 'double'),
 ('pressure_rollingmean_24', 'double'),
 ('vibration_rollingmean_24', 'double'),
 ('volt_rollingstd_3', 'double'),
 ('rotate_rollingstd_3', 'double'),
 ('pressure_rollingstd_3', 'double'),
 ('vibration_rollingstd_3', 'double'),
 ('volt_rollingstd_24', 'double'),
 ('rotate_rollingstd_24', 'double'),
 ('pressure_rollingstd_24', 'double'),
 ('vibration_rollingstd_24', 'double'),
 ('error1sum_rollingmean_24', 'double'),
 ('error2sum_rollingmean_24', 'double'),
 ('error3sum_rollingmean_24', 'double'),
 ('error4sum_rollingmean_24', 'double'),
 ('error5sum_rollingmean_24', 'double'),
 ('comp1sum', 'double'),
 ('comp2sum', 'double'),
 ('comp3sum', 'double'),
 ('comp4sum', 'double'),
 ('model', 'string'),
 ('age', 'b

In [31]:
labeled_features3 = labeled_features2.drop('failure').fillna(0)
labeled_features3.dtypes
#labeled_features3.groupBy('failure1').count().show()

[('machineID', 'bigint'),
 ('dt_truncated', 'timestamp'),
 ('volt_rollingmean_3', 'double'),
 ('rotate_rollingmean_3', 'double'),
 ('pressure_rollingmean_3', 'double'),
 ('vibration_rollingmean_3', 'double'),
 ('volt_rollingmean_24', 'double'),
 ('rotate_rollingmean_24', 'double'),
 ('pressure_rollingmean_24', 'double'),
 ('vibration_rollingmean_24', 'double'),
 ('volt_rollingstd_3', 'double'),
 ('rotate_rollingstd_3', 'double'),
 ('pressure_rollingstd_3', 'double'),
 ('vibration_rollingstd_3', 'double'),
 ('volt_rollingstd_24', 'double'),
 ('rotate_rollingstd_24', 'double'),
 ('pressure_rollingstd_24', 'double'),
 ('vibration_rollingstd_24', 'double'),
 ('error1sum_rollingmean_24', 'double'),
 ('error2sum_rollingmean_24', 'double'),
 ('error3sum_rollingmean_24', 'double'),
 ('error4sum_rollingmean_24', 'double'),
 ('error5sum_rollingmean_24', 'double'),
 ('comp1sum', 'double'),
 ('comp2sum', 'double'),
 ('comp3sum', 'double'),
 ('comp4sum', 'double'),
 ('model', 'string'),
 ('age', 'b

In [32]:
# build the code for backfill with all machine data
label_bfill1 = labeled_features3
label_bfill1.show(1)

+---------+--------------------+------------------+--------------------+----------------------+-----------------------+-------------------+---------------------+-----------------------+------------------------+-----------------+-------------------+---------------------+----------------------+-------------------+--------------------+----------------------+-----------------------+------------------------+------------------------+------------------------+------------------------+------------------------+--------+--------+--------+--------+------+---+-------------+--------+
|machineID|        dt_truncated|volt_rollingmean_3|rotate_rollingmean_3|pressure_rollingmean_3|vibration_rollingmean_3|volt_rollingmean_24|rotate_rollingmean_24|pressure_rollingmean_24|vibration_rollingmean_24|volt_rollingstd_3|rotate_rollingstd_3|pressure_rollingstd_3|vibration_rollingstd_3| volt_rollingstd_24|rotate_rollingstd_24|pressure_rollingstd_24|vibration_rollingstd_24|error1sum_rollingmean_24|error2sum_rolling

In [33]:
# lag values to manually backfill label (bfill =7)
my_window = Window.partitionBy('machineID').orderBy(label_bfill1.dt_truncated.desc())

label_bfill1 = label_bfill1.withColumn("prev_value1", F.lag(label_bfill1.failure1).over(my_window)).fillna(0)
label_bfill1 = label_bfill1.withColumn("prev_value2", F.lag(label_bfill1.prev_value1).over(my_window)).fillna(0) 
label_bfill1 = label_bfill1.withColumn("prev_value3", F.lag(label_bfill1.prev_value2).over(my_window)).fillna(0) 
label_bfill1 = label_bfill1.withColumn("prev_value4", F.lag(label_bfill1.prev_value3).over(my_window)).fillna(0) 
label_bfill1 = label_bfill1.withColumn("prev_value5", F.lag(label_bfill1.prev_value4).over(my_window)).fillna(0) 
label_bfill1 = label_bfill1.withColumn("prev_value6", F.lag(label_bfill1.prev_value5).over(my_window)).fillna(0) 
label_bfill1 = label_bfill1.withColumn("prev_value7", F.lag(label_bfill1.prev_value6).over(my_window)).fillna(0)

In [34]:
# create the label column 
label_bfill2 = (label_bfill1.withColumn('label', label_bfill1.failure1 + label_bfill1.prev_value1 
                         + label_bfill1.prev_value2 + label_bfill1.prev_value3 + label_bfill1.prev_value4 
                         + label_bfill1.prev_value5 + label_bfill1.prev_value6 + label_bfill1.prev_value7))
label_bfill2 = label_bfill2.withColumn('label_e', F.when(col('label') > 4, 4.0).otherwise(col('label')))

In [None]:
label_bfill3 = (label_bfill2.drop(label_bfill2.prev_value1).drop(label_bfill2.prev_value2)
              .drop(label_bfill2.prev_value3).drop(label_bfill2.prev_value4)
              .drop(label_bfill2.prev_value5).drop(label_bfill2.prev_value6)
              .drop(label_bfill2.prev_value7).drop(label_bfill2.label))

In [None]:
label_bfill3.show(1)

In [None]:
# write the final result as parquet file in blob location 
# https://github.com/Azure/ViennaDocs/blob/master/Documentation/UsingBlobForStorage.md

# Create a new container if necessary, otherwise you can use an existing container.
# This command creates the container if it does not already exist. Else it does nothing.
my_service.create_container(STORAGE_CONTAINER_NAME, 
                            fail_on_exist=False, 
                            public_access=PublicAccess.Container)

# you decide to partition the dataframe into three files and save them in the current folder.
# if you wish to visualize them in the run history Output Files, specify the path 
# as './outputs/multiple_files.parquet'.
#label_bfill3.coalesce(3).write.mode('overwrite').parquet('multiple_files.parquet')
label_bfill3.write.mode('overwrite').parquet(FEATURES_LOCAL_DIRECT)

# unlike the single file case, for multiple files we need to first delete results from the 
# previous run before uploading.
for blob in my_service.list_blobs(STORAGE_CONTAINER_NAME):
    if FEATURES_LOCAL_DIRECT in blob.name:
        my_service.delete_blob(STORAGE_CONTAINER_NAME, blob.name)

# upload the entire folder into blob storage
for name in glob.iglob(FEATURES_LOCAL_DIRECT + '/*'):
    print(os.path.abspath(name))
    my_service.create_blob_from_path(STORAGE_CONTAINER_NAME, name, name)

print("Feature engineering final dataset files saved!")

# Conclusion
