### Data preparation

In [1]:
# Start SparkSession
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("group3_project").getOrCreate()

In [2]:
# Start Spark Context
from pyspark import SparkContext, SparkConf
sc = spark.sparkContext

In [3]:
spark

In [4]:
sc

In [5]:
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.sql.functions import isnull, when, count, col
from pyspark.sql.functions import UserDefinedFunction
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer,IndexToString, VectorAssembler
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import functions

In [6]:
#load data in parquet format
mydf = spark.read.parquet('s3://bigdata-group3/mergedata/hardrive.parquet/*.parquet')

In [7]:
#count the number of rows
mydf.count()

76714860

In [8]:
#print the schema
mydf.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- serial_number: string (nullable = true)
 |-- model: string (nullable = true)
 |-- capacity_bytes: long (nullable = true)
 |-- failure: integer (nullable = true)
 |-- smart_1_normalized: double (nullable = true)
 |-- smart_1_raw: double (nullable = true)
 |-- smart_2_normalized: double (nullable = true)
 |-- smart_2_raw: double (nullable = true)
 |-- smart_3_normalized: double (nullable = true)
 |-- smart_3_raw: double (nullable = true)
 |-- smart_4_normalized: double (nullable = true)
 |-- smart_4_raw: double (nullable = true)
 |-- smart_5_normalized: double (nullable = true)
 |-- smart_5_raw: double (nullable = true)
 |-- smart_7_normalized: double (nullable = true)
 |-- smart_7_raw: double (nullable = true)
 |-- smart_8_normalized: double (nullable = true)
 |-- smart_8_raw: double (nullable = true)
 |-- smart_9_normalized: double (nullable = true)
 |-- smart_9_raw: double (nullable = true)
 |-- smart_10_normalized: double (nullable = t

In [9]:
# Count the NA for each columns
mydf.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in mydf.columns)).show()

+----+-------------+-----+--------------+-------+------------------+-----------+------------------+-----------+------------------+-----------+------------------+-----------+------------------+-----------+------------------+-----------+------------------+-----------+------------------+-----------+-------------------+------------+-------------------+------------+-------------------+------------+-------------------+------------+-------------------+------------+-------------------+------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+---------------

In [10]:
#drop the columns with large number of NA
drop_columns=[
    'smart_2_normalized',
    'smart_2_raw',
    'smart_8_normalized',
    'smart_8_raw',
    'smart_11_normalized',
    'smart_11_raw',
    'smart_13_normalized',
    'smart_13_raw',
    'smart_15_normalized',
    'smart_15_raw',
    'smart_22_normalized',
    'smart_22_raw',
    'smart_177_normalized',
    'smart_177_raw',
    'smart_179_normalized',
    'smart_179_raw',
    'smart_181_normalized',
    'smart_181_raw',
    'smart_182_normalized',
    'smart_182_raw',
    'smart_183_normalized',
    'smart_183_raw',
    'smart_184_normalized',
    'smart_184_raw',
    'smart_189_normalized',
    'smart_189_raw',
    'smart_191_normalized',
    'smart_191_raw',
    'smart_195_normalized',
    'smart_195_raw',
    'smart_196_normalized',
    'smart_196_raw',
    'smart_200_normalized',
    'smart_200_raw',
    'smart_201_normalized',
    'smart_201_raw',
    'smart_220_normalized',
    'smart_220_raw',
    'smart_222_normalized',
    'smart_222_raw',
    'smart_223_normalized',
    'smart_223_raw',
    'smart_224_normalized',
    'smart_224_raw',
    'smart_225_normalized',
    'smart_225_raw',
    'smart_226_normalized',
    'smart_226_raw',
    'smart_235_normalized',
    'smart_235_raw',
    'smart_250_normalized',
    'smart_250_raw',
    'smart_251_normalized',
    'smart_251_raw',
    'smart_252_normalized',
    'smart_252_raw',
    'smart_254_normalized',
    'smart_254_raw',   
    'smart_255_normalized',
    'smart_255_raw'
]
mydf_new=mydf.drop(*drop_columns)

In [11]:
mydf_new.columns

['date',
 'serial_number',
 'model',
 'capacity_bytes',
 'failure',
 'smart_1_normalized',
 'smart_1_raw',
 'smart_3_normalized',
 'smart_3_raw',
 'smart_4_normalized',
 'smart_4_raw',
 'smart_5_normalized',
 'smart_5_raw',
 'smart_7_normalized',
 'smart_7_raw',
 'smart_9_normalized',
 'smart_9_raw',
 'smart_10_normalized',
 'smart_10_raw',
 'smart_12_normalized',
 'smart_12_raw',
 'smart_187_normalized',
 'smart_187_raw',
 'smart_188_normalized',
 'smart_188_raw',
 'smart_190_normalized',
 'smart_190_raw',
 'smart_192_normalized',
 'smart_192_raw',
 'smart_193_normalized',
 'smart_193_raw',
 'smart_194_normalized',
 'smart_194_raw',
 'smart_197_normalized',
 'smart_197_raw',
 'smart_198_normalized',
 'smart_198_raw',
 'smart_199_normalized',
 'smart_199_raw',
 'smart_240_normalized',
 'smart_240_raw',
 'smart_241_normalized',
 'smart_241_raw',
 'smart_242_normalized',
 'smart_242_raw']

In [12]:
mydf_new.count()

76714860

In [13]:
mydf.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- serial_number: string (nullable = true)
 |-- model: string (nullable = true)
 |-- capacity_bytes: long (nullable = true)
 |-- failure: integer (nullable = true)
 |-- smart_1_normalized: double (nullable = true)
 |-- smart_1_raw: double (nullable = true)
 |-- smart_2_normalized: double (nullable = true)
 |-- smart_2_raw: double (nullable = true)
 |-- smart_3_normalized: double (nullable = true)
 |-- smart_3_raw: double (nullable = true)
 |-- smart_4_normalized: double (nullable = true)
 |-- smart_4_raw: double (nullable = true)
 |-- smart_5_normalized: double (nullable = true)
 |-- smart_5_raw: double (nullable = true)
 |-- smart_7_normalized: double (nullable = true)
 |-- smart_7_raw: double (nullable = true)
 |-- smart_8_normalized: double (nullable = true)
 |-- smart_8_raw: double (nullable = true)
 |-- smart_9_normalized: double (nullable = true)
 |-- smart_9_raw: double (nullable = true)
 |-- smart_10_normalized: double (nullable = t

## Model preparation

##### After the exploratory analysis of our columns, we decide to build models to evaluate their classification accuracy.

In [14]:
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, IndexToString, VectorAssembler, Binarizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator,BinaryClassificationEvaluator
from pyspark.ml import Pipeline, Model
import time

In [15]:
#drop the rows with null value
mydf_new=mydf_new.na.drop()

##### There are 57364274 rows after dropping null values.

In [16]:
mydf_new.count()

57364274

##### We will not include "date","model","serial_number" columns in our data since they are string value and have no meaning explaining the failure.
##### We remove the failure from the "features" since it is the dependent variables.

In [17]:
colname=mydf_new.columns
colname.remove('date')
colname.remove('failure')
colname.remove('model')
colname.remove('serial_number')

In [18]:
#Aseember the independent variables as "features"
vectorAssembler_features = VectorAssembler(
    inputCols=colname, 
    outputCol="features")

##### Then we split our data into traininig and testing set. 

In [19]:
splitted_data = mydf_new.randomSplit([0.8, 0.18, 0.02], 24)
train_data = splitted_data[0]
test_data = splitted_data[1]
predict_data = splitted_data[2]

print("Number of training records: " + str(train_data.count()))
print("Number of testing records : " + str(test_data.count()))
print("Number of prediction records : " + str(predict_data.count()))

Number of training records: 45892843
Number of testing records : 10324465
Number of prediction records : 1146966


### Logistic Model

##### We set the failure as dependent variable and others as independent variabels.

In [20]:
log = LogisticRegression(labelCol="failure", featuresCol="features",maxIter=10, regParam=0.01)

In [21]:
pipeline_log = Pipeline(stages=[vectorAssembler_features, log])

In [22]:
model_log = pipeline_log.fit(train_data)

In [23]:
predictions_log = model_log.transform(test_data)
evaluatorLog = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction").setLabelCol("failure")
accuracy_log = evaluatorLog.evaluate(predictions_log)
print("Logistic Regression Accuracy = %g" % accuracy_log)

Logistic Regression Accuracy = 0.853669


##### The logistic model has 85.37% accuracy predicting the failure.

### Random Forest Classification

##### Then we try ramdom forest to classify and predict the failure column.

In [24]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="failure", featuresCol="features", numTrees=10)

In [25]:
#chain features and rf in a pipeline
pipeline_rf= Pipeline(stages=[vectorAssembler_features, rf])

In [26]:
%%time
model_rf = pipeline_rf.fit(train_data)

CPU times: user 58.6 ms, sys: 23.5 ms, total: 82 ms
Wall time: 9min 8s


In [27]:
predictions_rf = model_rf.transform(test_data)
evaluatorRf = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction").setLabelCol("failure")
accuracy_rf = evaluatorRf.evaluate(predictions_rf)

print("Random Forest Accuracy = %g" % accuracy_rf)

Random Forest Accuracy = 0.790199


##### Random Forest model has 79.02% accuracy predicting the failure.

### GBTClassifier

##### Still we use the failure as label column and other numeric columns as feature columns

In [28]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(maxIter=5, maxDepth=2, featuresCol="features",labelCol="failure", seed=1)

In [29]:
#build the pipeline
pipeline_gbt= Pipeline(stages=[vectorAssembler_features, gbt])

In [30]:
model_gbt = pipeline_gbt.fit(train_data)

In [31]:
predictions_gbt = model_gbt.transform(test_data)
evaluatorGbt = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction").setLabelCol("failure")
accuracy_gbt = evaluatorGbt.evaluate(predictions_gbt)

print("Gradient Boosting Tree Accuracy = %g" % accuracy_gbt)

Gradient Boosting Tree Accuracy = 0.74321


##### Gradient Boosting Tree has 74.321% accuracy predicting the failure.
##### Among the three classification models we built, logistic regression performs best predicting the failure.

In [33]:
spark.stop()