##Random Forest for Star-Galaxy Classification


The following code allows us to use PySpark in the iPython Notebook 

In [1]:
import os
import sys

# Set the path for spark installation
# this is the path where you have built spark using sbt/sbt assembly
os.environ['SPARK_HOME']="/Users/blorangest/Desktop/spark-1.3.1-bin-hadoop2.6"

# Append to PYTHONPATH so that pyspark could be found
sys.path.append("/Users/blorangest/Desktop/spark-1.3.1-bin-hadoop2.6/python")
sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'python/lib/py4j-0.8.2.1-src.zip'))

# Now we are ready to import Spark Modules
try:
    from pyspark.mllib.tree import RandomForest
    from pyspark.mllib.util import MLUtils
    from pyspark.mllib.regression import LabeledPoint
    from pyspark import SparkContext

except ImportError as e:
    print ("Error importing Spark Modules", e)
    sys.exit(1)
import numpy as np
#import pyfits
import shutil

Now we set some variables that will determine the properties of the random forest. 
test_size is the percentage of the data that will be used to test the model.
num_trees is the number of trees in the forest.
max_depth is the maximum depth of each tree. It must be no more than 30.
k is the number of folds desired for kfolds cross validation

In [2]:
dataFile = "cfhtlens_matched.csv"
test_size = 0.2
num_trees = 6
max_depth = 10
k = 5

This function saves a given RDD as a text file

In [3]:
def save (rdd, filename):
    try:
        shutil.rmtree(filename)
    except Exception:
        pass
    rdd.saveAsTextFile(filename)

This function will be used to add colors to the feature data by taking the differences of adjacent magnitudes.

In [4]:
def addColors (features):
    for i in range (len(features)-1):
        features.append(features[i+1]-features[i])
    return features

This function does data preprocessing. It removes unwanted columns and puts the relevant data in LabeledPoint objects. Note that, in this particular dataset, one of the columns has comma seperated values enclosed by quotes that all belong under a single heading. This column will be quotes[1]. 

In [5]:
def parse (line): 
    quotes = np.array([x for x in line.split('"')])
    row = quotes[0].split(',')[:-1] + [quotes[1]] + quotes[2].split(',')[1:]
    label = float(row[heads['true_class']])
    want = ['MAG_u', 'MAG_g', 'MAG_r', 'MAG_i', 'MAG_z']
    want_index = []
    for w in want:
        want_index.append(heads[w])
    features = []
    for i in range (len(row)):
        for w in want_index:
            if i == w:
                features.append(float(row[i]))
    features = addColors(features)
    return LabeledPoint(label, features)

This function is a slow way to get classification probabilities and number of trees that classify it as a star

In [6]:
def get_probs (model, data):
    # Collect the individual decision trees as JavaArray objects
    trees = model._java_model.trees()
    ntrees = model.numTrees()
    scores = DecisionTreeModel(trees[0]).predict(data.map(lambda x: x.features))

    # For each tree, apply its prediction to the entire dataset and zip together the results
    for i in range(1,ntrees):
        dtm = DecisionTreeModel(trees[i])
        scores = scores.zip(dtm.predict(data.map(lambda x: x.features)))
        scores = scores.map(lambda x: x[0] + x[1])
    
    # Divide the accumulated scores over the number of trees
    return scores.map(lambda x: x/ntrees), scores

Compute test error by thresholding probabilistic predictions

In [7]:
def probTest(testData, model):
    threshold = 0.5
    probsAndScores = get_probs(model,testData)
    probs = probsAndScores[0]
    pred = probs.map(lambda x: 0 if x < threshold else 1)
    lab_pred = testData.map(lambda lp: lp.label).zip(pred)
    acc = lab_pred.filter(lambda (v, p): v != p).count() / float(testData.count())
    return (1 - acc), probsAndScores[1]

Tests the random forest classifier once

In [8]:
def testOnce ():
    # split the data into training and testing sets
    (trainingData, testData) = data.randomSplit([1-test_size, test_size])

    # train the random forest
    model = RandomForest.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={},
                                     numTrees=num_trees, featureSubsetStrategy="auto",
                                     impurity='gini', maxDepth = max_depth, maxBins=32)

    # test the random forest
    predictions = model.predict(testData.map(lambda x: x.features))
    labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
    testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(testData.count())
    Mg = float(labelsAndPredictions.filter(lambda (v, p): v == 0 and p == 1).count())
    Ng = float(labelsAndPredictions.filter(lambda (v, p): v == 0 and p == 0).count())
    Ms = float(labelsAndPredictions.filter(lambda (v, p): v == 1 and p == 0).count())
    Ns = float(labelsAndPredictions.filter(lambda (v, p): v == 1 and p == 1).count())
    probsAndScores = probTest(testData, model)
    threshold_accuracy = probsAndScores[0]
    probs = probsAndScores[1].map(lambda x: x/num_trees)
    labelsAndPredictions = labelsAndPredictions.zip(probs)
    save(labelsAndPredictions, 'answers')
    print ('Galaxy Purity = ' + str(Ng / (Ng+Ms)))
    print ('Galaxy Completeness = ' + str(Ng / (Ng+Mg)))
    print ('Star Purity = ' + str(Ns / (Ns+Mg)))
    print ('Star Completeness = ' + str(Ns/(Ns+Ms)))
    print ('Accuracy = ' + str(1 - testErr))
    print ('Threshold method accuracy = ' + str(threshold_accuracy))
    #print(model.toDebugString())

Performs k folds cross-validation

In [9]:
def kfolds ():
    #folds = kFold(data, k) this would work in java
    acc = 0
    spurity = 0
    scomp = 0
    gpurity = 0
    gcomp = 0
    foldsize = data.count()/k
    tested = sc.parallelize([])
    for i in range(k):
        test = sc.parallelize(data.subtract(tested).takeSample(False, foldsize))
        tested = tested.union(test)
        train = data.subtract(test)
        # train the random forest
        model = RandomForest.trainClassifier(train, numClasses=2, categoricalFeaturesInfo={},
                                     numTrees=num_trees, featureSubsetStrategy="auto",
                                     impurity='gini', maxDepth = max_depth, maxBins=32)

        predictions = model.predict(test.map(lambda x: x.features))
        labelsAndPredictions = test.map(lambda lp: lp.label).zip(predictions)
        testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(test.count())
        Mg = float(labelsAndPredictions.filter(lambda (v, p): v == 0 and p == 1).count())
        Ng = float(labelsAndPredictions.filter(lambda (v, p): v == 0 and p == 0).count())
        Ms = float(labelsAndPredictions.filter(lambda (v, p): v == 1 and p == 0).count())
        Ns = float(labelsAndPredictions.filter(lambda (v, p): v == 1 and p == 1).count())
        
        gpurity += (Ng / (Ng+Ms))
        gcomp += (Ng / (Ng+Mg))
        spurity += (Ns / (Ns+Mg))
        scomp += (Ns/(Ns+Ms))
        acc += (1 - testErr)
    
    print 'with '+ str(k) + ' folds:'
    print ('Average Galaxy Purity = ' + str(gpurity / k))
    print ('Average Galaxy Completeness = ' + str(gcomp / k))
    print ('Average Star Purity = ' + str(spurity / k))
    print ('Average Star Completeness = ' + str(scomp / k))
    print ('Average Accuracy = ' + str(acc / k))
            

In [10]:
sc = SparkContext(appName="stargalaxy")
rawData = sc.textFile(dataFile) # is an RDD with 66389 things
header = rawData.first()
lines = rawData.filter(lambda x: x != header) #now the header is gone
header_split = str(header).split(',')
heads = {}
for i in range( len(header_split)):
    heads[header_split[i]] = i
data = lines.map(parse).cache() # RDD of LabeledPoints

#testOnce()
kfolds()


with 5 folds:
Average Galaxy Purity = 0.972509685337
Average Galaxy Completeness = 0.989664204269
Average Star Purity = 0.921575722348
Average Star Completeness = 0.813163382414
Average Accuracy = 0.966664156059
