Continued from **Part 1 Exploratory Data Analysis**

In this part, we'll undersample the majority class to achieve class balanceness. Then we'll evaluate a simple logistic regression model in Spark ML on the balanced dataset, and use **CrossValidator** to find the optimal hyperparameter of the regularization coefficient of logistic regression.

In [1]:
import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
import seaborn as sns

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import Normalizer
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

spark = SparkSession.builder.getOrCreate()

df = spark.read.csv("creditcard.csv", header=True)

Cast the columns into float:

In [2]:
colnames = [col.name for col in df.schema.fields]
for col in colnames[:-1]:
    df = df.withColumn(col, df[col].cast("float"))
    
df = df.withColumn("Class", df["Class"].cast("int"))    

Number of rows:

In [3]:
df.count()

284807

Number of columns:

In [4]:
len(df.schema)

31

Undersample the majority (negative) class to reach class balanceness:

In [5]:
pos = df.filter(df["Class"] == 1)
neg = df.filter(df["Class"] == 0)
ratio = pos.count() / float(neg.count())
neg = neg.sample(False, ratio, seed=201)

In [6]:
print pos.count(), neg.count()

492 492


Combine positive and negative class and split into training and set

In [7]:
data = pos.union(neg)

Rename the "Class" column to "label"

In [8]:
data = data.withColumnRenamed("Class", "label")
data.persist()

DataFrame[Time: float, V1: float, V2: float, V3: float, V4: float, V5: float, V6: float, V7: float, V8: float, V9: float, V10: float, V11: float, V12: float, V13: float, V14: float, V15: float, V16: float, V17: float, V18: float, V19: float, V20: float, V21: float, V22: float, V23: float, V24: float, V25: float, V26: float, V27: float, V28: float, Amount: float, label: int]

Split the data into 80%, 20% ratio

In [9]:
train, test = data.randomSplit([0.8, 0.2], seed=123)

1. Create pipeline that first assemble the individual features into vectors.
2. Standardize the matrix with zero mean and unit standar deviation.
3. Set the hyperparameter grid using **ParamGridBuilder**, and pick the optimal value using cross validator.
4. Finally trains the logistic regression model with the best hyperparameter.

In [10]:
assembler = VectorAssembler(inputCols=colnames[:-1],
                            outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=True)
lr = LogisticRegression(maxIter=10, featuresCol="scaledFeatures",
                        regParam=0.3, elasticNetParam=0.8)

pipeline = Pipeline(stages=[assembler, scaler, lr])

paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [1.0, 0.1, 0.01]) \
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=10)  # use 3+ folds in practice

cvModel = crossval.fit(train)
testPred = cvModel.transform(test)
testPred.select(["scaledFeatures", "rawPrediction", "probability", "prediction"]).show()

+--------------------+--------------------+--------------------+----------+
|      scaledFeatures|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|[-1.8098657196564...|[0.42361902702795...|[0.60434892609849...|       0.0|
|[-1.7262448521327...|[-1.7793239514363...|[0.14438663246663...|       1.0|
|[-1.6733478422005...|[-6.2695318793994...|[0.00188953747081...|       1.0|
|[-1.6615067920374...|[-8.7233346596918...|[1.62717109406233...|       1.0|
|[-1.6589709211175...|[-6.2742962258405...|[0.00188057336040...|       1.0|
|[-1.6433994162127...|[-11.273167198346...|[1.27092573138424...|       1.0|
|[-1.6297979267333...|[-4.5938554014997...|[0.01001252580054...|       1.0|
|[-1.5864785449860...|[-14.554455919204...|[4.77617135662524...|       1.0|
|[-1.5759159090883...|[-8.6646395354148...|[1.72551968142148...|       1.0|
|[-1.5600300400199...|[-4.4867948667480...|[0.01113136313631...|       1.0|
|[-1.4525803

Spark ML only supports **area under ROC** and **area under Precision-Recall curve**

In [11]:
evaluator = BinaryClassificationEvaluator()
evaluator.explainParams()

'labelCol: label column name. (default: label)\nmetricName: metric name in evaluation (areaUnderROC|areaUnderPR) (default: areaUnderROC)\nrawPredictionCol: raw prediction (a.k.a. confidence) column name. (default: rawPrediction)'

In [12]:
print "Area under PR:", evaluator.evaluate(testPred,
                                           {evaluator.labelCol: "label", evaluator.metricName: "areaUnderPR"})
print "Area under ROC:", evaluator.evaluate(testPred,
                                            {evaluator.labelCol: "label", evaluator.metricName: "areaUnderROC"})

Area under PR: 0.990618570644
Area under ROC: 0.986575572336


Compute the accuracy, precision, recall manually:

In [13]:
def binary_classification_eval(df):
    acc = df.select(["prediction", "label"]).rdd.map(lambda x: 1. if x[0] == x[1] else 0.).mean()
    
    precision = df.select(["prediction", "label"]).rdd.map(lambda x: 1. if (x[0] == 1) and (x[1] == 1) else 0.).sum()/\
    df.select(["prediction", "label"]).rdd.map(lambda x: 1. if x[0] == 1 else 0.).sum()
    
    recall = df.select(["prediction", "label"]).rdd.map(lambda x: 1. if (x[0] == 1) and (x[1] == 1) else 0.).sum()/\
    df.select(["prediction", "label"]).rdd.map(lambda x: 1. if x[1] == 1 else 0.).sum()
    
    return {"accuracy":acc, "precision": precision, "recall": recall}

In [14]:
print binary_classification_eval(testPred)

{'recall': 0.8640776699029126, 'precision': 1.0, 'accuracy': 0.9239130434782609}


Confusion matrix:

In [15]:
def confusion_matrix(df):
    cm = np.zeros((2, 2))
    cm[0, 0] = df.select(["prediction", "label"]).rdd.map(lambda x: 1 if x[0] == 1 and x[1] == 1 else 0).sum()
    cm[0, 1] = df.select(["prediction", "label"]).rdd.map(lambda x: 1 if x[0] == 0 and x[1] == 1 else 0).sum()
    cm[1, 1] = df.select(["prediction", "label"]).rdd.map(lambda x: 1 if x[0] == 0 and x[1] == 0 else 0).sum()
    cm[1, 0] = df.select(["prediction", "label"]).rdd.map(lambda x: 1 if x[0] == 1 and x[1] == 0 else 0).sum()
    cm = pd.DataFrame(cm)
    cm.index = ["T", "F"]
    cm.index.name = "Target"
    cm.columns = ["T", "F"]
    cm.columns.name = "Predicted"
    cm = cm.applymap(int)
    return cm

In [16]:
confusion_matrix(testPred)

Predicted,T,F
Target,Unnamed: 1_level_1,Unnamed: 2_level_1
T,89,14
F,0,81
