In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, unix_timestamp, expr, desc, regexp_extract, isnan, when, count
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import GBTClassifier, GBTClassificationModel
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
import numpy as np

In [2]:
spark.stop()

In [3]:
spark = SparkSession \
    .builder \
    .config("spark.driver.maxResultSize",  "0") \
    .appName("09_red-vine-quality-prediction") \
    .master("yarn") \
    .getOrCreate()

In [4]:
spark

In [5]:
!hdfs dfs -ls /data/lsml/6-spark-ml/

Found 3 items
-rw-r--r--   3 hdfs lsml      28629 2021-05-10 01:24 /data/lsml/6-spark-ml/test.csv
-rw-r--r--   3 hdfs lsml      61194 2021-05-10 01:24 /data/lsml/6-spark-ml/train.csv
-rw-r--r--   3 hdfs lsml      84199 2022-02-13 22:33 /data/lsml/6-spark-ml/winequality-red.csv


In [6]:
dataset = spark.read.csv("hdfs:///data/lsml/6-spark-ml/winequality-red.csv", inferSchema = True, header=True, sep=";")

In [7]:
dataset.show(2)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
only showing top 2 rows



In [8]:
dataset.printSchema()

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)



In [9]:
df_count = dataset.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in dataset.columns])
df_count.toPandas()

Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
0,0,0,0,0,0,0,0,0,0,0,0,0


In [10]:
dataset.groupBy('quality').count().show()

+-------+-----+
|quality|count|
+-------+-----+
|      6|  638|
|      3|   10|
|      5|  681|
|      4|   53|
|      8|   18|
|      7|  199|
+-------+-----+



In [11]:
dataset.describe().toPandas()

Unnamed: 0,summary,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
0,count,1599.0,1599.0,1599.0,1599.0,1599.0,1599.0,1599.0,1599.0,1599.0,1599.0,1599.0,1599.0
1,mean,8.319637273295838,0.5278205128205131,0.2709756097560964,2.538805503439652,0.0874665415884925,15.87492182614134,46.46779237023139,0.9967466791744832,3.311113195747343,0.6581488430268921,10.422983114446502,5.636022514071295
2,stddev,1.7410963181276948,0.1790597041535352,0.1948011374053182,1.40992805950728,0.04706530201009,10.46015696980971,32.89532447829907,0.0018873339538427,0.1543864649035427,0.1695069795901101,1.0656675818473935,0.8075694397347051
3,min,4.6,0.12,0.0,0.9,0.012,1.0,6.0,0.99007,2.74,0.33,8.4,3.0
4,max,15.9,1.58,1.0,15.5,0.611,72.0,289.0,1.00369,4.01,2.0,14.9,8.0


In [12]:
train, test = dataset.randomSplit(weights=[0.7, 0.3], seed=42)

In [13]:
numerical = train.columns[:-1]

In [14]:
GBTR = GBTRegressor(maxIter=5, maxDepth=2, featuresCol="features", labelCol="label", seed=42)
stages = []

stages.append(StringIndexer(inputCol="quality", outputCol="label"))
assembler = VectorAssembler(inputCols=numerical, outputCol="features")
stages += [assembler] + [GBTR]

In [15]:
stages

[StringIndexer_465d830e9a69fd65aef0,
 VectorAssembler_49beb1ab877c0b8e30c5,
 GBTRegressor_4792a192cfab9e323c66]

In [16]:
pipeline = Pipeline(stages=stages)
model = pipeline.fit(train)
predictions = model.transform(test)

In [17]:
predictions.select('label','features', 'prediction').show()

+-----+--------------------+-------------------+
|label|            features|         prediction|
+-----+--------------------+-------------------+
|  1.0|[4.7,0.6,0.17,2.3...|  1.389217635997751|
|  2.0|[4.9,0.42,0.0,2.1...| 1.6544191557947636|
|  1.0|[5.0,0.38,0.01,1....| 1.6544191557947636|
|  0.0|[5.2,0.32,0.25,1....| 0.5446163070940966|
|  2.0|[5.2,0.48,0.04,1....|  1.389217635997751|
|  1.0|[5.4,0.74,0.09,1....| 0.9847167959588441|
|  4.0|[5.5,0.49,0.03,1....| 1.6544191557947636|
|  0.0|[5.6,0.31,0.37,1....|0.19371631410287427|
|  0.0|[5.6,0.54,0.04,1....| 1.0545973934713844|
|  0.0|[5.6,0.605,0.05,2...|  1.389217635997751|
|  1.0|[5.9,0.29,0.25,13...|  0.115863280459714|
|  1.0|[5.9,0.395,0.13,2...|  1.433879958020411|
|  0.0|[5.9,0.46,0.0,1.9...| 0.9847167959588441|
|  1.0|[5.9,0.61,0.08,2....| 1.1207625929421017|
|  3.0|[6.0,0.33,0.32,12...| 0.9847167959588441|
|  0.0|[6.0,0.5,0.0,1.4,...| 0.6224693407372568|
|  0.0|[6.0,0.5,0.04,2.2...| 0.5446163070940966|
|  1.0|[6.0,0.54,0.0

In [18]:
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = " , rmse)

Root Mean Squared Error (RMSE) on test data =  0.8326181541339553


In [19]:
with open("answers/week6/task2/output.txt", "w") as f:
    f.write(str(rmse))

In [20]:
!cat answers/week6/task2/output.txt

0.8326181541339553

In [21]:
paramGrid = ParamGridBuilder()\
    .addGrid(GBTR.maxDepth, [2, 5, 10, 20]) \
    .addGrid(GBTR.maxIter, [2, 5, 7, 10, 20])\
    .build()
    
tvs = TrainValidationSplit(estimator=GBTR,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)

stages = []

stages.append(StringIndexer(inputCol="quality", outputCol="label"))
assembler = VectorAssembler(inputCols=numerical, outputCol="features")
stages += [assembler]

pipeline = Pipeline(stages=stages)
model = pipeline.fit(train)
data = model.transform(train)
train_data = data.select('label', 'features')

data = model.transform(test)
test_data = data.select('label', 'features')

model = tvs.fit(train_data)

predictions = model.transform(test_data)
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = " , rmse)

Root Mean Squared Error (RMSE) on test data =  0.8273329922182245
