In [2]:
from __future__ import print_function
import findspark
import os
os.environ['SPARK_HOME'] = r'/Users/subham/Downloads/spark-3.0.0-bin-hadoop2.7'
findspark.init()
findspark.find()
import pyspark
findspark.find()
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.feature import StandardScaler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer

In [3]:
if __name__ == "__main__":
    spark = SparkSession\
        .builder\
        .appName("MultiClass")\
        .getOrCreate()

In [4]:
dataset = spark.read.csv("/Users/subham/Desktop/PySpark/winequality_red.csv",header=True)

In [5]:
dataset.show()

+-------------+----------------+-----------+--------------+-------------------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|          chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+-------------------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|        0.0|           1.9|              0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|
|          7.8|            0.88|        0.0|           2.6|              0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|
|          7.8|            0.76|       0.04|           2.3|              0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|      5|
|         11.2|            0.28|       0

In [6]:
dataset.printSchema()

root
 |-- fixed acidity: string (nullable = true)
 |-- volatile acidity: string (nullable = true)
 |-- citric acid: string (nullable = true)
 |-- residual sugar: string (nullable = true)
 |-- chlorides: string (nullable = true)
 |-- free sulfur dioxide: string (nullable = true)
 |-- total sulfur dioxide: string (nullable = true)
 |-- density: string (nullable = true)
 |-- pH: string (nullable = true)
 |-- sulphates: string (nullable = true)
 |-- alcohol: string (nullable = true)
 |-- quality: string (nullable = true)



## Convert to numerical

In [7]:
from pyspark.sql.functions import col
new_data = dataset.select(*(col(c).cast("float").alias(c) for c in dataset.columns))

In [8]:
new_data.printSchema()

root
 |-- fixed acidity: float (nullable = true)
 |-- volatile acidity: float (nullable = true)
 |-- citric acid: float (nullable = true)
 |-- residual sugar: float (nullable = true)
 |-- chlorides: float (nullable = true)
 |-- free sulfur dioxide: float (nullable = true)
 |-- total sulfur dioxide: float (nullable = true)
 |-- density: float (nullable = true)
 |-- pH: float (nullable = true)
 |-- sulphates: float (nullable = true)
 |-- alcohol: float (nullable = true)
 |-- quality: float (nullable = true)



## Correlation among features

In [31]:
from pyspark.mllib.stat import Statistics
from pyspark import SparkContext
from pyspark import SparkConf
import pandas as pd

###  We need to convert dataframe intp a RDD to check for correlation

In [29]:
col_names = new_data.columns
features_set = new_data.rdd.map(lambda row: row[0:])

In [32]:
corr_mat=Statistics.corr(features_set, method="pearson")
corr_df = pd.DataFrame(corr_mat)
corr_df.index, corr_df.columns = col_names, col_names

## Dataframe to Heatmap

In [33]:
corr_df.style.background_gradient(cmap='Blues')

Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
fixed acidity,1.0,-0.256131,0.671703,0.114777,0.0937052,-0.153794,-0.113181,0.668047,-0.682978,0.183006,-0.0616683,0.124052
volatile acidity,-0.256131,1.0,-0.552496,0.00191788,0.0612978,-0.0105038,0.07647,0.0220263,0.234937,-0.260987,-0.202288,-0.390558
citric acid,0.671703,-0.552496,1.0,0.143577,0.203823,-0.0609781,0.035533,0.364947,-0.541904,0.31277,0.109903,0.226373
residual sugar,0.114777,0.00191788,0.143577,1.0,0.0556095,0.187049,0.203028,0.355283,-0.0856524,0.00552712,0.0420754,0.0137316
chlorides,0.0937052,0.0612978,0.203823,0.0556095,1.0,0.00556215,0.0474005,0.200632,-0.265026,0.37126,-0.221141,-0.128907
free sulfur dioxide,-0.153794,-0.0105038,-0.0609781,0.187049,0.00556215,1.0,0.667666,-0.0219458,0.0703775,0.0516576,-0.0694084,-0.0506561
total sulfur dioxide,-0.113181,0.07647,0.035533,0.203028,0.0474005,0.667666,1.0,0.0712699,-0.0664946,0.0429468,-0.205654,-0.1851
density,0.668047,0.0220263,0.364947,0.355283,0.200632,-0.0219458,0.0712699,1.0,-0.341699,0.148507,-0.49618,-0.174919
pH,-0.682978,0.234937,-0.541904,-0.0856524,-0.265026,0.0703775,-0.0664946,-0.341699,1.0,-0.196648,0.205633,-0.0577314
sulphates,0.183006,-0.260987,0.31277,0.00552712,0.37126,0.0516576,0.0429468,0.148507,-0.196648,1.0,0.0935948,0.251397


## Checking for Null values

In [9]:
from pyspark.sql.functions import col, count, isnan, when
#checking for null ir nan type values in our columns
new_data.select([count(when(col(c).isNull(), c)).alias(c) for c in new_data.columns]).show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+---+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density| pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+---+---------+-------+-------+
|            0|               0|          0|             0|        0|                  0|                   0|      0|  0|        0|      0|      0|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+---+---------+-------+-------+



## Vector Assembler

In [10]:
cols=new_data.columns
cols.remove("quality")
assembler = VectorAssembler(inputCols=cols,outputCol="features")
# Now let us use the transform method to transform our dataset
data=assembler.transform(new_data)
data = data.select("features",'quality')

In [11]:
data.show(5)

+--------------------+-------+
|            features|quality|
+--------------------+-------+
|[7.40000009536743...|    5.0|
|[7.80000019073486...|    5.0|
|[7.80000019073486...|    5.0|
|[11.1999998092651...|    6.0|
|[7.40000009536743...|    5.0|
+--------------------+-------+
only showing top 5 rows



## Label Encode the output column

In [12]:
stringIndexer = StringIndexer(inputCol="quality", outputCol="quality_index")
data_indexed = stringIndexer.fit(data).transform(data)

In [13]:
data_indexed.show()

+--------------------+-------+-------------+
|            features|quality|quality_index|
+--------------------+-------+-------------+
|[7.40000009536743...|    5.0|          0.0|
|[7.80000019073486...|    5.0|          0.0|
|[7.80000019073486...|    5.0|          0.0|
|[11.1999998092651...|    6.0|          1.0|
|[7.40000009536743...|    5.0|          0.0|
|[7.40000009536743...|    5.0|          0.0|
|[7.90000009536743...|    5.0|          0.0|
|[7.30000019073486...|    7.0|          2.0|
|[7.80000019073486...|    7.0|          2.0|
|[7.5,0.5,0.360000...|    5.0|          0.0|
|[6.69999980926513...|    5.0|          0.0|
|[7.5,0.5,0.360000...|    5.0|          0.0|
|[5.59999990463256...|    5.0|          0.0|
|[7.80000019073486...|    5.0|          0.0|
|[8.89999961853027...|    5.0|          0.0|
|[8.89999961853027...|    5.0|          0.0|
|[8.5,0.2800000011...|    7.0|          2.0|
|[8.10000038146972...|    5.0|          0.0|
|[7.40000009536743...|    4.0|          3.0|
|[7.900000

## Train - Test split

In [14]:
(train, test) = data_indexed.randomSplit([0.7, 0.3])

## Standard Scaling features

In [34]:
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
scalerModel = scaler.fit(train)
train_df_sc = scalerModel.transform(train)
test_df_sc = scalerModel.transform(test)

## Naive Bayes Classifier

In [35]:
naive_bayes = NaiveBayes(featuresCol='scaledFeatures',labelCol='quality_index',smoothing=1.0)

In [37]:
model = naive_bayes.fit(train_df_sc) 

In [38]:
# select example rows to display.
predictions = model.transform(test_df_sc)

In [42]:
predictions.show(5)

+--------------------+-------+-------------+--------------------+--------------------+--------------------+----------+
|            features|quality|quality_index|      scaledFeatures|       rawPrediction|         probability|prediction|
+--------------------+-------+-------------+--------------------+--------------------+--------------------+----------+
|[4.90000009536743...|    7.0|          2.0|[2.79767623385634...|[-262.36729889590...|[0.36268105737131...|       1.0|
|[5.0,0.3799999952...|    6.0|          1.0|[2.85477161163867...|[-268.76797160764...|[0.43299340010681...|       1.0|
|[5.0,0.4000000059...|    6.0|          1.0|[2.85477161163867...|[-291.05979755220...|[0.39923083061211...|       1.0|
|[5.09999990463256...|    6.0|          1.0|[2.91186698942099...|[-257.37235092684...|[0.40882905204407...|       1.0|
|[5.09999990463256...|    7.0|          2.0|[2.91186698942099...|[-274.95379627286...|[0.50771837288832...|       0.0|
+--------------------+-------+-------------+----

In [40]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="quality_index", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)


In [41]:
print ("Accuracy",accuracy)

Accuracy 0.4734607218683652


## Random Forest Classifier

In [49]:
from pyspark.ml.classification import RandomForestClassifier
random_forest_classifier = RandomForestClassifier(labelCol="quality_index", featuresCol="scaledFeatures", numTrees=100)
model = random_forest_classifier.fit(train_df_sc)
predictions = model.transform(test_df_sc)

In [50]:
predictions.show(5)

+--------------------+-------+-------------+--------------------+--------------------+--------------------+----------+
|            features|quality|quality_index|      scaledFeatures|       rawPrediction|         probability|prediction|
+--------------------+-------+-------------+--------------------+--------------------+--------------------+----------+
|[4.90000009536743...|    7.0|          2.0|[2.79767623385634...|[12.1716008069178...|[0.12171600806917...|       1.0|
|[5.0,0.3799999952...|    6.0|          1.0|[2.85477161163867...|[10.6486147548880...|[0.10648614754888...|       1.0|
|[5.0,0.4000000059...|    6.0|          1.0|[2.85477161163867...|[13.0231324675434...|[0.13023132467543...|       1.0|
|[5.09999990463256...|    6.0|          1.0|[2.91186698942099...|[17.0734262154011...|[0.17073426215401...|       1.0|
|[5.09999990463256...|    7.0|          2.0|[2.91186698942099...|[14.6131621003241...|[0.14613162100324...|       1.0|
+--------------------+-------+-------------+----

In [51]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="quality_index", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)


In [52]:
print ("Accuracy",accuracy)

Accuracy 0.6050955414012739


In [53]:
spark.stop()