In [1]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark import SparkContext
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("RThp").enableHiveSupport().getOrCreate()

df = spark.read.load('/container-data/winequality-white.csv',
                     format="csv", sep=";", inferSchema="true", header="true")

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/05 19:35:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/12/05 19:35:41 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
21/12/05 19:35:41 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [2]:

#transform into [label, [features]] format
from pyspark.ml.feature import VectorAssembler
feature_columns = df.columns[:-1]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df = assembler.transform(df)

In [3]:
from pyspark.sql.functions import col,row_number

df_format = df.select("features", "quality")
df_format = df_format.withColumnRenamed("quality", "label")
df_format.show()

training , test = df_format.randomSplit([0.8, 0.2], seed = 2018)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[7.0,0.27,0.36,20...|    6|
|[6.3,0.3,0.34,1.6...|    6|
|[8.1,0.28,0.4,6.9...|    6|
|[7.2,0.23,0.32,8....|    6|
|[7.2,0.23,0.32,8....|    6|
|[8.1,0.28,0.4,6.9...|    6|
|[6.2,0.32,0.16,7....|    6|
|[7.0,0.27,0.36,20...|    6|
|[6.3,0.3,0.34,1.6...|    6|
|[8.1,0.22,0.43,1....|    6|
|[8.1,0.27,0.41,1....|    5|
|[8.6,0.23,0.4,4.2...|    5|
|[7.9,0.18,0.37,1....|    5|
|[6.6,0.16,0.4,1.5...|    7|
|[8.3,0.42,0.62,19...|    5|
|[6.6,0.17,0.38,1....|    7|
|[6.3,0.48,0.04,1....|    6|
|[6.2,0.66,0.48,1....|    8|
|[7.4,0.34,0.42,1....|    6|
|[6.5,0.31,0.14,7....|    5|
+--------------------+-----+
only showing top 20 rows



In [50]:
#seach for the best hyperpara 
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10)
#pipeline = Pipeline(stages=[rf])
paramGrid = ParamGridBuilder().addGrid(rf.numTrees, [10, 15, 20, 25, 30, 35, 40]).build()

crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=4) 

cvModel = crossval.fit(training)


In [54]:

#extract the best parameters and save the model 
bestModel = cvModel.bestModel
print ('Best Param (numTrees): ', bestModel.getNumTrees )
cvModel.write().overwrite().save("RfModel")

Best Param (numTrees):  10


In [55]:
#test the tunned model 
from pyspark.ml.tuning import CrossValidatorModel

loadedModel = CrossValidatorModel.load("RfModel")
loadedModel = loadedModel.bestModel
predictions = loadedModel.transform(test)
predictions.show(1)

trainingSummary = predictions.summary
test.show(1)

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[3.8,0.31,0.02,11...|    6|[0.0,0.0,0.0,0.03...|[0.0,0.0,0.0,0.00...|       6.0|
+--------------------+-----+--------------------+--------------------+----------+
only showing top 1 row

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[3.8,0.31,0.02,11...|    6|
+--------------------+-----+
only showing top 1 row



In [59]:
from pyspark.mllib.util import MLUtils
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import DoubleType

predictions_format = predictions.select("prediction", "label")
predictions_format = predictions_format.withColumn("label", predictions_format["label"].cast("double"))
predictions_format.show(1)

+----------+-----+
|prediction|label|
+----------+-----+
|       6.0|  6.0|
+----------+-----+
only showing top 1 row



In [60]:
rdd = predictions_format.rdd.map(lambda x: (x[0], x[1]))

for element in rdd.take(5):
    print(element)

(6.0, 6.0)
(5.0, 5.0)
(7.0, 7.0)
(6.0, 5.0)
(6.0, 7.0)


In [63]:
metrics = MulticlassMetrics(rdd)

for x in range(1, 10, 1):
    try:
        print('Lable: ', x)
        print('fMeasure: ',  metrics.fMeasure(float(x),1.0))
    except:
        print('fMeasure: 0.00')

fMeasure:  0.0
fMeasure:  0.583011583011583
fMeasure:  0.6475485661424606
fMeasure:  0.3020408163265307
fMeasure:  0.0
