In [2]:
# Import requiered libriaries

from pyspark.sql import SparkSession
from pyspark.sql.types import *
%matplotlib inline
import pandas as pd
import numpy as np
from pyspark.ml.linalg import Vectors
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml import Pipeline


In [3]:
# Set spark session
spark_session = SparkSession \
    .builder \
    .getOrCreate()

In [4]:
# Load dataset
dataset = spark_session\
        .read\
        .format("csv")\
        .options(header='false', inferschema='true', delimiter=',')\
        .load("/home/master/wine.data")

In [5]:
dataset

DataFrame[_c0: int, _c1: double, _c2: double, _c3: double, _c4: double, _c5: int, _c6: double, _c7: double, _c8: double, _c9: double, _c10: double, _c11: double, _c12: double, _c13: int]

## Preprocessing

In [6]:
dataset.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: double (nullable = true)
 |-- _c2: double (nullable = true)
 |-- _c3: double (nullable = true)
 |-- _c4: double (nullable = true)
 |-- _c5: integer (nullable = true)
 |-- _c6: double (nullable = true)
 |-- _c7: double (nullable = true)
 |-- _c8: double (nullable = true)
 |-- _c9: double (nullable = true)
 |-- _c10: double (nullable = true)
 |-- _c11: double (nullable = true)
 |-- _c12: double (nullable = true)
 |-- _c13: integer (nullable = true)



In [7]:
dataset.show(5)

+---+-----+----+----+----+---+----+----+----+----+----+----+----+----+
|_c0|  _c1| _c2| _c3| _c4|_c5| _c6| _c7| _c8| _c9|_c10|_c11|_c12|_c13|
+---+-----+----+----+----+---+----+----+----+----+----+----+----+----+
|  1|14.23|1.71|2.43|15.6|127| 2.8|3.06|0.28|2.29|5.64|1.04|3.92|1065|
|  1| 13.2|1.78|2.14|11.2|100|2.65|2.76|0.26|1.28|4.38|1.05| 3.4|1050|
|  1|13.16|2.36|2.67|18.6|101| 2.8|3.24| 0.3|2.81|5.68|1.03|3.17|1185|
|  1|14.37|1.95| 2.5|16.8|113|3.85|3.49|0.24|2.18| 7.8|0.86|3.45|1480|
|  1|13.24|2.59|2.87|21.0|118| 2.8|2.69|0.39|1.82|4.32|1.04|2.93| 735|
+---+-----+----+----+----+---+----+----+----+----+----+----+----+----+
only showing top 5 rows



In [8]:
#Think to optimize?

In [9]:
c0 = dataset.select("_c0").rdd.flatMap(lambda x: x).collect()
c1 = dataset.select("_c1").rdd.flatMap(lambda x: x).collect()
c2 = dataset.select("_c2").rdd.flatMap(lambda x: x).collect()
c3 = dataset.select("_c3").rdd.flatMap(lambda x: x).collect()
c4 = dataset.select("_c4").rdd.flatMap(lambda x: x).collect()
c5 = dataset.select("_c5").rdd.flatMap(lambda x: x).collect()
c6 = dataset.select("_c6").rdd.flatMap(lambda x: x).collect()
c7 = dataset.select("_c7").rdd.flatMap(lambda x: x).collect()
c8 = dataset.select("_c8").rdd.flatMap(lambda x: x).collect()
c9 = dataset.select("_c9").rdd.flatMap(lambda x: x).collect()
c10 = dataset.select("_c10").rdd.flatMap(lambda x: x).collect()
c11 = dataset.select("_c11").rdd.flatMap(lambda x: x).collect()
c12 = dataset.select("_c12").rdd.flatMap(lambda x: x).collect()
c13 = dataset.select("_c13").rdd.flatMap(lambda x: x).collect()

In [10]:
vector=[]

for i in np.arange(0,len(c0)):
    vector.append(Vectors.dense([c1[i],c2[i],c3[i],c4[i],c5[i],c6[i],c7[i],c8[i],c9[i],c10[i],c11[i],c12[i],c13[i]]))
vector[1]

DenseVector([13.2, 1.78, 2.14, 11.2, 100.0, 2.65, 2.76, 0.26, 1.28, 4.38, 1.05, 3.4, 1050.0])

In [11]:
data = zip(c0,vector)

In [12]:
df = spark_session.createDataFrame(data, ["label", "features"])

In [13]:
df.printSchema()

root
 |-- label: long (nullable = true)
 |-- features: vector (nullable = true)



In [14]:
df.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|    1|[14.23,1.71,2.43,...|
|    1|[13.2,1.78,2.14,1...|
|    1|[13.16,2.36,2.67,...|
|    1|[14.37,1.95,2.5,1...|
|    1|[13.24,2.59,2.87,...|
+-----+--------------------+
only showing top 5 rows



## Model Application

In [15]:
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(df)


# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 3 distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=3).fit(df)

In [16]:
# Split the data into training and test sets (30% held out for testing)
train, test = df.randomSplit([0.70, 0.30])

In [17]:
#Create a decision tree MODEL
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxDepth=2)

In [18]:
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

In [19]:
# Select (prediction, true label) and compute accuracy
dtevaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

In [20]:
# Create ParamGrid for Cross Validation
dtparamGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [2, 5, 10, 20, 30])
             .addGrid(dt.maxBins, [10, 20, 40, 80, 100])
             .build())


# Create 5-fold CrossValidator
# Estimator will be pipeline which we created.
dtcv = CrossValidator(estimator = pipeline,
                      estimatorParamMaps = dtparamGrid,
                      evaluator = dtevaluator,
                      numFolds = 5)

In [21]:
# Run cross validations
dtcvModel = dtcv.fit(train)
print(dtcvModel)

CrossValidatorModel_68455aa78009


In [22]:
#Get the best params
print(dtcvModel.getEstimatorParamMaps()[np.argmax(dtcvModel.avgMetrics)])

{Param(parent='DecisionTreeClassifier_567dc2be85dd', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 5, Param(parent='DecisionTreeClassifier_567dc2be85dd', name='maxBins', doc='Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature.'): 10}


As we can see above, best params for this cross validation are:
    maxDepth:5
    maxBins:10

In [23]:
#Get all params of best model
best_mod = dtcvModel.bestModel
param_dict = best_mod.stages[2].extractParamMap()
param_dict

{Param(parent='DecisionTreeClassifier_567dc2be85dd', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees.'): False,
 Param(parent='DecisionTreeClassifier_567dc2be85dd', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext'): 10,
 Param(parent='DecisionTreeClassifier_567dc2be85dd', name='featuresCol', doc='features column name'): 'indexedFeatures',
 Param(parent='DecisionTreeClassifier_567dc2be85dd', name='impurity', doc='Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini'): 'gini',
 Param(parent='DecisionTreeClassifier_567dc2be85dd', name='labelCol', doc='label column name'

In above output, we can see all variable that has best our model

In [53]:
# cvModel uses the best model found from the Cross Validation
dtpredictions = dtcvModel.transform(test)

In [54]:
dtpredictions.show(5)

+-----+--------------------+------------+--------------------+--------------+-------------+----------+
|label|            features|indexedLabel|     indexedFeatures| rawPrediction|  probability|prediction|
+-----+--------------------+------------+--------------------+--------------+-------------+----------+
|    1|[13.05,1.65,2.55,...|         1.0|[13.05,1.65,2.55,...|[0.0,40.0,0.0]|[0.0,1.0,0.0]|       1.0|
|    1|[13.07,1.5,2.1,15...|         1.0|[13.07,1.5,2.1,15...|[0.0,40.0,0.0]|[0.0,1.0,0.0]|       1.0|
|    1|[13.2,1.78,2.14,1...|         1.0|[13.2,1.78,2.14,1...|[0.0,40.0,0.0]|[0.0,1.0,0.0]|       1.0|
|    1|[13.24,3.98,2.29,...|         1.0|[13.24,3.98,2.29,...|[39.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|    1|[13.51,1.8,2.65,1...|         1.0|[13.51,1.8,2.65,1...|[0.0,40.0,0.0]|[0.0,1.0,0.0]|       1.0|
+-----+--------------------+------------+--------------------+--------------+-------------+----------+
only showing top 5 rows



In [56]:
# Evaluate best model
print('Accuracy:', dtevaluator.evaluate(dtpredictions))
print("---------------------------------------------")
lrmetrics = MulticlassMetrics(dtpredictions['indexedLabel','prediction'].rdd)
print('Confusion Matrix:\n', lrmetrics.confusionMatrix())
print("---------------------------------------------")
print('F1 Score:', lrmetrics.fMeasure(1.0,1.0))

Accuracy: 0.9038461538461539
---------------------------------------------
Confusion Matrix:
 DenseMatrix([[19.,  1.,  0.],
             [ 2., 17.,  0.],
             [ 2.,  0., 11.]])
---------------------------------------------
F1 Score: 0.918918918918919
