In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

In [15]:
spark = SparkSession.builder.appName("demo").getOrCreate()

In [16]:
spark

In [17]:
schema = StructType([
    StructField("fixed acidity", FloatType(),nullable=True),
    StructField("volatile acidity", FloatType(),nullable=True),
    StructField("citric acid", FloatType(),nullable=True),
    StructField("residual sugar", FloatType(),nullable=True),
    StructField("chlorides", FloatType(),nullable=True),
    StructField("free sulfur dioxide", FloatType(),nullable=True),
    StructField("total sulfur dioxide", FloatType(),nullable=True),
    StructField("density", FloatType(),nullable=True),
    StructField("pH", FloatType(),nullable=True),
    StructField("sulphates", FloatType(),nullable=True),
    StructField("alcohol", FloatType(),nullable=True),
    StructField("quality", StringType(),nullable=True),
])

In [19]:
path = "wine.csv"
df = spark.read.csv(path=path,schema=schema,header=True)

In [20]:
df.show(5)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|    bad|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|    bad|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|    bad|
|         11.2|            0.28|       0.56|           1.9|    0.075|               17.0|           

In [21]:
df.columns

['fixed acidity',
 'volatile acidity',
 'citric acid',
 'residual sugar',
 'chlorides',
 'free sulfur dioxide',
 'total sulfur dioxide',
 'density',
 'pH',
 'sulphates',
 'alcohol',
 'quality']

In [22]:
df.printSchema()

root
 |-- fixed acidity: float (nullable = true)
 |-- volatile acidity: float (nullable = true)
 |-- citric acid: float (nullable = true)
 |-- residual sugar: float (nullable = true)
 |-- chlorides: float (nullable = true)
 |-- free sulfur dioxide: float (nullable = true)
 |-- total sulfur dioxide: float (nullable = true)
 |-- density: float (nullable = true)
 |-- pH: float (nullable = true)
 |-- sulphates: float (nullable = true)
 |-- alcohol: float (nullable = true)
 |-- quality: string (nullable = true)



In [23]:
from pyspark.sql.functions import count,isnan,isnull,when,col
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+---+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density| pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+---+---------+-------+-------+
|            0|               0|          0|             0|        0|                  0|                   0|      0|  0|        0|      0|      0|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+---+---------+-------+-------+



In [24]:
df.summary().show()

+-------+------------------+-------------------+-------------------+------------------+-------------------+-------------------+--------------------+--------------------+-------------------+-------------------+------------------+-------+
|summary|     fixed acidity|   volatile acidity|        citric acid|    residual sugar|          chlorides|free sulfur dioxide|total sulfur dioxide|             density|                 pH|          sulphates|           alcohol|quality|
+-------+------------------+-------------------+-------------------+------------------+-------------------+-------------------+--------------------+--------------------+-------------------+-------------------+------------------+-------+
|  count|              1599|               1599|               1599|              1599|               1599|               1599|                1599|                1599|               1599|               1599|              1599|   1599|
|   mean|  8.31963727204333| 0.5278205118742565|0.27

In [25]:
df.dropna(how="any")

DataFrame[fixed acidity: float, volatile acidity: float, citric acid: float, residual sugar: float, chlorides: float, free sulfur dioxide: float, total sulfur dioxide: float, density: float, pH: float, sulphates: float, alcohol: float, quality: string]

In [26]:
df.dropDuplicates()

DataFrame[fixed acidity: float, volatile acidity: float, citric acid: float, residual sugar: float, chlorides: float, free sulfur dioxide: float, total sulfur dioxide: float, density: float, pH: float, sulphates: float, alcohol: float, quality: string]

In [27]:
%pip install numpy

Note: you may need to restart the kernel to use updated packages.


In [28]:
import numpy as np
from pyspark.ml.feature import StringIndexer

In [29]:
indexer = StringIndexer(inputCol="quality", outputCol="quality_Index")
df2 = indexer.fit(df).transform(df)
df2.show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+-------------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|quality_Index|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+-------------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|    bad|          1.0|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|    bad|          1.0|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|    bad|          1.0|
|         11.2| 

In [30]:
from pyspark.sql.functions import *

In [31]:
df.columns

['fixed acidity',
 'volatile acidity',
 'citric acid',
 'residual sugar',
 'chlorides',
 'free sulfur dioxide',
 'total sulfur dioxide',
 'density',
 'pH',
 'sulphates',
 'alcohol',
 'quality']

In [32]:
df2 = df2.drop("quality")

In [33]:
df2.printSchema()

root
 |-- fixed acidity: float (nullable = true)
 |-- volatile acidity: float (nullable = true)
 |-- citric acid: float (nullable = true)
 |-- residual sugar: float (nullable = true)
 |-- chlorides: float (nullable = true)
 |-- free sulfur dioxide: float (nullable = true)
 |-- total sulfur dioxide: float (nullable = true)
 |-- density: float (nullable = true)
 |-- pH: float (nullable = true)
 |-- sulphates: float (nullable = true)
 |-- alcohol: float (nullable = true)
 |-- quality_Index: double (nullable = false)



In [34]:
for i in df2.columns:
    print(i," ",":",df2.stat.corr(i,"quality_Index"))

fixed acidity   : -0.09509348889944476
volatile acidity   : 0.3214408560424918
citric acid   : -0.15912940741184622
residual sugar   : 0.002160450924422744
chlorides   : 0.10949399522667415
free sulfur dioxide   : 0.06175674399891035
total sulfur dioxide   : 0.23196297573870725
density   : 0.15910977328405826
pH   : 0.0032639935724002836
sulphates   : -0.2180716662476898
alcohol   : -0.4347512093468429
quality_Index   : 1.0


In [35]:
x=df2[[ 'volatile acidity',
       'total sulfur dioxide',"sulphates",   
        'alcohol','quality_Index']]

In [36]:
x.printSchema()

root
 |-- volatile acidity: float (nullable = true)
 |-- total sulfur dioxide: float (nullable = true)
 |-- sulphates: float (nullable = true)
 |-- alcohol: float (nullable = true)
 |-- quality_Index: double (nullable = false)



In [37]:
x.columns

['volatile acidity',
 'total sulfur dioxide',
 'sulphates',
 'alcohol',
 'quality_Index']

In [38]:
len(x.columns)

5

In [39]:
from pyspark.ml.feature import *

In [40]:
feature = VectorAssembler(inputCols = x.columns[:len(x.columns)-1],outputCol="features")
feature_vector= feature.transform(x)

In [41]:
feature_vector.show(5)

+----------------+--------------------+---------+-------+-------------+--------------------+
|volatile acidity|total sulfur dioxide|sulphates|alcohol|quality_Index|            features|
+----------------+--------------------+---------+-------+-------------+--------------------+
|             0.7|                34.0|     0.56|    9.4|          1.0|[0.69999998807907...|
|            0.88|                67.0|     0.68|    9.8|          1.0|[0.87999999523162...|
|            0.76|                54.0|     0.65|    9.8|          1.0|[0.75999999046325...|
|            0.28|                60.0|     0.58|    9.8|          0.0|[0.28000000119209...|
|             0.7|                34.0|     0.56|    9.4|          1.0|[0.69999998807907...|
+----------------+--------------------+---------+-------+-------------+--------------------+
only showing top 5 rows



In [42]:
feature_vector_select = feature_vector.select(['features','quality_Index'])

In [43]:
# for data split in pyspark, we can use df.randomSplit()
(x_train, x_test) = feature_vector_select.randomSplit([0.8, 0.2])

In [44]:
x_train.printSchema()

root
 |-- features: vector (nullable = true)
 |-- quality_Index: double (nullable = false)



In [None]:
y = x_train.select("features")
z = y.take(1)
z


[Row(features=DenseVector([0.16, 31.0, 0.54, 12.4]))]

In [45]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

In [46]:
rf = RandomForestClassifier(labelCol='quality_Index',featuresCol="features")

In [47]:
model = rf.fit(x_train)

In [48]:
prediction = model.transform(x_test)
prediction.show(5)

+--------------------+-------------+--------------------+--------------------+----------+
|            features|quality_Index|       rawPrediction|         probability|prediction|
+--------------------+-------------+--------------------+--------------------+----------+
|[0.11999999731779...|          0.0|[18.6968928176903...|[0.93484464088451...|       0.0|
|[0.18000000715255...|          1.0|[17.0584753784893...|[0.85292376892446...|       0.0|
|[0.18000000715255...|          0.0|[16.3050466629479...|[0.81525233314739...|       0.0|
|[0.20000000298023...|          0.0|[18.6448094843569...|[0.93224047421784...|       0.0|
|[0.20999999344348...|          1.0|[15.8812777789942...|[0.79406388894971...|       0.0|
+--------------------+-------------+--------------------+--------------------+----------+
only showing top 5 rows



In [49]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [50]:
print('Accuracy: ', MulticlassClassificationEvaluator(labelCol='quality_Index',predictionCol="prediction",metricName='accuracy').evaluate(prediction))

Accuracy:  0.7476038338658147


paramGrid = ParamGridBuilder()\
                                .addGrid(rf.maxDepth, [5, 10, 20]) \
                                .addGrid(rf.maxBins, [20, 32, 50]) \
                                .addGrid(rf.numTrees, [20, 40, 60 ]) \
                                .addGrid(rf.impurity, ["gini", "entropy"]) \
                                .addGrid(rf.minInstancesPerNode, [1, 5, 10]) \
                                .build()

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
tvs = TrainValidationSplit( estimator=rf
                           ,estimatorParamMaps=paramGrid
                           ,evaluator=MulticlassClassificationEvaluator(labelCol='quality_Index')
                           ,trainRatio=0.8)
model = tvs.fit(x_train)
model_predictions= model.transform(x_test)


print('Accuracy: ', MulticlassClassificationEvaluator(labelCol='quality_Index',predictionCol="prediction",metricName='accuracy').evaluate(model_predictions))
print('Precision: ',MulticlassClassificationEvaluator(labelCol='quality_Index',predictionCol="prediction",metricName='weightedPrecision').evaluate(model_predictions))

In [51]:
import importlib
module = importlib.import_module("pyspark.ml.classification")
model_name = "RandomForestClassifier"
class_ref = getattr(module, model_name)
class_ref

In [60]:
rf = class_ref(labelCol='quality_Index',featuresCol="features").fit(x_train)

In [61]:
module_eval = importlib.import_module("pyspark.ml.evaluation")
eval_name = "MulticlassClassificationEvaluator"
class_ref_eval = getattr(module_eval, eval_name)
class_ref_eval

In [64]:
print(class_ref_eval(labelCol='quality_Index',predictionCol="prediction",metricName='accuracy').evaluate(prediction))

0.7476038338658147
