In [1]:
from pyspark.mllib.regression import LabeledPoint
from numpy import array

#read training data
initial_train_data = sc.textFile("dbfs:/FileStore/tables/segpevp71489465231873/train_data.csv")
header = initial_train_data.first()

def parse_data(data):
  data_split = line.split(",")
  return LabeledPoint(data_split[1], data_split[2:])

train_data = initial_train_data.filter(lambda d: d != header).map(
  lambda d: LabeledPoint(d.split(',')[1], d.split(',')[2:])
).cache()

In [2]:
#read test data
initial_test_data = sc.textFile("dbfs:/FileStore/tables/2e7b7q841489465266479/test_data.csv")
test_data = initial_test_data.filter(lambda d: d != header).map(
  lambda d: LabeledPoint(d.split(',')[1], d.split(',')[2:])
).cache()

In [3]:
#Perform logistic regression
from pyspark.mllib.classification import LogisticRegressionWithLBFGS

lrm_output = []

for _ in range(10):

  lrm = LogisticRegressionWithLBFGS.train(train_data)
  
  predictionsAndLabels = test_data.map(
    lambda p: (p.label, lrm.predict(p.features))
  )

  true_pos = float(predictionsAndLabels.filter(lambda (v, p): v == 1 and p == 1).count())
  true_neg = float(predictionsAndLabels.filter(lambda (v, p): v == 0 and p == 0).count())
  false_pos = float(predictionsAndLabels.filter(lambda (v, p): v == 0 and p == 1).count())
  false_neg = float(predictionsAndLabels.filter(lambda (v, p): v == 1 and p == 0).count())
  

  accuracy = (true_pos + true_neg) / (true_pos + false_neg + false_pos + true_neg)
  recall = false_pos / (false_pos + true_neg)
  precision = true_pos / (true_pos + false_pos)
  f1_score = 2 * ((recall * precision) / (recall + precision))
  
  lrm_output.append((round(accuracy,5), round(recall,5), round(precision,5), round(f1_score,5)))
  
print display(lrm_output)
  

In [4]:
#Perform random forest classifier
from pyspark.mllib.tree import RandomForest

rfc_output = []

for _ in range(10):
  rfc = RandomForest.trainClassifier(
    train_data, 2, categoricalFeaturesInfo={40:2, 41:2, 42:2, 43:2, 44:2, 45:2, 46:2, 47:2, 48:2, 49:2}
    , numTrees=100, featureSubsetStrategy="auto", impurity='gini', maxDepth=3, maxBins=200
  )
  
  predictions = rfc.predict(test_data.map(
      lambda d: d.features
    ))
  labelsAndPredictions = test_data.map(
    lambda lp: lp.label
  ).zip(predictions)
  
  true_pos = float(predictionsAndLabels.filter(lambda (v, p): v == 1 and p == 1).count())
  true_neg = float(predictionsAndLabels.filter(lambda (v, p): v == 0 and p == 0).count())
  false_pos = float(predictionsAndLabels.filter(lambda (v, p): v == 0 and p == 1).count())
  false_neg = float(predictionsAndLabels.filter(lambda (v, p): v == 1 and p == 0).count())
  

  accuracy = (true_pos + true_neg) / (true_pos + false_neg + false_pos + true_neg)
  recall = false_pos / (false_pos + true_neg)
  precision = true_pos / (true_pos + false_pos)
  f1_score = 2 * ((recall * precision) / (recall + precision))
  
  rfc_output.append((round(accuracy,5), round(recall,5), round(precision,5), round(f1_score,5)))
  
print display(rfc_output)

In [5]:
# Gradient boosted classifier
from pyspark.mllib.tree import GradientBoostedTrees

gbt_output = []

for _ in range(10):
  gbt = GradientBoostedTrees.trainClassifier(
    train_data, categoricalFeaturesInfo={40:2, 41:2, 42:2, 43:2, 44:2, 45:2, 46:2, 47:2, 48:2, 49:2}
    , numIterations=5
  )
  
  predictions = gbt.predict(test_data.map(lambda d: d.features))
  labelsAndPredictions = test_data.map(
    lambda lp: lp.label
  ).zip(predictions)
  
  true_pos = float(predictionsAndLabels.filter(lambda (v, p): v == 1 and p == 1).count())
  true_neg = float(predictionsAndLabels.filter(lambda (v, p): v == 0 and p == 0).count())
  false_pos = float(predictionsAndLabels.filter(lambda (v, p): v == 0 and p == 1).count())
  false_neg = float(predictionsAndLabels.filter(lambda (v, p): v == 1 and p == 0).count())
  

  accuracy = (true_pos + true_neg) / (true_pos + false_neg + false_pos + true_neg)
  recall = false_pos / (false_pos + true_neg)
  precision = true_pos / (true_pos + false_pos)
  f1_score = 2 * ((recall * precision) / (recall + precision))
  
  gbt_output.append((round(accuracy,5), round(recall,5), round(precision,5), round(f1_score,5)))
  
print display(gbt_output)