# Data Preprocessing

Joeri R. Hermans                    
*Departement of Data Science & Knowledge Engineering*          
*Maastricht University, The Netherlands*  

After the data exploration, we now have a rough idea which features to use, and how to normalize them. Before we actually train the models, we need to normalize them. This is the intent of this notebook.

In [1]:
import numpy as np

import os

from pyspark import SparkContext
from pyspark import SparkConf
import pyspark

from distkeras.transformers import *
from distkeras.utils import *

from pyspark.ml.feature import StringIndexer

from pyspark.sql.functions import mean
from pyspark.sql.functions import stddev_pop
from pyspark.sql.functions import min
from pyspark.sql.functions import max

# Use the DataBricks CSV reader, this has some nice functionality regarding invalid values.
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-csv_2.10:1.4.0 pyspark-shell'

Using TensorFlow backend.


In [2]:
# Modify these variables according to your needs.
application_name = "CMS Event Preprocessing for ML jobs"
using_spark_2 = False
local = False
path_data = "data/events.csv"
if local:
    # Tell master to use local resources.
    master = "local[*]"
    num_processes = 3
    num_executors = 1
else:
    # Tell master to use YARN.
    master = "yarn-client"
    num_executors = 20
    num_processes = 1

# This variable is derived from the number of cores and executors,
# and will be used to assign the number of model trainers.
num_workers = num_executors * num_processes

print("Number of desired executors: " + `num_executors`)
print("Number of desired processes / executor: " + `num_processes`)
print("Total number of workers: " + `num_workers`)

Number of desired executors: 20
Number of desired processes / executor: 1
Total number of workers: 20


In [3]:
# Do not change anything here.
conf = SparkConf()
conf.set("spark.app.name", application_name)
conf.set("spark.master", master)
conf.set("spark.executor.cores", `num_processes`)
conf.set("spark.executor.instances", `num_executors`)
conf.set("spark.executor.memory", "4g")
conf.set("spark.locality.wait", "0")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

# Check if the user is running Spark 2.0 +
if using_spark_2:
    sc = SparkSession.builder.config(conf=conf) \
            .appName(application_name) \
            .getOrCreate()
else:
    # Create the Spark context.
    sc = SparkContext(conf=conf)
    # Add the missing imports
    from pyspark import SQLContext
    sqlContext = SQLContext(sc)

In [4]:
# Check if we are using Spark 2.0
if using_spark_2:
    reader = sc
else:
    reader = sqlContext
# Read the dataset.
dataset = reader.read.format('com.databricks.spark.csv') \
                .options(header='true', inferSchema='true') \
                .load(path_data)
# Repartition the dataset.
dataset = dataset.repartition(num_workers)
dataset.cache()

# Count the total number of events.
print("Total number of events: " + str(dataset.count()))

Total number of events: 1747413


In [5]:
# We are only interested in the following features
# Extract columns which are interesting to us.
features = dataset.columns
del features[0]
features.remove('TrackId')
# Remove the pixel detector hits.
for i in range(0, 5):
    features.remove("pix_" + str(i) + "_x")
    features.remove("pix_" + str(i) + "_y")
    features.remove("pix_" + str(i) + "_z")
# Remove the silicon detector hits.
for i in range(0, 50):
    features.remove("sis_" + str(i) + "_x")
    features.remove("sis_" + str(i) + "_y")
    features.remove("sis_" + str(i) + "_z")
# Filter them from the dataset.
dataset = dataset.select(features)
dataset.cache()
# Remove other columns from the feature list.
features.remove('run')
features.remove('evt')
features.remove('lumi')
features.remove('label')
features.remove('ndof')
features.remove('chi2')
features.remove('normalizedChi2')

# Show the new schema.
dataset.printSchema()

root
 |-- run: integer (nullable = true)
 |-- evt: integer (nullable = true)
 |-- lumi: integer (nullable = true)
 |-- charge: integer (nullable = true)
 |-- chi2: double (nullable = true)
 |-- ndof: double (nullable = true)
 |-- normalizedChi2: double (nullable = true)
 |-- qoverp: double (nullable = true)
 |-- theta: double (nullable = true)
 |-- lambda: double (nullable = true)
 |-- dxy: double (nullable = true)
 |-- d0: double (nullable = true)
 |-- dsz: double (nullable = true)
 |-- dz: double (nullable = true)
 |-- p: double (nullable = true)
 |-- pt: double (nullable = true)
 |-- px: double (nullable = true)
 |-- py: double (nullable = true)
 |-- pz: double (nullable = true)
 |-- eta: double (nullable = true)
 |-- phi: double (nullable = true)
 |-- vx: double (nullable = true)
 |-- vy: double (nullable = true)
 |-- vz: double (nullable = true)
 |-- label: string (nullable = true)



In [6]:
# Fetch all distinct labels.
labels = dataset.select("label").distinct().collect()
num_labels = len(labels)
for i in range(0, num_labels):
    labels[i] = labels[i].asDict()['label']
    
labels

[u'Wjet',
 u'SMS-T1tttt_mGl',
 u'DisplacedSUSY_stopToBottom',
 u'RSGravitonToGaGa',
 u'PhiToMuMu',
 u'H125GGgluonfusion']

In [7]:
# Display the features we will use in our models
for f in features:
    print("- " + f)

- charge
- qoverp
- theta
- lambda
- dxy
- d0
- dsz
- dz
- p
- pt
- px
- py
- pz
- eta
- phi
- vx
- vy
- vz


In [8]:
# Collect the min-max value for every feature.
for f in features:
    r = dataset.select(pyspark.sql.functions.min(f),pyspark.sql.functions.max(f)).collect()[0].asDict()
    min = r['min(' + f + ')']
    max = r['max(' + f + ')']
    print(f + ": " + str(min) + " - " + str(max))

charge: -1 - 1
qoverp: -6.372203513 - 6.29370927046
theta: 0.063404051408 - 3.09221749952
lambda: -1.52142117272 - 1.50739227539
dxy: -122.304409123 - 138.917929453
d0: -138.917929453 - 122.304409123
dsz: -189.87788268 - 191.659099905
dz: -543.282959864 - 1289.63191789
p: 0.156931585433 - 418632.894996
pt: 0.0601777821704 - 412066.929499
px: -258788.734375 - 304492.25
py: -54888.5 - 320667.34375
pz: -214375.046875 - 70553.8359375
eta: -3.70125192813 - 3.45103961308
phi: -3.14158964157 - 3.14159178734
vx: -120.217697144 - 121.903030396
vy: -113.07585907 - 116.766670227
vz: -543.150695801 - 1292.33703613


In [9]:
# Now, let's normalize all the features according to the plan described above.
# All normalized features will have a '_norm' suffix.
# However, it is possible that certain events contain outliers, but we will ignore these,
# and check how it goes.
# For the normalization we use the metrics obtained in the Data Exploration notebook.
columns = list(features)
columns.append('run')
columns.append('evt')
columns.append('lumi')
columns.append('label')
dataset = dataset.select(columns)

# Normalize qoverp.
t = MinMaxTransformer(o_min=-7.0, o_max=7.0, n_min=-1.0, n_max=1.0, input_col="qoverp", output_col="qoverp_norm", is_vector=False)
dataset = t.transform(dataset)
# Normalize theta.
t = MinMaxTransformer(o_min=-3.5, o_max=3.5, n_min=-1.0, n_max=1.0, input_col="theta", output_col="theta_norm", is_vector=False)
dataset = t.transform(dataset)
# Normalize lambda.
t = MinMaxTransformer(o_min=-3.5, o_max=3.5, n_min=-1.0, n_max=1.0, input_col="lambda", output_col="lambda_norm", is_vector=False)
dataset = t.transform(dataset)
# Normalize dxy
t = MinMaxTransformer(o_min=-35, o_max=35, n_min=-1.0, n_max=1.0, input_col="dxy", output_col="dxy_norm", is_vector=False)
dataset = t.transform(dataset)
# Normalize d0
t = MinMaxTransformer(o_min=-35, o_max=35, n_min=-1.0, n_max=1.0, input_col="d0", output_col="d0_norm", is_vector=False)
dataset = t.transform(dataset)
# Normalize dsz
t = MinMaxTransformer(o_min=-60, o_max=60, n_min=-1.0, n_max=1.0, input_col="dsz", output_col="dsz_norm", is_vector=False)
dataset = t.transform(dataset)
# Normalize dz
t = MinMaxTransformer(o_min=-120, o_max=120, n_min=-1.0, n_max=1.0, input_col="dz", output_col="dz_norm", is_vector=False)
dataset = t.transform(dataset)
# Normalize p
t = MinMaxTransformer(o_min=0, o_max=200, n_min=0, n_max=1.0, input_col="p", output_col="p_norm", is_vector=False)
dataset = t.transform(dataset)
# Normalize pt
t = MinMaxTransformer(o_min=0, o_max=200, n_min=0, n_max=1.0, input_col="pt", output_col="pt_norm", is_vector=False)
dataset = t.transform(dataset)
# Normalize px
t = MinMaxTransformer(o_min=-5000, o_max=5000, n_min=-5.0, n_max=5.0, input_col="px", output_col="px_norm", is_vector=False)
dataset = t.transform(dataset)
# Normalize py
t = MinMaxTransformer(o_min=-3000.0, o_max=3000.0, n_min=-3.0, n_max=3.0, input_col="py", output_col="py_norm", is_vector=False)
dataset = t.transform(dataset)
# Normalize pz
t = MinMaxTransformer(o_min=-2500.0, o_max=2500.0, n_min=-2.5, n_max=2.5, input_col="pz", output_col="pz_norm", is_vector=False)
dataset = t.transform(dataset)
# Normalize eta
t = MinMaxTransformer(o_min=-4.0, o_max=4.0, n_min=-1, n_max=1, input_col="eta", output_col="eta_norm", is_vector=False)
dataset = t.transform(dataset)
# Normalize phi
t = MinMaxTransformer(o_min=-4.0, o_max=4.0, n_min=-1, n_max=1, input_col="phi", output_col="phi_norm", is_vector=False)
dataset = t.transform(dataset)
# Normalize vx
t = MinMaxTransformer(o_min=-35.0, o_max=35.0, n_min=-1, n_max=1, input_col="vx", output_col="vx_norm", is_vector=False)
dataset = t.transform(dataset)
# Normalize vy
t = MinMaxTransformer(o_min=-14.0, o_max=14.0, n_min=-1, n_max=1, input_col="vy", output_col="vy_norm", is_vector=False)
dataset = t.transform(dataset)
# Normalize vz
t = MinMaxTransformer(o_min=-120.0, o_max=120.0, n_min=-1, n_max=1, input_col="vz", output_col="vz_norm", is_vector=False)
dataset = t.transform(dataset)

dataset.printSchema()

root
 |-- charge: long (nullable = true)
 |-- qoverp: double (nullable = true)
 |-- theta: double (nullable = true)
 |-- lambda: double (nullable = true)
 |-- dxy: double (nullable = true)
 |-- d0: double (nullable = true)
 |-- dsz: double (nullable = true)
 |-- dz: double (nullable = true)
 |-- p: double (nullable = true)
 |-- pt: double (nullable = true)
 |-- px: double (nullable = true)
 |-- py: double (nullable = true)
 |-- pz: double (nullable = true)
 |-- eta: double (nullable = true)
 |-- phi: double (nullable = true)
 |-- vx: double (nullable = true)
 |-- vy: double (nullable = true)
 |-- vz: double (nullable = true)
 |-- run: long (nullable = true)
 |-- evt: long (nullable = true)
 |-- lumi: long (nullable = true)
 |-- label: string (nullable = true)
 |-- qoverp_norm: double (nullable = true)
 |-- theta_norm: double (nullable = true)
 |-- lambda_norm: double (nullable = true)
 |-- dxy_norm: double (nullable = true)
 |-- d0_norm: double (nullable = true)
 |-- dsz_norm: double (

In [10]:
from pyspark.ml.feature import VectorAssembler

# In order to have a comparison, let use vectorize the above features to establish a baseline for learning.
vector_assembler = VectorAssembler(inputCols=features, outputCol="features_raw")
dataset = vector_assembler.transform(dataset)

In [11]:
# Construct an vector with the names of the normalized columns.
features_normalized = [x + "_norm" for x in features]
features_normalized.remove('charge_norm')
features_normalized.append('charge')
# Vectorize the normalized features.
vector_assembler = VectorAssembler(inputCols=features_normalized, outputCol="features_normalized_raw")
dataset = vector_assembler.transform(dataset)

In [12]:
# Check if all columns are present.
dataset.printSchema()
# Remove the old Parquet file if it exists.
!hdfs dfs -rm -r events.parquet
# Write to a Parquet file so we don't have to execute the above when we run this notebook again.
dataset.write.parquet("events.parquet")

root
 |-- charge: long (nullable = true)
 |-- qoverp: double (nullable = true)
 |-- theta: double (nullable = true)
 |-- lambda: double (nullable = true)
 |-- dxy: double (nullable = true)
 |-- d0: double (nullable = true)
 |-- dsz: double (nullable = true)
 |-- dz: double (nullable = true)
 |-- p: double (nullable = true)
 |-- pt: double (nullable = true)
 |-- px: double (nullable = true)
 |-- py: double (nullable = true)
 |-- pz: double (nullable = true)
 |-- eta: double (nullable = true)
 |-- phi: double (nullable = true)
 |-- vx: double (nullable = true)
 |-- vy: double (nullable = true)
 |-- vz: double (nullable = true)
 |-- run: long (nullable = true)
 |-- evt: long (nullable = true)
 |-- lumi: long (nullable = true)
 |-- label: string (nullable = true)
 |-- qoverp_norm: double (nullable = true)
 |-- theta_norm: double (nullable = true)
 |-- lambda_norm: double (nullable = true)
 |-- dxy_norm: double (nullable = true)
 |-- d0_norm: double (nullable = true)
 |-- dsz_norm: double (

In [13]:
# Read the parquet file.
dataset = sqlContext.read.parquet("events.parquet")
# Repartition the dataset.
dataset = dataset.repartition(num_workers)
dataset.cache()
# Print the schema.
dataset.printSchema()

root
 |-- charge: long (nullable = true)
 |-- qoverp: double (nullable = true)
 |-- theta: double (nullable = true)
 |-- lambda: double (nullable = true)
 |-- dxy: double (nullable = true)
 |-- d0: double (nullable = true)
 |-- dsz: double (nullable = true)
 |-- dz: double (nullable = true)
 |-- p: double (nullable = true)
 |-- pt: double (nullable = true)
 |-- px: double (nullable = true)
 |-- py: double (nullable = true)
 |-- pz: double (nullable = true)
 |-- eta: double (nullable = true)
 |-- phi: double (nullable = true)
 |-- vx: double (nullable = true)
 |-- vy: double (nullable = true)
 |-- vz: double (nullable = true)
 |-- run: long (nullable = true)
 |-- evt: long (nullable = true)
 |-- lumi: long (nullable = true)
 |-- label: string (nullable = true)
 |-- qoverp_norm: double (nullable = true)
 |-- theta_norm: double (nullable = true)
 |-- lambda_norm: double (nullable = true)
 |-- dxy_norm: double (nullable = true)
 |-- d0_norm: double (nullable = true)
 |-- dsz_norm: double (

In [14]:
# Add binary labels for binary classification experiments.
for i in range(0, num_labels):
    label = labels[i]
    input_column = "label"
    output_column = "label_binary_" + str(i)
    t = BinaryLabelTransformer(input_column, output_column, label)
    dataset = t.transform(dataset)

In [15]:
# Later, we also want to do multiclass classification.
# For this, we first need to convert the labels to indexes,
# and then one-hot encode them.

# Convert labels to indexes.
string_indexer = StringIndexer(inputCol="label", outputCol="label_index")
dataset = string_indexer.fit(dataset).transform(dataset)
# One-hot encode the labels for multi-class classification.
t = OneHotTransformer(input_col="label_index", output_col="label_multiclass", output_dim=num_labels)
dataset = t.transform(dataset)

# Check if all required columns have been added.
dataset.printSchema()

root
 |-- charge: long (nullable = true)
 |-- qoverp: double (nullable = true)
 |-- theta: double (nullable = true)
 |-- lambda: double (nullable = true)
 |-- dxy: double (nullable = true)
 |-- d0: double (nullable = true)
 |-- dsz: double (nullable = true)
 |-- dz: double (nullable = true)
 |-- p: double (nullable = true)
 |-- pt: double (nullable = true)
 |-- px: double (nullable = true)
 |-- py: double (nullable = true)
 |-- pz: double (nullable = true)
 |-- eta: double (nullable = true)
 |-- phi: double (nullable = true)
 |-- vx: double (nullable = true)
 |-- vy: double (nullable = true)
 |-- vz: double (nullable = true)
 |-- run: long (nullable = true)
 |-- evt: long (nullable = true)
 |-- lumi: long (nullable = true)
 |-- label: string (nullable = true)
 |-- qoverp_norm: double (nullable = true)
 |-- theta_norm: double (nullable = true)
 |-- lambda_norm: double (nullable = true)
 |-- dxy_norm: double (nullable = true)
 |-- d0_norm: double (nullable = true)
 |-- dsz_norm: double (

In [16]:
# Remove previous possible dataframe.
!hdfs dfs -rm -r events_processed.parquet
# Write the preprocessed dataframe to HDFS for later use.
dataset.write.parquet("events_processed.parquet")

Deleted events_processed.parquet
