In [1]:
import findspark
findspark.init("/home/wesmail/Panda/spark-2.2.1-bin-hadoop2.7")

from pyspark.sql import SparkSession

spark = SparkSession.builder  \
.master("local[2]")  \
.appName("Linear Regrssion Model")  \
.config("spark.executor.memory","8gb")  \
.getOrCreate()

sc = spark.sparkContext

# LOAD HERE
from root_numpy import root2array, tree2array, rec2array
import numpy as np
import pandas as pd

branch_names = """momentumx, momentumy,momentumz,momentum,energy,position,MvdDEDX,MvdHits,SttMeanDEDX,SttHits,GemHits,TofStopTime,
TofM2,TofTrackLength,TofQuality,TofBeta,DrcThetaC,DrcQuality,
DiscThetaC,DiscQuality,
EmcRawEnergy,EmcCalEnergy,EmcQuality,EmcNumberOfCrystals,EmcNumberOfBumps,EmcModule,
EmcZ20,EmcZ53,EmcLat,EmcE1,EmcE9,EmcE25,MuoQuality,MuoIron,MuoMomentumIn,MuoNumberOfLayers,MuoModule,MuoHits,
DegreesOfFreedom,ChiSquared""".split(",")
branch_names = [c.strip() for c in branch_names]
branch_names = list(branch_names)

electrons = root2array("/home/wesmail/Downloads/treeElectrons.root", "t1", branch_names)
electrons = rec2array(electrons)

pions = root2array("/home/wesmail/Downloads/treePions.root", "t1", branch_names)
pions = rec2array(pions)

muons = root2array("/home/wesmail/Downloads/treeMuons.root", "t1", branch_names)
muons = rec2array(muons)

kaons = root2array("/home/wesmail/Downloads/treeKaons.root", "t1", branch_names)
kaons = rec2array(kaons)

anti_p = root2array("/home/wesmail/Downloads/treeProtons.root", "t1", branch_names)
anti_p = rec2array(anti_p)

X = np.concatenate((electrons, pions, muons, kaons, anti_p))
y = np.concatenate(( np.zeros(electrons.shape[0]), (np.ones(pions.shape[0])), (2*np.ones(muons.shape[0])), (3*np.ones(kaons.shape[0])), (4*np.ones(anti_p.shape[0])) ))

# scale the features
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Create DataFrame from X and y
df = pd.DataFrame(np.hstack((X_scaled, y.reshape(y.shape[0], -1))),columns=branch_names+['label'])

# compute the mean value per feature on the data set
#mean_on_data = (df.iloc[:,0:-1]).mean(axis=0)
# compute the standard deviation of each feature on the data set
#std_on_data = (df.iloc[:,0:-1]).std(axis=0)
# subtract the mean, scale by inverse standard deviation
# afterwards, mean=0 and std=1
#df_scaled = ((df.iloc[:,0:-1]) - mean_on_data) / std_on_data

#df_scaled = pd.DataFrame(array_scaled, columns=branch_names)
#df_scaled['label'] = df.loc[:,'label']

print(df.columns)
# Create Spark DataFrame
sparkRDD = spark.createDataFrame(df)

Welcome to ROOTaaS 6.06/02
Index([u'momentumx', u'momentumy', u'momentumz', u'momentum', u'energy',
       u'position', u'MvdDEDX', u'MvdHits', u'SttMeanDEDX', u'SttHits',
       u'GemHits', u'TofStopTime', u'TofM2', u'TofTrackLength', u'TofQuality',
       u'TofBeta', u'DrcThetaC', u'DrcQuality', u'DiscThetaC', u'DiscQuality',
       u'EmcRawEnergy', u'EmcCalEnergy', u'EmcQuality', u'EmcNumberOfCrystals',
       u'EmcNumberOfBumps', u'EmcModule', u'EmcZ20', u'EmcZ53', u'EmcLat',
       u'EmcE1', u'EmcE9', u'EmcE25', u'MuoQuality', u'MuoIron',
       u'MuoMomentumIn', u'MuoNumberOfLayers', u'MuoModule', u'MuoHits',
       u'DegreesOfFreedom', u'ChiSquared', u'label'],
      dtype='object')


In [2]:
# Assemble the branches into feature coloumn
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=branch_names,outputCol="features")
data = assembler.transform(sparkRDD)

In [8]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3], seed=0)

# specify layers for the neural network:
# input layer of size 44 (features), two intermediate of size 50 and 50
# and output of size 5 (classes)
layers = [40, 100, 100, 5]

# Train a RandomForest model.
mlp = MultilayerPerceptronClassifier(labelCol='label', featuresCol='features', maxIter=500, layers=layers, blockSize=128, seed=1234)

# Fit model.
model = mlp.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)
predictionAndLabels = predictions.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

Test set accuracy = 0.839176738064


In [9]:
# Convert 'predictionAndLabels' into spark RDD
mypred = predictionAndLabels.rdd
print (mypred)

# Evaluation metrics
from pyspark.mllib.evaluation import MulticlassMetrics
metrics = MulticlassMetrics(mypred)

# Build the confusion matrix
cm = metrics.confusionMatrix().toArray()

# Show the confusion matrix
print (cm)

# Normalize and plot the confusion matrix (Rows = True values, Coloumns = Predicted Values)
# normalized over coloumns (axis=1)
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib tk
figcm, ax = plt.subplots()
cm = cm.astype('float') / cm.sum(axis=1)
sns.set(font_scale=2)
sns.heatmap(cm, square=True, annot=True, cbar=False)
classes=['e-','pi-', 'mu-', 'k-', 'p-']    # 0 1 2 3 4 
tick_marks = np.arange(len(classes))
plt.xticks(tick_marks, classes, rotation=90)
ax.set_xticks(np.arange(len(classes))+0.5, minor=False)
plt.yticks(tick_marks, classes)
ax.set_yticks(np.arange(len(classes))+0.5, minor=False)
plt.xlabel('predication', horizontalalignment = 'center')
plt.ylabel('true value')

MapPartitionsRDD[742] at javaToPython at NativeMethodAccessorImpl.java:0
[[10054.   145.    87.    54.    57.]
 [  216.  7168.  1192.  1018.   335.]
 [  124.   712.  9439.   167.    76.]
 [  136.  1063.   343.  7275.   863.]
 [   82.   366.    83.   945.  8142.]]


<matplotlib.text.Text at 0x7f2f93b62f50>