## Launching Spark

In [1]:
%env SPARK_HOME=/home/ggomarr/.spark
master_node='spark://Otter:7077'
app_name='SparkTree'
dataframeMode=True
stopSpark=False

env: SPARK_HOME=/home/ggomarr/.spark


In [2]:
!/home/ggomarr/.spark/sbin/start-master.sh
!/home/ggomarr/.spark/sbin/start-slave.sh {master_node}

org.apache.spark.deploy.master.Master running as process 3733.  Stop it first.
org.apache.spark.deploy.worker.Worker running as process 3793.  Stop it first.


In [3]:
import findspark
findspark.init()

In [4]:
if dataframeMode:
    from pyspark.sql import SparkSession
    spark=SparkSession.builder.master(master_node).appName(app_name).getOrCreate()
    sc=spark.sparkContext
else:
    from pyspark import SparkConf, SparkContext
    conf=SparkConf().setMaster(master_node).setAppName(app_name)
    sc=SparkContext(conf=conf)

## Test using a text file

In [5]:
def process_row(row):
    row=row.strip()
    return [ (len(row),(row)) ] if row else []

file_rows=(sc.textFile("file:///home/ggomarr/.spark/NOTICE")
                      .flatMap(process_row))

for r in file_rows.takeOrdered(5,key=lambda x: -x[0]):
    print(r)

(183, u'(Common Development and Distribution License (CDDL) v1.0) JavaBeans Activation Framework (JAF) (javax.activation:activation:1.1 - http://java.sun.com/products/javabeans/jaf/index.jsp)')
(166, u'(COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.0) (GNU General Public Library) Streaming API for XML (javax.xml.stream:stax-api:1.0-2 - no url defined)')
(142, u'(CDDL 1.0) Servlet Specification 2.5 API (org.mortbay.jetty:servlet-api-2.5:6.1.14 - http://jetty.mortbay.org/project/modules/servlet-api-2.5)')
(126, u'The following components are provided under the Common Development and Distribution License 1.0. See project link for details.')
(126, u'The following components are provided under the Common Development and Distribution License 1.1. See project link for details.')


## Enjoy

In [5]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import DecisionTree
from numpy import array

In [6]:
def translateFeature(feature,featureMap={'N':0,'Y':1,'NA':0,'BS':1,'MS':2,'PhD':3}):
    try:
        return int(feature)
    except:
        return featureMap[feature]
def rephraseIntoLabeledPoint(features):
    rephrasedFeatures=[translateFeature(feature) for feature in features]
    return LabeledPoint(rephrasedFeatures[-1], array(rephrasedFeatures[:-1])) 

In [7]:
rawData=sc.textFile("./misc/PastHires.csv")
header=rawData.first()
trainingData=(rawData.filter(lambda x: x!=header)
                     .map(lambda x: x.split(","))
                     .map(rephraseIntoLabeledPoint))
model=DecisionTree.trainClassifier(trainingData, numClasses=2,
                                     categoricalFeaturesInfo={1:2, 3:4, 4:2, 5:2},
                                     impurity='gini', maxDepth=5, maxBins=32)

In [8]:
testCandidates=[ array([10, 1, 3, 1, 0, 0])]
testData=sc.parallelize(testCandidates)
predictions = model.predict(testData).collect()
for prediction in predictions:
    print(prediction)

1.0


In [9]:
print(model.toDebugString())

DecisionTreeModel classifier of depth 4 with 9 nodes
  If (feature 1 in {0.0})
   If (feature 5 in {0.0})
    If (feature 0 <= 0.0)
     If (feature 3 in {1.0})
      Predict: 0.0
     Else (feature 3 not in {1.0})
      Predict: 1.0
    Else (feature 0 > 0.0)
     Predict: 0.0
   Else (feature 5 not in {0.0})
    Predict: 1.0
  Else (feature 1 not in {0.0})
   Predict: 1.0



## Clean up

In [6]:
sc.stop()

In [None]:
if stopSpark:
    !/home/ggomarr/.spark/sbin/stop-slave.sh
    !/home/ggomarr/.spark/sbin/stop-master.sh