# Rolling Feature Computation


### Work with result dataset from Notebook_1

In [1]:
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import lit
import pyspark.sql.functions as F
from pyspark.sql.functions import col,udf,lag,date_add,explode
from pandas import DataFrame
from pyspark.sql.dataframe import *
from pyspark.ml.classification import *
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.window import Window
import time
from pyspark.sql.functions import col,udf, unix_timestamp
from pyspark.sql.types import DateType
from datetime import datetime, timedelta
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.evaluation import *
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder
import numpy as np
import pandas as pd
from pyspark.sql import Row
import subprocess
import sys
import os
import re
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.regression import RandomForestRegressor
from pyspark.mllib.evaluation import BinaryClassificationMetrics, RegressionMetrics
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer, RFormula
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import matplotlib.pyplot as plt
import numpy as np
import datetime
import atexit
from sklearn.metrics import roc_curve,auc
import seaborn as sns
from pyspark.sql.functions import month, weekofyear, dayofmonth
from pyspark.sql.functions import concat, col, lit
from pyspark.ml.feature import RFormula
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import PCA
start_time = time.time()


In [2]:
##################################### rolling computation ###########################

rolling_features = [
    'warn_type1_total', 'warn_type2_total', 
    'pca_1_warn','pca_2_warn', 'pca_3_warn', 'pca_4_warn', 'pca_5_warn',
    'pca_6_warn','pca_7_warn', 'pca_8_warn', 'pca_9_warn', 'pca_10_warn',
    'pca_11_warn','pca_12_warn', 'pca_13_warn', 'pca_14_warn', 'pca_15_warn',
    'pca_16_warn','pca_17_warn', 'pca_18_warn', 'pca_19_warn', 'pca_20_warn',
    'problem_type_1', 'problem_type_2', 'problem_type_3','problem_type_4',
    'problem_type_1_per_usage1','problem_type_2_per_usage1',
    'problem_type_3_per_usage1','problem_type_4_per_usage1',
    'problem_type_1_per_usage2','problem_type_2_per_usage2',
    'problem_type_3_per_usage2','problem_type_4_per_usage2',                
    'fault_code_type_1_count', 'fault_code_type_2_count', 'fault_code_type_3_count', 'fault_code_type_4_count',                          
    'fault_code_type_1_count_per_usage1','fault_code_type_2_count_per_usage1',
    'fault_code_type_3_count_per_usage1', 'fault_code_type_4_count_per_usage1',
    'fault_code_type_1_count_per_usage2','fault_code_type_2_count_per_usage2',
    'fault_code_type_3_count_per_usage2', 'fault_code_type_4_count_per_usage2']
               
# lag window 3, 7, 14, 30, 90 days
lags = [3, 7, 14, 30, 90]

print(len(rolling_features))


46


## What issues we encountered using Pyspark and how we solved them?

-  If the entire list of **46 features** and **5 time windows** were computed for **5 different types of rolling** (mean, difference, std, max, min) all in one go, we always ran into "StackOverFlow" error. 
-  It was because the lineage was too long and Spark could not handle it.
-  We could either create checkPoint and materialize it throughout the process.
-  OR break the workload into chunks and save the result from each chunk as parquet file.

## A few things we found helpful:
-  Before the rolling compute, save the upstream work as a parquet file in Notebook_1 ("Notebook_1_DataCleansing_FeatureEngineering"). It will speed up the whole process because we no need to repeat all the previous steps. It will also help reduce the lineage.
-  Print out the lag and feature name to track progress.
-  Use htop to keep track how many CPUs are running for a particular task. For rolling compute, we were considering two potential approaches: 1) Use Spark clusters on HDI to perform rolling compute in parallel; 2) Use single node Spark on a powerful VM. By looking at htop dashboard, we saw all the 32 cores were running at the same time for a single task (for example compute rolling mean). So if say we divide the workload onto multiple nodes and each node runs a type of rolling compute, the amount of time taken will be comparable with running everything in a sequential manner on a single node Spark on a powerful machine.
-  Use "%%time" for each cell to get an estimate of the total run time, we will then have a better idea where and what to optimze the process.
-  Materialize the intermediate results by either caching in memory or writing as parquet files. We chose to save as parquet files because we did not want to repeat the compute again in case cache() did not work or any part of the rolling compute did not work.
-  Why parquet? There are many reasons, just to name a few: parquet not only saves the data but also the schema, it is a preferred file format by Spark, you are allowed to read only the data you need, etc..

<br>


In [3]:
%%time

# load intermediate dataset from the 1st notebook 
df = sqlContext.read.parquet('/mnt/resource/PysparkExample/notebook1_result.parquet')

for lag_n in lags:
    wSpec = Window.partitionBy('deviceid').orderBy('date').rowsBetween(1-lag_n, 0)
    for col_name in rolling_features:
        df = df.withColumn(col_name+'_rollingmean_'+str(lag_n), F.avg(col(col_name)).over(wSpec))
#         df = df.withColumn(col_name+'_rollingdiff_'+str(lag_n), col(col_name)-F.avg(col(col_name)).over(wSpec))
#         df = df.withColumn(col_name+'_rollingstd_'+str(lag_n), F.stddev(col(col_name)).over(wSpec))
#         df = df.withColumn(col_name+'_rollingmax_'+str(lag_n), F.max(col(col_name)).over(wSpec))
#         df = df.withColumn(col_name+'_rollingmin_'+str(lag_n), F.min(col(col_name)).over(wSpec))
        print("Lag = %d, Column = %s" % (lag_n, col_name))

## save the intermediate result for downstream work
df.write.mode('overwrite').parquet('/mnt/resource/PysparkExample/data_rollingmean.parquet')


Lag = 3, Column = warn_type1_total
Lag = 3, Column = warn_type2_total
Lag = 3, Column = pca_1_warn
Lag = 3, Column = pca_2_warn
Lag = 3, Column = pca_3_warn
Lag = 3, Column = pca_4_warn
Lag = 3, Column = pca_5_warn
Lag = 3, Column = pca_6_warn
Lag = 3, Column = pca_7_warn
Lag = 3, Column = pca_8_warn
Lag = 3, Column = pca_9_warn
Lag = 3, Column = pca_10_warn
Lag = 3, Column = pca_11_warn
Lag = 3, Column = pca_12_warn
Lag = 3, Column = pca_13_warn
Lag = 3, Column = pca_14_warn
Lag = 3, Column = pca_15_warn
Lag = 3, Column = pca_16_warn
Lag = 3, Column = pca_17_warn
Lag = 3, Column = pca_18_warn
Lag = 3, Column = pca_19_warn
Lag = 3, Column = pca_20_warn
Lag = 3, Column = problem_type_1
Lag = 3, Column = problem_type_2
Lag = 3, Column = problem_type_3
Lag = 3, Column = problem_type_4
Lag = 3, Column = problem_type_1_per_usage1
Lag = 3, Column = problem_type_2_per_usage1
Lag = 3, Column = problem_type_3_per_usage1
Lag = 3, Column = problem_type_4_per_usage1
Lag = 3, Column = problem_type_

In [4]:
%%time

# load intermediate dataset from the 1st notebook 
df = sqlContext.read.parquet('/mnt/resource/PysparkExample/notebook1_result.parquet')

for lag_n in lags:
    wSpec = Window.partitionBy('deviceid').orderBy('date').rowsBetween(1-lag_n, 0)
    for col_name in rolling_features:
#         df = df.withColumn(col_name+'_rollingmean_'+str(lag_n), F.avg(col(col_name)).over(wSpec))
        df = df.withColumn(col_name+'_rollingdiff_'+str(lag_n), col(col_name)-F.avg(col(col_name)).over(wSpec))
#         df = df.withColumn(col_name+'_rollingstd_'+str(lag_n), F.stddev(col(col_name)).over(wSpec))
#         df = df.withColumn(col_name+'_rollingmax_'+str(lag_n), F.max(col(col_name)).over(wSpec))
#         df = df.withColumn(col_name+'_rollingmin_'+str(lag_n), F.min(col(col_name)).over(wSpec))
        print("Lag = %d, Column = %s" % (lag_n, col_name))

rollingdiff = df.select(['key'] + list(s for s in df.columns if "rollingdiff" in s))

## save the intermediate result for downstream work
rollingdiff.write.mode('overwrite').parquet('/mnt/resource/PysparkExample/rollingdiff.parquet')


Lag = 3, Column = warn_type1_total
Lag = 3, Column = warn_type2_total
Lag = 3, Column = pca_1_warn
Lag = 3, Column = pca_2_warn
Lag = 3, Column = pca_3_warn
Lag = 3, Column = pca_4_warn
Lag = 3, Column = pca_5_warn
Lag = 3, Column = pca_6_warn
Lag = 3, Column = pca_7_warn
Lag = 3, Column = pca_8_warn
Lag = 3, Column = pca_9_warn
Lag = 3, Column = pca_10_warn
Lag = 3, Column = pca_11_warn
Lag = 3, Column = pca_12_warn
Lag = 3, Column = pca_13_warn
Lag = 3, Column = pca_14_warn
Lag = 3, Column = pca_15_warn
Lag = 3, Column = pca_16_warn
Lag = 3, Column = pca_17_warn
Lag = 3, Column = pca_18_warn
Lag = 3, Column = pca_19_warn
Lag = 3, Column = pca_20_warn
Lag = 3, Column = problem_type_1
Lag = 3, Column = problem_type_2
Lag = 3, Column = problem_type_3
Lag = 3, Column = problem_type_4
Lag = 3, Column = problem_type_1_per_usage1
Lag = 3, Column = problem_type_2_per_usage1
Lag = 3, Column = problem_type_3_per_usage1
Lag = 3, Column = problem_type_4_per_usage1
Lag = 3, Column = problem_type_

In [5]:
%%time

# load intermediate dataset from the 1st notebook 
df = sqlContext.read.parquet('/mnt/resource/PysparkExample/notebook1_result.parquet')

for lag_n in lags:
    wSpec = Window.partitionBy('deviceid').orderBy('date').rowsBetween(1-lag_n, 0)
    for col_name in rolling_features:
#         df = df.withColumn(col_name+'_rollingmean_'+str(lag_n), F.avg(col(col_name)).over(wSpec))
#         df = df.withColumn(col_name+'_rollingdiff_'+str(lag_n), col(col_name)-F.avg(col(col_name)).over(wSpec))
        df = df.withColumn(col_name+'_rollingstd_'+str(lag_n), F.stddev(col(col_name)).over(wSpec))
#         df = df.withColumn(col_name+'_rollingmax_'+str(lag_n), F.max(col(col_name)).over(wSpec))
#         df = df.withColumn(col_name+'_rollingmin_'+str(lag_n), F.min(col(col_name)).over(wSpec))
        print("Lag = %d, Column = %s" % (lag_n, col_name))

# there are some missing values for rollingstd features
rollingstd_features = list(s for s in df.columns if "rollingstd" in s)
df = df.fillna(0, subset=rollingstd_features)
rollingstd = df.select(['key'] + list(s for s in df.columns if "rollingstd" in s))

## save the intermediate result for downstream work
rollingstd.write.mode('overwrite').parquet('/mnt/resource/PysparkExample/rollingstd.parquet')


Lag = 3, Column = warn_type1_total
Lag = 3, Column = warn_type2_total
Lag = 3, Column = pca_1_warn
Lag = 3, Column = pca_2_warn
Lag = 3, Column = pca_3_warn
Lag = 3, Column = pca_4_warn
Lag = 3, Column = pca_5_warn
Lag = 3, Column = pca_6_warn
Lag = 3, Column = pca_7_warn
Lag = 3, Column = pca_8_warn
Lag = 3, Column = pca_9_warn
Lag = 3, Column = pca_10_warn
Lag = 3, Column = pca_11_warn
Lag = 3, Column = pca_12_warn
Lag = 3, Column = pca_13_warn
Lag = 3, Column = pca_14_warn
Lag = 3, Column = pca_15_warn
Lag = 3, Column = pca_16_warn
Lag = 3, Column = pca_17_warn
Lag = 3, Column = pca_18_warn
Lag = 3, Column = pca_19_warn
Lag = 3, Column = pca_20_warn
Lag = 3, Column = problem_type_1
Lag = 3, Column = problem_type_2
Lag = 3, Column = problem_type_3
Lag = 3, Column = problem_type_4
Lag = 3, Column = problem_type_1_per_usage1
Lag = 3, Column = problem_type_2_per_usage1
Lag = 3, Column = problem_type_3_per_usage1
Lag = 3, Column = problem_type_4_per_usage1
Lag = 3, Column = problem_type_

In [6]:
%%time

# load intermediate dataset from the 1st notebook 
df = sqlContext.read.parquet('/mnt/resource/PysparkExample/notebook1_result.parquet')

for lag_n in lags:
    wSpec = Window.partitionBy('deviceid').orderBy('date').rowsBetween(1-lag_n, 0)
    for col_name in rolling_features:
#         df = df.withColumn(col_name+'_rollingmean_'+str(lag_n), F.avg(col(col_name)).over(wSpec))
#         df = df.withColumn(col_name+'_rollingdiff_'+str(lag_n), col(col_name)-F.avg(col(col_name)).over(wSpec))
#         df = df.withColumn(col_name+'_rollingstd_'+str(lag_n), F.stddev(col(col_name)).over(wSpec))
        df = df.withColumn(col_name+'_rollingmax_'+str(lag_n), F.max(col(col_name)).over(wSpec))
#         df = df.withColumn(col_name+'_rollingmin_'+str(lag_n), F.min(col(col_name)).over(wSpec))
        print("Lag = %d, Column = %s" % (lag_n, col_name))

rollingmax = df.select(['key'] + list(s for s in df.columns if "rollingmax" in s))

## save the intermediate result for downstream work
rollingmax.write.mode('overwrite').parquet('/mnt/resource/PysparkExample/rollingmax.parquet')


Lag = 3, Column = warn_type1_total
Lag = 3, Column = warn_type2_total
Lag = 3, Column = pca_1_warn
Lag = 3, Column = pca_2_warn
Lag = 3, Column = pca_3_warn
Lag = 3, Column = pca_4_warn
Lag = 3, Column = pca_5_warn
Lag = 3, Column = pca_6_warn
Lag = 3, Column = pca_7_warn
Lag = 3, Column = pca_8_warn
Lag = 3, Column = pca_9_warn
Lag = 3, Column = pca_10_warn
Lag = 3, Column = pca_11_warn
Lag = 3, Column = pca_12_warn
Lag = 3, Column = pca_13_warn
Lag = 3, Column = pca_14_warn
Lag = 3, Column = pca_15_warn
Lag = 3, Column = pca_16_warn
Lag = 3, Column = pca_17_warn
Lag = 3, Column = pca_18_warn
Lag = 3, Column = pca_19_warn
Lag = 3, Column = pca_20_warn
Lag = 3, Column = problem_type_1
Lag = 3, Column = problem_type_2
Lag = 3, Column = problem_type_3
Lag = 3, Column = problem_type_4
Lag = 3, Column = problem_type_1_per_usage1
Lag = 3, Column = problem_type_2_per_usage1
Lag = 3, Column = problem_type_3_per_usage1
Lag = 3, Column = problem_type_4_per_usage1
Lag = 3, Column = problem_type_

In [7]:
%%time

# load intermediate dataset from the 1st notebook 
df = sqlContext.read.parquet('/mnt/resource/PysparkExample/notebook1_result.parquet')

for lag_n in lags:
    wSpec = Window.partitionBy('deviceid').orderBy('date').rowsBetween(1-lag_n, 0)
    for col_name in rolling_features:
#         df = df.withColumn(col_name+'_rollingmean_'+str(lag_n), F.avg(col(col_name)).over(wSpec))
#         df = df.withColumn(col_name+'_rollingdiff_'+str(lag_n), col(col_name)-F.avg(col(col_name)).over(wSpec))
#         df = df.withColumn(col_name+'_rollingstd_'+str(lag_n), F.stddev(col(col_name)).over(wSpec))
#         df = df.withColumn(col_name+'_rollingmax_'+str(lag_n), F.max(col(col_name)).over(wSpec))
        df = df.withColumn(col_name+'_rollingmin_'+str(lag_n), F.min(col(col_name)).over(wSpec))
        print("Lag = %d, Column = %s" % (lag_n, col_name))

rollingmin = df.select(['key'] + list(s for s in df.columns if "rollingmin" in s))

## save the intermediate result for downstream work
rollingmin.write.mode('overwrite').parquet('/mnt/resource/PysparkExample/rollingmin.parquet')


Lag = 3, Column = warn_type1_total
Lag = 3, Column = warn_type2_total
Lag = 3, Column = pca_1_warn
Lag = 3, Column = pca_2_warn
Lag = 3, Column = pca_3_warn
Lag = 3, Column = pca_4_warn
Lag = 3, Column = pca_5_warn
Lag = 3, Column = pca_6_warn
Lag = 3, Column = pca_7_warn
Lag = 3, Column = pca_8_warn
Lag = 3, Column = pca_9_warn
Lag = 3, Column = pca_10_warn
Lag = 3, Column = pca_11_warn
Lag = 3, Column = pca_12_warn
Lag = 3, Column = pca_13_warn
Lag = 3, Column = pca_14_warn
Lag = 3, Column = pca_15_warn
Lag = 3, Column = pca_16_warn
Lag = 3, Column = pca_17_warn
Lag = 3, Column = pca_18_warn
Lag = 3, Column = pca_19_warn
Lag = 3, Column = pca_20_warn
Lag = 3, Column = problem_type_1
Lag = 3, Column = problem_type_2
Lag = 3, Column = problem_type_3
Lag = 3, Column = problem_type_4
Lag = 3, Column = problem_type_1_per_usage1
Lag = 3, Column = problem_type_2_per_usage1
Lag = 3, Column = problem_type_3_per_usage1
Lag = 3, Column = problem_type_4_per_usage1
Lag = 3, Column = problem_type_

## Joining data frame 
-  **Usually very slow, it is better to reduce the number of partitions before the join.**
-  **Check the number of partitions of the pyspark dataframe.**
-  **repartition vs coalesce. If we only want to reduce the number of partitions, it is better to use coalesce because repartition involves reshuffling which is computational more expensive and takes more time.**
<br>


In [8]:
#----------------------------------- Join all the rolling results ---------------------------#

rollingmean = sqlContext.read.parquet('/mnt/resource/PysparkExample/data_rollingmean.parquet')
rollingdiff = sqlContext.read.parquet('/mnt/resource/PysparkExample/rollingdiff.parquet')
rollingstd = sqlContext.read.parquet('/mnt/resource/PysparkExample/rollingstd.parquet')
rollingmax = sqlContext.read.parquet('/mnt/resource/PysparkExample/rollingmax.parquet')
rollingmin = sqlContext.read.parquet('/mnt/resource/PysparkExample/rollingmin.parquet')

# check the number of partitions for each dataset
print(rollingmean.rdd.getNumPartitions())
print(rollingdiff.rdd.getNumPartitions())
print(rollingstd.rdd.getNumPartitions())
print(rollingmax.rdd.getNumPartitions())
print(rollingmin.rdd.getNumPartitions())


33
33
33
31
31


In [9]:
%%time

# To make join faster, reduce the number of partitions.
rollingmean = rollingmean.coalesce(1)
rollingdiff = rollingdiff.coalesce(1)
rollingstd = rollingstd.coalesce(1)
rollingmax = rollingmax.coalesce(1)
rollingmin = rollingmin.coalesce(1)

rolling_result = rollingmean.join(rollingdiff, 'key', 'inner')\
                 .join(rollingstd, 'key', 'inner')\
                 .join(rollingmax, 'key', 'inner')\
                 .join(rollingmin, 'key', 'inner')
            

## write the final result as parquet file for downstream work in notebook_3
rolling_result.write.mode('overwrite').parquet('/mnt/resource/PysparkExample/notebook2_result.parquet')


CPU times: user 901 ms, sys: 303 ms, total: 1.2 s
Wall time: 1h 50min 38s
