In [1]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LinearSVC, RandomForestClassifier
from pyspark.ml.feature import IndexToString, Normalizer, StringIndexer, VectorAssembler, VectorIndexer
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import expr
from pyspark.sql.session import SparkSession
from pyspark.sql.types import BooleanType
from pyspark.mllib.evaluation import MulticlassMetrics
from helpers.helper_functions import translate_to_file_string
from pyspark.ml.classification import LinearSVC
import pandas as pd
import numpy as np

# for pretty printing
def printDf(sprkDF): 
    newdf = sprkDF.toPandas()
    from IPython.display import display, HTML
    return HTML(newdf.to_html())

In [2]:
inputFile = translate_to_file_string("../data/mushrooms.csv")

In [3]:
spark = (SparkSession
       .builder
       .appName("Mushroom/Classification")
       .getOrCreate())

In [4]:
df = spark.read.option("header", "true") \
       .option("inferSchema", "true") \
       .option("delimiter", ",") \
       .csv(inputFile)

In [5]:
labelIndexer        = StringIndexer().setInputCol("class").setOutputCol("label").fit(df)
#cap_shapeIndexer    = StringIndexer().setInputCol("cap-shape").setOutputCol("cap_shapeNUM").fit(df)
#cap_surfaceIndexer  = StringIndexer().setInputCol("cap-surface").setOutputCol("cap_surfaceNUM").fit(df)
#cap_colorIndexer    = StringIndexer().setInputCol("cap-color").setOutputCol("cap_colorNUM").fit(df)
#bruisesIndexer      = StringIndexer().setInputCol("bruises").setOutputCol("bruisesNUM").fit(df)
odorIndexer         = StringIndexer().setInputCol("odor").setOutputCol("odorNUM").fit(df)
#gill_attachmentIndexer  = StringIndexer().setInputCol("gill-attachment").setOutputCol("gill_attachmentNUM").fit(df)
#gill_spacingIndexer     = StringIndexer().setInputCol("gill-spacing").setOutputCol("gill_spacingNUM").fit(df)
gill_sizeIndexer        = StringIndexer().setInputCol("gill-size").setOutputCol("gill_sizeNUM").fit(df)
gill_colorIndexer       = StringIndexer().setInputCol("gill-color").setOutputCol("gill_colorNUM").fit(df)
#stalk_shapeIndexer      = StringIndexer().setInputCol("stalk-shape").setOutputCol("stalk_shapeNUM").fit(df)
stalk_rootIndexer       = StringIndexer().setInputCol("stalk-root").setOutputCol("stalk_rootNUM").fit(df)
#stalk_surface_above_ringIndexer  = StringIndexer().setInputCol("stalk-surface-above-ring").setOutputCol("stalk_surface_above_ringNUM").fit(df)
#stalk_surface_below_ringIndexer  = StringIndexer().setInputCol("stalk-surface-below-ring").setOutputCol("stalk_surface_below_ringNUM").fit(df)
#stalk_color_above_ringIndexer    = StringIndexer().setInputCol("stalk-color-above-ring").setOutputCol("stalk_color_above_ringNUM").fit(df)
#stalk_color_below_ringIndexer    = StringIndexer().setInputCol("stalk-color-below-ring").setOutputCol("stalk_color_below_ringNUM").fit(df)
#veil_typeIndexer            = StringIndexer().setInputCol("veil-type").setOutputCol("veil_typeNUM").fit(df)
#veil_colorIndexer           = StringIndexer().setInputCol("veil-color").setOutputCol("veil_colorNUM").fit(df)
#ring_numberIndexer          = StringIndexer().setInputCol("ring-number").setOutputCol("ring_numberNUM").fit(df)
ring_typeIndexer            = StringIndexer().setInputCol("ring-type").setOutputCol("ring_typeNUM").fit(df)
spore_print_colorIndexer    = StringIndexer().setInputCol("spore-print-color").setOutputCol("spore_print_colorNUM").fit(df)
populationIndexer           = StringIndexer().setInputCol("population").setOutputCol("populationNUM").fit(df)
#habitatIndexer              = StringIndexer().setInputCol("habitat").setOutputCol("habitatNUM").fit(df)

In [6]:
featureCols = df.columns.copy()
featureCols.remove("class")
featureCols.remove("cap-shape")
featureCols.remove("cap-surface")
featureCols.remove("cap-color")
featureCols.remove("odor")
featureCols.remove("bruises")
featureCols.remove("gill-attachment")
featureCols.remove("gill-spacing")
featureCols.remove("gill-size")
featureCols.remove("gill-color")
featureCols.remove("stalk-shape")
featureCols.remove("stalk-root")
featureCols.remove("stalk-surface-above-ring")
featureCols.remove("stalk-surface-below-ring")
featureCols.remove("stalk-color-above-ring")
featureCols.remove("stalk-color-below-ring")
featureCols.remove("veil-type")
featureCols.remove("veil-color")
featureCols.remove("ring-number")
featureCols.remove("ring-type")
featureCols.remove("spore-print-color")
featureCols.remove("population")
featureCols.remove("habitat")

#featureCols = featureCols + ["cap_shapeNUM","cap_surfaceNUM", "cap_colorNUM", "bruisesNUM", "odorNUM", "gill_attachmentNUM", "gill_spacingNUM", "gill_sizeNUM", "gill_colorNUM", "stalk_shapeNUM", "stalk_rootNUM", "stalk_surface_above_ringNUM", "stalk_surface_below_ringNUM", "stalk_color_above_ringNUM", "stalk_color_below_ringNUM", "veil_colorNUM", "ring_numberNUM", "ring_typeNUM", "spore_print_colorNUM", "populationNUM", "habitatNUM"]
featureCols = featureCols + ["odorNUM","gill_sizeNUM", "gill_colorNUM", "stalk_rootNUM", "ring_typeNUM", "spore_print_colorNUM", "populationNUM"]

print(featureCols)

['odorNUM', 'gill_sizeNUM', 'gill_colorNUM', 'stalk_rootNUM', 'ring_typeNUM', 'spore_print_colorNUM', 'populationNUM']


In [7]:
labeledData = labelIndexer.transform(df)

#labeledDataNUM = cap_shapeIndexer.transform(cap_surfaceIndexer.transform(cap_colorIndexer.transform(bruisesIndexer.transform(odorIndexer.transform(gill_attachmentIndexer.transform(gill_spacingIndexer.transform(gill_sizeIndexer.transform(gill_colorIndexer.transform(stalk_shapeIndexer.transform(stalk_rootIndexer.transform(stalk_surface_above_ringIndexer.transform(stalk_surface_below_ringIndexer.transform(stalk_color_above_ringIndexer.transform(stalk_color_below_ringIndexer.transform(veil_colorIndexer.transform(veil_typeIndexer.transform(ring_numberIndexer.transform(ring_typeIndexer.transform(spore_print_colorIndexer.transform(populationIndexer.transform(habitatIndexer.transform(labeledData))))))))))))))))))))))
labeledDataNUM = odorIndexer.transform(gill_sizeIndexer.transform(gill_colorIndexer.transform(stalk_rootIndexer.transform(ring_typeIndexer.transform(spore_print_colorIndexer.transform(populationIndexer.transform(labeledData)))))))
assembler =  VectorAssembler(outputCol="features", inputCols=featureCols)

In [8]:
splits = labeledDataNUM.randomSplit([0.9, 0.1 ], 12345)
training = splits[0]
test = splits[1]

In [9]:
evaluator = BinaryClassificationEvaluator(labelCol="label",rawPredictionCol="prediction", metricName="areaUnderROC")

In [10]:
lsvc = LinearSVC(labelCol="label",aggregationDepth=2, featuresCol="features") 

In [11]:
pipeline = Pipeline(stages= [assembler, lsvc])

In [12]:
paramGrid = ParamGridBuilder().addGrid(lsvc.maxIter, [100])\
            .addGrid(lsvc.regParam, [0.1, 0.001, 0.0001])\
            .addGrid(lsvc.standardization, [True, False])\
            .build()

In [13]:
cvSVM = CrossValidator(estimator=pipeline, evaluator=evaluator, estimatorParamMaps=paramGrid, numFolds=5, parallelism=2)

In [14]:
cvSVMModel = cvSVM.fit(training)

In [15]:
linearSVMModel = cvSVMModel.bestModel.stages[1]
print("Best Params: \n", linearSVMModel.explainParams())
print("Param Map: \n", linearSVMModel.extractParamMap())

Best Params: 
 aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2, current: 2)
featuresCol: features column name. (default: features, current: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label, current: label)
maxIter: max number of iterations (>= 0). (default: 100, current: 100)
predictionCol: prediction column name. (default: prediction)
rawPredictionCol: raw prediction (a.k.a. confidence) column name. (default: rawPrediction)
regParam: regularization parameter (>= 0). (default: 0.0, current: 0.001)
standardization: whether to standardize the training features before fitting the model. (default: True, current: True)
threshold: The threshold in binary classification applied to the linear model prediction.  This threshold can be any real number, where Inf will make all predictions 0.0 and -Inf will make all predictions 1.0. (default: 0.0)
tol: the convergence tolerance for iterative algorithms (>=

In [16]:
predictions = cvSVMModel.transform(test)

In [17]:
accuracy = evaluator.evaluate(predictions)
print("Test Error", (1.0 - accuracy))

Test Error 0.0741130393763344


In [18]:
predictionAndLabels = predictions.select("prediction", "label").rdd.map(lambda p: [p[0], float(p[1])])
metrics = MulticlassMetrics(predictionAndLabels)

In [19]:
confusion = metrics.confusionMatrix()
print("Confusion matrix: \n" , confusion)

Confusion matrix: 
 DenseMatrix([[373.,  19.],
             [ 41., 370.]])


In [None]:
spark.stop()