In [1]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric

# Create Spark Session and read CSV file.

In [2]:
spark = SparkSession.builder.appName('Bank Data-Logistic Regression').getOrCreate()
df = spark.read.csv('./bankdata1.csv', header=True, inferSchema=True)

In [3]:
# Drop null column at end of data.
df = df.drop('_c16')

# Use StringIndexer on categorical variable columns.

In [4]:
categorical_features = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'month', 'poutcome', 'y']
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in categorical_features]
pipeline = Pipeline(stages=indexers)
df_r = pipeline.fit(df).transform(df)

In [5]:
# Drop original columns.
df_r = df_r.drop('job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'month', 'poutcome', 'y')

# Use VectorAssembler to compress all independent variable columns into dense vectors.

In [6]:
# Create list of IVs.
feat_cols = ['age', 'balance','day', 'campaign', 'pdays', 'previous', 'job_index', 'marital_index', 'education_index', 'default_index', 'housing_index', 'loan_index', 'contact_index', 'month_index', 'poutcome_index']

# Create the vector assembler and name the vector column "features".
vec_assembler = VectorAssembler(inputCols=feat_cols, outputCol='features')

# Create the new dataframe that has the "features" column of dense vectors..
dense_df = vec_assembler.transform(df_r)

In [7]:
# Create a final pre-processed dataframe consisting only of IV vectors and the churn indicator.
final_df = dense_df.select('features', 'y_index')

# Split the data into three categories: training data, testing data, and validation data.
train, test, valid = final_df.randomSplit([.6, .2, .2])

# Logistic Regression model.

In [8]:
# Create Logistic Regression model, fit the model to the training data.
lr = LogisticRegression(labelCol='y_index')
lr = lr.fit(train)

In [9]:
# Get predictions for the testing data and print the accuracy.
test_preds = lr.transform(test)
print("Test data prediction accuracy: {0:.3f}".format(test_preds.filter(test_preds.y_index == test_preds.prediction).count() / test_preds.count()))

Test data prediction accuracy: 0.890


In [10]:
# Get predictions for the validation data and print the accuracy.
valid_preds = lr.transform(valid)
print("Validation data prediction accuracy: {0:.3f}".format(valid_preds.filter(valid_preds.y_index == valid_preds.prediction).count() / valid_preds.count()))

Validation data prediction accuracy: 0.894


# Get metrics via BinaryClassificationMetrics

In [11]:
# Collect the probabilities and churn indicator into a list.
results = test_preds.select(['probability', 'y_index'])
results_collect = results.collect()
results_list = [(float(i[0][0]), 1.0-float(i[1])) for i in results_collect]

In [12]:
# Convert results_list into an RDD.
scoreAndLabels = spark.sparkContext.parallelize(results_list)

# Use BinaryClassificationMetrics on the results
metrics = metric(scoreAndLabels)

print("The test ROC score is: ", metrics.areaUnderROC)

The test ROC score is:  0.7375987905514824


In [13]:
# Get confusion matrix values
tp = test_preds.filter((test_preds.y_index == 1) & (test_preds.prediction == 1)).count()
tn = test_preds.filter((test_preds.y_index == 0) & (test_preds.prediction == 0)).count()
fp = test_preds.filter((test_preds.y_index == 0) & (test_preds.prediction == 1)).count()
fn = test_preds.filter((test_preds.y_index == 1) & (test_preds.prediction == 0)).count()
print(tp, tn, fp, fn)

159 7872 97 891
