In [15]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.functions import col, when, isnan, trim, count
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import QuantileDiscretizer

In [3]:
spark = SparkSession \
    .builder \
    .appName("SparkTest") \
    .getOrCreate()

# import data
df = spark.read.csv("loan_1000.csv", inferSchema = True, header = True)

# fill na
df = df.na.fill(0)

In [4]:
# create feature vector
cols = df.columns
cols.remove('funded_ratio')
assembler = VectorAssembler(inputCols = cols, outputCol = 'features')

# transform df with the newly created vectorassembler
df = assembler.transform(df)
df.select('features').show(5)

+--------------------+
|            features|
+--------------------+
|[2500.0,2500.0,25...|
|[30000.0,30000.0,...|
|(96,[0,1,2,3,4,5,...|
|(96,[0,1,2,3,4,5,...|
|(96,[0,1,2,3,4,5,...|
+--------------------+
only showing top 5 rows



In [5]:
# bucket dependent variable
discretizer = QuantileDiscretizer(numBuckets = 3, inputCol = "funded_ratio", outputCol = "funded_ratio_cat")
df = discretizer.fit(df).transform(df)
df.groupby("funded_ratio_cat").count().toPandas()

Unnamed: 0,funded_ratio_cat,count
0,0.0,20
1,1.0,979


In [6]:
# split data into train and test
train, test = df.randomSplit([.8,.2],seed = 1234)

In [7]:
# set parameters for Logistic Regression
lgr = LogisticRegression(maxIter = 10, featuresCol = 'features', labelCol = 'funded_ratio_cat')

In [8]:
# fit the model to the data
lgrm = lgr.fit(train)

In [10]:
# given a dataset, predict each point's label, and show the results.
predict_train = lgrm.transform(train)
predict_test = lgrm.transform(test)

In [29]:
train_results = predict_train.select(['prediction', 'funded_ratio_cat'])
train_predictionAndLabels = train_results.rdd
train_metrics = MulticlassMetrics(train_predictionAndLabels)

train_cm = train_metrics.confusionMatrix().toArray()
trian_accuracy = (train_cm[0][0] + train_cm[1][1])/train_cm.sum()
train_precision = (train_cm[0][0])/(train_cm[0][0] + train_cm[1][0])
train_recall = (train_cm[0][0])/(train_cm[0][0]+train_cm[0][1])

print("train_confusion_matrix\n",train_cm)
print("train_accuracy",trian_accuracy)
print("train_precision",train_precision)
print("trian_recall",train_recall)

train_confusion_matrix
 [[ 13.   3.]
 [  2. 808.]]
train_accuracy 0.9939467312348669
train_precision 0.8666666666666667
trian_recall 0.8125


In [30]:
test_results = predict_test.select(['prediction', 'funded_ratio_cat'])
test_predictionAndLabels = test_results.rdd
test_metrics = MulticlassMetrics(test_predictionAndLabels)

test_cm = test_metrics.confusionMatrix().toArray()
test_accuracy = (test_cm[0][0] + test_cm[1][1])/test_cm.sum()
test_precision = (test_cm[0][0])/(test_cm[0][0] + test_cm[1][0])
test_recall = (test_cm[0][0])/(test_cm[0][0] + test_cm[0][1])

print("test_confusion_matrix\n",test_cm)
print("test_accuracy",test_accuracy)
print("test_precision",test_precision)
print("test_recall",test_recall)

test_confusion_matrix
 [[  0.   4.]
 [  0. 169.]]
test_accuracy 0.976878612716763
test_precision nan
test_recall 0.0


  import sys


In [None]:
# evaluation metrics
evaluator_train = MulticlassClassificationEvaluator(labelCol = 'funded_ratio_cat', predictionCol = 'predict_train')
evaluator_test = MulticlassClassificationEvaluator(labelCol = 'funded_ratio_cat', predictionCol = 'predict_test')

In [18]:
# print("Accuracy score for predicted train is {}"\
#       .format(evaluator_train.evaluate(predict_train, {evaluator_train.metricName: "accuracy"})))
# print("f1 score for predicted train is {}"\
#       .format(evaluator_train.evaluate(predict_train, {evaluator_train.metricName: "f1"})))

# print("\nAccuracy score for predicted test is {}"\
#       .format(evaluator.evaluate(predict_test, {evaluator.metricName: "accuracy"})))
# print("f1 score for predicted test is {}"\
#       .format(evaluator.evaluate(predict_test, {evaluator.metricName: "f1"})))

# print("\nThe area under ROC for train set is {}".format(evaluator.evaluate(predict_train)))
# print("The area under ROC for test set is {}".format(evaluator.evaluate(predict_test)))