### Create Dataset

When using a Domino on-demand Spark cluster, any data that will be used, created, or modified as part of the interaction must go into an external data store.

In this notebook we will be moving our data file from the imported git repo into a Domino Dataset and working with it there. 

When you create a Spark cluster attached to a Domino workspace or job, any Domino dataset accessible from the workspace or job will also be accessible from all components of the cluster under the same dataset mount path. Data can be accessed using the following path prefix:
`file:///`

For example, to read a file you would use the following:
`rdd = sc.textFile("file:///path/to/file")`


To read from other data sources, see our docs [here](https://docs.dominodatalab.com/en/latest/user_guide/a3b42e/work-with-data/). 

In [2]:
import os

dataset_path = '/mnt/data/' + os.environ['DOMINO_PROJECT_NAME'] + '/'

In [3]:
#set up data location 

#change the following line if you are not using the spark-quickstart-winequality repo as a Domino imported repo 
file_path = '/mnt/code/wine_quality.csv'

!cp $file_path $dataset_path

In [4]:
new_file_path = dataset_path + '/wine_quality.csv'

In [5]:
#Setup Pyspark
from pyspark.ml import Pipeline
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer

from pyspark.sql import SparkSession

spark = SparkSession \
        .builder \
        .appName("WineQualityApp") \
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/18 13:11:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
#Setup and Test Spark Context
sc = spark.sparkContext
sc

In [8]:
#Read Data
red_wine = spark.read.format('csv').options(header='true', inferSchema='true',sep=';').load(new_file_path)

                                                                                

In [9]:
#Profile Data
red_wine.printSchema()
print("Rows: %s" % red_wine.count())

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)



[Stage 2:>                                                          (0 + 1) / 1]

Rows: 1599


                                                                                

In [10]:
#Feature Extraction into Vectors
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

# select the columns to be used as the features (all except `quality`)
featureColumns = [c for c in red_wine.columns if c != 'quality']

# create and configure the assembler
assembler = VectorAssembler(inputCols=featureColumns, 
                            outputCol="features")

# transform the original data
dataDF = assembler.transform(red_wine)
dataDF.printSchema()

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)
 |-- features: vector (nullable = true)



In [11]:
dataDF.show()

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+--------------------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|            features|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+--------------------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|[7.4,0.7,0.0,1.9,...|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|[7.8,0.88,0.0,2.6...|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|    

In [12]:
from pyspark.ml.regression import LinearRegression

# fit a `LinearRegression` model using features in colum `features` and label in column `quality`
lr = LinearRegression(maxIter=30, regParam=0.3, elasticNetParam=0.3, featuresCol="features", labelCol="quality")
lrModel = lr.fit(dataDF)

                                                                                

In [13]:
for t in zip(featureColumns, lrModel.coefficients):
    print(t)

('fixed acidity', 0.0)
('volatile acidity', -0.7569245708155441)
('citric acid', 0.0)
('residual sugar', 0.0)
('chlorides', 0.0)
('free sulfur dioxide', 0.0)
('total sulfur dioxide', -0.00020326904604382344)
('density', 0.0)
('pH', 0.0)
('sulphates', 0.27384704513106206)
('alcohol', 0.19463722248781323)


In [14]:
# predict the quality, the predicted quality will be saved in `prediction` column
predictionsDF = lrModel.transform(dataDF)
display(predictionsDF.show())

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+--------------------+-----------------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|            features|       prediction|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+--------------------+-----------------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|[7.4,0.7,0.0,1.9,...|5.282241583202343|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|[7.8,0.88,0.0,2.6...|5.250003816346952|
|          7.8|            0.76|       0.04|           

None

In [15]:
from pyspark.ml.evaluation import RegressionEvaluator

# create a regression evaluator with RMSE metrics

evaluator = RegressionEvaluator(
    labelCol='quality', predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictionsDF)
print("Root Mean Squared Error (RMSE) = %g" % rmse)

Root Mean Squared Error (RMSE) = 0.683189


In [16]:
# split the input data into traning and test dataframes with 70% to 30% weights
(trainingDF, testDF) = red_wine.randomSplit([0.7, 0.3])

In [17]:
from pyspark.ml import Pipeline

# construct the `Pipeline` that with two stages: the `vector assembler` and `regresion model estimator`
pipeline = Pipeline(stages=[assembler, lr])

# train the pipleline on the traning data
lrPipelineModel = pipeline.fit(trainingDF)

# make predictions
traningPredictionsDF = lrPipelineModel.transform(trainingDF)
testPredictionsDF = lrPipelineModel.transform(testDF)

# evaluate the model on test and traning data
print("RMSE on training data = %g" % evaluator.evaluate(traningPredictionsDF))
print("RMSE on test data = %g" % evaluator.evaluate(testPredictionsDF))

RMSE on training data = 0.680599
RMSE on test data = 0.695666


In [18]:
#Run Random Forest
from pyspark.ml.regression import RandomForestRegressor

# define the random forest estimator
rf = RandomForestRegressor(featuresCol="features", labelCol="quality", numTrees=100, maxBins=128, maxDepth=20, \
                           minInstancesPerNode=5, seed=33)
rfPipeline = Pipeline(stages=[assembler, rf])

# train the random forest model
rfPipelineModel = rfPipeline.fit(trainingDF)

24/11/18 13:21:48 WARN DAGScheduler: Broadcasting large task binary with size 1406.2 KiB
24/11/18 13:21:49 WARN DAGScheduler: Broadcasting large task binary with size 2015.7 KiB
24/11/18 13:21:51 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
24/11/18 13:21:52 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
24/11/18 13:21:53 WARN DAGScheduler: Broadcasting large task binary with size 3.4 MiB
24/11/18 13:21:53 WARN DAGScheduler: Broadcasting large task binary with size 3.7 MiB
24/11/18 13:21:54 WARN DAGScheduler: Broadcasting large task binary with size 3.8 MiB
24/11/18 13:21:55 WARN DAGScheduler: Broadcasting large task binary with size 3.9 MiB
24/11/18 13:21:56 WARN DAGScheduler: Broadcasting large task binary with size 3.4 MiB
24/11/18 13:21:56 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
24/11/18 13:21:56 WARN DAGScheduler: Broadcasting large task binary with size 1047.3 KiB


In [19]:
#Test Accuracy of Random Forest
rfTrainingPredictions = rfPipelineModel.transform(trainingDF)
rfTestPredictions = rfPipelineModel.transform(testDF)
print("Random Forest RMSE on training data = %g" % evaluator.evaluate(rfTrainingPredictions))
print("Random Forest RMSE on test data = %g" % evaluator.evaluate(rfTestPredictions))



Random Forest RMSE on training data = 0.38196
Random Forest RMSE on test data = 0.602075


In [None]:
#stop Spark context
sc.stop()