In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("spark://192.168.56.200:7077").getOrCreate()

In [None]:
spark.sparkContext._conf.getAll()

In [2]:
raw_data = spark.read.format('csv').option('header','true').option('inferSchema', 'true').load('training.csv')
raw_data.columns

['EventId',
 'DER_mass_MMC',
 'DER_mass_transverse_met_lep',
 'DER_mass_vis',
 'DER_pt_h',
 'DER_deltaeta_jet_jet',
 'DER_mass_jet_jet',
 'DER_prodeta_jet_jet',
 'DER_deltar_tau_lep',
 'DER_pt_tot',
 'DER_sum_pt',
 'DER_pt_ratio_lep_tau',
 'DER_met_phi_centrality',
 'DER_lep_eta_centrality',
 'PRI_tau_pt',
 'PRI_tau_eta',
 'PRI_tau_phi',
 'PRI_lep_pt',
 'PRI_lep_eta',
 'PRI_lep_phi',
 'PRI_met',
 'PRI_met_phi',
 'PRI_met_sumet',
 'PRI_jet_num',
 'PRI_jet_leading_pt',
 'PRI_jet_leading_eta',
 'PRI_jet_leading_phi',
 'PRI_jet_subleading_pt',
 'PRI_jet_subleading_eta',
 'PRI_jet_subleading_phi',
 'PRI_jet_all_pt',
 'Weight',
 'Label']

In [None]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="Label", outputCol="Label_index").fit(raw_data)
train = indexer.transform(raw_data)
#train.show()

train_clean = train.drop("Label")

In [None]:
import pandas as pd
train_df.limit(5).toPandas()

In [None]:
cols=train_clean.columns
cols.remove("Label_index")
# Let us import the vector assembler
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=cols,outputCol="features")
# Now let us use the transform method to transform our dataset
train_df=assembler.transform(train_clean)
train_df.select("features").show(truncate=False)

In [None]:
from pyspark.ml.feature import StandardScaler
standardscaler=StandardScaler().setInputCol("features").setOutputCol("Scaled_features")
train_df=standardscaler.fit(train_df).transform(train_df)
train_df.select("features","Scaled_features").show(5)

In [None]:
train, test = train_df.randomSplit([0.7, 0.3], seed = 2018)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

In [None]:
dataset_size=float(train.select("Label_index").count())
numPositives=train.select("Label_index").where('Label_index == 0').count()
per_ones=(float(numPositives)/float(dataset_size))*100
numNegatives=float(dataset_size-numPositives)
print('The number of ones are {}'.format(numPositives))
print('Percentage of ones are {}'.format(per_ones))

In [None]:
from pyspark.ml.classification import LogisticRegression
# lr = LogisticRegression().setWeightCol("classWeights").setLabelCol("Outcome").setFeaturesCol("Aspect")
lr = LogisticRegression(labelCol="Label_index", featuresCol="Scaled_features",maxIter=10)
model=lr.fit(train)
predict_train=model.transform(train)
predict_test=model.transform(test)
predict_test.select("Label_index","prediction").show(10)

In [None]:
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))

In [None]:
import matplotlib.pyplot as plt
import numpy as np

beta = np.sort(model.coefficients)
plt.plot(beta)
plt.ylabel('Beta Coefficients')
plt.show()

trainingSummary = model.summary
roc = trainingSummary.roc.toPandas()
plt.plot(roc['FPR'],roc['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
plt.show()
print('Training set areaUnderROC: ' + str(trainingSummary.areaUnderROC))

In [None]:
pr = trainingSummary.pr.toPandas()
plt.plot(pr['recall'],pr['precision'])
plt.ylabel('Precision')
plt.xlabel('Recall')
plt.show()

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator=BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",labelCol="Label_index")


print("The area under ROC for train set is {}".format(evaluator.evaluate(predict_train)))
print("The area under ROC for test set is {}".format(evaluator.evaluate(predict_test)))

In [None]:
# test_data = spark.read.format('csv').option('header','true').option('inferSchema', 'true').load('test.csv')
# test_data.columns

In [None]:
# tindexer = StringIndexer(inputCol="Label", outputCol="Label_index").fit(test_data)
# test = tindexer.transform(test_data)
# test.show()

In [None]:
# tcols=test_data.columns
# # tcols.remove("Label")
# tassembler = VectorAssembler(inputCols=tcols,outputCol="features")
# # Now let us use the transform method to transform our dataset
# test_df=tassembler.transform(test_data)
# test_df.select("features").show(truncate=False)

In [None]:
# standardscaler=StandardScaler().setInputCol("features").setOutputCol("Scaled_features")
# test_df=standardscaler.fit(test_df).transform(test_df)
# test_df.select("features","Scaled_features").show(5)

In [None]:
# predictions = model.transform(test_df)

In [None]:
# from pyspark.ml.evaluation import BinaryClassificationEvaluator

# evaluator = BinaryClassificationEvaluator()
# print('Test Area Under ROC', evaluator.evaluate(predictions))

In [None]:
# cols=train.columns
# cols.remove("Label")
# train_clean = train.drop("Label")
# train_clean.limit(5).toPandas()