In [1]:
from pyspark import *

In [2]:
#Step 1: Read the table from the spark data
census = spark.read.table("adult_csv")
#Step2 : Reading the Table Schema
census.printSchema()

In [3]:
#census1 = census
import pyspark.sql.functions as F
step1 = [F.when(~F.col(x).isin("?","NULL", "NA", "NaN"), F.col(x)).alias(x)  for x in census.columns] 
step2 = census.select(*step1).dropna(how='any')
step3 = step2.drop('numf1','numf2') 
#numf2 is dropped because its numerical lable for education column so its pointless to have both so i used education
#numf1 is finalweight and was beyound the comprehension as to how the datat collectors arrived at that(kaggale)

In [4]:
# normalize my numeric data

from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# UDF for converting column type from vector to double type
unlist = udf(lambda x: round(float(list(x)[0]),3), DoubleType())


# VectorAssembler Transformation - Converting column to vector type
assembler = [VectorAssembler(inputCols= [col], outputCol="Vect_"+col) for col in ['capitalgain','capitalloss']]
# MinMaxScaler Transformation
scaler = [MinMaxScaler(inputCol="Vect_"+col, outputCol="Scaled_"+col) for col in ['capitalgain','capitalloss'] ]

# Pipeline of VectorAssembler and MinMaxScaler
all_stages = assembler + scaler 
pipeline = Pipeline(stages= all_stages)

# Fitting pipeline on dataframe
step4 = pipeline.fit(step3).transform(step3).drop('capitalgain','capitalloss','Vect_capitalgain','Vect_capitalloss').withColumn('Scaled_capitalgain', unlist('Scaled_capitalgain')).withColumn('Scaled_capitalloss', unlist('Scaled_capitalloss'))

In [5]:
from pyspark.ml.feature import Bucketizer
from pyspark.ml import Pipeline
#for age

bucketizer_age = Bucketizer(splits=[0.0, 30.0, 45.0, 60.0,float("inf")], inputCol="age", outputCol="index_age_group")
bucketizer_hrwk = Bucketizer(splits=[0.0, 25.0, 45.0, float("inf")], inputCol="hr_per_wk", outputCol="index_hr/wk")
bucketizer_cg = Bucketizer(splits=[0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, float("inf")], inputCol="Scaled_capitalgain", outputCol="index_capital_gain")
bucketizer_cl = Bucketizer(splits=[0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, float("inf")], inputCol="Scaled_capitalloss", outputCol="index_capital_loss")
# Transform original data into its bucket index.
step5 = bucketizer_age.transform(step4).drop("age")
step5 = bucketizer_hrwk.transform(step5).drop("hr_per_wk")
step5 = bucketizer_cg.transform(step5).drop("Scaled_capitalgain")
step5 = bucketizer_cl.transform(step5).drop("Scaled_capitalloss")
step5 = step5.withColumnRenamed("index_age_group","age").withColumnRenamed("index_hr/wk","hr/wk").withColumnRenamed("index_capital_gain","capital_gain").withColumnRenamed("index_capital_loss","capital_loss")

In [6]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

# Clearly identifying the job categories
def employementtype(job):
  if(job=='Never-worked'or job=='Without-pay'):
    return "Unemployed"
  if(job=='State-gov' or job=='Local-gov'):
    return "Govt"
  if(job=='Self-emp-inc' or job=='Self-emp-not-inc'):
    return "self_emp"
  else:
    return(job)
  
# For Distinguishing mrital status into 3 categories    
def maritalstat(mar):
  #mar= as.character(mar)
  
  if mar in ['Separated','Widowed','Divorced']:
    return "Not-Married"
  elif mar== 'Never-married':
    return "Never-married"
  else: 
    return "Married"

  
# For Distinguishing countries into continents  
Asia=['China','Hong','India','Iran','Cambodia','Japan', 'Laos',
        ' Philippines' ,' Vietnam' ,' Taiwan', ' Thailand']
North_America = ['Canada','United-States','Puerto-Rico' ]

Europe = ['England' ,'France', 'Germany' ,'Greece','Holand-Netherlands','Hungary',
            'Ireland','Italy','Poland','Portugal','Scotland','Yugoslavia']

Latin_and_SouthAmerica = ['Columbia','Cuba','Dominican-Republic','Ecuador',
                             'El-Salvador','Guatemala',' Haiti',' Honduras',
                             'Mexico','Nicaragua','Outlying-US(Guam-USVI-etc)','Peru',
                             'Jamaica','Trinadad&Tobago']
Other= ['South']

def regionlist(countries):
  if countries in Asia:
    return "Asia"
  if countries in North_America:
    return "North America"
  if countries in Europe:
    return "Europe"
  if countries in Latin_and_SouthAmerica:
    return "Latin & South America"
  else:
    return "Others"  


#CALLING USER DEFINED FUNCTIONS
etype_udf = udf(employementtype,StringType())
step6 = step5.withColumn("employer_type", etype_udf("employer_type"))

# udf(user defined function) the function and its type and then use withcolumn to apply the transformation
marital_udf = udf(maritalstat,StringType())
step6 = step6.withColumn("marital", marital_udf("marital"))

region_udf = udf(regionlist,StringType())
step6 = step6.withColumn("region",region_udf("region"))

In [8]:
#CHECK FOR DISTINCT VALUES in every feature/variable/column
for col,dtype in step6.dtypes:
    print(col)
    step6.select(col).distinct().show()
    print("----------------------------------------")

In [9]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

#feature_column = [col for col,dtype  in modeldata.dtypes if dtype =='string']

#STRING INDEXER FOR ALL THE COLUMNS WITH DATATYPES== STRING
String_index = [StringIndexer(inputCol=col, outputCol='index_'+col) for col,dtype  in step6.dtypes if dtype =='string' if col!='income']
String_index += [StringIndexer(inputCol='income', outputCol='label')]


#ONEHOT INDEXER FOR ALL THE COLUMNS NUMERICAL TYPE
Onehot_index = [OneHotEncoder(dropLast=False, inputCol='index_'+col, outputCol='encoded_'+col) for col,dtype  in step6.dtypes if col not in ['age', 'hr/wk','capital_gain','capital_loss','income' ] ]
Onehot_index += [OneHotEncoder(inputCol='age', outputCol='encoded_age')]
Onehot_index += [OneHotEncoder(inputCol='hr/wk', outputCol='encoded_hr/wk')]
Onehot_index += [OneHotEncoder(inputCol='capital_gain', outputCol='encoded_capital_gain')]
Onehot_index += [OneHotEncoder(inputCol='capital_loss', outputCol='encoded_capital_loss')]


#GENERATING ONE FEATURE VECTOR WHICH WILL GO INTO MACHINE LEARNING MODEL
vectorassembler_stage = VectorAssembler(inputCols=['encoded_' + col for col in step6.columns if col!='income'], outputCol='features')


#PRINT ALL THE JOBS
print(String_index)
print(Onehot_index)
print(vectorassembler_stage)

In [10]:
#PIPELINE CALLED ALL STAGE ==>> STRING INDEXING + ONE HOT ENCODING + VECTOR ASSEMBLER
all_stages = String_index + Onehot_index + [vectorassembler_stage]
pipelined_fit = Pipeline(stages=all_stages).fit(step6)

# SELECTING ONLY THE ENCODED COLUMNS TO SEE THE TRANSFORMATION
final_columns = ['encoded_'+col for col,dtype  in step6.dtypes if col!='income'  ] + ['features', 'label']
step7 = pipelined_fit.transform(step6).select(final_columns)
display(step7.limit(10))

encoded_employer_type,encoded_education,encoded_marital,encoded_occupation,encoded_relationship,encoded_race,encoded_gender,encoded_region,encoded_age,encoded_hr/wk,encoded_capital_gain,encoded_capital_loss,features,label
"List(0, 5, List(0), List(1.0))","List(0, 16, List(5), List(1.0))","List(0, 3, List(1), List(1.0))","List(0, 14, List(6), List(1.0))","List(0, 6, List(2), List(1.0))","List(0, 5, List(1), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 5, List(0), List(1.0))","List(0, 3, List(0), List(1.0))","List(0, 2, List(1), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 79, List(0, 10, 22, 30, 40, 45, 49, 51, 56, 60, 61, 70), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))",0.0
"List(0, 5, List(0), List(1.0))","List(0, 16, List(0), List(1.0))","List(0, 3, List(0), List(1.0))","List(0, 14, List(9), List(1.0))","List(0, 6, List(0), List(1.0))","List(0, 5, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 5, List(0), List(1.0))","List(0, 3, List(1), List(1.0))","List(0, 2, List(), List())","List(0, 9, List(0), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 79, List(0, 5, 21, 33, 38, 44, 49, 51, 57, 61, 70), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))",0.0
"List(0, 5, List(2), List(1.0))","List(0, 16, List(6), List(1.0))","List(0, 3, List(0), List(1.0))","List(0, 14, List(11), List(1.0))","List(0, 6, List(0), List(1.0))","List(0, 5, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 5, List(0), List(1.0))","List(0, 3, List(0), List(1.0))","List(0, 2, List(1), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 79, List(2, 11, 21, 35, 38, 44, 49, 51, 56, 60, 61, 70), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))",1.0
"List(0, 5, List(0), List(1.0))","List(0, 16, List(1), List(1.0))","List(0, 3, List(0), List(1.0))","List(0, 14, List(6), List(1.0))","List(0, 6, List(0), List(1.0))","List(0, 5, List(1), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 5, List(0), List(1.0))","List(0, 3, List(1), List(1.0))","List(0, 2, List(1), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 79, List(0, 6, 21, 30, 38, 45, 49, 51, 57, 60, 61, 70), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))",1.0
"List(0, 5, List(0), List(1.0))","List(0, 16, List(7), List(1.0))","List(0, 3, List(1), List(1.0))","List(0, 14, List(5), List(1.0))","List(0, 6, List(1), List(1.0))","List(0, 5, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 5, List(0), List(1.0))","List(0, 3, List(1), List(1.0))","List(0, 2, List(1), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 79, List(0, 12, 22, 29, 39, 44, 49, 51, 57, 60, 61, 70), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))",0.0
"List(0, 5, List(1), List(1.0))","List(0, 16, List(9), List(1.0))","List(0, 3, List(0), List(1.0))","List(0, 14, List(1), List(1.0))","List(0, 6, List(0), List(1.0))","List(0, 5, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 5, List(0), List(1.0))","List(0, 3, List(), List())","List(0, 2, List(1), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 79, List(1, 14, 21, 25, 38, 44, 49, 51, 60, 61, 70), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))",1.0
"List(0, 5, List(0), List(1.0))","List(0, 16, List(1), List(1.0))","List(0, 3, List(1), List(1.0))","List(0, 14, List(5), List(1.0))","List(0, 6, List(3), List(1.0))","List(0, 5, List(0), List(1.0))","List(0, 2, List(1), List(1.0))","List(0, 5, List(0), List(1.0))","List(0, 3, List(0), List(1.0))","List(0, 2, List(1), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 79, List(0, 6, 22, 29, 41, 44, 50, 51, 56, 60, 61, 70), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))",0.0
"List(0, 5, List(0), List(1.0))","List(0, 16, List(8), List(1.0))","List(0, 3, List(0), List(1.0))","List(0, 14, List(0), List(1.0))","List(0, 6, List(0), List(1.0))","List(0, 5, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 5, List(0), List(1.0))","List(0, 3, List(2), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 79, List(0, 13, 21, 24, 38, 44, 49, 51, 58, 59, 61, 70), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))",0.0
"List(0, 5, List(0), List(1.0))","List(0, 16, List(0), List(1.0))","List(0, 3, List(0), List(1.0))","List(0, 14, List(6), List(1.0))","List(0, 6, List(0), List(1.0))","List(0, 5, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 5, List(0), List(1.0))","List(0, 3, List(), List())","List(0, 2, List(1), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 79, List(0, 5, 21, 30, 38, 44, 49, 51, 60, 61, 70), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))",1.0
"List(0, 5, List(3), List(1.0))","List(0, 16, List(2), List(1.0))","List(0, 3, List(0), List(1.0))","List(0, 14, List(3), List(1.0))","List(0, 6, List(0), List(1.0))","List(0, 5, List(0), List(1.0))","List(0, 2, List(0), List(1.0))","List(0, 5, List(0), List(1.0))","List(0, 3, List(1), List(1.0))","List(0, 2, List(1), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 9, List(0), List(1.0))","List(0, 79, List(3, 7, 21, 27, 38, 44, 49, 51, 57, 60, 61, 70), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))",0.0


# MACHINE LEARNING Models

In [12]:
#SELECTING COLUMNS FOR MACHINE LEARNING MODEL
mldata = step7.select("features","label")

#1: Logistic Regression

In [14]:
(trainDF, testDF) = mldata.randomSplit([0.8, 0.2], seed=1231)
trainDF.cache()
testDF.cache()

In [15]:
from pyspark.ml.classification import LogisticRegression

# Load training data


lr = LogisticRegression()

# Fit the model
lrModel = lr.fit(trainDF)

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))
lrModel.summary.accuracy

In [16]:
#for test df
result = lrModel.transform(testDF)
result.select("prediction","label","features").show(10)

In [17]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator()
print("AUC: %(result)s" % {"result": evaluator.evaluate(result)})
display(lrModel, trainDF, "ROC")

False Positive Rate,True Positive Rate,Threshold
0.0,0.0,0.9997586412226628
0.0,0.0476190476190476,0.9997586412226628
0.0,0.0952380952380952,0.9964595733598344
0.0,0.1428571428571428,0.8410533217230212
0.0,0.1904761904761904,0.8343621720435789
0.0123456790123456,0.1904761904761904,0.812373482632714
0.0123456790123456,0.238095238095238,0.7940022908330541
0.0123456790123456,0.2857142857142857,0.7868378812055389
0.0123456790123456,0.3333333333333333,0.7533533727829665
0.0123456790123456,0.4285714285714285,0.7439042883617669


In [18]:
display(lrModel, trainDF)

fitted values,residuals
-2.0298666724904644,-0.1161026038095926
-1.5722461622803294,-0.1718964192582363
-0.9940821593305624,-0.2701065291953623
5.639961330495028,0.0035404266401655
-0.4799461321551436,-0.3822648454185685
-0.717856821177567,0.6721349000111311
-0.3298662341769112,-0.4182731707994022
0.1277542760332227,-0.5318952002616767
0.1277542760332227,0.4681047997383233
-1.3726977500535278,-0.2021843339815066


In [19]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Let's use the run-of-the-mill evaluator
evaluator = BinaryClassificationEvaluator(labelCol='label')

# We have only two choices: area under ROC and PR curves :-(
auroc = evaluator.evaluate(result, {evaluator.metricName: "areaUnderROC"})
auprc = evaluator.evaluate(result, {evaluator.metricName: "areaUnderPR"})
print("Area under ROC Curve: {:.4f}".format(auroc))
print("Area under PR Curve: {:.4f}".format(auprc))

In [20]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
lr.setRegParam(0.08)

model = lr.fit(trainDF)
#training = model.transform(trainDF)
result = model.transform(testDF)
print("evaluations %(result)s" % {"result": BinaryClassificationEvaluator().evaluate(result)})

In [21]:
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import FloatType

#important: need to cast to float type, and order by prediction, else it won't work
preds_and_labels = result.select(['prediction','label']).withColumn('label', F.col('label').cast(FloatType())).orderBy('prediction')
#select only prediction and label columns
preds_and_labels = preds_and_labels.select(['prediction','label'])

metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))

print(metrics.confusionMatrix().toArray())


In [22]:
# We are trying for all possible evaluations possible

In [23]:
from pyspark.ml.classification import LogisticRegression
logr = LogisticRegression(featuresCol='features', labelCol='label')


In [24]:
from pyspark.ml.tuning import ParamGridBuilder
param_grid = ParamGridBuilder().\
      addGrid(logr.regParam, [0, 0.1, 0.2, 0.5, 1]).\
      addGrid(logr.elasticNetParam, [0, 0.1, 0.2, 0.5, 1]).\
      addGrid(logr.maxIter, [5,10,20,50,100]).\
      build()

In [25]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")

In [26]:
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator=logr, evaluator=evaluator, estimatorParamMaps=param_grid, numFolds=3)
cv_model = cv.fit(trainDF)  # fitiing data to my cross validation model

In [27]:
show_columns = ['features', 'label', 'prediction', 'rawPrediction', 'probability']
pred_training_cv = cv_model.transform(trainDF)
pred_training_cv.select(show_columns).show(5, truncate=False)

In [28]:
pred_test_cv = cv_model.transform(testDF)
pred_test_cv.select(show_columns).show(5, truncate=False)

In [29]:
print('Intercept: ' + str(cv_model.bestModel.intercept) + "\n" 'coefficients: ' + str(cv_model.bestModel.coefficients))

In [30]:
print('Logistic Regression', "\n",'The best RegParam is: ', cv_model.bestModel._java_obj.getRegParam(), "\n",'The best ElasticNetParam is:', cv_model.bestModel._java_obj.getElasticNetParam(), "\n",'The best Iteration is:',cv_model.bestModel._java_obj.getMaxIter() , "\n", 'Area under ROC is:', cv_model.bestModel.summary.areaUnderROC)

In [31]:
cv_model.avgMetrics

#2: Support Vector Machine

In [33]:
from pyspark.ml.classification import LinearSVC

# Load training data


lsvc = LinearSVC(maxIter=10, regParam=0.1)

# Fit the model
lsvcModel = lsvc.fit(trainDF)

# Print the coefficients and intercept for linear SVC
print("Coefficients: " + str(lsvcModel.coefficients))
print("Intercept: " + str(lsvcModel.intercept))

lsvcresult = lsvcModel.transform(testDF)
lsvcresult.select("prediction","label","features").show(10)

#Compute accuracy of test
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print("evaluation: %(result)s" % {"result": evaluator.evaluate(lsvcresult)})

In [34]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Let's use the run-of-the-mill evaluator
svmevaluator = BinaryClassificationEvaluator()

# We have only two choices: area under ROC and PR curves :-(
svmauroc = svmevaluator.evaluate(lsvcresult, {svmevaluator.metricName: "areaUnderROC"})
svmauprc = svmevaluator.evaluate(lsvcresult, {svmevaluator.metricName: "areaUnderPR"})
print("Area under ROC Curve: {:.4f}".format(svmauroc))
print("Area under PR Curve: {:.4f}".format(svmauprc))

In [35]:
#ESTIMATOR
from pyspark.ml.classification import LinearSVC
lsvm = LinearSVC(featuresCol='features', labelCol='label')

#GRID VECTOR

from pyspark.ml.tuning import ParamGridBuilder
param_grid_svm = ParamGridBuilder().\
      addGrid(lsvm.regParam, [0, 0.1, 0.2, 0.5, 1]).\
      addGrid(lsvm.maxIter, [5,10,20,50,100]).\
      build()

#Evaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
svmevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")

#CROSS VALIDATION
from pyspark.ml.tuning import CrossValidator
cv_svm = CrossValidator(estimator=lsvm, evaluator=svmevaluator, estimatorParamMaps=param_grid_svm, numFolds=3)
cv_svm_model = cv_svm.fit(trainDF)  # fitiing data to my cross validation model

show_columns = ['features', 'label', 'prediction', 'rawPrediction']
pred_training_svm = cv_svm_model.transform(trainDF)
pred_training_svm.select(show_columns).show(5, truncate=False)

pred_test_svm = cv_svm_model.transform(testDF)
pred_test_svm.select(show_columns).show(5, truncate=False)

print('Support Vector Machine', "\n",'The best RegParam is: ', cv_svm_model.bestModel._java_obj.getRegParam(),  "\n",'The best Iteration is:',cv_svm_model.bestModel._java_obj.getMaxIter() , "\n", 'Area under ROC is:', svmevaluator.evaluate(pred_test_svm, {svmevaluator.metricName: "areaUnderROC"}))

In [36]:
cv_svm_model.avgMetrics

#3: Naive Bayes

In [38]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# train the model
nbmodel = nb.fit(trainDF)

# select example rows to display.
nbresult = model.transform(testDF)
nbresult.select("prediction","label","features").show(10)

# compute accuracy on the test set
nbevaluator = BinaryClassificationEvaluator()
accuracy = nbevaluator.evaluate(nbresult)
print("evaluations: %(nbresult)s" % {"nbresult": nbevaluator.evaluate(nbresult)})

In [39]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Let's use the run-of-the-mill evaluator
nbevaluator = BinaryClassificationEvaluator()

# We have only two choices: area under ROC and PR curves :-(
nbauroc = nbevaluator.evaluate(nbresult, {nbevaluator.metricName: "areaUnderROC"})
nbauprc = nbevaluator.evaluate(nbresult, {nbevaluator.metricName: "areaUnderPR"})
print("Area under ROC Curve: {:.4f}".format(nbauroc))
print("Area under PR Curve: {:.4f}".format(nbauprc))

In [40]:
#ESTIMATOR
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(featuresCol='features', labelCol='label')

#GRID VECTOR

from pyspark.ml.tuning import ParamGridBuilder
param_grid_nb = ParamGridBuilder().\
      addGrid(nb.smoothing, [0.0,1.0,2.0,4.0,6.0,8.0]).\
      addGrid(nb.modelType, ["multinomial", "bernoulli"]).\
      build()

#Evaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
nbevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")

#CROSS VALIDATION
from pyspark.ml.tuning import CrossValidator
cv_nb = CrossValidator(estimator=nb, evaluator=nbevaluator, estimatorParamMaps=param_grid_nb, numFolds=3)
cv_nb_model = cv_nb.fit(trainDF)  # fitiing data to my cross validation model

show_columns = ['features', 'label', 'prediction', 'rawPrediction', 'probability']
pred_training_nb = cv_nb_model.transform(trainDF)
pred_training_nb.select(show_columns).show(5, truncate=False)

pred_test_nb = cv_nb_model.transform(testDF)
pred_test_nb.select(show_columns).show(5, truncate=False)

print('Naive Bayes ',"\n",'The best Smoothening is: ', cv_nb_model.bestModel._java_obj.getSmoothing(), "\n",'The best model type is:', cv_nb_model.bestModel._java_obj.getModelType(), "\n", 'Area under ROC is:', nbevaluator.evaluate(pred_test_nb, {nbevaluator.metricName: "areaUnderROC"}))
cv_nb_model.avgMetrics

##nbauroc = nbevaluator.evaluate(nbresult, {pred_test_nb.metricName: "areaUnderROC"})
#nbauprc = nbevaluator.evaluate(nbresult, {pred_test_nb.metricName: "areaUnderPR"})


In [41]:
cv_nb_model.avgMetrics

# Random Forest

In [43]:
from pyspark.ml.classification import RandomForestClassifier

from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Train a RandomForest model.
rf = RandomForestClassifier( numTrees=10)


# Train model.  This also runs the indexers.
rfmodel = rf.fit(trainDF)

# Make predictions.
rfresult = rfmodel.transform(testDF)

# Select example rows to display.
rfresult.select("prediction","label","features").show(10)

# Select (prediction, true label) and compute test error
rfevaluator = BinaryClassificationEvaluator()
print("evaluations: %(rfresult)s" % {"rfresult": rfevaluator.evaluate(rfresult)})



In [44]:
# We have only two choices: area under ROC and PR curves :-(
rfauroc = rfevaluator.evaluate(rfresult, {rfevaluator.metricName: "areaUnderROC"})
rfauprc = rfevaluator.evaluate(rfresult, {rfevaluator.metricName: "areaUnderPR"})
print("Area under ROC Curve: {:.4f}".format(rfauroc))
print("Area under PR Curve: {:.4f}".format(rfauprc))

In [45]:
#ESTIMATOR
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Train a RandomForest model.
rf = RandomForestClassifier(featuresCol='features', labelCol='label')

#GRID VECTOR
from pyspark.ml.tuning import ParamGridBuilder
param_grid_rf = ParamGridBuilder().\
      addGrid(rf.impurity,['gini']).\
      addGrid(rf.maxDepth, [2, 3, 4]).\
      addGrid(rf.minInfoGain, [0.0, 0.1, 0.2, 0.3]).\
      addGrid(rf.numTrees,[20,40,60,80,100]).\
      build()

#Evaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
rfevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")

#CROSS VALIDATION
from pyspark.ml.tuning import CrossValidator
cv_rf = CrossValidator(estimator=rf, evaluator=rfevaluator, estimatorParamMaps=param_grid_rf, numFolds=3)
cv_rf_model = cv_rf.fit(trainDF)  # fitiing data to my cross validation model

show_columns = ['features', 'label', 'prediction', 'rawPrediction', 'probability']
pred_training_rf = cv_rf_model.transform(trainDF)
pred_training_rf.select(show_columns).show(5, truncate=False)

pred_test_rf = cv_rf_model.transform(testDF)
pred_test_rf.select(show_columns).show(5, truncate=False)

print('Random forest ',"\n",'The best Max Depth is: ', cv_rf_model.bestModel._java_obj.getMaxDepth(), "\n",'The best min Info gain is:', cv_rf_model.bestModel._java_obj.getMinInfoGain(), "\n", 'Area under ROC is:', rfevaluator.evaluate(pred_test_rf, {rfevaluator.metricName: "areaUnderROC"}))
#rfmodel.trees
##nbauroc = rfevaluator.evaluate(pred_test_rf, {rfevaluator.metricName: "areaUnderROC"})
#nbauprc = rfevaluator.evaluate(pred_test_rf, {rfevaluator.metricName: "areaUnderPR"})

In [46]:
cv_rf_model.avgMetrics

#5: Gradient Boost

In [48]:
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Train a GBT model.
gb = GBTClassifier()
# Chain indexers and GBT in a Pipeline

# Train model.  This also runs the indexers.
gbmodel = gb.fit(trainDF)

# Make predictions.
gbresult = gbmodel.transform(testDF)

# Select example rows to display.
gbresult.select("prediction","label","features").show(5)

# Select (prediction, true label) and compute test error
gbevaluator = BinaryClassificationEvaluator()

print("evaluations: %(gbresult)s" % {"gbresult": gbevaluator.evaluate(gbresult)})


In [49]:
# We have only two choices: area under ROC and PR curves :-(
gbauroc = gbevaluator.evaluate(gbresult, {gbevaluator.metricName: "areaUnderROC"})
gbauprc = gbevaluator.evaluate(gbresult, {gbevaluator.metricName: "areaUnderPR"})
print("Area under ROC Curve: {:.4f}".format(gbauroc))
print("Area under PR Curve: {:.4f}".format(gbauprc))

In [50]:
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# ESTIMATOR
gbt = GBTClassifier(featuresCol='features', labelCol='label')


#GRID VECTOR
from pyspark.ml.tuning import ParamGridBuilder
param_grid_gbt = ParamGridBuilder().\
    addGrid(gbt.maxDepth, [2, 3, 4]).\
    addGrid(gbt.minInfoGain, [0.0, 0.1, 0.2]).\
    addGrid(gbt.stepSize, [0.02, 0.05, 0.1]).\
    addGrid(gb.maxIter,[20,40,60,80,100]).\
    build()

#Evaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
gbtevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")

#CROSS VALIDATION
from pyspark.ml.tuning import CrossValidator
cv_gbt = CrossValidator(estimator=gbt, evaluator=gbtevaluator, estimatorParamMaps=param_grid_gbt)
cv_gbt_model = cv_gbt.fit(trainDF)  # fitiing data to my cross validation model

show_columns = ['features', 'label', 'prediction', 'rawPrediction', 'probability']
pred_training_gbt = cv_gbt_model.transform(trainDF)
pred_training_gbt.select(show_columns).show(5, truncate=False)

pred_test_gbt = cv_gbt_model.transform(testDF)
pred_test_gbt.select(show_columns).show(5, truncate=False)


print('Gradient Boosting ',"\n",'The best Max Depth is: ', cv_gbt_model.bestModel._java_obj.getMaxDepth(), "\n",'The best min Info gain is:',cv_gbt_model.bestModel._java_obj.getMinInfoGain(), "\n", 'step size: ', cv_gbt_model.bestModel._java_obj.getStepSize(),"\n" ,'Area under ROC is:', gbtevaluator.evaluate(pred_test_gbt, {gbtevaluator.metricName: "areaUnderROC"}))

In [51]:
cv_gbt_model.avgMetrics

In [52]:
from pyspark.mllib.evaluation import MulticlassMetrics
scoreandlabels = gbresult.rdd.map(lambda z: (z["label"], z["prediction"]))
metric = MulticlassMetrics(scoreandlabels)


In [53]:
#model_stages = cv + cv_svm + cv_nb+cv_rf +cv_gbt
#pipelined_models = Pipeline(stages= model_stages).fit(trainDF)
#pipelined_result = pipelined_models.transform(testDF)

#MODEL ACCURACY 
(on training data)

In [55]:
print('Models and their Performance',"\n")
print('Logistic Regression',evaluator.evaluate(pred_training_cv, {evaluator.metricName: "areaUnderROC"}))
print('Support Vector Machine',svmevaluator.evaluate(pred_training_svm, {svmevaluator.metricName: "areaUnderROC"}))
print('Naive Bayes', nbevaluator.evaluate(pred_training_nb, {nbevaluator.metricName: "areaUnderROC"}))
print('Random forest', rfevaluator.evaluate(pred_training_rf, {rfevaluator.metricName: "areaUnderROC"}))
print('Gradient Boost', gbtevaluator.evaluate(pred_training_gbt, {gbtevaluator.metricName: "areaUnderROC"}))


#MODEL PREDICTION ACCURACY
(on test data)

In [57]:
print('Models and their Performance',"\n")
print('Logistic Regression',evaluator.evaluate(pred_test_cv, {evaluator.metricName: "areaUnderROC"}))
print('Support Vector Machine',svmevaluator.evaluate(pred_test_svm, {svmevaluator.metricName: "areaUnderROC"}))
print('Naive Bayes', nbevaluator.evaluate(pred_test_nb, {nbevaluator.metricName: "areaUnderROC"}))
print('Random forest', rfevaluator.evaluate(pred_test_rf, {rfevaluator.metricName: "areaUnderROC"}))
print('Gradient Boost', gbtevaluator.evaluate(pred_test_gbt, {gbtevaluator.metricName: "areaUnderROC"}))

#ROC V/S PR

In [59]:
print('Models and their Performance',"\n")
print('Logistic Regression: ROC: ',evaluator.evaluate(pred_training_cv, {evaluator.metricName: "areaUnderROC"}), ', PR: ',evaluator.evaluate(pred_training_cv, {evaluator.metricName: "areaUnderPR"}))
print('Support Vector Machine',svmevaluator.evaluate(pred_training_svm, {svmevaluator.metricName: "areaUnderROC"}), ', PR: ',svmevaluator.evaluate(pred_training_svm, {svmevaluator.metricName: "areaUnderPR"}))
print('Naive Bayes', nbevaluator.evaluate(pred_training_nb, {nbevaluator.metricName: "areaUnderROC"}),', PR: ' , nbevaluator.evaluate(pred_training_nb, {nbevaluator.metricName: "areaUnderPR"}))
print('Random forest', rfevaluator.evaluate(pred_training_rf, {rfevaluator.metricName: "areaUnderROC"}), ', PR: ', rfevaluator.evaluate(pred_training_rf, {rfevaluator.metricName: "areaUnderPR"}))
print('Gradient Boost', gbtevaluator.evaluate(pred_training_gbt, {gbtevaluator.metricName: "areaUnderROC"}),', PR: ' , gbtevaluator.evaluate(pred_training_gbt ,{gbtevaluator.metricName: "areaUnderPR"}))