In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
        .builder \
        .appName('Predict Grape Variety') \
        .getOrCreate()

In [3]:
rawData = spark.read \
            .format('csv') \
            .option('header','false') \
            .load('C:\\Pulkit\\Learning\\Spark\\Spark ML\\datasets\\wine.data')

In [4]:
rawData

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string, _c9: string, _c10: string, _c11: string, _c12: string, _c13: string]

In [5]:
rawData.show(5)

+---+-----+----+----+----+---+----+----+---+----+----+----+----+----+
|_c0|  _c1| _c2| _c3| _c4|_c5| _c6| _c7|_c8| _c9|_c10|_c11|_c12|_c13|
+---+-----+----+----+----+---+----+----+---+----+----+----+----+----+
|  1|14.23|1.71|2.43|15.6|127| 2.8|3.06|.28|2.29|5.64|1.04|3.92|1065|
|  1| 13.2|1.78|2.14|11.2|100|2.65|2.76|.26|1.28|4.38|1.05| 3.4|1050|
|  1|13.16|2.36|2.67|18.6|101| 2.8|3.24| .3|2.81|5.68|1.03|3.17|1185|
|  1|14.37|1.95| 2.5|16.8|113|3.85|3.49|.24|2.18| 7.8| .86|3.45|1480|
|  1|13.24|2.59|2.87|  21|118| 2.8|2.69|.39|1.82|4.32|1.04|2.93| 735|
+---+-----+----+----+----+---+----+----+---+----+----+----+----+----+
only showing top 5 rows



In [6]:
dataset = rawData.toDF('Label',
            'Alchol',
            'MalicAcid',
            'Ash',
            'AshAlkalinity',
            'Magnesium',
            'TotalPhenols',
            'Flavanoids',
            'NonflavenoidsPhenols',
            'Proanthocyanins',
            'ColorIntensity',
            'Hue',
            'OD',
            'Proline'
            )

In [7]:
dataset.printSchema()

root
 |-- Label: string (nullable = true)
 |-- Alchol: string (nullable = true)
 |-- MalicAcid: string (nullable = true)
 |-- Ash: string (nullable = true)
 |-- AshAlkalinity: string (nullable = true)
 |-- Magnesium: string (nullable = true)
 |-- TotalPhenols: string (nullable = true)
 |-- Flavanoids: string (nullable = true)
 |-- NonflavenoidsPhenols: string (nullable = true)
 |-- Proanthocyanins: string (nullable = true)
 |-- ColorIntensity: string (nullable = true)
 |-- Hue: string (nullable = true)
 |-- OD: string (nullable = true)
 |-- Proline: string (nullable = true)



In [8]:
dataset.show(5)

+-----+------+---------+----+-------------+---------+------------+----------+--------------------+---------------+--------------+----+----+-------+
|Label|Alchol|MalicAcid| Ash|AshAlkalinity|Magnesium|TotalPhenols|Flavanoids|NonflavenoidsPhenols|Proanthocyanins|ColorIntensity| Hue|  OD|Proline|
+-----+------+---------+----+-------------+---------+------------+----------+--------------------+---------------+--------------+----+----+-------+
|    1| 14.23|     1.71|2.43|         15.6|      127|         2.8|      3.06|                 .28|           2.29|          5.64|1.04|3.92|   1065|
|    1|  13.2|     1.78|2.14|         11.2|      100|        2.65|      2.76|                 .26|           1.28|          4.38|1.05| 3.4|   1050|
|    1| 13.16|     2.36|2.67|         18.6|      101|         2.8|      3.24|                  .3|           2.81|          5.68|1.03|3.17|   1185|
|    1| 14.37|     1.95| 2.5|         16.8|      113|        3.85|      3.49|                 .24|           2.1

In [9]:
dataset.rdd.take(5)

[Row(Label='1', Alchol='14.23', MalicAcid='1.71', Ash='2.43', AshAlkalinity='15.6', Magnesium='127', TotalPhenols='2.8', Flavanoids='3.06', NonflavenoidsPhenols='.28', Proanthocyanins='2.29', ColorIntensity='5.64', Hue='1.04', OD='3.92', Proline='1065'),
 Row(Label='1', Alchol='13.2', MalicAcid='1.78', Ash='2.14', AshAlkalinity='11.2', Magnesium='100', TotalPhenols='2.65', Flavanoids='2.76', NonflavenoidsPhenols='.26', Proanthocyanins='1.28', ColorIntensity='4.38', Hue='1.05', OD='3.4', Proline='1050'),
 Row(Label='1', Alchol='13.16', MalicAcid='2.36', Ash='2.67', AshAlkalinity='18.6', Magnesium='101', TotalPhenols='2.8', Flavanoids='3.24', NonflavenoidsPhenols='.3', Proanthocyanins='2.81', ColorIntensity='5.68', Hue='1.03', OD='3.17', Proline='1185'),
 Row(Label='1', Alchol='14.37', MalicAcid='1.95', Ash='2.5', AshAlkalinity='16.8', Magnesium='113', TotalPhenols='3.85', Flavanoids='3.49', NonflavenoidsPhenols='.24', Proanthocyanins='2.18', ColorIntensity='7.8', Hue='.86', OD='3.45', P

In [10]:
from pyspark.ml.linalg import Vectors

def vectorize(data):
    return data.rdd.map(lambda r: [r[0],Vectors.dense(r[1:])]).toDF(['label','features'])

In [11]:
vectorizedData = vectorize(dataset)

In [12]:
vectorizedData.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|    1|[14.23,1.71,2.43,...|
|    1|[13.2,1.78,2.14,1...|
|    1|[13.16,2.36,2.67,...|
|    1|[14.37,1.95,2.5,1...|
|    1|[13.24,2.59,2.87,...|
+-----+--------------------+
only showing top 5 rows



In [24]:
vectorizedData.take(5)

[Row(label='1', features=DenseVector([14.23, 1.71, 2.43, 15.6, 127.0, 2.8, 3.06, 0.28, 2.29, 5.64, 1.04, 3.92, 1065.0])),
 Row(label='1', features=DenseVector([13.2, 1.78, 2.14, 11.2, 100.0, 2.65, 2.76, 0.26, 1.28, 4.38, 1.05, 3.4, 1050.0])),
 Row(label='1', features=DenseVector([13.16, 2.36, 2.67, 18.6, 101.0, 2.8, 3.24, 0.3, 2.81, 5.68, 1.03, 3.17, 1185.0])),
 Row(label='1', features=DenseVector([14.37, 1.95, 2.5, 16.8, 113.0, 3.85, 3.49, 0.24, 2.18, 7.8, 0.86, 3.45, 1480.0])),
 Row(label='1', features=DenseVector([13.24, 2.59, 2.87, 21.0, 118.0, 2.8, 2.69, 0.39, 1.82, 4.32, 1.04, 2.93, 735.0]))]

In [25]:
from pyspark.ml.feature import StringIndexer

labelIndexer = StringIndexer(inputCol='label',
                            outputCol='indexLabel')

In [26]:
indexedData= labelIndexer.fit(vectorizedData).transform(vectorizedData)
indexedData.take(2)

[Row(label='1', features=DenseVector([14.23, 1.71, 2.43, 15.6, 127.0, 2.8, 3.06, 0.28, 2.29, 5.64, 1.04, 3.92, 1065.0]), indexLabel=1.0),
 Row(label='1', features=DenseVector([13.2, 1.78, 2.14, 11.2, 100.0, 2.65, 2.76, 0.26, 1.28, 4.38, 1.05, 3.4, 1050.0]), indexLabel=1.0)]

In [27]:
from pyspark.sql import functions

indexedData.select('label').distinct().sort('label').show(5)

+-----+
|label|
+-----+
|    1|
|    2|
|    3|
+-----+



In [28]:
indexedData.select('indexLabel').distinct().sort('indexLabel').show(5)

+----------+
|indexLabel|
+----------+
|       0.0|
|       1.0|
|       2.0|
+----------+



In [29]:
(trainingData,testData) = indexedData.randomSplit([0.8,0.2])

In [30]:
from pyspark.ml.classification import DecisionTreeClassifier


dtree = DecisionTreeClassifier(
        labelCol ='indexLabel',
        featuresCol ='features',
        maxDepth=3,
        impurity='gini')

In [31]:
model = dtree.fit(trainingData)

In [43]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol='indexLabel',
                                             predictionCol='prediction',
                                             metricName='f1')

In [39]:
transformed_data = model.transform(testData)
transformed_data.show(5)

+-----+--------------------+----------+--------------+-------------+----------+
|label|            features|indexLabel| rawPrediction|  probability|prediction|
+-----+--------------------+----------+--------------+-------------+----------+
|    1|[12.93,3.8,2.65,1...|       1.0|[0.0,39.0,0.0]|[0.0,1.0,0.0]|       1.0|
|    1|[13.05,1.65,2.55,...|       1.0|[0.0,39.0,0.0]|[0.0,1.0,0.0]|       1.0|
|    1|[13.3,1.72,2.14,1...|       1.0|[0.0,39.0,0.0]|[0.0,1.0,0.0]|       1.0|
|    1|[13.41,3.84,2.12,...|       1.0|[0.0,39.0,0.0]|[0.0,1.0,0.0]|       1.0|
|    1|[13.51,1.8,2.65,1...|       1.0|[0.0,39.0,0.0]|[0.0,1.0,0.0]|       1.0|
+-----+--------------------+----------+--------------+-------------+----------+
only showing top 5 rows



In [46]:
print(evaluator.getMetricName(), 
      'accuracy:', evaluator.evaluate(transformed_data))

f1 accuracy: 0.9181443985519053
