In [1]:
import findspark
findspark.init()

In [2]:
from pyspark import SparkContext
sc=SparkContext(master="local[4]")

In [3]:
#sc.stop()

In [4]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint

from string import split,strip

from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
from pyspark.mllib.tree import RandomForest, RandomForestModel

from pyspark.mllib.util import MLUtils

### Higgs data set
* **URL:** http://archive.ics.uci.edu/ml/datasets/HIGGS#  
* **Abstract:** This is a classification problem to distinguish between a signal process which produces Higgs bosons and a background process which does not.

**Data Set Information:**  
The data has been produced using Monte Carlo simulations. The first 21 features (columns 2-22) are kinematic properties measured by the particle detectors in the accelerator. The last seven features are functions of the first 21 features; these are high-level features derived by physicists to help discriminate between the two classes. There is an interest in using deep learning methods to obviate the need for physicists to manually develop such features. Benchmark results using Bayesian Decision Trees from a standard physics package and 5-layer neural networks are presented in the original paper. The last 500,000 examples are used as a test set.



In [5]:
#define feature names
feature_text='lepton pT, lepton eta, lepton phi, missing energy magnitude, missing energy phi, jet 1 pt, jet 1 eta, jet 1 phi, jet 1 b-tag, jet 2 pt, jet 2 eta, jet 2 phi, jet 2 b-tag, jet 3 pt, jet 3 eta, jet 3 phi, jet 3 b-tag, jet 4 pt, jet 4 eta, jet 4 phi, jet 4 b-tag, m_jj, m_jjj, m_lv, m_jlv, m_bb, m_wbb, m_wwbb'
features=[strip(a) for a in split(feature_text,',')]
print len(features),features

28 ['lepton pT', 'lepton eta', 'lepton phi', 'missing energy magnitude', 'missing energy phi', 'jet 1 pt', 'jet 1 eta', 'jet 1 phi', 'jet 1 b-tag', 'jet 2 pt', 'jet 2 eta', 'jet 2 phi', 'jet 2 b-tag', 'jet 3 pt', 'jet 3 eta', 'jet 3 phi', 'jet 3 b-tag', 'jet 4 pt', 'jet 4 eta', 'jet 4 phi', 'jet 4 b-tag', 'm_jj', 'm_jjj', 'm_lv', 'm_jlv', 'm_bb', 'm_wbb', 'm_wwbb']


In [6]:
# create a directory called higgs, download and decompress HIGGS.csv.gz into it

from os.path import exists
if not exists('higgs'):
    print "creating directory higgs"
    !mkdir higgs
%cd higgs
if not exists('HIGGS.csv'):
    if not exists('HIGGS.csv.gz'):
        print 'downloading HIGGS.csv.gz'
        !curl -O http://archive.ics.uci.edu/ml/machine-learning-databases/00280/HIGGS.csv.gz
    print 'decompressing HIGGS.csv.gz --- May take 5-10 minutes'
    !gunzip -f HIGGS.csv.gz
!ls -l
%cd ..

/Users/joshwilson/Documents/DSE/CSE255-DSE230/Classes/HW-7/higgs
total 15694336
-rw-r--r--  1 joshwilson  staff  8035497980 May 31 18:37 HIGGS.csv
/Users/joshwilson/Documents/DSE/CSE255-DSE230/Classes/HW-7


### As done in previous notebook, create RDDs from raw data and build Gradient boosting and Random forests models. Consider doing 1% sampling since the dataset is too big for your local machine

In [7]:
# Read the file into an RDD
# If doing this on a real cluster, you need the file to be available on all nodes, ideally in HDFS.
path='higgs/HIGGS.csv'
inputRDD=sc.textFile(path)
inputRDD.first()

u'1.000000000000000000e+00,8.692932128906250000e-01,-6.350818276405334473e-01,2.256902605295181274e-01,3.274700641632080078e-01,-6.899932026863098145e-01,7.542022466659545898e-01,-2.485731393098831177e-01,-1.092063903808593750e+00,0.000000000000000000e+00,1.374992132186889648e+00,-6.536741852760314941e-01,9.303491115570068359e-01,1.107436060905456543e+00,1.138904333114624023e+00,-1.578198313713073730e+00,-1.046985387802124023e+00,0.000000000000000000e+00,6.579295396804809570e-01,-1.045456994324922562e-02,-4.576716944575309753e-02,3.101961374282836914e+00,1.353760004043579102e+00,9.795631170272827148e-01,9.780761599540710449e-01,9.200048446655273438e-01,7.216574549674987793e-01,9.887509346008300781e-01,8.766783475875854492e-01'

In [8]:
# Transform the text RDD into an RDD of LabeledPoints
Data=inputRDD.map(lambda line: [float(strip(x)) for x in line.split(',')])\
     .map(lambda line: LabeledPoint(line[:1][0],line[1:]))
Data.first()

LabeledPoint(1.0, [0.869293212891,-0.635081827641,0.22569026053,0.327470064163,-0.689993202686,0.754202246666,-0.24857313931,-1.09206390381,0.0,1.37499213219,-0.653674185276,0.930349111557,1.10743606091,1.13890433311,-1.57819831371,-1.0469853878,0.0,0.65792953968,-0.0104545699432,-0.0457671694458,3.10196137428,1.35376000404,0.979563117027,0.978076159954,0.920004844666,0.721657454967,0.988750934601,0.876678347588])

In [9]:
# full dataset for cluster
#Data1=Data.sample(False,0.1).cache()
#(trainingData,testData)=Data1.randomSplit([0.7,0.3],seed=255)

# subset of dataset for local testing
Data1=Data.sample(False,0.01).cache()
(trainingData,testData)=Data1.randomSplit([0.7,0.3],seed=255)

print 'Sizes: Data1=%d, trainingData=%d, testData=%d'%(Data1.count(),trainingData.cache().count(),testData.cache().count())

Sizes: Data1=110487, trainingData=77477, testData=33010


In [10]:
# gradient boosted tree
from time import time
errors={}
for depth in [10]:
    for lr in [0.2]:
        for numiter in [20]:
            start=time()
            model=GradientBoostedTrees.trainClassifier(trainingData, categoricalFeaturesInfo={}, 
                                                       numIterations=numiter, maxDepth=depth, 
                                                       learningRate=lr)
            #print model.toDebugString()
            errors[depth]={}
            dataSets={'train':trainingData,'test':testData}
            for name in dataSets.keys():  # Calculate errors on train and test sets
                data=dataSets[name]
                Predicted=model.predict(data.map(lambda x: x.features))
                LabelsAndPredictions=data.map(lambda LP: LP.label).zip(Predicted)
                Err = LabelsAndPredictions.filter(lambda (v,p):v != p).count()/float(data.count())
                errors[depth][name]=Err
            print depth,lr,numiter,errors[depth],int(time()-start),'seconds'

10 0.2 20 {'test': 0.2979400181763102, 'train': 0.12244924300114873} 620 seconds


In [11]:
# random forest
from time import time
for depth in [10]:
    for ntrees in [15]:
        for imp in ['gini']:
            start=time()
            model = RandomForest.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={},
                                                 numTrees=ntrees, featureSubsetStrategy='auto',
                                                 impurity=imp, maxDepth=depth, 
                                                 maxBins=32, seed=11)
            #print model.toDebugString()
            errors[depth]={}
            dataSets={'train':trainingData,'test':testData}
            for name in dataSets.keys():  # Calculate errors on train and test sets
                data=dataSets[name]
                Predicted=model.predict(data.map(lambda x: x.features))
                LabelsAndPredictions=data.map(lambda LP: LP.label).zip(Predicted)
                Err = LabelsAndPredictions.filter(lambda (v,p):v != p).count()/float(data.count())
                errors[depth][name]=Err
            print depth,errors[depth],int(time()-start),'seconds'

10 {'test': 0.30187821872159953, 'train': 0.25869612917381934} 64 seconds
