In [1]:
import os
import sys
import copy
import time
import random
import pyspark
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType

# for creating pipelines and model
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, VectorIndexer
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# sklearn
from sklearn.metrics import precision_recall_fscore_support as score
from sklearn.model_selection import RandomizedSearchCV
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import train_test_split

In [2]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars xgboost4j_3.0-1.0.0-0.1.0.jar,xgboost4j-spark_3.0-1.0.0-0.1.0.jar pyspark-shell'

def init_spark():
    spark = SparkSession \
        .builder \
        .appName("Predictive Maintenance") \
        .config("spark.some.config.option", "some-value") \
        .config("spark.executor.memory", "70g")\
        .config("spark.driver.memory", "50g")\
        .config("spark.memory.offHeap.enabled", "true")\
        .config("spark.memory.offHeap.size","30g")\
        .master("local[*]")\
        .getOrCreate()
    return spark

spark = init_spark()

# Data Split

In [3]:
filename = "..\data\machines.csv"
machines = spark.read.csv(filename, sep=',', header=True)

print(machines.count())
machines.show()

100
+---------+------+---+
|machineID| model|age|
+---------+------+---+
|        1|model3| 18|
|        2|model4|  7|
|        3|model3|  8|
|        4|model3|  7|
|        5|model3|  2|
|        6|model3|  7|
|        7|model3| 20|
|        8|model3| 16|
|        9|model4|  7|
|       10|model3| 10|
|       11|model2|  6|
|       12|model3|  9|
|       13|model1| 15|
|       14|model3|  1|
|       15|model3| 14|
|       16|model1|  3|
|       17|model1| 14|
|       18|model3| 15|
|       19|model3| 17|
|       20|model2| 16|
+---------+------+---+
only showing top 20 rows



In [3]:
feat_data = spark.read.parquet('../data/labeled_features.parquet')
feat_data = feat_data.withColumn("age", feat_data.age.cast(DoubleType()))

print(feat_data.count())
# highly imbalanced data
print(feat_data.groupby('label_e').count().show())
feat_data.limit(10).toPandas().head(10)

73142
+-------+-----+
|label_e|count|
+-------+-----+
|    0.0|67470|
|    1.0| 1337|
|    4.0| 1473|
|    3.0| 1000|
|    2.0| 1862|
+-------+-----+

None


Unnamed: 0,dt_truncated,volt_rollingmean_12,rotate_rollingmean_12,pressure_rollingmean_12,vibration_rollingmean_12,volt_rollingmean_24,rotate_rollingmean_24,pressure_rollingmean_24,vibration_rollingmean_24,volt_rollingmean_36,...,error5sum_rollingmean_24,comp1sum,comp2sum,comp3sum,comp4sum,model,age,model_encoded,failure,label_e
0,2016-01-01 07:00:00,172.083928,453.576897,101.30311,40.62741,169.230878,451.007306,100.487259,40.839262,167.339602,...,0.0,579.0,534.0,474.0,459.0,model4,3.0,"(0.0, 1.0, 0.0)",0.0,0.0
1,2015-12-31 19:00:00,168.173348,453.181951,99.527531,40.981132,165.787189,449.842118,100.598808,41.791947,166.190766,...,0.0,578.0,533.0,473.0,458.0,model4,3.0,"(0.0, 1.0, 0.0)",0.0,0.0
2,2015-12-31 07:00:00,163.40103,446.502286,101.670084,42.602762,165.199475,445.038344,101.074817,41.722713,168.995817,...,0.0,578.0,533.0,473.0,458.0,model4,3.0,"(0.0, 1.0, 0.0)",0.0,0.0
3,2015-12-30 19:00:00,166.997919,443.574402,100.47955,40.842664,171.793211,450.456864,100.955598,40.418503,172.419415,...,0.0,577.0,532.0,472.0,457.0,model4,3.0,"(0.0, 1.0, 0.0)",0.0,0.0
4,2015-12-30 07:00:00,176.588502,457.339327,101.431647,39.994342,175.130162,466.483595,99.468624,40.920312,174.914358,...,0.0,577.0,532.0,472.0,457.0,model4,3.0,"(0.0, 1.0, 0.0)",0.0,0.0
5,2015-12-29 19:00:00,173.671822,475.627863,97.505602,41.846282,174.077285,464.681249,99.852795,41.520096,173.561514,...,0.0,576.0,531.0,471.0,456.0,model4,3.0,"(0.0, 1.0, 0.0)",0.0,0.0
6,2015-12-29 07:00:00,174.482749,453.734636,102.199989,41.193909,173.50636,450.268077,101.890206,39.760938,173.318924,...,0.0,576.0,531.0,471.0,456.0,model4,3.0,"(0.0, 1.0, 0.0)",0.0,0.0
7,2015-12-28 19:00:00,172.529971,446.801518,101.580424,38.327966,172.737011,443.114257,99.159233,38.946824,171.416655,...,0.0,575.0,530.0,470.0,455.0,model4,3.0,"(0.0, 1.0, 0.0)",0.0,0.0
8,2015-12-28 07:00:00,172.944052,439.426996,96.738042,39.565681,170.859997,444.430674,99.034004,39.874349,172.38422,...,0.0,575.0,530.0,470.0,455.0,model4,3.0,"(0.0, 1.0, 0.0)",0.0,0.0
9,2015-12-27 19:00:00,168.775941,449.434352,101.329965,40.183016,172.104304,444.571586,98.413556,39.550401,170.807337,...,0.0,574.0,529.0,469.0,454.0,model4,3.0,"(0.0, 1.0, 0.0)",0.0,0.0


In [4]:
label_var = ['label_e']
key_cols =['machineID','dt_truncated']
input_features = feat_data.columns
remove_cols = label_var + key_cols + ['failure','model_encoded','model' ]

# Remove the extra names if that are in the input_features list
input_features = [x for x in input_features if x not in set(remove_cols)]
# Use cols
input_features

['volt_rollingmean_12',
 'rotate_rollingmean_12',
 'pressure_rollingmean_12',
 'vibration_rollingmean_12',
 'volt_rollingmean_24',
 'rotate_rollingmean_24',
 'pressure_rollingmean_24',
 'vibration_rollingmean_24',
 'volt_rollingmean_36',
 'vibration_rollingmean_36',
 'rotate_rollingmean_36',
 'pressure_rollingmean_36',
 'volt_rollingstd_12',
 'rotate_rollingstd_12',
 'pressure_rollingstd_12',
 'vibration_rollingstd_12',
 'volt_rollingstd_24',
 'rotate_rollingstd_24',
 'pressure_rollingstd_24',
 'vibration_rollingstd_24',
 'volt_rollingstd_36',
 'rotate_rollingstd_36',
 'pressure_rollingstd_36',
 'vibration_rollingstd_36',
 'error1sum_rollingmean_24',
 'error2sum_rollingmean_24',
 'error3sum_rollingmean_24',
 'error4sum_rollingmean_24',
 'error5sum_rollingmean_24',
 'comp1sum',
 'comp2sum',
 'comp3sum',
 'comp4sum',
 'age']

In [5]:
# assemble features
va = VectorAssembler(inputCols=(input_features), outputCol='features')
feat_data = va.transform(feat_data).select('machineID','dt_truncated','label_e','features')

# set maxCategories so features with > 10 distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", 
                               outputCol="indexedFeatures", 
                               maxCategories=10).fit(feat_data)

# fit on whole dataset to include all labels in index
labelIndexer = StringIndexer(inputCol="label_e", outputCol="indexedLabel").fit(feat_data)

# split the data into train/test based on date
split_date = "2015-10-30"
training = feat_data.filter(feat_data.dt_truncated < split_date)
testing = feat_data.filter(feat_data.dt_truncated >= split_date)

print(training.count())
print(testing.count())

60434
12708


In [None]:
# Down sample majority class, do we really need this?
# SampleBy returns a stratified sample without replacement based on the fraction given on each stratum
train_downsampled = training.sampleBy('label', fractions={0.0: 0.135, 1.0: 1.0}, seed=123).cache()
train_downsampled.groupby('label').count().show()

testing.groupby('label').count().show()

In [None]:
# Cache results
# cache datasets in memory
train_downsampled.cache()
testing.cache()

# check the number of devices in training and testing data
print(train_downsampled.select('deviceid').distinct().count())
print(testing.select('deviceid').distinct().count())

# GBT Gradient-Boosted Tree

In [26]:
# GBTClassifier currently only supports binary classification.
training.dtypes

[('machineID', 'string'),
 ('dt_truncated', 'timestamp'),
 ('label_e', 'double'),
 ('features', 'vector')]

In [None]:
%%time

# Train a GBT model.
gbt = GBTClassifier(labelCol='label_e', featuresCol='features', maxDepth=10, minInstancesPerNode=5, maxIter=50)

# Chain indexers and GBT in a Pipeline
pipeline_gbt = Pipeline(stages=[labelIndexer, featureIndexer, gbt])

# Train model. This also runs the indexers.
model_gbt = pipeline_gbt.fit(training)

# save model
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
gbt_fileName = '../checkpoints/GradientBoostedTree_' + datestamp;
gbtDirfilename = modelDir + gbt_fileName;
model_gbt.save(gbtDirfilename)

# Make predictions.
predictions_gbt = model_gbt.transform(testing)

# GBT eXtreme Gradient Boosting

In [10]:
!wget https://repo1.maven.org/maven2/com/nvidia/xgboost4j_3.0/1.0.0-0.1.0/xgboost4j_3.0-1.0.0-0.1.0.jar
!wget https://repo1.maven.org/maven2/com/nvidia/xgboost4j-spark_3.0/1.0.0-0.1.0/xgboost4j-spark_3.0-1.0.0-0.1.0.jar

!wget https://raw.githubusercontent.com/srivatsan88/YoutubeLI/master/dataset/covtype_train.parquet --no-check-certificate
!wget https://raw.githubusercontent.com/srivatsan88/YoutubeLI/master/dataset/covtype_test.parquet --no-check-certificate


--2021-03-26 23:04:29--  https://repo1.maven.org/maven2/com/nvidia/xgboost4j_3.0/1.0.0-0.1.0/xgboost4j_3.0-1.0.0-0.1.0.jar
Resolving repo1.maven.org (repo1.maven.org)... 151.101.124.209
Connecting to repo1.maven.org (repo1.maven.org)|151.101.124.209|:443... connected.
ERROR: cannot verify repo1.maven.org's certificate, issued by 'CN=DigiCert SHA2 Secure Server CA,O=DigiCert Inc,C=US':
  Unable to locally verify the issuer's authority.
To connect to repo1.maven.org insecurely, use `--no-check-certificate'.
--2021-03-26 23:04:29--  https://repo1.maven.org/maven2/com/nvidia/xgboost4j-spark_3.0/1.0.0-0.1.0/xgboost4j-spark_3.0-1.0.0-0.1.0.jar
Resolving repo1.maven.org (repo1.maven.org)... 151.101.124.209
Connecting to repo1.maven.org (repo1.maven.org)|151.101.124.209|:443... connected.
ERROR: cannot verify repo1.maven.org's certificate, issued by 'CN=DigiCert SHA2 Secure Server CA,O=DigiCert Inc,C=US':
  Unable to locally verify the issuer's authority.
To connect to repo1.maven.org insecure

--2021-03-26 23:04:32--  https://raw.githubusercontent.com/srivatsan88/YoutubeLI/master/dataset/covtype_test.parquet
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
  Unable to locally verify the issuer's authority.
HTTP request sent, awaiting response... 200 OK
Length: 1591040 (1.5M) [application/octet-stream]
Saving to: 'covtype_test.parquet'

     0K .......... .......... .......... .......... ..........  3% 2.91M 1s
    50K .......... .......... .......... .......... ..........  6% 2.82M 0s
   100K .......... .......... .......... .......... ..........  9% 2.94M 0s
   150K .......... .......... .......... .......... .......... 12% 2.73M 0s
   200K .......... .......... .......... .......... .......... 16% 2.55M 0s
   250K .......... .......... .......... .......... .......... 19% 3.27M 0s
   300K .......... 

In [3]:
spark.sparkContext.addPyFile("xgboost4j-spark_3.0-1.0.0-0.1.0.jar")
from ml.dmlc.xgboost4j.scala.spark import XGBoostClassificationModel, XGBoostClassifier
#training.dtypes

In [7]:
# setting parameters. you can find these parameters in the link above.
params = {'eta': 0.1, 'gamma': 0.1, 'missing': 0.0,
          'treeMethod': 'gpu_hist', 'maxDepth': 3, 
          'growPolicy': 'depthwise', 'lambda_': 1.0,
          'subsample': 1.0, 'numRound': 1000,
          'numWorkers': 1, 'verbosity': 1}

features = [feat for feat in feat_data.columns if feat != 'label_e']

# create XGBoost model object
# xgboost = XGBoostClassifier()\
#             .setLabelCol('label_e')\
#             .setFeaturesCols(features)

xgboost = XGBoostClassifier(
    featuresCol="features", 
    labelCol="label_e", 
)
xgboost.setMissing(0.0)

# # getting the feature names
# target = 'label_e'
# features = [feat for feat in feat_data.columns if feat != target]

# # setting parameters. you can find these parameters in the link above.
# params = {'eta': 0.1, 'gamma': 0.1, 'missing': 0.0,
#           'treeMethod': 'gpu_hist', 'maxDepth': 3, 
#           'growPolicy': 'depthwise', 'lambda_': 1.0,
#           'subsample': 1.0, 'numRound': 1000,
#           'numWorkers': 1, 'verbosity': 1}

# # create XGBoost model object
# xgboost = XGBoostClassifier(**params)\
#             .setLabelCol(target)\
#             .setFeaturesCols(features)

# xgboost.setMaxDepth(2)

XGBoostClassifier_6d72fffbfc9d

In [8]:
start_time = time.time()

model = xgboost.fit(training)

print("Training Time: %s seconds" % (str(time.time() - start_time)))


Py4JJavaError: An error occurred while calling o168.fit.
: ml.dmlc.xgboost4j.java.XGBoostError: XGBoostModel training failed
	at ml.dmlc.xgboost4j.scala.spark.XGBoost$.postTrackerReturnProcessing(XGBoost.scala:848)
	at ml.dmlc.xgboost4j.scala.spark.XGBoost$.$anonfun$trainDistributed$2(XGBoost.scala:720)
	at ml.dmlc.xgboost4j.scala.spark.XGBoost$.$anonfun$trainDistributed$2$adapted(XGBoost.scala:695)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.immutable.List.map(List.scala:298)
	at ml.dmlc.xgboost4j.scala.spark.XGBoost$.trainDistributed(XGBoost.scala:694)
	at ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier.train(XGBoostClassifier.scala:246)
	at ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier.train(XGBoostClassifier.scala:48)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:150)
	at ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier.fit(XGBoostClassifier.scala:191)
	at ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier.fit(XGBoostClassifier.scala:48)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)


In [6]:
# loading data
reader = spark.read
train_data = reader.parquet("covtype_train.parquet")
test_data = reader.parquet("covtype_test.parquet")

# getting the feature names
target = 'target'
features = [feat for feat in train_data.schema.names if feat != target]

# assemble feature columns
va = VectorAssembler(inputCols=(features), outputCol='features')
feat_data = va.transform(train_data).select('target','features')

# setting parameters. you can find these parameters in the link above.
params = {'eta': 0.1, 'gamma': 0.1, 'missing': 0.0,
          'treeMethod': 'gpu_hist', 'maxDepth': 3, 
          'growPolicy': 'depthwise', 'lambda_': 1.0,
          'subsample': 1.0, 'numRound': 1000,
          'numWorkers': 1, 'verbosity': 1}

# create XGBoost model object
xgboost = XGBoostClassifier(**params)\
            .setFeaturesCol("features")\
            .setLabelCol("target")
print(features)
feat_data.limit(10).toPandas().head(10)

['Elevation', 'Aspect', 'Slope', 'Horizontal_Distance_To_Hydrology', 'Vertical_Distance_To_Hydrology', 'Horizontal_Distance_To_Roadways', 'Hillshade_9am', 'Hillshade_Noon', 'Hillshade_3pm', 'Horizontal_Distance_To_Fire_Points', 'Wilderness_Area1', 'Wilderness_Area2', 'Wilderness_Area3', 'Wilderness_Area4', 'Soil_Type1', 'Soil_Type2', 'Soil_Type3', 'Soil_Type4', 'Soil_Type5', 'Soil_Type6', 'Soil_Type7', 'Soil_Type8', 'Soil_Type9', 'Soil_Type10', 'Soil_Type11', 'Soil_Type12', 'Soil_Type13', 'Soil_Type14', 'Soil_Type15', 'Soil_Type16', 'Soil_Type17', 'Soil_Type18', 'Soil_Type19', 'Soil_Type20', 'Soil_Type21', 'Soil_Type22', 'Soil_Type23', 'Soil_Type24', 'Soil_Type25', 'Soil_Type26', 'Soil_Type27', 'Soil_Type28', 'Soil_Type29', 'Soil_Type30', 'Soil_Type31', 'Soil_Type32', 'Soil_Type33', 'Soil_Type34', 'Soil_Type35', 'Soil_Type36', 'Soil_Type37', 'Soil_Type38', 'Soil_Type39', 'Soil_Type40']


Unnamed: 0,target,features
0,6,"(3381.0, 0.0, 18.0, 497.0, 187.0, 4201.0, 190...."
1,1,"(3039.0, 291.0, 12.0, 361.0, 87.0, 618.0, 187...."
2,0,"(3318.0, 353.0, 21.0, 309.0, 84.0, 4548.0, 179..."
3,1,"(2823.0, 67.0, 13.0, 170.0, 54.0, 2514.0, 233...."
4,1,"(3211.0, 16.0, 19.0, 350.0, 36.0, 3254.0, 200...."
5,1,"(2797.0, 129.0, 9.0, 0.0, 0.0, 2400.0, 235.0, ..."
6,0,"(3385.0, 282.0, 8.0, 899.0, 179.0, 1092.0, 198..."
7,0,"(3065.0, 0.0, 4.0, 242.0, 10.0, 3298.0, 214.0,..."
8,0,"(3209.0, 157.0, 7.0, 309.0, 46.0, 5057.0, 228...."
9,1,"(3004.0, 151.0, 18.0, 90.0, 3.0, 3458.0, 240.0..."


In [7]:
# track training time
start_time = time.time()

# train the model
model = xgboost.fit(feat_data)

print("GPU Training Time: %s seconds" % (str(time.time() - start_time)))

Py4JJavaError: An error occurred while calling o266.fit.
: ml.dmlc.xgboost4j.java.XGBoostError: XGBoostModel training failed
	at ml.dmlc.xgboost4j.scala.spark.XGBoost$.postTrackerReturnProcessing(XGBoost.scala:848)
	at ml.dmlc.xgboost4j.scala.spark.XGBoost$.$anonfun$trainDistributed$2(XGBoost.scala:720)
	at ml.dmlc.xgboost4j.scala.spark.XGBoost$.$anonfun$trainDistributed$2$adapted(XGBoost.scala:695)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.immutable.List.map(List.scala:298)
	at ml.dmlc.xgboost4j.scala.spark.XGBoost$.trainDistributed(XGBoost.scala:694)
	at ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier.train(XGBoostClassifier.scala:246)
	at ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier.train(XGBoostClassifier.scala:48)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:150)
	at ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier.fit(XGBoostClassifier.scala:191)
	at ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier.fit(XGBoostClassifier.scala:48)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
