In [1]:
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [3]:
spark = SparkSession.builder.master("local[1]").appName("SparkByExamples.com").getOrCreate()
print('Apache Spark Version :'+spark.version)
print('Apache Spark Version :'+spark.sparkContext.version)

Apache Spark Version :3.3.1
Apache Spark Version :3.3.1


In [71]:
spark = SparkSession.builder.appName(
        "WineClassifier").getOrCreate()
train_data = spark.read.option(
    "delimiter", ";").option("header", True).option("inferSchema", True).csv(f'./data/TrainingDataset.csv')
val_data = spark.read.option("delimiter", ";").option("header", True).option("inferSchema", True).csv(
    f'./data/ValidationDataset.csv')

data = train_data.union(val_data)
print(data.head(5))

[Row(fixed acidity=8.9, volatile acidity=0.22, citric acid=0.48, residual sugar=1.8, chlorides=0.077, free sulfur dioxide=29.0, total sulfur dioxide=60.0, density=0.9968, pH=3.39, sulphates=0.53, alcohol=9.4, quality=6), Row(fixed acidity=7.6, volatile acidity=0.39, citric acid=0.31, residual sugar=2.3, chlorides=0.082, free sulfur dioxide=23.0, total sulfur dioxide=71.0, density=0.9982, pH=3.52, sulphates=0.65, alcohol=9.7, quality=5), Row(fixed acidity=7.9, volatile acidity=0.43, citric acid=0.21, residual sugar=1.6, chlorides=0.106, free sulfur dioxide=10.0, total sulfur dioxide=37.0, density=0.9966, pH=3.17, sulphates=0.91, alcohol=9.5, quality=5), Row(fixed acidity=8.5, volatile acidity=0.49, citric acid=0.11, residual sugar=2.3, chlorides=0.084, free sulfur dioxide=9.0, total sulfur dioxide=67.0, density=0.9968, pH=3.17, sulphates=0.53, alcohol=9.4, quality=5), Row(fixed acidity=6.9, volatile acidity=0.4, citric acid=0.14, residual sugar=2.4, chlorides=0.085, free sulfur dioxide=

In [72]:
label_indexer = StringIndexer().setInputCol("quality").setOutputCol("label").fit(data)

In [73]:
vectorAssembler = VectorAssembler().setInputCols(data.columns[:-1]).setOutputCol("features")

In [74]:
# create pipeline
pipeline = Pipeline().setStages([label_indexer,vectorAssembler])

In [75]:
data.show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          8.9|            0.22|       0.48|           1.8|    0.077|               29.0|                60.0| 0.9968|3.39|     0.53|    9.4|      6|
|          7.6|            0.39|       0.31|           2.3|    0.082|               23.0|                71.0| 0.9982|3.52|     0.65|    9.7|      5|
|          7.9|            0.43|       0.21|           1.6|    0.106|               10.0|                37.0| 0.9966|3.17|     0.91|    9.5|      5|
|          8.5|            0.49|       0.11|           2.3|    0.084|                9.0|           

In [81]:
transformedData = pipeline.fit(data).transform(data).select("features","label")

In [82]:
transformedData.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[8.9,0.22,0.48,1....|  1.0|
|[7.6,0.39,0.31,2....|  0.0|
|[7.9,0.43,0.21,1....|  0.0|
|[8.5,0.49,0.11,2....|  0.0|
|[6.9,0.4,0.14,2.4...|  1.0|
|[6.3,0.39,0.16,1....|  0.0|
|[7.6,0.41,0.24,1....|  0.0|
|[7.9,0.43,0.21,1....|  0.0|
|[7.1,0.71,0.0,1.9...|  0.0|
|[7.8,0.645,0.0,2....|  1.0|
|[6.7,0.675,0.07,2...|  0.0|
|[6.9,0.685,0.0,2....|  1.0|
|[8.3,0.655,0.12,2...|  0.0|
|[6.9,0.605,0.12,1...|  1.0|
|[5.2,0.32,0.25,1....|  0.0|
|[7.8,0.645,0.0,5....|  1.0|
|[7.8,0.6,0.14,2.4...|  1.0|
|[8.1,0.38,0.28,2....|  2.0|
|[5.7,1.13,0.09,1....|  3.0|
|[7.3,0.45,0.36,5....|  0.0|
+--------------------+-----+
only showing top 20 rows



In [83]:
(trainingData, testData) = transformedData.randomSplit([0.7, 0.3])

In [84]:
print("Training Data Count:" ,trainingData.count())
print("Test Data Count:" ,testData.count())

Training Data Count: 1038
Test Data Count: 401


Logistic Regression

In [85]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=50)
lrModel = lr.fit(trainingData)

In [88]:
predictions = lrModel.transform(testData)
predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[5.0,0.4,0.5,4.3,...|  1.0|[1.96591096795310...|[0.01852847913855...|       2.0|
|[5.0,0.74,0.0,1.2...|  1.0|[2.23088895678708...|[0.21072471863693...|       1.0|
|[5.1,0.42,0.0,1.8...|  2.0|[2.66156733326718...|[0.03849518871239...|       2.0|
|[5.1,0.51,0.18,2....|  2.0|[3.08874911309709...|[0.08639490121648...|       1.0|
|[5.2,0.32,0.25,1....|  0.0|[5.16960665830137...|[0.74374409170772...|       0.0|
|[5.2,0.34,0.0,1.8...|  1.0|[1.39738131997673...|[0.00655955375204...|       2.0|
|[5.6,0.605,0.05,2...|  0.0|[1.40706079033213...|[0.07847586211984...|       1.0|
|[5.7,1.13,0.09,1....|  3.0|[4.12259018219433...|[0.28495013198423...|       5.0|
|[5.9,0.29,0.25,13...|  1.0|[4.76407002778121...|[0.32253910783428...|       1.0|
|[5.9,0.61,0.08,

In [89]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator()
print('MultiClassClassifaction:', evaluator.evaluate(predictions))

MultiClassClassifaction: 0.5888513351173561


Random Forest

In [90]:

from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label',maxDepth=17, numTrees=706)
rfModel = rf.fit(trainingData)
predictions = rfModel.transform(testData)
predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[5.0,0.4,0.5,4.3,...|  1.0|[92.0,243.0,298.0...|[0.13031161473087...|       2.0|
|[5.0,0.74,0.0,1.2...|  1.0|[60.0,581.8,34.7,...|[0.08498583569405...|       1.0|
|[5.1,0.42,0.0,1.8...|  2.0|[76.0,179.0,323.2...|[0.10764872521246...|       2.0|
|[5.1,0.51,0.18,2....|  2.0|[78.0,133.0,381.6...|[0.11048158640226...|       2.0|
|[5.2,0.32,0.25,1....|  0.0|[381.008063687724...|[0.53967147831122...|       0.0|
|[5.2,0.34,0.0,1.8...|  1.0|[21.0,557.2,85.8,...|[0.02974504249291...|       1.0|
|[5.6,0.605,0.05,2...|  0.0|[217.089668615984...|[0.30749244846456...|       1.0|
|[5.7,1.13,0.09,1....|  3.0|[434.0,75.0,10.0,...|[0.61473087818696...|       0.0|
|[5.9,0.29,0.25,13...|  1.0|[404.0,204.0,61.0...|[0.57223796033994...|       0.0|
|[5.9,0.61,0.08,

In [91]:
evaluator = MulticlassClassificationEvaluator()
print('MultiClassClassifaction:', evaluator.evaluate(predictions))

MultiClassClassifaction: 0.650294041016861


In [106]:
evaluator.getMetricName()

'f1'

In [99]:
from pyspark.ml.feature import IndexToString

pred_to_predLabel = IndexToString(
    inputCol="prediction", outputCol="predictedQuality", labels=label_indexer.labels)
label_to_actualLabel = IndexToString(
    inputCol="label", outputCol="quality", labels=label_indexer.labels)
pipeline = Pipeline().setStages([pred_to_predLabel,label_to_actualLabel])
result = pipeline.fit(predictions).transform(predictions)

In [102]:
result.select('predictedQuality','quality').show()

+----------------+-------+
|predictedQuality|quality|
+----------------+-------+
|               7|      6|
|               6|      6|
|               7|      7|
|               7|      7|
|               5|      5|
|               6|      6|
|               6|      5|
|               5|      4|
|               5|      6|
|               6|      6|
|               6|      4|
|               6|      6|
|               5|      5|
|               5|      5|
|               5|      6|
|               6|      7|
|               6|      6|
|               6|      6|
|               5|      5|
|               5|      4|
+----------------+-------+
only showing top 20 rows



In [None]:
# saving string indexer
# stringIndexerPath = temp_path + "/string-indexer"
# label_indexer.save(stringIndexerPath)
# label_indexer_loaded = StringIndexer.load(stringIndexerPath)