In [None]:
# Import Libraries
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))
from pyspark.sql.types import StructType, StructField, FloatType, BooleanType, DoubleType, IntegerType, StringType
import pyspark
from pyspark.sql.functions import col, sum as spark_sum
from pyspark import SQLContext
from pyspark.sql import SparkSession
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression, LinearSVC, GBTClassifier, NaiveBayes, RandomForestClassifier, DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.sql.functions import lit
import warnings
warnings.filterwarnings("ignore")

In [None]:
# Setup the Configuration
conf = pyspark.SparkConf()
spark_context = SparkSession.builder.config(conf=conf).getOrCreate()
sqlcontext = SQLContext(sc)

In [None]:
df = spark.read.option("delimiter", ",").option("header", True).csv("Base.csv", header=True, inferSchema=True)
df.show()



+----------+------------------+---------------------+-------------------------+----------------------------+------------+------------------+----------------------+------------+------------+------------------+------------------+------------------+--------------------+--------------------------------+-----------------+-----------------+-------------+--------------+----------------+------------------+-----------------+---------------+---------------------+---------------+--------+-------------------------+---------+------------------+-------------------------+------------------+-----+
|fraud_bool|            income|name_email_similarity|prev_address_months_count|current_address_months_count|customer_age|days_since_request|intended_balcon_amount|payment_type|zip_count_4w|       velocity_6h|      velocity_24h|       velocity_4w|bank_branch_count_8w|date_of_birth_distinct_emails_4w|employment_status|credit_risk_score|email_is_free|housing_status|phone_home_valid|phone_mobile_valid|bank_month

                                                                                

In [None]:
fractions={
    1:1,
    0:0.493737
}
#fractions = df.select("fraud_bool").distinct().withColumn("fraction", lit(0.5)).rdd.collectAsMap()
sampled_df = df.stat.sampleBy("fraud_bool", fractions, 42)
sampled_df.show()

In [None]:
sampled_df.count()

In [None]:
sampled_df.groupby("fraud_bool").count().show()

In [None]:
sampled_df.printSchema()

root
 |-- fraud_bool: integer (nullable = true)
 |-- income: double (nullable = true)
 |-- name_email_similarity: double (nullable = true)
 |-- prev_address_months_count: integer (nullable = true)
 |-- current_address_months_count: integer (nullable = true)
 |-- customer_age: integer (nullable = true)
 |-- days_since_request: double (nullable = true)
 |-- intended_balcon_amount: double (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- zip_count_4w: integer (nullable = true)
 |-- velocity_6h: double (nullable = true)
 |-- velocity_24h: double (nullable = true)
 |-- velocity_4w: double (nullable = true)
 |-- bank_branch_count_8w: integer (nullable = true)
 |-- date_of_birth_distinct_emails_4w: integer (nullable = true)
 |-- employment_status: string (nullable = true)
 |-- credit_risk_score: integer (nullable = true)
 |-- email_is_free: integer (nullable = true)
 |-- housing_status: string (nullable = true)
 |-- phone_home_valid: integer (nullable = true)
 |-- phone_mobil

In [None]:
null_counts = sampled_df.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in sampled_df.columns])
null_counts.show()

[Stage 24:>                                                       (0 + 12) / 12]

+----------+------+---------------------+-------------------------+----------------------------+------------+------------------+----------------------+------------+------------+-----------+------------+-----------+--------------------+--------------------------------+-----------------+-----------------+-------------+--------------+----------------+------------------+-----------------+---------------+---------------------+---------------+------+-------------------------+---------+------------------+-------------------------+------------------+-----+
|fraud_bool|income|name_email_similarity|prev_address_months_count|current_address_months_count|customer_age|days_since_request|intended_balcon_amount|payment_type|zip_count_4w|velocity_6h|velocity_24h|velocity_4w|bank_branch_count_8w|date_of_birth_distinct_emails_4w|employment_status|credit_risk_score|email_is_free|housing_status|phone_home_valid|phone_mobile_valid|bank_months_count|has_other_cards|proposed_credit_limit|foreign_request|sour

                                                                                

In [None]:
fractions={
    1:0.8,
    0:0.8
}
train = sampled_df.stat.sampleBy("fraud_bool", fractions, 42)
train.groupby("fraud_bool").count().show()

In [None]:
test=sampled_df.subtract(train)
test.groupby("fraud_bool").count().show()

In [None]:
# fractions={
#     1:1,
#     0:0.1
# }
# test = test.stat.sampleBy("fraud_bool", fractions, 42)
# test.groupby("fraud_bool").count().show()

In [None]:
fractions={
    1:1,
    0:0.1
}
train = train.stat.sampleBy("fraud_bool", fractions, 42)
train.groupby("fraud_bool").count().show()

In [None]:
class_counts = train.groupBy("fraud_bool").count().collect()

# Get counts for majority and minority classes
majority_count = max(class_counts, key=lambda x: x['count'])['count']
minority_count = min(class_counts, key=lambda x: x['count'])['count']

minority_df = train.filter(col("fraud_bool") == 1)
# Calculate the ratio to match the majority class
ratio = majority_count // minority_count

# Create an empty DataFrame to store oversampled data
oversampled_minority_df = minority_df

# Duplicate the minority class rows
for i in range(ratio//4 - 1):  # We already have one copy, so we need ratio-1 more copies
    oversampled_minority_df = oversampled_minority_df.union(minority_df)

# Combine with the original data
balanced_train = train.union(oversampled_minority_df)

balanced_train.groupby("fraud_bool").count().show()

In [None]:
def create_pipeline(algo):
    categoricalColumns = ['payment_type','employment_status','housing_status','source','device_os']

    stages=[]

    for categoricalCol in categoricalColumns:
        stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Indexer')
        encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "Vec"])
        stages += [stringIndexer, encoder]

    label_stringIdx = StringIndexer(inputCol = 'fraud_bool', outputCol = 'label')
    stages += [label_stringIdx]

    numericColumns = ['income','name_email_similarity','prev_address_months_count','current_address_months_count','customer_age','days_since_request', \
                     'intended_balcon_amount','zip_count_4w','velocity_6h','velocity_24h','velocity_4w','bank_branch_count_8w', \
                      'date_of_birth_distinct_emails_4w','credit_risk_score','email_is_free','phone_home_valid','phone_mobile_valid','bank_months_count', \
                     'has_other_cards', 'proposed_credit_limit', 'foreign_request', 'session_length_in_minutes', 'keep_alive_session', \
                     'device_distinct_emails_8w', 'device_fraud_count', 'month']

    va1 = [VectorAssembler(inputCols=[f], outputCol=('vec_' + f)) for f in numericColumns]
    ss = [StandardScaler(inputCol='vec_' + f, outputCol='scaled_' + f, withMean=True, withStd=True) for f in numericColumns]
    assemblerInputs =  ["scaled_" + n for n in numericColumns] +[c + "Vec" for c in categoricalColumns]
    va2 = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

    stages += va1 + ss + [va2]

    if algo=="lr": # Logistic Regression
        model = LogisticRegression(maxIter=10)
        paramGrid = ParamGridBuilder() \
            .addGrid(model.regParam, [0.01]) \
            .addGrid(model.elasticNetParam, [0.5]) \
            .build()
        #0.1
        #0.8,1.0
    elif algo=="svc": # Linear Support Vector Classifier (SVC)
        model = LinearSVC(maxIter=10)
        paramGrid = ParamGridBuilder() \
            .addGrid(model.regParam, [0.01]) \
            .build()
    elif algo=="gbt": # Gradient-Boosted Trees
        model = GBTClassifier(maxIter=10)
        paramGrid = ParamGridBuilder() \
            .build()
    elif algo == "rf": # Random Forests
      model = RandomForestClassifier()
      paramGrid = ParamGridBuilder() \
          .addGrid(model.numTrees, [10, 20]) \
          .addGrid(model.maxDepth, [5, 10]) \
          .build()
    elif algo == "dt": # Decision Trees
        model = DecisionTreeClassifier()
        paramGrid = ParamGridBuilder() \
            .addGrid(model.maxDepth, [5, 10]) \
            .addGrid(model.maxBins, [32, 64]) \
            .build()
    elif algo == "nb": # Naive Bayes
        model = NaiveBayes(smoothing=1.0, modelType="gaussian")
        paramGrid = ParamGridBuilder() \
            .addGrid(model.smoothing, [1.0, 0.1, 0.01]) \
            .build()

    pipeline = Pipeline(stages=stages+[model])

    crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=BinaryClassificationEvaluator(),
                              numFolds=5)

    bestModel = crossval.fit(balanced_train)
    print(bestModel.bestModel.stages[-1].extractParamMap())
    return bestModel

                                                                                

In [None]:
def evaluate(model):
    evaluatorMulti = MulticlassClassificationEvaluator()
    evaluator = BinaryClassificationEvaluator()
    # Make predicitons
    predictions = model.transform(test)
    predictionAndTarget = predictions.select("label", "prediction")
    # Get metrics
    print('Accuracy', evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "accuracy"}))
    print('F1 Score', evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "f1", evaluatorMulti.metricLabel: 1}))
    print('Precision', evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "precisionByLabel", evaluatorMulti.metricLabel: 1}))
    print('Recall', evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "recallByLabel", evaluatorMulti.metricLabel: 1}))
    print('Test Area Under ROC', evaluator.evaluate(predictions))

In [None]:
lr=create_pipeline("lr")
evaluate(lr)
#Accuracy 0.9058692354826072
#F1 Score 0.9339703069357408
#Precision 0.13497057032677084
#Recall 0.6109324758842444
#Test Area Under ROC 0.869935273369259

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(41,[1,4,10,12,15...|
|  0.0|(41,[3,4,10,11,15...|
|  0.0|(41,[0,4,10,13,15...|
|  0.0|(41,[0,4,10,12,15...|
|  0.0|(41,[1,4,10,11,15...|
|  0.0|(41,[3,7,10,12,15...|
|  0.0|(41,[0,4,10,15,16...|
|  0.0|(41,[0,7,10,11,15...|
|  0.0|(41,[0,4,10,11,15...|
|  0.0|(41,[3,8,10,11,15...|
|  0.0|(41,[0,6,10,11,15...|
|  0.0|(41,[1,6,10,11,15...|
|  0.0|(41,[0,7,10,11,15...|
|  0.0|(41,[3,6,10,14,15...|
|  0.0|(41,[3,6,10,12,15...|
|  0.0|(41,[2,4,10,12,15...|
|  0.0|(41,[3,4,10,13,15...|
|  0.0|(41,[2,4,10,11,15...|
|  0.0|(41,[0,5,10,13,15...|
|  0.0|(41,[1,7,10,11,15...|
+-----+--------------------+
only showing top 20 rows



In [None]:
svc=create_pipeline("svc")
evaluate(svc)
# Print the coefficients and intercept for linear SVC
print("Coefficients: " + str(svc.bestModel.stages[-1].coefficients))
print("Intercept: " + str(svc.bestModel.stages[-1].intercept))
# Accuracy 0.8998724297610318
# F1 Score 0.930516083840461
# Precision 0.129317727662004
# Recall 0.6242535599448783
# Test Area Under ROC 0.8710241698531431
# Coefficients: [0.18162636962736106,-0.22788473585367736,-0.20943314909151564,0.046448616641765285,0.1909327473303072,0.030594283142206916,-0.058824375617851234,0.08128620155228171,-0.01399150840303473,0.014019050986985518,-0.0062829752201177605,-0.0245152683181686,-0.08514997680534937,0.12660353669710017,0.20170556569820722,-0.3025997810981302,-0.04842151057755718,0.09817816633879688,-0.3033004171696736,0.09949297467557096,0.06936473982720699,0.006742787911613915,-0.22717653774102303,0.15025344196468782,0.0,0.0326121240493045,-0.08929988396754016,0.34593318712312526,-0.16625979913164,-0.04187659569492681,0.15986562357757406,-0.16954852464391817,0.26189882125239,-0.3435954089663676,-0.2734752472102851,-0.28859470198478676,-0.2617824430890552,0.6824148503478419,-0.28058560341955074,-0.3661877889612033,-0.005755512726188675,-0.1405303872777505,-0.470436887581395,0.4836975973996795,-0.2269184372431432,-0.35203164656093544,0.12324651269747748]
# Intercept: -0.5396847365626736

In [None]:
# Train a GBT model.
gbt=create_pipeline("gbt")
evaluate(gbt)
gbtModel = gbt.bestModel.stages[-1]
print(gbtModel)  # summary only
# Accuracy 0.8979538537261559
# F1 Score 0.929257517233363
# Precision 0.12263615733736763
# Recall 0.5957740009186955
# Test Area Under ROC 0.8630938740214331

In [None]:
rf=create_pipeline('rf')
evaluate(rf)
# Accuracy 0.9207161125319693
# F1 Score 0.941783371494845
# Precision 0.1591158871657101
# Recall 0.5687472719336534
# Test Area Under ROC 0.874543078480858

In [None]:
dt=create_pipeline("dt")
evaluate(dt)
# Accuracy 0.8745896852407515
# F1 Score 0.9147282344077521
# Precision 0.10512820512820513
# Recall 0.5905718027062418
# Test Area Under ROC 0.5446640260565774

In [None]:
nb = create_pipeline("nb")
evaluate(nb)
# Accuracy 0.7269569245020843
# F1 Score 0.8218578384985008
# Precision 0.06369068541300527
# Recall 0.7909209951986033
# Test Area Under ROC 0.6860004953308981