In [None]:

from pyspark.sql import SparkSession

# @hidden_cell
# This function is used to setup the access of Spark to your Object Storage. The definition contains your credentials.
# You might want to remove those credentials before you share your notebook.
def set_hadoop_config_with_credentials_b3df59f4cfa4437ba5a0d8341462d910(name):
    """This function sets the Hadoop configuration so it is possible to
    access data from Bluemix Object Storage using Spark"""

    prefix = 'fs.swift.service.' + name
    hconf = sc._jsc.hadoopConfiguration()
    hconf.set(prefix + '.auth.url', 'https://identity.open.softlayer.com'+'/v3/auth/tokens')
    hconf.set(prefix + '.auth.endpoint.prefix', 'endpoints')
    hconf.set(prefix + '.tenant', 'aef925cbc55c424b9be33df912da34c9')
    hconf.set(prefix + '.username', '482344af2f77465fb158947814a1d548')
    hconf.set(prefix + '.password', 'ExnZeWW7-2,.,S#t')
    hconf.setInt(prefix + '.http.port', 8080)
    hconf.set(prefix + '.region', 'dallas')
    hconf.setBoolean(prefix + '.public', False)

# you can choose any name
name = 'keystone'
set_hadoop_config_with_credentials_b3df59f4cfa4437ba5a0d8341462d910(name)

spark = SparkSession.builder.getOrCreate()

df_data_1 = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .option('inferschema', 'true')\
  .load('swift://DefaultProjectsushidharjayaramanmavsutaedu.' + name + '/white.csv')
df_data_1.take(5)


In [None]:
df_data_1.groupby('quality').count().toPandas()

In [None]:
stratified_CV_data = df_data_1.sampleBy('quality', fractions={'Low': 1060./1640, 'High': 1.0, 'Medium' : 1060./2198}).cache()

stratified_CV_data.groupby('quality').count().toPandas()

In [None]:
%matplotlib inline
import pandas as pd
pd.options.display.mpl_style = 'default'

In [None]:
numeric_features = [t[0] for t in stratified_CV_data.dtypes if t[1] == 'int' or t[1] == 'double']

import seaborn as sns
sampled_data = stratified_CV_data.select(numeric_features).sample(False, 0.10).toPandas()
corr = sampled_data.corr()
sns.heatmap(corr, 
        xticklabels=corr.columns,
        yticklabels=corr.columns)

In [None]:
#final_CV_data = stratified_CV_data.drop('total sulfur dioxide').drop('density').cache()
final_CV_data = stratified_CV_data.drop('free sulfur dioxide').drop('density').cache()
#final_CV_data = stratified_CV_data.drop('total sulfur dioxide').drop('residual sugar').cache()
#final_CV_data = stratified_CV_data.drop('free sulfur dioxide').drop('residual sugar').cache()

In [None]:
indexer = StringIndexer(inputCol="quality", outputCol="qualityIndex")
indexed = indexer.fit(final_CV_data).transform(final_CV_data)
indexed = indexed.drop('quality').cache()
indexed.show()

In [None]:

from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import DecisionTree

def labelData(data):
    # label: row[end], features: row[0:end-1]
    return data.rdd.map(lambda row: LabeledPoint(row[-1], row[:-1]))

In [None]:
training_data, testing_data = labelData(df_data_1).randomSplit([0.8, 0.2])

model = DecisionTree.trainClassifier(training_data, numClasses=10, maxDepth=3,
                                     categoricalFeaturesInfo=dict(),
                                     impurity='gini', maxBins=32)

print model.toDebugString()

In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics

def getPredictionsLabels(model, test_data):
    predictions = model.predict(test_data.map(lambda r: r.features))
    return predictions.zip(test_data.map(lambda r: r.label))

def printMetrics(predictions_and_labels):
    metrics = MulticlassMetrics(predictions_and_labels)
    #print 'Precision of True ', metrics.precision(1)
    #print 'Precision of False', metrics.precision(0)
    #print 'Recall of True    ', metrics.recall(1)
    #print 'Recall of False   ', metrics.recall(0)
    print 'F-1 Score         ', metrics.fMeasure()
    print 'Confusion Matrix\n', metrics.confusionMatrix().toArray()

predictions_and_labels = getPredictionsLabels(model, testing_data)
test_accuracy = predictions_and_labels.filter(lambda (v, p): v == p).count() / float(testing_data.count())
print('Accuracy = ' + str(test_accuracy))
printMetrics(predictions_and_labels)

In [None]:
import math
model = DecisionTree.trainRegressor(training_data, categoricalFeaturesInfo={},
                                    impurity='variance', maxDepth=5, maxBins=32)

# Evaluate model on test instances and compute test error
predictions = model.predict(testing_data.map(lambda x: x.features))
labelsAndPredictions = testing_data.map(lambda lp: lp.label).zip(predictions)
testMSE = labelsAndPredictions.map(lambda (v, p): (v - p) * (v - p)).sum() /\
    float(testing_data.count())
print('Root Mean Squared Error = ' + str(math.sqrt(testMSE)))
print('Learned regression tree model:')
print(model.toDebugString())
