In [1]:
import os
import sys

spark_home = os.environ['SPARK_HOME']
sys.path.insert(0, spark_home + "/python/")
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip'))

# Initialize PySpark to predefine the SparkContext variable 'sc'
execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.5.0-SNAPSHOT
      /_/

Using Python version 2.7.9 (default, Mar  9 2015 16:20:48)
SparkContext available as sc, SQLContext available as sqlContext.


In [2]:
import csv
import StringIO
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import DecisionTree

def loadRecord(line):
    """Parse a CSV line"""
    input = StringIO.StringIO(line)
    reader = csv.reader(input)
    row = map(float, reader.next())
    return LabeledPoint(row[-1],row[:-1]) 


In [3]:
chf = open('data/CAhousing.csv','r')
header = chf.next().rstrip("\n").split(",")
for i,j in enumerate(header):
    print "%d: %s" % (i,j)

0: longitude
1: latitude
2: housingMedianAge
3: totalRooms
4: totalBedrooms
5: population
6: households
7: medianIncome
8: medianHouseValue


In [4]:
chrdd = sc.parallelize(chf).map(lambda line: loadRecord(line))
chrdd.persist()
chrdd.first()

LabeledPoint(452600.0, [-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252])

In [5]:
(trainingData, testData) = chrdd.randomSplit([0.7, 0.3])

In [6]:
# Train a DecisionTree model.
#  Empty categoricalFeaturesInfo indicates all features are continuous.
model = DecisionTree.trainRegressor(trainingData, categoricalFeaturesInfo={},
                                    impurity='variance', minInstancesPerNode=2500)

# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testMSE = labelsAndPredictions.map(lambda (v, p): (v - p) * (v - p)).sum() / float(testData.count())
print('Test Mean Squared Error = ' + str(testMSE))
print('Learned regression tree model:')
print(model.toDebugString())

Test Mean Squared Error = 7871958588.08
Learned regression tree model:
DecisionTreeModel regressor of depth 3 with 9 nodes
  If (feature 7 <= 5.0203)
   If (feature 7 <= 3.0704)
    If (feature 1 <= 34.45)
     Predict: 157230.19809160306
    Else (feature 1 > 34.45)
     Predict: 116681.98932004272
   Else (feature 7 > 3.0704)
    If (feature 2 <= 30.0)
     Predict: 189679.62258064517
    Else (feature 2 > 30.0)
     Predict: 228940.83941344777
  Else (feature 7 > 5.0203)
   Predict: 328618.14514578664



each tree will be slightly different because for continuous inputs spark splits on 32 quantiles of a random subsample of data.

In [7]:
dir(model)

['__class__',
 '__del__',
 '__delattr__',
 '__dict__',
 '__doc__',
 '__format__',
 '__getattribute__',
 '__hash__',
 '__init__',
 '__module__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_java_loader_class',
 '_java_model',
 '_load_java',
 '_sc',
 'call',
 'depth',
 'load',
 'numNodes',
 'predict',
 'save',
 'toDebugString']

In [8]:
model.toDebugString

<bound method DecisionTreeModel.toDebugString of DecisionTreeModel regressor of depth 3 with 9 nodes>

In [9]:
from pyspark.mllib.tree import RandomForest, RandomForestModel
forest = RandomForest.trainRegressor(trainingData, categoricalFeaturesInfo={},
                                     numTrees=20, featureSubsetStrategy="all",
                                     impurity='variance', maxDepth=3)


In [10]:
trees = forest._java_model.trees()

In [11]:
print(trees[1].toDebugString())

DecisionTreeModel regressor of depth 3 with 15 nodes
  If (feature 7 <= 4.9984)
   If (feature 7 <= 3.0491)
    If (feature 1 <= 34.61)
     Predict: 157948.29657794678
    Else (feature 1 > 34.61)
     Predict: 114825.26924493554
   Else (feature 7 > 3.0491)
    If (feature 2 <= 42.0)
     Predict: 196515.6275020016
    Else (feature 2 > 42.0)
     Predict: 269141.65628476086
  Else (feature 7 > 4.9984)
   If (feature 7 <= 6.8643)
    If (feature 2 <= 28.0)
     Predict: 267667.529588015
    Else (feature 2 > 28.0)
     Predict: 328001.60185185185
   Else (feature 7 > 6.8643)
    If (feature 7 <= 8.1657)
     Predict: 384889.52391799545
    Else (feature 7 > 8.1657)
     Predict: 456953.3729216152



In [12]:
for i in range(20):
    with open("data/tree%d.txt"%i, "w") as f:
        f.write(trees[i].toDebugString())