   # Load the data



In [1]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *

import sys
import os
import pyspark

if ('sc' not in locals() or 'sc' not in globals()):
    os.environ['PYSPARK_PYTHON'] = '/usr/bin/python2'
    sc = pyspark.SparkContext('local[*]')


sqlContext = SQLContext(sc)
schema = StructType([ \
    StructField("fixed acidity", DoubleType(), True), \
    StructField("volatile acidity", DoubleType(), True), \
    StructField("citric acid", DoubleType(), True), \
    StructField("residual sugar", DoubleType(), True), \
    StructField("chlorides", DoubleType(), True), \
    StructField("free sulfur dioxide", DoubleType(), True), \
    StructField("total sulfur dioxide", DoubleType(), True), \
    StructField("density", DoubleType(), True), \
    StructField("pH", DoubleType(), True), \
    StructField("sulphates", DoubleType(), True), \
    StructField("alcohol", DoubleType(), True), \
    StructField("quality", StringType(), True)])


train_data = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .load('/data/wine_train_spark.csv', schema = schema)

test_data = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .load('/data/wine_test_spark.csv', schema = schema)

In [2]:
reduced_numeric_cols = ['fixed acidity', 'volatile acidity', 'citric acid', 'residual sugar',
       'chlorides', 'free sulfur dioxide', 'total sulfur dioxide', 'density',
       'pH', 'sulphates', 'alcohol']


In [3]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler

label_indexer = StringIndexer(inputCol = 'quality', outputCol = 'label')

assembler = VectorAssembler(
    inputCols = reduced_numeric_cols,
    outputCol = 'features')

## Naive Bayes

In [4]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes

classifier = NaiveBayes(labelCol = 'label', featuresCol = 'features', modelType='gaussian')

pipeline = Pipeline(stages=[label_indexer, assembler, classifier])

train = train_data
test = test_data
model = pipeline.fit(train)

In [5]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictions = model.transform(test)
evaluator = BinaryClassificationEvaluator()
auroc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
aupr = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"})
print("The AUROC is %s and the AUPR is %s." % (auroc, aupr))



from pyspark.ml.evaluation import MulticlassClassificationEvaluator

predictions = model.transform(test)
evaluator = MulticlassClassificationEvaluator()
acc = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})
print("The accuracy is %s and the f1 is %s." % (acc, f1))



pred_df = predictions.toPandas()
pred_df.to_csv('log/spark_pred_gnb.csv', header=True, index=False)

The AUROC is 0.7029711046310062 and the AUPR is 0.32589070689298616.
The accuracy is 0.8328125 and the f1 is 0.8452321757495944.


## Logistic Regression

In [6]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

# default parameter:
# self, featuresCol="features", labelCol="label", predictionCol="prediction",
# maxIter=100, regParam=0.1, elasticNetParam=0.0, tol=1e-6, fitIntercept=True,
# threshold=0.5, thresholds=None, probabilityCol="probability",
# rawPredictionCol="rawPrediction", standardization=True, weightCol=None

#accuracy: 0.870313	 f1: 0.848442
classifier = LogisticRegression(featuresCol="features", labelCol="label")
                      
pipeline = Pipeline(stages=[label_indexer, assembler, classifier])

train = train_data
test = test_data
model = pipeline.fit(train)

In [7]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

predictions = model.transform(test)
evaluator = MulticlassClassificationEvaluator()
acc = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})
print("accuracy: %f\t f1: %f" % (acc, f1))

pred_df = predictions.toPandas()
pred_df.to_csv('log/spark_pred_logreg.csv', header=True, index=False)

accuracy: 0.870313	 f1: 0.848442


## Linear Support Vector Machine

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LinearSVC

# default parameter:
# featuresCol=”features”, labelCol=”label”, predictionCol=”prediction”, maxIter=100, regParam=0.0, tol=1e-6, 
# rawPredictionCol=”rawPrediction”, fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, 
# aggregationDepth=2

# accuracy: 0.860938	 f1: 0.796602
classifier = LinearSVC( featuresCol="features", labelCol="label", regParam=1.0)

pipeline = Pipeline(stages=[label_indexer, assembler, classifier])

train = train_data
test = test_data
model = pipeline.fit(train)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

predictions = model.transform(test)
evaluator = MulticlassClassificationEvaluator()
acc = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})
print("accuracy: %f\t f1: %f" % (acc, f1))

pred_df = predictions.toPandas()
pred_df.to_csv('log/spark_pred_linsvm.csv', header=True, index=False)