In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark =  SparkSession.builder.appName("My app").getOrCreate()

In [4]:
df = spark.read.csv('winequality-red.csv',inferSchema=True,header=True)

In [5]:
df

DataFrame[fixed acidity: double, volatile acidity: double, citric acid: double, residual sugar: double, chlorides: double, free sulfur dioxide: double, total sulfur dioxide: double, density: double, pH: double, sulphates: double, alcohol: double, quality: int]

In [6]:
df.show()

+-------------+----------------+-----------+--------------+-------------------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|          chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+-------------------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|        0.0|           1.9|              0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|
|          7.8|            0.88|        0.0|           2.6|              0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|
|          7.8|            0.76|       0.04|           2.3|              0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|      5|
|         11.2|            0.28|       0

In [10]:
df.select('quality').distinct().show()

+-------+
|quality|
+-------+
|      6|
|      3|
|      5|
|      4|
|      8|
|      7|
+-------+



In [13]:
df.printSchema()

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)



In [14]:
from pyspark.ml.feature import VectorAssembler
featureassembler=VectorAssembler(inputCols=["fixed acidity","volatile acidity","citric acid","residual sugar","chlorides","free sulfur dioxide","total sulfur dioxide","density","pH","sulphates","alcohol"],outputCol="Independent Features")

In [15]:
output=featureassembler.transform(df)

In [16]:
output.show()

+-------------+----------------+-----------+--------------+-------------------+-------------------+--------------------+-------+----+---------+-------+-------+--------------------+
|fixed acidity|volatile acidity|citric acid|residual sugar|          chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|Independent Features|
+-------------+----------------+-----------+--------------+-------------------+-------------------+--------------------+-------+----+---------+-------+-------+--------------------+
|          7.4|             0.7|        0.0|           1.9|              0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|[7.4,0.7,0.0,1.9,...|
|          7.8|            0.88|        0.0|           2.6|              0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|[7.8,0.88,0.0,2.6...|
|          7.8|            0.76|       0.04|           2.3|              0.092|               1

In [17]:
output.columns

['fixed acidity',
 'volatile acidity',
 'citric acid',
 'residual sugar',
 'chlorides',
 'free sulfur dioxide',
 'total sulfur dioxide',
 'density',
 'pH',
 'sulphates',
 'alcohol',
 'quality',
 'Independent Features']

In [18]:
finalized_data=output.select("Independent Features","quality")

In [19]:
finalized_data.show()

+--------------------+-------+
|Independent Features|quality|
+--------------------+-------+
|[7.4,0.7,0.0,1.9,...|      5|
|[7.8,0.88,0.0,2.6...|      5|
|[7.8,0.76,0.04,2....|      5|
|[11.2,0.28,0.56,1...|      6|
|[7.4,0.7,0.0,1.9,...|      5|
|[7.4,0.66,0.0,1.8...|      5|
|[7.9,0.6,0.06,1.6...|      5|
|[7.3,0.65,0.0,1.2...|      7|
|[7.8,0.58,0.02,2....|      7|
|[7.5,0.5,0.36,6.1...|      5|
|[6.7,0.58,0.08,1....|      5|
|[7.5,0.5,0.36,6.1...|      5|
|[5.6,0.615,0.0,1....|      5|
|[7.8,0.61,0.29,1....|      5|
|[8.9,0.62,0.18,3....|      5|
|[8.9,0.62,0.19,3....|      5|
|[8.5,0.28,0.56,1....|      7|
|[8.1,0.56,0.28,1....|      5|
|[7.4,0.59,0.08,4....|      4|
|[7.9,0.32,0.51,1....|      6|
+--------------------+-------+
only showing top 20 rows



In [21]:
from pyspark.ml.classification import DecisionTreeClassifier
##train test split
train_data,test_data=finalized_data.randomSplit([0.75,0.25])
classifier=DecisionTreeClassifier(featuresCol='Independent Features', labelCol='quality')
classifier=classifier.fit(train_data)

In [24]:
pred_results=classifier.transform(test_data)

In [25]:
pred_results.predictions.show()

AttributeError: 'DataFrame' object has no attribute 'predictions'