In [82]:
import quinn
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, Normalizer, StandardScaler
from pyspark.ml import Pipeline


from pyspark.ml.classification import LogisticRegression, RandomForestClassifier

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [49]:
spark = SparkSession \
    .builder \
    .appName("CS643_Wine_Quality") \
    .getOrCreate()

def remove_quotations(s):
    return s.replace('"', '')

train_file = './data/TrainingDataSet.csv'
validation_file = './data/ValidationDataset.csv'

train_df = spark.read.format('csv').options(header='true', inferSchema='true', sep=';').load(train_file)
validation_df = spark.read.format('csv').options(header='true', inferSchema='true', sep=';').load(validation_file)


train_df = quinn.with_columns_renamed(remove_quotations)(train_df)
train_df = train_df.withColumnRenamed('quality', 'label')

validation_df = quinn.with_columns_renamed(remove_quotations)(validation_df)
validation_df = validation_df.withColumnRenamed('quality', 'label')


assembler = VectorAssembler(
    inputCols=["fixed acidity",
               "volatile acidity",
               "citric acid",
               "residual sugar",
               "chlorides",
               "free sulfur dioxide",
               "total sulfur dioxide",
               "density",
               "pH",
               "sulphates",
               "alcohol"],
                outputCol="inputFeatures")

scaler = Normalizer(inputCol="inputFeatures", outputCol="features")
paramgrid = ParamGridBuilder().build()
evaluator = MulticlassClassificationEvaluator(metricName="f1")


In [50]:
lr = LogisticRegression()
pipeline1 = Pipeline(stages=[assembler, scaler, lr])
crossval = CrossValidator(estimator=pipeline1,  
                         estimatorParamMaps=paramgrid,
                         evaluator=evaluator, 
                         numFolds=3
                        )
cvModel1 = crossval.fit(train_df) 
print("F1 Score for LogisticRegression Model: ", evaluator.evaluate(cvModel1.transform(validation_df)))


F1 Score for LogisticRegression Model:  0.5729445029855991


In [51]:
rf = RandomForestClassifier()
pipeline2 = Pipeline(stages=[assembler, scaler, rf])
crossval = CrossValidator(estimator=pipeline2,  
                         estimatorParamMaps=paramgrid,
                         evaluator=evaluator, 
                         numFolds=3
                        )

cvModel2 = crossval.fit(train_df) 
print("F1 Score for RandomForestClassifier Model: ", evaluator.evaluate(cvModel2.transform(validation_df)))


F1 Score for RandomForestClassifier Model:  0.5160475076818385


In [57]:


linearRegression = LinearRegression()
pipeline3 = Pipeline(stages=[assembler, scaler, linearRegression])

crossval = CrossValidator(estimator=pipeline3,  
                         estimatorParamMaps=paramgrid,
                         evaluator=evaluator, 
                         numFolds=3
                        )

cvModel3 = crossval.fit(train_df) 
print("F1 Score for RandomForestClassifier Model: ", evaluator.evaluate(cvModel3.transform(validation_df)))


F1 Score for RandomForestClassifier Model:  0.0


In [62]:

def doIt(model, paramgrid, validation_df):
    pipeline = Pipeline(stages=[assembler, scaler, model])
    crossval = CrossValidator(estimator=pipeline,  
                         estimatorParamMaps=paramgrid,
                         evaluator=evaluator, 
                         numFolds=3
                        )
    cvModel = crossval.fit(train_df)
    print(f"F1 Score Model: ", evaluator.evaluate(cvModel.transform(validation_df)))

In [63]:
doIt(RandomForestClassifier(), paramgrid, validation_df)

F1 Score Model:  0.5160475076818385


In [64]:
doIt(LogisticRegression(), paramgrid, validation_df)

F1 Score Model:  0.5729445029855991


In [65]:
doIt(LinearRegression(), paramgrid, validation_df)

F1 Score Model:  0.0


In [75]:
from pyspark.ml.classification import GBTClassifier, DecisionTreeClassifier,MultilayerPerceptronClassifier
from pyspark.ml.regression import GBTRegressor

In [76]:
doIt(GBTRegressor(), paramgrid, validation_df)

F1 Score Model:  0.0


In [89]:
lr = LinearRegression(maxIter=30, regParam=0.3, elasticNetParam=0.3, featuresCol="features", labelCol="label")
lrModel = lr.fit(validation_df)
predictionsDF = lrModel.transform(train_df)
predictionsDF.show()

IllegalArgumentException: features does not exist. Available: fixed acidity, volatile acidity, citric acid, residual sugar, chlorides, free sulfur dioxide, total sulfur dioxide, density, pH, sulphates, alcohol, label