In [1]:
from pyspark.ml.classification import LogisticRegression as LR
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline


from pyspark.sql import SparkSession

spark = SparkSession.builder \
      .appName("Wine Quality Classifier") \
      .config("spark.executor.memory", "70g") \
      .config("spark.driver.memory", "50g") \
      .config("spark.memory.offHeap.enabled",True) \
      .config("spark.memory.offHeap.size","16g") \
      .getOrCreate()

In [2]:
wine_df = spark.read.format("csv"). \
            options(header="true", inferschema="true",sep=';'). \
            load("winequality-red.csv")

In [3]:
wine_df.show(4)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|      5|
|         11.2|            0.28|       0.56|           1.9|    0.075|               17.0|           

In [4]:
from pyspark.sql.functions import when
wine_df = wine_df.withColumn('quality_new', when(wine_df['quality']< 5, 0 ).\
                               otherwise(when(wine_df['quality']<8,1).otherwise(2)))
wine_df.show(4)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+-----------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|quality_new|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+-----------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|          1|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|          1|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|      5|          1|
|         11.2|            0

In [5]:
wine_df.printSchema()

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)
 |-- quality_new: integer (nullable = false)



In [6]:
wine_df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
fixed acidity,1599,8.319637273295838,1.7410963181276948,4.6,15.9
volatile acidity,1599,0.5278205128205131,0.17905970415353525,0.12,1.58
citric acid,1599,0.2709756097560964,0.19480113740531824,0.0,1.0
residual sugar,1599,2.5388055034396517,1.40992805950728,0.9,15.5
chlorides,1599,0.08746654158849257,0.047065302010090085,0.012,0.611
free sulfur dioxide,1599,15.874921826141339,10.46015696980971,1.0,72.0
total sulfur dioxide,1599,46.46779237023139,32.89532447829907,6.0,289.0
density,1599,0.9967466791744831,0.0018873339538427265,0.99007,1.00369
pH,1599,3.311113195747343,0.15438646490354271,2.74,4.01


In [7]:
string_index = StringIndexer(inputCol='quality_new', outputCol='quality'+'Index')

vectors = VectorAssembler(inputCols = ['fixed acidity','volatile acidity',
                                       'citric acid','residual sugar','chlorides','free sulfur dioxide', 'total sulfur dioxide', 'density',
                                      'pH','sulphates', 'alcohol'],
                         outputCol = 'features')

stages = [vectors, string_index]

In [8]:
pipeline = Pipeline().setStages(stages)
pipelineModel = pipeline.fit(wine_df)
pipeline_data_df = pipelineModel.transform(wine_df)

pipeline_data_df.show(5)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+-----------+--------------------+------------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|quality_new|            features|qualityIndex|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+-----------+--------------------+------------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|          1|[7.4,0.7,0.0,1.9,...|         0.0|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|          1|[7.8,0.88,0.0,2.6...|         0.0|
|          7.8|     

In [9]:
pipeline_data_df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
fixed acidity,1599,8.319637273295838,1.7410963181276948,4.6,15.9
volatile acidity,1599,0.5278205128205131,0.17905970415353525,0.12,1.58
citric acid,1599,0.2709756097560964,0.19480113740531824,0.0,1.0
residual sugar,1599,2.5388055034396517,1.40992805950728,0.9,15.5
chlorides,1599,0.08746654158849257,0.047065302010090085,0.012,0.611
free sulfur dioxide,1599,15.874921826141339,10.46015696980971,1.0,72.0
total sulfur dioxide,1599,46.46779237023139,32.89532447829907,6.0,289.0
density,1599,0.9967466791744831,0.0018873339538427265,0.99007,1.00369
pH,1599,3.311113195747343,0.15438646490354271,2.74,4.01


In [10]:
train_df, test_df = pipeline_data_df.randomSplit([0.7,0.3])

In [11]:
classifier= LR(featuresCol = 'features', labelCol='qualityIndex',\
               maxIter=1000)

model = classifier.fit(train_df)

In [12]:
print("Beta Coefficients:", model.coefficientMatrix)
print("Interceptors: ", model.interceptVector)

Beta Coefficients: DenseMatrix([[-2.12908685e-02, -2.15649410e+00, -1.14989690e+00,
               6.03485505e-03,  4.34852745e+00, -1.24570841e-02,
               1.85425162e-02,  6.62398057e+00, -7.80430677e-01,
               1.17897956e-01, -2.37305887e-01],
             [ 3.81099169e-01,  2.65183608e+00,  3.14959443e-02,
               9.05755732e-02,  8.92807636e+00,  1.78080907e-02,
              -1.58814149e-02, -1.31773133e+01,  6.01291582e+00,
              -4.15805194e+00, -8.81077553e-01],
             [-3.59808301e-01, -4.95341980e-01,  1.11840095e+00,
              -9.66104283e-02, -1.32766038e+01, -5.35100655e-03,
              -2.66110129e-03,  6.55333268e+00, -5.23248515e+00,
               4.04015399e+00,  1.11838344e+00]])
Interceptors:  [2.4452990696315684,-0.717067157664007,-1.7282319119675615]


In [13]:
modelSummary = model.summary
print("Objective History", modelSummary.objectiveHistory)
print("Number of Iterations is ",modelSummary.totalIterations)

Objective History [0.23581786131403523, 0.23580931675532288, 0.2357844684998421, 0.23578153826655315, 0.2357774832561159, 0.23577553882853106, 0.23575592269598752, 0.23571847279018668, 0.2356081652133476, 0.23533935601775696, 0.23465909571090346, 0.233074638221403, 0.2298469418187082, 0.22521317735730695, 0.22277104098335837, 0.21714901864321998, 0.20543939322577454, 0.20541769138199, 0.20540848490538513, 0.20540608996512336, 0.20540526125177352, 0.2054023472615037, 0.2053962830942178, 0.2053790366611294, 0.2053356750249505, 0.20522264991633213, 0.20494261450790907, 0.20427965443798451, 0.20393628442217043, 0.20227740821547938, 0.1981875184173352, 0.19691004381535576, 0.19217931420713402, 0.18984666163417804, 0.1896464650042805, 0.18962716750337047, 0.18962431706375787, 0.18962371892669455, 0.1896232586786512, 0.18962191812859064, 0.18961842034510618, 0.18961075625682117, 0.1895794208848531, 0.18950742839706422, 0.18930640430833076, 0.1888122191161155, 0.1876346922411781, 0.18551697366

In [14]:
model_predictions = model.transform(test_df)
model_predictions.select("features", "qualityIndex", "prediction").show(5)

+--------------------+------------+----------+
|            features|qualityIndex|prediction|
+--------------------+------------+----------+
|[4.6,0.52,0.15,2....|         1.0|       0.0|
|[5.0,0.74,0.0,1.2...|         0.0|       0.0|
|[5.1,0.47,0.02,1....|         0.0|       0.0|
|[5.1,0.585,0.0,1....|         0.0|       0.0|
|[5.2,0.32,0.25,1....|         0.0|       0.0|
+--------------------+------------+----------+
only showing top 5 rows



In [15]:
accuracy = modelSummary.accuracy
fPR = modelSummary.weightedFalsePositiveRate
tPR = modelSummary.weightedTruePositiveRate
fMeasure = modelSummary.weightedFMeasure()
precision = modelSummary.weightedPrecision
recall = modelSummary.weightedRecall
print("Accuracy: {} False Positive Rate {} True Positive Rate {} F {} Precision {} Recall {}"\
      .format(accuracy, fPR, tPR, fMeasure, precision, recall))

Accuracy: 0.9475113122171945 False Positive Rate 0.8985878446501954 True Positive Rate 0.9475113122171945 F 0.9267671517671516 Precision 0.9234410550109313 Recall 0.9475113122171945


In [16]:
spark.stop()