In [1]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint

from string import split,strip

from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
from pyspark.mllib.tree import RandomForest, RandomForestModel

from pyspark.mllib.util import MLUtils

### Higgs data set
* **URL:** http://archive.ics.uci.edu/ml/datasets/HIGGS#  
* **Abstract:** This is a classification problem to distinguish between a signal process which produces Higgs bosons and a background process which does not.

**Data Set Information:**  
The data has been produced using Monte Carlo simulations. The first 21 features (columns 2-22) are kinematic properties measured by the particle detectors in the accelerator. The last seven features are functions of the first 21 features; these are high-level features derived by physicists to help discriminate between the two classes. There is an interest in using deep learning methods to obviate the need for physicists to manually develop such features. Benchmark results using Bayesian Decision Trees from a standard physics package and 5-layer neural networks are presented in the original paper. The last 500,000 examples are used as a test set.



In [2]:
#define feature names
feature_text='lepton pT, lepton eta, lepton phi, missing energy magnitude, missing energy phi, jet 1 pt, jet 1 eta, jet 1 phi, jet 1 b-tag, jet 2 pt, jet 2 eta, jet 2 phi, jet 2 b-tag, jet 3 pt, jet 3 eta, jet 3 phi, jet 3 b-tag, jet 4 pt, jet 4 eta, jet 4 phi, jet 4 b-tag, m_jj, m_jjj, m_lv, m_jlv, m_bb, m_wbb, m_wwbb'
features=[strip(a) for a in split(feature_text,',')]
print len(features),features

28 ['lepton pT', 'lepton eta', 'lepton phi', 'missing energy magnitude', 'missing energy phi', 'jet 1 pt', 'jet 1 eta', 'jet 1 phi', 'jet 1 b-tag', 'jet 2 pt', 'jet 2 eta', 'jet 2 phi', 'jet 2 b-tag', 'jet 3 pt', 'jet 3 eta', 'jet 3 phi', 'jet 3 b-tag', 'jet 4 pt', 'jet 4 eta', 'jet 4 phi', 'jet 4 b-tag', 'm_jj', 'm_jjj', 'm_lv', 'm_jlv', 'm_bb', 'm_wbb', 'm_wwbb']


In [3]:
# create a directory called higgs, download and decompress HIGGS.csv.gz into it

from os.path import exists
if not exists('higgs'):
    print "creating directory higgs"
    !mkdir higgs
%cd higgs
if not exists('HIGGS.csv'):
    if not exists('HIGGS.csv.gz'):
        print 'downloading HIGGS.csv.gz'
        !curl -O http://archive.ics.uci.edu/ml/machine-learning-databases/00280/HIGGS.csv.gz
    print 'decompressing HIGGS.csv.gz --- May take 5-10 minutes'
    !gunzip -f HIGGS.csv.gz
!ls -l
%cd ..

creating directory higgs
/Users/Dorain/Desktop/Python/UCSD_BigData_2016/Homeworks/HW-5/higgs
downloading HIGGS.csv.gz
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 2685M  100 2685M    0     0  10.6M      0  0:04:11  0:04:11 --:--:-- 12.9M
decompressing HIGGS.csv.gz --- May take 5-10 minutes
total 15694336
-rw-r--r--  1 Dorain  staff  8035497980 May 21 13:07 HIGGS.csv
/Users/Dorain/Desktop/Python/UCSD_BigData_2016/Homeworks/HW-5


### As done in previous notebook, create RDDs from raw data and build Gradient boosting and Random forests models. Consider doing 1% sampling since the dataset is too big for your local machine

In [4]:
Break up features that are made out of several binary features.
featureDict={a:[a] for a in features}
print featureDict
featureDict['Soil_Type (40 binary columns)'] = ['ST_'+str(i) for i in range(40)]
colDict['Wilderness_Area (4 binarycolumns)'] = ['WA_'+str(i) for i in range(4)]
Columns=[]
for item in cols:
    Columns=Columns+colDict[item]
print Columns

{'jet 2 phi': ['jet 2 phi'], 'missing energy phi': ['missing energy phi'], 'jet 3 b-tag': ['jet 3 b-tag'], 'm_jlv': ['m_jlv'], 'jet 1 eta': ['jet 1 eta'], 'jet 3 phi': ['jet 3 phi'], 'jet 1 pt': ['jet 1 pt'], 'm_wbb': ['m_wbb'], 'lepton pT': ['lepton pT'], 'm_jjj': ['m_jjj'], 'm_jj': ['m_jj'], 'm_wwbb': ['m_wwbb'], 'jet 4 phi': ['jet 4 phi'], 'jet 3 eta': ['jet 3 eta'], 'm_bb': ['m_bb'], 'jet 2 b-tag': ['jet 2 b-tag'], 'm_lv': ['m_lv'], 'jet 4 eta': ['jet 4 eta'], 'jet 1 phi': ['jet 1 phi'], 'missing energy magnitude': ['missing energy magnitude'], 'jet 3 pt': ['jet 3 pt'], 'jet 2 pt': ['jet 2 pt'], 'jet 2 eta': ['jet 2 eta'], 'jet 4 pt': ['jet 4 pt'], 'jet 4 b-tag': ['jet 4 b-tag'], 'jet 1 b-tag': ['jet 1 b-tag'], 'lepton eta': ['lepton eta'], 'lepton phi': ['lepton phi']}


In [5]:
path='higgs/HIGGS.csv'
inputRDD=sc.textFile(path)
inputRDD.first()

u'1.000000000000000000e+00,8.692932128906250000e-01,-6.350818276405334473e-01,2.256902605295181274e-01,3.274700641632080078e-01,-6.899932026863098145e-01,7.542022466659545898e-01,-2.485731393098831177e-01,-1.092063903808593750e+00,0.000000000000000000e+00,1.374992132186889648e+00,-6.536741852760314941e-01,9.303491115570068359e-01,1.107436060905456543e+00,1.138904333114624023e+00,-1.578198313713073730e+00,-1.046985387802124023e+00,0.000000000000000000e+00,6.579295396804809570e-01,-1.045456994324922562e-02,-4.576716944575309753e-02,3.101961374282836914e+00,1.353760004043579102e+00,9.795631170272827148e-01,9.780761599540710449e-01,9.200048446655273438e-01,7.216574549674987793e-01,9.887509346008300781e-01,8.766783475875854492e-01'

In [6]:
Data=inputRDD.map(lambda line: [float(x) for x in line.split(',')])\
    .map(lambda V:LabeledPoint(V[0], V[1:]))

In [10]:
Data1=Data.sample(False,0.01,seed=255).cache()
(trainingData,testData)=Data1.randomSplit([0.7,0.3],seed=255)
print 'Sizes: Data1=%d, trainingData=%d, testData=%d'%(Data1.count(),trainingData.cache().count(),testData.cache().count())

Sizes: Data1=109807, trainingData=76948, testData=32859


In [14]:
from time import time
for depth in [13,15]:
    start=time()
    model=GradientBoostedTrees.trainClassifier(trainingData,categoricalFeaturesInfo={}, numIterations=35)
#     print model.toDebugString()
    errors[depth]={}
    dataSets={'train':trainingData,'test':testData}
    for name in dataSets.keys():  # Calculate errors on train and test sets
        data=dataSets[name]
        Predicted=model.predict(data.map(lambda x: x.features))
        LabelsAndPredictions=data.map(lambda lp: lp.label).zip(Predicted)
        Err = LabelsAndPredictions.filter(lambda (v,p):v != p).count()/float(data.count())
        errors[depth][name]=Err
    print depth,errors[depth],int(time()-start),'seconds'


KeyboardInterrupt: 

In [13]:
# from time import time
# depth = 13
# start=time()
# model=GradientBoostedTrees.trainClassifier(trainingData,categoricalFeaturesInfo={}, numIterations=35)
# #     print model.toDebugString()
# errors[depth]={}
# dataSets={'train':trainingData,'test':testData}
# for name in dataSets.keys():  # Calculate errors on train and test sets
#     data=dataSets[name]
#     Predicted=model.predict(data.map(lambda x: x.features))
#     LabelsAndPredictions=data.map(lambda lp: lp.label).zip(Predicted)
#     Err = LabelsAndPredictions.filter(lambda (v,p):v != p).count()/float(data.count())
#     errors[depth][name]=Err
# print depth,errors[depth]

15 {'test': 0.31592562159530113, 'train': 0.3118209700057181} 149 seconds


In [None]:
from time import time
errors={}
for depth in [1,3,6,10,15,20]:
    start=time()
    model = RandomForest.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={},
                                     numTrees=10, featureSubsetStrategy="auto",
                                     impurity='gini', maxDepth=depth, maxBins=32)
#     print model.toDebugString()
    errors[depth]={}
    dataSets={'train':trainingData,'test':testData}
    for name in dataSets.keys():  # Calculate errors on train and test sets
        data=dataSets[name]
        Predicted=model.predict(data.map(lambda x: x.features))
        LabelsAndPredictions=data.map(lambda lp: lp.label).zip(Predicted)
        Err = LabelsAndPredictions.filter(lambda (v,p):v != p).count()/float(data.count())
        errors[depth][name]=Err
    print depth,errors[depth],int(time()-start),'seconds'
print errors