In [1]:
!wget http://archive.ics.uci.edu/ml/machine-learning-databases/00280/HIGGS.csv.gz
!gunzip HIGGS.csv.gz
!ls

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20200531223749-0000
KERNEL_ID = 42baa6e1-7c5d-42ce-a0d8-ec2c528f5718
--2020-05-31 22:37:52--  http://archive.ics.uci.edu/ml/machine-learning-databases/00280/HIGGS.csv.gz
Resolving archive.ics.uci.edu (archive.ics.uci.edu)... 128.195.10.252
Connecting to archive.ics.uci.edu (archive.ics.uci.edu)|128.195.10.252|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2816407858 (2.6G) [application/x-httpd-php]
Saving to: 'HIGGS.csv.gz'


2020-05-31 22:39:15 (32.7 MB/s) - 'HIGGS.csv.gz' saved [2816407858/2816407858]

HIGGS.csv  conda  logs	spark-events  user-libs


In [14]:
import pyspark
import os

## Data

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('higgs-analysis').getOrCreate()

In [4]:
df = spark.read.load("HIGGS.csv",format="csv", sep=",", inferSchema="true", header="false")

In [5]:
df= df.sampleBy("_c0", fractions={1:0.3, 0: 0.3})

In [6]:
df.printSchema()

root
 |-- _c0: double (nullable = true)
 |-- _c1: double (nullable = true)
 |-- _c2: double (nullable = true)
 |-- _c3: double (nullable = true)
 |-- _c4: double (nullable = true)
 |-- _c5: double (nullable = true)
 |-- _c6: double (nullable = true)
 |-- _c7: double (nullable = true)
 |-- _c8: double (nullable = true)
 |-- _c9: double (nullable = true)
 |-- _c10: double (nullable = true)
 |-- _c11: double (nullable = true)
 |-- _c12: double (nullable = true)
 |-- _c13: double (nullable = true)
 |-- _c14: double (nullable = true)
 |-- _c15: double (nullable = true)
 |-- _c16: double (nullable = true)
 |-- _c17: double (nullable = true)
 |-- _c18: double (nullable = true)
 |-- _c19: double (nullable = true)
 |-- _c20: double (nullable = true)
 |-- _c21: double (nullable = true)
 |-- _c22: double (nullable = true)
 |-- _c23: double (nullable = true)
 |-- _c24: double (nullable = true)
 |-- _c25: double (nullable = true)
 |-- _c26: double (nullable = true)
 |-- _c27: double (nullable = tru

## Train/Test split

In [7]:
(training, test) = df.randomSplit([0.7, 0.3])
training.count(), test.count()

(2312078, 990375)

## Feature Scaling



In [8]:
from pyspark.sql.functions import *
from pyspark.ml.linalg import DenseVector

In [9]:
training_dense = training.rdd.map(lambda x: (x[0], DenseVector(x[1:])))
training_dense = spark.createDataFrame(training_dense, ["label", "features"])

In [10]:
test_dense = test.rdd.map(lambda x: (x[0], DenseVector(x[1:])))
test_dense = spark.createDataFrame(test_dense, ["label", "features"])

In [11]:
from pyspark.ml.feature import StandardScaler
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled", withMean=True)

In [12]:
scaler = standardScaler.fit(training_dense)
scaled_training = scaler.transform(training_dense)
scaled_training.head(2)

[Row(label=0.0, features=DenseVector([0.2749, -2.0142, -0.428, 1.4418, -1.4561, 0.9728, -0.5724, -0.5166, 2.1731, 1.4753, -1.2987, 0.7378, 2.2149, 0.8962, -0.8327, 1.7024, 0.0, 0.4866, -0.5493, -0.0263, 0.0, 0.7343, 0.8667, 0.9871, 1.1632, 1.7356, 1.0476, 0.8319]), features_scaled=DenseVector([-1.2671, -1.9975, -0.4258, 0.7398, -1.4469, -0.0376, -0.5669, -0.5136, 1.1412, 0.9661, -1.2854, 0.7324, 1.1581, -0.1973, -0.8251, 1.6919, -0.8374, -0.9875, -0.5455, -0.0261, -0.7149, -0.4435, -0.4146, -0.3862, 0.3857, 1.453, 0.0395, -0.4083])),
 Row(label=0.0, features=DenseVector([0.2751, -2.1058, -0.4463, 1.9427, 1.0554, 1.0124, -0.203, 1.5962, 0.0, 0.7557, -0.1243, -0.448, 2.2149, 0.4728, 0.0775, -1.4902, 2.5482, 0.4075, 0.4909, 1.2967, 0.0, 0.4785, 0.9793, 0.989, 1.0262, 0.5172, 1.0048, 1.0122]), features_scaled=DenseVector([-1.2667, -2.0884, -0.444, 1.5754, 1.0485, 0.0458, -0.2011, 1.5869, -0.9727, -0.4745, -0.1224, -0.4456, 1.1581, -1.0659, 0.0771, -1.4809, 1.2972, -1.144, 0.4864, 1.2884, -

In [13]:
scaled_test = scaler.transform(test_dense)
scaled_test.head(2)

[Row(label=0.0, features=DenseVector([0.2749, -1.6139, -1.6815, 1.1242, -0.4629, 1.1079, -0.3288, 1.1643, 2.1731, 0.6779, 0.2973, -1.187, 0.0, 0.5585, -0.556, 0.6656, 0.0, 1.1097, 0.9015, -0.8915, 3.102, 1.0404, 1.0145, 0.9898, 0.7681, 1.4423, 0.9111, 0.7806]), features_scaled=DenseVector([-1.2671, -1.6004, -1.6715, 0.2101, -0.4601, 0.2472, -0.3256, 1.1575, 1.1412, -0.6303, 0.2951, -1.1798, -0.9527, -0.8901, -0.5509, 0.6615, -0.8374, 0.2451, 0.8937, -0.8857, 1.4998, 0.0088, -0.0276, -0.3694, -0.6084, 0.8943, -0.3339, -0.5719])),
 Row(label=0.0, features=DenseVector([0.2751, 0.4947, -0.0935, 1.2946, -1.0987, 0.8165, -0.6575, -0.1308, 0.0, 1.2099, 0.8112, 1.114, 2.2149, 0.582, 1.7105, -0.981, 0.0, 1.6277, 0.5534, 1.6046, 3.102, 0.8882, 1.2242, 0.984, 1.3738, 0.5526, 0.9844, 1.3208]), features_scaled=DenseVector([-1.2667, 0.4916, -0.0933, 0.4944, -1.0918, -0.367, -0.6513, -0.13, -0.9727, 0.4348, 0.804, 1.1061, 1.1581, -0.8418, 1.6958, -0.9748, -0.8374, 1.2697, 0.5484, 1.5944, 1.4998, -0.2

# Create MultilayerPerceptronClassifier

In [15]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [16]:
import time
train_start = time.time()

layers = [28, 15, 16, 2]
mlpc = MultilayerPerceptronClassifier(layers = layers, blockSize = 32, seed = 4, solver='gd')
model = mlpc.fit(scaled_training)

train_end = time.time()
print(f'Time elapsed training model: {train_end - train_start} seconds')

Time elapsed training model: 1518.0823214054108 seconds


In [17]:
result = model.transform(scaled_test)
predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

Test set accuracy = 0.521960873406538
