In [1]:
import pyspark 
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Wine Quality Prediction").enableHiveSupport().getOrCreate()

In [2]:
df = spark.read.csv('TrainingDataset.csv',header='true', inferSchema='true', sep=';')

In [3]:
new_column_name_list= list(map(lambda x: x.replace("\"\"", ""), df.columns))

df = df.toDF(*new_column_name_list)

df = df.withColumnRenamed("quality\"", "quality")

In [4]:
def isTasty(quality):
    if quality >= 7:
        return 1
    else:
        return 0

In [5]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
tasty_udf_int = udf(isTasty, IntegerType())

In [6]:
df_tasty = df.withColumn("tasty", tasty_udf_int('quality'))

In [7]:
featureColumns = ["alcohol", "volatile acidity", "sulphates", "citric acid", "total sulfur dioxide", "density"]

In [8]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=featureColumns, 
                            outputCol="features")

In [9]:
trainingData = assembler.transform(df_tasty).select('features', 'tasty')

In [10]:
df_test = spark.read.csv('ValidationDataset.csv',header='true', inferSchema='true', sep=';')

In [11]:
new_column_name_list= list(map(lambda x: x.replace("\"\"", ""), df.columns))

df_test = df_test.toDF(*new_column_name_list)

df_test = df_test.withColumnRenamed("quality\"", "quality")

In [12]:
df_test_tasty = df_test.withColumn("tasty", tasty_udf_int('quality'))

In [13]:
featureColumns_test = ["alcohol", "volatile acidity", "sulphates", "citric acid", "total sulfur dioxide", "density"]

In [14]:
assembler_test = VectorAssembler(inputCols=featureColumns_test, 
                            outputCol="features")

In [15]:
testData = assembler_test.transform(df_test_tasty).select('features', 'tasty')

In [20]:
from pyspark.ml.classification import GBTClassifier

In [21]:
gbt = GBTClassifier(maxIter=15).setLabelCol("tasty") 

In [22]:
gbtModel = gbt.fit(trainingData)

In [23]:
gbt_preds = gbtModel.transform(testData)

In [25]:
gbt_evaluator = MulticlassClassificationEvaluator(
    labelCol='tasty', predictionCol="prediction", metricName="f1")
gbt_f1 = gbt_evaluator.evaluate(gbt_preds)
print("f-score on GBT = %g" % gbt_f1)

f-score on GBT = 0.772111


In [29]:
from pyspark.ml.classification import RandomForestClassifier
rfc = RandomForestClassifier(featuresCol='features',labelCol='tasty', numTrees=28)

In [30]:
rfc_model = rfc.fit(trainingData)

In [31]:
rfc_preds = rfc_model.transform(testData)

In [33]:
rfc_evaluator = MulticlassClassificationEvaluator(
    labelCol='tasty', predictionCol="prediction", metricName="f1")
rfc_f1 = rfc_evaluator.evaluate(rfc_preds)
print("f-score on RFC = %g" % rfc_f1)

f-score on RFC = 0.840584


In [34]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'tasty', maxDepth =2)

In [35]:
dtModel = dt.fit(trainingData)

In [36]:
dt_preds = dtModel.transform(testData)

In [37]:
dt_evaluator = MulticlassClassificationEvaluator(
    labelCol='tasty', predictionCol="prediction", metricName="f1")
dt_f1 = dt_evaluator.evaluate(dt_preds)
print("f-score on DT = %g" % dt_f1)

f-score on DT = 0.843542


In [38]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol = 'tasty', maxIter=25)

In [39]:
lrModel = lr.fit(trainingData)

In [40]:
predictions = lrModel.transform(testData)

In [41]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
lr_evaluator = MulticlassClassificationEvaluator(
    labelCol='tasty', predictionCol="prediction", metricName="f1")
f1 = lr_evaluator.evaluate(predictions)
print("f-score on test data = %g" % f1)

f-score on test data = 0.870752
