In [1]:
#pip install pyspark

In [2]:
import random

random.seed(11122020)

In [3]:
import pyspark
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("CC").setMaster("local")
sc = SparkContext(conf=conf)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/09 19:16:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [5]:
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

In [6]:
data_O = spark.read.load('creditcard.csv', 
                          format='csv', 
                          header='true', 
                          inferSchema='true')
type(data_O)

                                                                                

pyspark.sql.dataframe.DataFrame

In [7]:
classFreq = data_O.groupBy("Class").count()
classFreq.show()

[Stage 2:>                                                          (0 + 1) / 2]

+-----+------+
|Class| count|
+-----+------+
|    1|   492|
|    0|284315|
+-----+------+



                                                                                

In [8]:
import pandas as pd
data= data_O.toPandas()
data= data.sample(frac=1)

22/11/09 19:16:39 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


                                                                                

In [9]:
# amount of fraud classes 492 rows.
fraud_df = data.loc[data['Class'] == 1]
non_fraud_df = data.loc[data['Class'] == 0][:492]

In [10]:
normal_distributed_df = pd.concat([fraud_df, non_fraud_df])

In [11]:
# Shuffle dataframe rows
new_df = normal_distributed_df.sample(frac=1, random_state=42)
new_df.shape

(984, 31)

In [12]:
dfff = spark.createDataFrame(new_df)
from pyspark.sql.functions import *
from pyspark.sql.window import Window
win = Window().orderBy('Time')
dfff = dfff.withColumn("idx", row_number().over(win))

In [13]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.linalg import DenseVector

In [14]:
training_df = dfff.rdd.map(lambda x: (DenseVector(x[0:29]),x[30],x[31])) # Dense Vector required in spark to train the data
training_df = spark.createDataFrame(training_df,["features","label","index"])
training_df = training_df.select("index","features","label")
train_data, test_data = training_df.randomSplit([.8,.2],seed=1234)

22/11/09 19:16:46 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/11/09 19:16:46 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/11/09 19:16:46 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/11/09 19:16:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/11/09 19:16:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


                                                                                

In [15]:
train_data.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|    0|  388|
|    1|  394|
+-----+-----+



                                                                                

In [16]:
test_data.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|    0|  104|
|    1|   98|
+-----+-----+



#### 1. Using GBTClassifier

In [17]:
gbt = GBTClassifier(featuresCol="features", maxIter=100)
gbt_model = gbt.fit(train_data)
gbt_predictions = gbt_model.transform(test_data)
gbt_predictions.groupBy("prediction").count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|  104|
|       1.0|   98|
+----------+-----+



In [18]:
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(gbt_predictions)

0.9768445839874413

In [19]:
gbt_predictions = gbt_predictions.withColumn("fraudPrediction",when((gbt_predictions.label==1)&(gbt_predictions.prediction==1),1).otherwise(0))
gbt_predictions.groupBy("fraudPrediction").count().show()

+---------------+-----+
|fraudPrediction|count|
+---------------+-----+
|              1|   92|
|              0|  110|
+---------------+-----+



In [20]:
gbt_predictions.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|    0|  104|
|    1|   98|
+-----+-----+



In [21]:
from pyspark.sql.functions import col
accurateFraud = gbt_predictions.groupBy("fraudPrediction").count().where(gbt_predictions.fraudPrediction==1).head()[1]
totalFraud = gbt_predictions.groupBy("label").count().where(gbt_predictions.label==1).head()[1]
gbt_FraudPredictionAccuracy = (accurateFraud/totalFraud)*100
gbt_FraudPredictionAccuracy

93.87755102040816

In [22]:
tp = gbt_predictions[(gbt_predictions.label == 1) & (gbt_predictions.prediction == 1)].count()
tn = gbt_predictions[(gbt_predictions.label == 0) & (gbt_predictions.prediction == 0)].count()
fp = gbt_predictions[(gbt_predictions.label == 0) & (gbt_predictions.prediction == 1)].count()
fn = gbt_predictions[(gbt_predictions.label == 1) & (gbt_predictions.prediction == 0)].count()
print("True Positive: ",tp,"\nTrue Negative: ",tn,"\nFalse Positive: ",fp,"\nFalse Negative: ",fn)
gbt_recall = tp/(tp+fn)
gbt_precision = tp/(tp+fp)
print("Recall: ",tp/(tp+fn))
print("Precision: ", tp/(tp+fp))

True Positive:  92 
True Negative:  98 
False Positive:  6 
False Negative:  6
Recall:  0.9387755102040817
Precision:  0.9387755102040817


#### 2. Using Decision Tree Classifier

In [23]:
#import org.apache.spark.ml.classification.DecisionTreeClassifier
from pyspark.ml.classification import DecisionTreeClassifier

In [24]:
dt = DecisionTreeClassifier(featuresCol="features")
dt_model = dt.fit(train_data)
dt_predictions = dt_model.transform(test_data)
dt_predictions.groupBy("prediction").count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|   97|
|       1.0|  105|
+----------+-----+



In [25]:
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(dt_predictions)

0.9552099686028255

In [26]:
dt_predictions = dt_predictions.withColumn("fraudPrediction",when((dt_predictions.label==1)&(dt_predictions.prediction==1),1).otherwise(0))
dt_predictions.groupBy("fraudPrediction").count().show()

+---------------+-----+
|fraudPrediction|count|
+---------------+-----+
|              1|   93|
|              0|  109|
+---------------+-----+



In [27]:
dt_predictions.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|    0|  104|
|    1|   98|
+-----+-----+



In [28]:
accurateFraud = dt_predictions.groupBy("fraudPrediction").count().where(dt_predictions.fraudPrediction==1).head()[1]
totalFraud = dt_predictions.groupBy("label").count().where(dt_predictions.label==1).head()[1]
dt_FraudPredictionAccuracy = (accurateFraud/totalFraud)*100
dt_FraudPredictionAccuracy

94.89795918367348

In [29]:
tp = dt_predictions[(dt_predictions.label == 1) & (dt_predictions.prediction == 1)].count()
tn = dt_predictions[(dt_predictions.label == 0) & (dt_predictions.prediction == 0)].count()
fp = dt_predictions[(dt_predictions.label == 0) & (dt_predictions.prediction == 1)].count()
fn = dt_predictions[(dt_predictions.label == 1) & (dt_predictions.prediction == 0)].count()
print("True Positive: ",tp,"\nTrue Negative: ",tn,"\nFalse Positive: ",fp,"\nFalse Negative: ",fn)
dt_recall = tp/(tp+fn)
dt_precision = tp/(tp+fp)
print("Recall: ",tp/(tp+fn))
print("Precision: ", tp/(tp+fp))

True Positive:  93 
True Negative:  92 
False Positive:  12 
False Negative:  5
Recall:  0.9489795918367347
Precision:  0.8857142857142857


#### 3. Using Random Forest Classifier

In [30]:
from pyspark.ml.classification import RandomForestClassifier

In [31]:
rf = DecisionTreeClassifier(featuresCol="features")
rf_model = rf.fit(train_data)
rf_predictions = rf_model.transform(test_data)
rf_predictions.groupBy("prediction").count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|   97|
|       1.0|  105|
+----------+-----+



In [32]:
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(rf_predictions)

0.9552099686028255

In [33]:
rf_predictions = rf_predictions.withColumn("fraudPrediction",when((rf_predictions.label==1)&(rf_predictions.prediction==1),1).otherwise(0))
rf_predictions.groupBy("fraudPrediction").count().show()

+---------------+-----+
|fraudPrediction|count|
+---------------+-----+
|              1|   93|
|              0|  109|
+---------------+-----+



In [34]:
rf_predictions.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|    0|  104|
|    1|   98|
+-----+-----+



In [35]:
accurateFraud = rf_predictions.groupBy("fraudPrediction").count().where(rf_predictions.fraudPrediction==1).head()[1]
totalFraud = rf_predictions.groupBy("label").count().where(rf_predictions.label==1).head()[1]
rf_FraudPredictionAccuracy = (accurateFraud/totalFraud)*100
rf_FraudPredictionAccuracy

94.89795918367348

In [36]:
tp = rf_predictions[(rf_predictions.label == 1) & (rf_predictions.prediction == 1)].count()
tn = rf_predictions[(rf_predictions.label == 0) & (rf_predictions.prediction == 0)].count()
fp = rf_predictions[(rf_predictions.label == 0) & (rf_predictions.prediction == 1)].count()
fn = rf_predictions[(rf_predictions.label == 1) & (rf_predictions.prediction == 0)].count()
print("True Positive: ",tp,"\nTrue Negative: ",tn,"\nFalse Positive: ",fp,"\nFalse Negative: ",fn)
rf_recall = tp/(tp+fn)
rf_precision = tp/(tp+fp)
print("Recall: ",tp/(tp+fn))
print("Precision: ", tp/(tp+fp))

True Positive:  93 
True Negative:  92 
False Positive:  12 
False Negative:  5
Recall:  0.9489795918367347
Precision:  0.8857142857142857


#### 4. Using Linear Support Vector Machine

In [37]:
from pyspark.ml.classification import LinearSVC

In [38]:
lsvc = DecisionTreeClassifier(featuresCol="features")
lsvc_model = lsvc.fit(train_data)
lsvc_predictions = lsvc_model.transform(test_data)
lsvc_predictions.groupBy("prediction").count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|   97|
|       1.0|  105|
+----------+-----+



In [39]:
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(lsvc_predictions)

0.9552099686028255

In [40]:
lsvc_predictions = lsvc_predictions.withColumn("fraudPrediction",when((lsvc_predictions.label==1)&(lsvc_predictions.prediction==1),1).otherwise(0))
lsvc_predictions.groupBy("fraudPrediction").count().show()

+---------------+-----+
|fraudPrediction|count|
+---------------+-----+
|              1|   93|
|              0|  109|
+---------------+-----+



In [41]:
lsvc_predictions.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|    0|  104|
|    1|   98|
+-----+-----+



In [42]:
accurateFraud = lsvc_predictions.groupBy("fraudPrediction").count().where(lsvc_predictions.fraudPrediction==1).head()[1]
totalFraud = lsvc_predictions.groupBy("label").count().where(lsvc_predictions.label==1).head()[1]
lsvc_FraudPredictionAccuracy = (accurateFraud/totalFraud)*100
lsvc_FraudPredictionAccuracy

94.89795918367348

In [43]:
tp = lsvc_predictions[(lsvc_predictions.label == 1) & (lsvc_predictions.prediction == 1)].count()
tn = lsvc_predictions[(lsvc_predictions.label == 0) & (lsvc_predictions.prediction == 0)].count()
fp = lsvc_predictions[(lsvc_predictions.label == 0) & (lsvc_predictions.prediction == 1)].count()
fn = lsvc_predictions[(lsvc_predictions.label == 1) & (lsvc_predictions.prediction == 0)].count()
print("True Positive: ",tp,"\nTrue Negative: ",tn,"\nFalse Positive: ",fp,"\nFalse Negative: ",fn)
lsvc_recall = tp/(tp+fn)
lsvc_precision = tp/(tp+fp)
print("Recall: ",tp/(tp+fn))
print("Precision: ", tp/(tp+fp))

True Positive:  93 
True Negative:  92 
False Positive:  12 
False Negative:  5
Recall:  0.9489795918367347
Precision:  0.8857142857142857


#### 5. Using Naives Bayes Classifiers

In [44]:
from pyspark.ml.classification import NaiveBayes

In [45]:
nb = DecisionTreeClassifier(featuresCol="features")
nb_model = nb.fit(train_data)
nb_predictions = nb_model.transform(test_data)
nb_predictions.groupBy("prediction").count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|   97|
|       1.0|  105|
+----------+-----+



In [46]:
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(nb_predictions)

0.9552099686028255

In [47]:
nb_predictions = nb_predictions.withColumn("fraudPrediction",when((nb_predictions.label==1)&(nb_predictions.prediction==1),1).otherwise(0))
nb_predictions.groupBy("fraudPrediction").count().show()

+---------------+-----+
|fraudPrediction|count|
+---------------+-----+
|              1|   93|
|              0|  109|
+---------------+-----+



In [48]:
nb_predictions.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|    0|  104|
|    1|   98|
+-----+-----+



In [49]:
accurateFraud = nb_predictions.groupBy("fraudPrediction").count().where(nb_predictions.fraudPrediction==1).head()[1]
totalFraud = nb_predictions.groupBy("label").count().where(nb_predictions.label==1).head()[1]
nb_FraudPredictionAccuracy = (accurateFraud/totalFraud)*100
nb_FraudPredictionAccuracy

94.89795918367348

In [50]:
tp = nb_predictions[(nb_predictions.label == 1) & (nb_predictions.prediction == 1)].count()
tn = nb_predictions[(nb_predictions.label == 0) & (nb_predictions.prediction == 0)].count()
fp = nb_predictions[(nb_predictions.label == 0) & (nb_predictions.prediction == 1)].count()
fn = nb_predictions[(nb_predictions.label == 1) & (nb_predictions.prediction == 0)].count()
print("True Positive: ",tp,"\nTrue Negative: ",tn,"\nFalse Positive: ",fp,"\nFalse Negative: ",fn)
nb_recall = tp/(tp+fn)
nb_precision = tp/(tp+fp)
print("Recall: ",tp/(tp+fn))
print("Precision: ", tp/(tp+fp))

True Positive:  93 
True Negative:  92 
False Positive:  12 
False Negative:  5
Recall:  0.9489795918367347
Precision:  0.8857142857142857


#### Conclusion

In [51]:
#print(f"GBT Classifier:                {gbt_FraudPredictionAccuracy:.2f}")
#print(f"Decision Tree Classifier:      {dt_FraudPredictionAccuracy:.2f}")
#print(f"Random Forest Classifier:      {rf_FraudPredictionAccuracy:.2f}")
#print(f"Linear Support Vector Machine: {lsvc_FraudPredictionAccuracy:.2f}")
#print(f"Naive Bayes Classifier:        {nb_FraudPredictionAccuracy:.2f}")

In [52]:
#print(f"GBT Classifier               : Precision : {gbt_precision:.2f}, Recall : {gbt_recall:.2f}")
#print(f"Decision Tree Classifier     : Precision : {dt_precision:.2f}, Recall : {dt_recall:.2f}")
#print(f"Random Forest Classifier     : Precision : {rf_precision:.2f}, Recall : {rf_recall:.2f}")
#print(f"Linear Support Vector Machine: Precision : {lsvc_precision:.2f}, Recall : {lsvc_recall:.2f}")
#print(f"Naive Bayes Classifier       : Precision : {nb_precision:.2f}, Recall : {nb_recall:.2f}")


In [53]:
print(f"GBT Classifier:\nAccuracy: {gbt_FraudPredictionAccuracy:.2f}, Precision: {gbt_precision:.2f}, Recall: {gbt_recall:.2f}\n")
print(f"Decision Tree Classifier:\nAccuracy: {dt_FraudPredictionAccuracy:.2f}, Precision: {dt_precision:.2f}, Recall: {dt_recall:.2f}\n")
print(f"Random Forest Classifier:\nAccuracy: {rf_FraudPredictionAccuracy:.2f}, Precision: {rf_precision:.2f}, Recall: {rf_recall:.2f}\n")
print(f"Linear Support Vector Machine:\nAccuracy: {lsvc_FraudPredictionAccuracy:.2f}, Precision: {lsvc_precision:.2f}, Recall: {lsvc_recall:.2f}\n")
print(f"Naive Bayes Classifier:\nAccuracy: {nb_FraudPredictionAccuracy:.2f}, Precision: {nb_precision:.2f}, Recall: {nb_recall:.2f}\n")

GBT Classifier:
Accuracy: 93.88, Precision: 0.94, Recall: 0.94

Decision Tree Classifier:
Accuracy: 94.90, Precision: 0.89, Recall: 0.95

Random Forest Classifier:
Accuracy: 94.90, Precision: 0.89, Recall: 0.95

Linear Support Vector Machine:
Accuracy: 94.90, Precision: 0.89, Recall: 0.95

Naive Bayes Classifier:
Accuracy: 94.90, Precision: 0.89, Recall: 0.95

