In [None]:
!pip install pyspark



In [None]:
from pyspark import SparkContext
sc=SparkContext(master ='local[2]')
import findspark
findspark.init()
import pyspark
sc

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [None]:
filepath = "winequality-red.csv"
spark_df = spark.read.format('csv').options(header='true', inferSchema='true', delimiter=';').load(filepath)
spark_df.show(5, truncate=False)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|pH  |sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|7.4          |0.7             |0.0        |1.9           |0.076    |11.0               |34.0                |0.9978 |3.51|0.56     |9.4    |5      |
|7.8          |0.88            |0.0        |2.6           |0.098    |25.0               |67.0                |0.9968 |3.2 |0.68     |9.8    |5      |
|7.8          |0.76            |0.04       |2.3           |0.092    |15.0               |54.0                |0.997  |3.26|0.65     |9.8    |5      |
|11.2         |0.28            |0.56       |1.9           |0.075    |17.0               |60.0       

In [None]:
spark_df.select("quality").distinct().show()

+-------+
|quality|
+-------+
|      6|
|      3|
|      5|
|      4|
|      8|
|      7|
+-------+



In [None]:
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier

In [None]:
spark_df = spark_df.withColumn("alcohol", F.when(F.col("alcohol") > 10.5, "High").otherwise("Low"))
spark_df.show(3, truncate=False)
spark_df.groupby("alcohol").count().show(), spark_df.select("quality").distinct().show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|pH  |sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|7.4          |0.7             |0.0        |1.9           |0.076    |11.0               |34.0                |0.9978 |3.51|0.56     |Low    |5      |
|7.8          |0.88            |0.0        |2.6           |0.098    |25.0               |67.0                |0.9968 |3.2 |0.68     |Low    |5      |
|7.8          |0.76            |0.04       |2.3           |0.092    |15.0               |54.0                |0.997  |3.26|0.65     |Low    |5      |
+-------------+----------------+-----------+--------------+---------+-------------------+-----------

(None, None)

In [None]:
(train_df, test_df) = spark_df.randomSplit([0.8, 0.2], 11)
print("Number of train samples: " + str(train_df.count()))
print("Number of test samples: " + str(test_df.count()))

Number of train samples: 1279
Number of test samples: 320


In [None]:
# Label Encoding of categorical variables
alcohol_indexer = StringIndexer(inputCol="alcohol", outputCol="alcoholIndex")
alcohol_indexer = alcohol_indexer.fit(train_df)
train_df = alcohol_indexer.transform(train_df)

# Convert features into vecotor
inputCols = ['fixed acidity', 'volatile acidity', 'citric acid', 'residual sugar', 'chlorides',
             'free sulfur dioxide', 'total sulfur dioxide', 'density', 'pH', 'sulphates', 'alcoholIndex']
outputCol = "features"
vector_assembler = VectorAssembler(inputCols = inputCols, outputCol = outputCol)
train_df = vector_assembler.transform(train_df)

# Select feature vector and label
modeling_df = train_df.select(['features', 'quality'])

# Create DecisionTreeClassifier model
dt_model = DecisionTreeClassifier(labelCol="quality", featuresCol="features")

# Train model with Training Data
dt_model = dt_model.fit(modeling_df)

# Do predictions on train data
predictions = dt_model.transform(modeling_df)
predictions.show(5, truncate=False)

# Predictions on test data
test_df = alcohol_indexer.transform(test_df)
test_df = vector_assembler.transform(test_df)
test_predictions = dt_model.transform(test_df)

test_predictions.show(3, truncate=False)

+--------------------------------------------------------+-------+----------------------------------------+----------------------------------------------------------------------------------------------------------------------+----------+
|features                                                |quality|rawPrediction                           |probability                                                                                                           |prediction|
+--------------------------------------------------------+-------+----------------------------------------+----------------------------------------------------------------------------------------------------------------------+----------+
|[4.6,0.52,0.15,2.1,0.054,8.0,65.0,0.9934,3.9,0.56,1.0]  |4      |[0.0,0.0,0.0,3.0,4.0,31.0,11.0,1.0,0.0] |[0.0,0.0,0.0,0.06,0.08,0.62,0.22,0.02,0.0]                                                                            |5.0       |
|[4.7,0.6,0.17,2.3,0.058,17.0,106.0,0.9932,3.85,

In [None]:
(train_df, test_df) = spark_df.randomSplit([0.8, 0.2], 11)
print("Number of train samples: " + str(train_df.count()))
print("Number of test samples: " + str(test_df.count()))

Number of train samples: 1279
Number of test samples: 320


In [None]:
# Import pipeline from PySpark ML
from pyspark.ml import Pipeline

In [None]:
# StringIndexer
alcohol_indexer = StringIndexer(inputCol="alcohol", outputCol="alcoholIndex")

# VectorAssembler
inputCols = ['fixed acidity', 'volatile acidity', 'citric acid', 'residual sugar', 'chlorides',
             'free sulfur dioxide', 'total sulfur dioxide', 'density', 'pH', 'sulphates', 'alcoholIndex']
outputCol = "features"
vector_assembler = VectorAssembler(inputCols = inputCols, outputCol = outputCol)

# Modeling using DecisionTreeClassifier
dt_model = DecisionTreeClassifier(labelCol="quality", featuresCol="features")

In [None]:
# Setup the pipeline
pipeline = Pipeline(stages=[alcohol_indexer, vector_assembler, dt_model])
# Fit the pipeline model
final_pipeline = pipeline.fit(train_df)

# Predict on test data
test_predictions_from_pipeline = final_pipeline.transform(test_df)

test_predictions_from_pipeline.show(5, truncate=False)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+------------+---------------------------------------------------------+----------------------------------------+-----------------------------------------------------------------------------------------------------+----------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|pH  |sulphates|alcohol|quality|alcoholIndex|features                                                 |rawPrediction                           |probability                                                                                          |prediction|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+------------+---------------------------------------------------------+------------------------------

In [None]:
def accuracy_m(model):
    cm = model.select("quality", "prediction")
    acc = cm.filter(cm.quality == cm.prediction).count() / cm.count()
    print("Model with pipeline accuracy: %.3f%%" % (acc * 100))
accuracy_m(model = test_predictions_from_pipeline)

Model with pipeline accuracy: 61.250%


In [None]:
def accuracy_m(model):
    cm = model.select("quality", "prediction")
    acc = cm.filter(cm.quality == cm.prediction).count() / cm.count()
    print("Model without pipeline accuracy: %.3f%%" % (acc * 100))
accuracy_m(model = test_predictions)

Model without pipeline accuracy: 61.250%
