# Configuration
Keys from:
* http://spark.apache.org/docs/latest/configuration.html
* http://spark.apache.org/docs/latest/running-on-yarn.html

We edited the following configurations to illustrate a different tuning methodology to the one used in MRS's RxSpark.

In [1]:
%%configure -f 
{
    "conf": {
        "spark.jars.packages": "com.databricks:spark-csv_2.10:1.4.0",
        "spark.executor.memory": "2g",
        "spark.driver.memory": "4g",
        "spark.driver.cores": "2",
        "spark.executor.cores": "2",
        "spark.executor.instances": "32"
    }
} 

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1466578436859_0003,pyspark,dead,,,
36,application_1467161110841_0045,pyspark,dead,,,
46,,pyspark,dead,,,


# Information

In [2]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1466578436859_0003,pyspark,dead,,,
36,application_1467161110841_0045,pyspark,dead,,,
46,,pyspark,dead,,,


# Code

In [3]:
import time
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql import Row
from pyspark.mllib.tree import DecisionTree

# prefix = "wasb://nyctaxi@maxkazstorage.blob.core.windows.net/"
# single blob files (not a good idea)
#trainFile = "wasb://nyctaxi@maxkazsouthcentralus.blob.core.windows.net/trainDump.xdf/trainDump.xdf"
#testFile = "wasb://nyctaxi@maxkazsouthcentralus.blob.core.windows.net/testDump.xdf/testDump.xdf"
# split files
trainFile = "wasb://nyctaxi@maxkazstorage.blob.core.windows.net/trainDumpSplitcsv"
testFile = "wasb://nyctaxi@maxkazstorage.blob.core.windows.net/testDumpSplitcsv"
test  = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load( testFile).drop("payment_type").drop("tip_amount")
train = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load(trainFile).drop("payment_type").drop("tip_amount")

# obtain categories for x and recode them to double
categories = train.select("vendor_id").distinct().collect()
vendorMap = {}
for x in enumerate(categories):
    vendorMap[x[1][0]] = float(x[0])

trainLabeled = train.map(lambda x: LabeledPoint(float(x[-1]), Row(vendorMap[x[0]]) + x[1:-2]))
testLabeled  =  test.map(lambda x: LabeledPoint(float(x[-1]), Row(vendorMap[x[0]]) + x[1:-2]))

Creating SparkContext as 'sc'
Creating HiveContext as 'sqlContext'


In [4]:
start = time.time()
model = DecisionTree.trainClassifier(trainLabeled, numClasses=2, categoricalFeaturesInfo={0:2}, impurity='gini', maxDepth=10, maxBins=32)
end = time.time()
print "Time to train on uncached data: %.2f mins" % ((end - start)/60)

Time to train on uncached data: 6.09 mins

# Compute Benchmarks

In [5]:
predictions = model.predict(testLabeled.map(lambda x: x.features)).collect()
truth = testLabeled.map(lambda x: x.label).collect()


In [6]:
from sklearn.metrics import f1_score
print f1_score(truth, predictions)

0.632199582112

In [7]:
# load MRS predictions
testFile = "wasb://nyctaxi@maxkazstorage.blob.core.windows.net/predictSplitcsv"
predict = [x[0] for x in sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load( testFile).select("tipped_Pred").collect()]

In [8]:
from sklearn.metrics import f1_score
print f1_score(truth, predict)

0.63625822304

# Sanity Check
Make sure we are predicting and testing on the same set as MRS.

In [9]:
testFile = "wasb://nyctaxi@maxkazstorage.blob.core.windows.net/predictSplitcsv"
tt = [x[0] for x in sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load( testFile).select("tipped").collect()]

In [10]:
print tt == truth

True

# Other Benchmarks

In [11]:
start = time.time()
trainLabeled.cache()
trainLabeled.count()
end = time.time()
print "Time to cache and materialize training data: %.2f mins" % ((end - start)/60)

Time to cache and materialize training data: 4.45 mins

In [13]:
start = time.time()
model = DecisionTree.trainClassifier(trainLabeled, numClasses=2, categoricalFeaturesInfo={0:2}, impurity='gini', maxDepth=10, maxBins=32)
end = time.time()
trainLabeled.unpersist()
print "Time to train on cached data: %.2f mins" % ((end - start)/60)

Time to train on cached data: 6.69 mins