In [11]:
from pyspark.sql import SparkSession

In [12]:
spark = SparkSession.builder.getOrCreate()

In [13]:
from IPython.display import display

In [14]:
inputDF = spark.read.csv("winequality-white.csv", header=True, inferSchema=True, sep=";")

In [15]:
inputDF.printSchema()
print("Rows: %s" % inputDF.count())

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)

Rows: 4898


In [16]:
display(inputDF.limit(5))

DataFrame[fixed acidity: double, volatile acidity: double, citric acid: double, residual sugar: double, chlorides: double, free sulfur dioxide: double, total sulfur dioxide: double, density: double, pH: double, sulphates: double, alcohol: double, quality: int]

In [17]:
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [18]:
featureColumns = [c for c in inputDF.columns if c != 'quality']

In [19]:
assembler = VectorAssembler(inputCols=featureColumns, 
                            outputCol="features")

In [20]:
dataDF = assembler.transform(inputDF)
dataDF.printSchema()

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)
 |-- features: vector (nullable = true)



In [21]:
display(dataDF.limit(3))

DataFrame[fixed acidity: double, volatile acidity: double, citric acid: double, residual sugar: double, chlorides: double, free sulfur dioxide: double, total sulfur dioxide: double, density: double, pH: double, sulphates: double, alcohol: double, quality: int, features: vector]

In [23]:
from pyspark.ml.regression import LinearRegression

In [24]:
lr = LinearRegression(maxIter=30, regParam=0.3, elasticNetParam=0.3, featuresCol="features", labelCol="quality")
lrModel = lr.fit(dataDF)

In [27]:
for t in zip(featureColumns, lrModel.coefficients): print(t)

('fixed acidity', 0.0)
('volatile acidity', -0.7916891710244995)
('citric acid', 0.0)
('residual sugar', 0.0)
('chlorides', -0.10550323778501457)
('free sulfur dioxide', 0.0)
('total sulfur dioxide', 0.0)
('density', 0.0)
('pH', 0.0)
('sulphates', 0.0)
('alcohol', 0.1972647137835092)


In [28]:
predictionsDF = lrModel.transform(dataDF)
display(predictionsDF.limit(3))

DataFrame[fixed acidity: double, volatile acidity: double, citric acid: double, residual sugar: double, chlorides: double, free sulfur dioxide: double, total sulfur dioxide: double, density: double, pH: double, sulphates: double, alcohol: double, quality: int, features: vector, prediction: double]

In [29]:
from pyspark.ml.evaluation import RegressionEvaluator

In [30]:
evaluator = RegressionEvaluator(
    labelCol='quality', predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictionsDF)
print("Root Mean Squared Error (RMSE) = %g" % rmse)

Root Mean Squared Error (RMSE) = 0.794772


In [31]:
from pyspark.sql.functions import *

In [32]:
avgQuality = inputDF.groupBy().avg('quality').first()[0]
print(avgQuality)

5.87790935075541


In [33]:
zeroModelPredictionsDF = dataDF.select(col('quality'), lit(avgQuality).alias('prediction'))

In [34]:
zeroModelRmse = evaluator.evaluate(zeroModelPredictionsDF)
print("RMSE of 'zero model' = %g" % zeroModelRmse)

RMSE of 'zero model' = 0.885548


In [35]:
(trainingDF, testDF) = inputDF.randomSplit([0.7, 0.3])

In [36]:
from pyspark.ml import Pipeline

In [37]:
pipeline = Pipeline(stages=[assembler, lr])

In [38]:
lrPipelineModel = pipeline.fit(trainingDF)

In [39]:
traningPredictionsDF = lrPipelineModel.transform(trainingDF)
testPredictionsDF = lrPipelineModel.transform(testDF)

In [40]:
print("RMSE on traning data = %g" % evaluator.evaluate(traningPredictionsDF))
print("RMSE on test data = %g" % evaluator.evaluate(testPredictionsDF))

RMSE on traning data = 0.787533
RMSE on test data = 0.795952


In [41]:
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator

In [53]:
search_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.0, 0.3, 0.6]) \
    .addGrid(lr.elasticNetParam, [0.4, 0.6, 0.8]).build()

In [43]:
cv = CrossValidator(estimator = pipeline, estimatorParamMaps = search_grid, evaluator = evaluator, numFolds = 3)
cvModel = cv.fit(trainingDF)

In [44]:
cvTestPredictionsDF = cvModel.transform(testDF)
print("RMSE on test data with CV = %g" % evaluator.evaluate(cvTestPredictionsDF))

RMSE on test data with CV = 0.776433


In [45]:
print(cvModel.avgMetrics)

[0.7496649481492527, 0.7496649481492527, 0.7496649481492527, 0.7947704694542455, 0.8143814406229279, 0.8302341641276376, 0.842161087430805, 0.8735265613011516, 0.889300531170492]


In [46]:
from pyspark.ml.regression import RandomForestRegressor

In [47]:
rf = RandomForestRegressor(featuresCol="features", labelCol="quality", numTrees=100, maxBins=128, maxDepth=20, \
                           minInstancesPerNode=5, seed=33)
rfPipeline = Pipeline(stages=[assembler, rf])

In [48]:
rfPipelineModel = rfPipeline.fit(trainingDF)

In [49]:
rfTrainingPredictions = rfPipelineModel.transform(trainingDF)
rfTestPredictions = rfPipelineModel.transform(testDF)
print("Random Forest RMSE on traning data = %g" % evaluator.evaluate(rfTrainingPredictions))
print("Random Forest RMSE on test data = %g" % evaluator.evaluate(rfTestPredictions))

Random Forest RMSE on traning data = 0.406653
Random Forest RMSE on test data = 0.649047


In [50]:
rfModel = rfPipelineModel.stages[1]
rfModel.featureImportances

SparseVector(11, {0: 0.0554, 1: 0.121, 2: 0.0612, 3: 0.0705, 4: 0.0769, 5: 0.1062, 6: 0.0652, 7: 0.1147, 8: 0.0634, 9: 0.0514, 10: 0.2141})