In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from datetime import datetime

# App named 'Cruise'
spark = SparkSession.builder.appName('cruise').getOrCreate()

In [3]:
df = spark.read.csv('transactions.csv',inferSchema=True,header=True)

In [4]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- accountNumber: integer (nullable = true)
 |-- customerId: integer (nullable = true)
 |-- creditLimit: integer (nullable = true)
 |-- availableMoney: double (nullable = true)
 |-- transactionDateTime: timestamp (nullable = true)
 |-- transactionAmount: double (nullable = true)
 |-- merchantName: string (nullable = true)
 |-- acqCountry: string (nullable = true)
 |-- merchantCountryCode: string (nullable = true)
 |-- posEntryMode: integer (nullable = true)
 |-- posConditionCode: integer (nullable = true)
 |-- merchantCategoryCode: string (nullable = true)
 |-- currentExpDate: string (nullable = true)
 |-- accountOpenDate: string (nullable = true)
 |-- dateOfLastAddressChange: string (nullable = true)
 |-- cardCVV: integer (nullable = true)
 |-- enteredCVV: integer (nullable = true)
 |-- cardLast4Digits: integer (nullable = true)
 |-- transactionType: string (nullable = true)
 |-- echoBuffer: string (nullable = true)
 |-- currentBalance: doubl

In [5]:
df.show()

+---+-------------+----------+-----------+--------------+-------------------+-----------------+--------------------+----------+-------------------+------------+----------------+--------------------+--------------+---------------+-----------------------+-------+----------+---------------+--------------------+----------+--------------+------------+-------------+-----------+-----------+-------------+----------------+------------------------+-------+
|_c0|accountNumber|customerId|creditLimit|availableMoney|transactionDateTime|transactionAmount|        merchantName|acqCountry|merchantCountryCode|posEntryMode|posConditionCode|merchantCategoryCode|currentExpDate|accountOpenDate|dateOfLastAddressChange|cardCVV|enteredCVV|cardLast4Digits|     transactionType|echoBuffer|currentBalance|merchantCity|merchantState|merchantZip|cardPresent|posOnPremises|recurringAuthInd|expirationDateKeyInMatch|isFraud|
+---+-------------+----------+-----------+--------------+-------------------+-----------------+---

In [6]:
df.describe().show()

+-------+------------------+-------------------+-------------------+------------------+-----------------+------------------+------------------+----------+-------------------+------------------+-----------------+--------------------+--------------+---------------+-----------------------+-----------------+------------------+-----------------+--------------------+----------+-----------------+------------+-------------+-----------+-------------+----------------+
|summary|               _c0|      accountNumber|         customerId|       creditLimit|   availableMoney| transactionAmount|      merchantName|acqCountry|merchantCountryCode|      posEntryMode| posConditionCode|merchantCategoryCode|currentExpDate|accountOpenDate|dateOfLastAddressChange|          cardCVV|        enteredCVV|  cardLast4Digits|     transactionType|echoBuffer|   currentBalance|merchantCity|merchantState|merchantZip|posOnPremises|recurringAuthInd|
+-------+------------------+-------------------+-------------------+------

In [7]:
from pyspark.ml.feature import StringIndexer

df = df.withColumn("cardPresent", df.cardPresent.cast('int'))
df = df.withColumn("isFraud", df.isFraud.cast('int'))
df = df.withColumn("expirationDateKeyInMatch", df.expirationDateKeyInMatch.cast('int'))

In [8]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
inputCols=["accountNumber", "customerId", "creditLimit", "availableMoney", "transactionAmount", "cardCVV", "enteredCVV", "cardLast4Digits", "currentBalance", "cardPresent", "expirationDateKeyInMatch"],
outputCol="features")
output = assembler.transform(df)


In [9]:
final_data = output.select("features", "isFraud")
valid_data = final_data.filter(df.isFraud == 0)
valid_data = valid_data.orderBy(F.rand()).limit(12417)
invalid_data = final_data.filter(df.isFraud == 1)
final_data = valid_data.union(invalid_data)

In [10]:
train_data,test_data = final_data.randomSplit([0.70,0.30])

In [11]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.clustering import KMeans

In [12]:
kmeans = KMeans(k=2, seed=0)

In [13]:
start_time = datetime.now()
# lrModel = lr.fit(train_data) # for logistic regression
# rfModel = rf.fit(train_data)
kmeans = kmeans.fit(train_data)
duration = (datetime.now() - start_time).total_seconds()

In [15]:
from pyspark.ml.evaluation import ClusteringEvaluator
# Evaluate clustering by computing Silhouette score
predictions = kmeans.transform(test_data)
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("\n\nSilhouette with squared euclidean distance = " + str(silhouette) + "\n\n")



Silhouette with squared euclidean distance = 0.8225820335076228




In [16]:
predictions.show()

+--------------------+-------+----------+
|            features|isFraud|prediction|
+--------------------+-------+----------+
|[1.00737756E8,1.0...|      0|         0|
|[1.00737756E8,1.0...|      0|         0|
|[1.01339369E8,1.0...|      0|         0|
|[1.01394465E8,1.0...|      0|         0|
|[1.01738384E8,1.0...|      0|         0|
|[1.01876201E8,1.0...|      0|         0|
|[1.02204865E8,1.0...|      0|         0|
|[1.02755773E8,1.0...|      0|         0|
|[1.02755773E8,1.0...|      0|         0|
|[1.0485242E8,1.04...|      0|         0|
|[1.05418907E8,1.0...|      0|         0|
|[1.05499749E8,1.0...|      0|         0|
|[1.05730883E8,1.0...|      0|         0|
|[1.06159813E8,1.0...|      0|         0|
|[1.06159813E8,1.0...|      0|         0|
|[1.06159813E8,1.0...|      0|         0|
|[1.06159813E8,1.0...|      0|         0|
|[1.06159813E8,1.0...|      0|         0|
|[1.06159813E8,1.0...|      0|         0|
|[1.06159813E8,1.0...|      0|         0|
+--------------------+-------+----

In [18]:
def precision_recall_f1(data):
    tp = data[(data.prediction == 1) & (data.isFraud == 1)].count()
    tn = data[(data.prediction == 0) & (data.isFraud == 0)].count()
    fn = data[(data.prediction == 0) & (data.isFraud == 1)].count()
    fp = data[(data.prediction == 1) & (data.isFraud == 0)].count()
    try:
        precision = float((tp)/(tp + fp))
    except:
        precision = 0
    try:
        recall = float((tp)/(tp + fn))
    except:
        recall = 0
    try:
        F1 = float(2 * precision * recall / (precision + recall))
    except:
        F1 = 0
    return precision,recall,F1

In [20]:
precision,recall,F1 = precision_recall_f1(predictions)
print("The precision is :",precision)
print("The recall is :",recall)
print('The F1 is :',F1)

The precision is : 0.5001460706982179
The recall is : 0.4542318917484744
The F1 is : 0.47608453837597325


In [21]:
print('total run time is: ', duration)

total run time is:  16.102881
