In [75]:
import os
import findspark
findspark.init()
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml import Pipeline
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [5]:
sqlContext = SQLContext(SparkContext())

In [6]:
df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferschema", "true").option("mode", "DROPMALFORMED").load("winequality-red.csv")

In [10]:
featureColumns = ['fixed acidity','volatile acidity','citric acid',
                  'residual sugar','chlorides','free sulfur dioxide',
                  'total sulfur dioxide','density','pH','sulphates','alcohol']

In [7]:
df.count(), len(df.columns)

(1599, 12)

In [9]:
df.select('*').show(5)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|      5|
|         11.2|            0.28|       0.56|           1.9|    0.075|               17.0|           

In [11]:
assembler = VectorAssembler(inputCols=featureColumns, outputCol='features')
assembled = assembler.transform(df)

In [44]:
(training, testing) = assembled.randomSplit([0.8,0.2])

In [45]:
(training.count(), testing.count())

(1277, 322)

In [76]:
dt = DecisionTreeClassifier(labelCol='quality', featuresCol='features', maxDepth=5, minInstancesPerNode=20, impurity="gini")

In [77]:
pipeline = Pipeline(stages=[dt])
model = pipeline.fit(training)

In [66]:
predictions = model.transform(testing)

In [67]:
predictions.select('prediction', 'quality').show(10)

+----------+-------+
|prediction|quality|
+----------+-------+
|       7.0|      7|
|       5.0|      5|
|       6.0|      7|
|       6.0|      6|
|       7.0|      7|
|       5.0|      5|
|       6.0|      5|
|       6.0|      6|
|       6.0|      6|
|       5.0|      5|
+----------+-------+
only showing top 10 rows



In [68]:
predictions.summary()

DataFrame[summary: string, fixed acidity: string, volatile acidity: string, citric acid: string, residual sugar: string, chlorides: string, free sulfur dioxide: string, total sulfur dioxide: string, density: string, pH: string, sulphates: string, alcohol: string, quality: string, prediction: string]

In [69]:
predictions.select('prediction', 'quality').write.save(path='predictions.csv', format='com.databricks.spark.csv', header='true')

In [70]:
evaluator = MulticlassClassificationEvaluator(labelCol='quality', predictionCol='prediction', metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("test accuracy : %g "%(1.0 - accuracy))

test accuracy : 0.39441 
