# **View the Dataset**

In [1]:
import pandas as pd

In [2]:
df = pd.read_csv('winequality-red.csv')

In [9]:
df.head()

Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
0,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5
1,7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8,5
2,7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8,5
3,11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8,6
4,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5


# **Spark Session**

In [6]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=f797932bded31545102a2a8ae11ec1a42bee9c6fc9277bb453e92231c4d805f7
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [7]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer, VectorIndexer, IndexToString, VectorAssembler
from pyspark.sql.functions import col

In [8]:
### Initializing the spark session
spark = SparkSession.builder.appName("RandomForestClassification").getOrCreate()

In [10]:
#### loading the data set

data_path = "winequality-red.csv"
data = spark.read.csv(data_path, header=True, inferSchema=True)
data = data.withColumnRenamed("quality",'label')

In [11]:
#### Index Labels, adding metadata to the label column
labelIndexer = StringIndexer(inputCol= "label", outputCol="indexedLabel").fit(data)

In [12]:
### convert feature columns into single feature vector

featureColumns = [c for c in data.columns if c!="label"]
assembler = VectorAssembler(inputCols=featureColumns, outputCol = "features")
data = assembler.transform(data)

In [22]:
#### automatically identify categorical features and index them

featureIndexer = VectorIndexer(inputCol = "features", outputCol= "indexedFeatures", maxCategories=4).fit(data)

In [23]:
#### splitting the dataset

(trainingData, testData) = data.randomSplit([0.7,0.3])

In [24]:
### train a randomforest model

rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=15)

In [25]:
## convert indexed labels back to original labels
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels = labelIndexer.labels)

In [26]:
### chain Indexers and forest in a pipeline

pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

In [27]:
#### train the model. this also runs the indexer

model = pipeline.fit(trainingData)

In [28]:
##### make the predictions

predictions = model.transform(testData)


In [30]:
#### select example rows to display

predictions.select("predictedLabel", "label", "features").show(15)

+--------------+-----+--------------------+
|predictedLabel|label|            features|
+--------------+-----+--------------------+
|             6|    6|[4.7,0.6,0.17,2.3...|
|             6|    6|[5.0,0.74,0.0,1.2...|
|             5|    5|[5.2,0.32,0.25,1....|
|             6|    6|[5.2,0.34,0.0,1.8...|
|             6|    7|[5.2,0.48,0.04,1....|
|             7|    7|[5.3,0.47,0.11,2....|
|             6|    7|[5.4,0.42,0.27,2....|
|             6|    6|[5.4,0.74,0.09,1....|
|             7|    7|[5.4,0.835,0.08,1...|
|             6|    5|[5.6,0.5,0.09,2.3...|
|             6|    5|[5.6,0.5,0.09,2.3...|
|             5|    5|[5.6,0.54,0.04,1....|
|             5|    5|[5.6,0.615,0.0,1....|
|             6|    5|[5.6,0.66,0.0,2.5...|
|             7|    8|[5.6,0.85,0.05,1....|
+--------------+-----+--------------------+
only showing top 15 rows



In [32]:
### select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel",predictionCol="prediction",metricName="accuracy")

accuracy = evaluator.evaluate(predictions)
print("Test error = %g" % (1.0-accuracy))

Test error = 0.390144
