In [None]:
import findspark
findspark.init()

In [1]:
import pyspark
import os

## Data

The data set consists of 11 Million Monte Carlo simulations of nuclear collsions. Signal collisions correspond to collisions where a Higgs boson was created. Background collisions correspond to collisions that have the same end product particles but where a Higgs boson was not created. Each collision has 28 attributes.
Data set location: [https://archive.ics.uci.edu/ml/datasets/HIGGS]

Relevant Paper: * Baldi, P., P. Sadowski, and D. Whiteson. “Searching for Exotic Particles in High-energy Physics with Deep Learning.” Nature Communications 5 (July 2, 2014) *

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('higgs-analysis').getOrCreate()

In [3]:
data_location = os.path.join('resources','HIGGS_subsampled_20k.csv')
df = spark.read.load(data_location,
                     format="csv", sep=",", inferSchema="true", header="true")

In [None]:
df.head(2)

## Train/Test split

In [4]:
(training, test) = df.randomSplit([0.7, 0.3])
training.count(), test.count()

(1399830, 600170)

In [None]:
training.columns

In [None]:
training.describe(training.columns[1]).show()

## Feature Scaling

Not required for GBT or Random forrest but done to make it easy to add more classifiers later

In [5]:
from pyspark.sql.functions import *
from pyspark.ml.linalg import DenseVector

In [7]:
training_dense = training.rdd.map(lambda x: (x[0], DenseVector(x[1:])))
training_dense = spark.createDataFrame(training_dense, ["label", "features"])

In [8]:
test_dense = test.rdd.map(lambda x: (x[0], DenseVector(x[1:])))
test_dense = spark.createDataFrame(test_dense, ["label", "features"])

In [None]:
from pyspark.ml.feature import StandardScaler
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled", withMean=True)

In [None]:
scaler = standardScaler.fit(training_dense)
scaled_training = scaler.transform(training_dense)
scaled_training.head(2)

In [None]:
scaled_test = scaler.transform(test_dense)
scaled_test.head(2)

## create GBT model

In [9]:
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib import linalg as mllib_linalg
from pyspark.ml import linalg as ml_linalg

In [10]:
def as_old(v):
    if isinstance(v, ml_linalg.SparseVector):
        return mllib_linalg.SparseVector(v.size, v.indices, v.values)
    if isinstance(v, ml_linalg.DenseVector):
        return mllib_linalg.DenseVector(v.values)
    raise ValueError("Unsupported type {0}".format(type(v)))

In [None]:
scaled_labelPoint_train = scaled_training.rdd.map(lambda row: LabeledPoint(row.label, as_old(row.features_scaled)))
scaled_labelPoint_train.take(2)

In [11]:
labelPoint_train = training_dense.rdd.map(lambda row: LabeledPoint(row.label, as_old(row.features)))
labelPoint_train.take(2)

[LabeledPoint(0.0, [0.2746966481208801,-1.9840284585952759,0.7694543600082397,1.4012621641159058,1.495957612991333,0.9564709663391113,0.33269181847572327,-0.03209567442536354,0.0,0.7049337029457092,-0.8722332715988159,-1.0061124563217163,2.214872121810913,0.6468133926391602,-0.3530036211013794,-0.6303791999816895,0.0,1.0685573816299438,-0.23031991720199585,1.3876736164093018,3.101961374282837,0.8952987790107727,0.9776041507720947,0.9931463003158569,0.7896732687950134,0.6978268623352051,0.7527646422386169,0.7791340947151184]),
 LabeledPoint(0.0, [0.27487966418266296,-2.014221668243408,-0.428039014339447,1.4417753219604492,-1.4561175107955933,0.9727770686149597,-0.5723783373832703,-0.5166195034980774,2.1730761528015137,1.4753371477127075,-1.29866623878479,0.7378231287002563,2.214872121810913,0.896195650100708,-0.8327046632766724,1.7023881673812866,0.0,0.48657089471817017,-0.54929119348526,-0.026345083490014076,0.0,0.7342715859413147,0.8667398691177368,0.9870530962944031,1.163178205490112

In [None]:
import time
train_start = time.time()
GBTmodel = GradientBoostedTrees.trainClassifier(labelPoint_train,
                                             categoricalFeaturesInfo={}, numIterations=30)
train_end = time.time()
print(f'Time elapsed training model: {train_end - train_start} seconds')

## Evaluate model

In [None]:
# Evaluate model on test instances and compute test error
predictions = GBTmodel.predict(test_dense.rdd.map(lambda x: x.features.values))
labelsAndPredictions = test_dense.rdd.map(lambda lp: lp.label).zip(predictions)


testErr = labelsAndPredictions.filter(
    lambda lp: lp[0] != lp[1]).count() / float(test_dense.rdd.count())
print('Test Error = ' + str(testErr))
print('Learned classification GBT model:')
#print(model.toDebugString())

## Create Random Forest Model

In [None]:
from pyspark.mllib.tree import RandomForest, RandomForestModel

train_start = time.time()
RFmodel = RandomForest.trainClassifier(labelPoint_train,
                                     numClasses=2, categoricalFeaturesInfo={},
                                     numTrees=30, featureSubsetStrategy="auto",
                                     impurity='gini', maxDepth=4, maxBins=32)
train_end = time.time()
print(f'Time elapsed training model: {train_end - train_start} seconds')

In [None]:
predictions = RFmodel.predict(test_dense.rdd.map(lambda x: x.features.values))
labelsAndPredictions = test_dense.rdd.map(lambda lp: lp.label).zip(predictions)

testErr = labelsAndPredictions.filter(
    lambda lp: lp[0] != lp[1]).count() / float(test_dense.rdd.count())
print('Test Error = ' + str(testErr))
print('Learned classification RF model:')
#print(model.toDebugString())

In [None]:
spark.stop()

## Results for further analysis

20k simulations: (train/test split 0.7/0.3)

    GBT error = 0.3072; GBT time = 40.44 secs

    RF  error = 0.3271; RF  time = 14.75 secs
  
200k simulations: (train/test split 0.7/0.3)

    GBT error = 0.3066; GBT time = 57.71 secs

    RF  error = 0.3366; RF  time = 13.00 secs
    
