In [1]:
%fs rm /FileStore/tables/card-logs-data -r

In [2]:
%fs rm /FileStore/tables/card-logs-check/ -r

In [3]:
%fs rm /FileStore/tables/card-logs-data-archieve/ -r

In [4]:
%fs rm /FileStore/tables/card-logs-data-table/ -r

In [5]:
%fs rm /FileStore/tables/card-logs-data-withOutPuts/ -r

In [6]:
%fs ls /FileStore/tables/

path,name,size
dbfs:/FileStore/tables/1_Spark_Core/,1_Spark_Core/,0
dbfs:/FileStore/tables/2_Spark_Streaming/,2_Spark_Streaming/,0
dbfs:/FileStore/tables/3_Spark_SQL/,3_Spark_SQL/,0
dbfs:/FileStore/tables/4_Spark_GraphsLib/,4_Spark_GraphsLib/,0
dbfs:/FileStore/tables/5_Spark_MachineLearning/,5_Spark_MachineLearning/,0
dbfs:/FileStore/tables/7_Assignments/,7_Assignments/,0
dbfs:/FileStore/tables/creditcard.csv,creditcard.csv,150828752
dbfs:/FileStore/tables/creditcard_databricks.csv,creditcard_databricks.csv,111104688


In [7]:
"""importing the usual pythonic libraries"""
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import collections
import time
from datetime import datetime 

"""spark MLlib libraries"""
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier, RandomForestClassifier
from pyspark.ml.feature import VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.linalg import DenseVector



"""Spark Streaming and sql libaries"""
from pyspark.sql.functions import *
from pyspark.sql.window import Window
# from pyspark.sql.functions import col

"""SciKit Libraries"""
from sklearn.preprocessing import RobustScaler
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import train_test_split




In [8]:
%scala

import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,DoubleType};
import org.apache.spark.sql.streaming.Trigger

In [9]:
columns = ["MessageID","Time","V1","V2","V3","V4","V5","V6","V7","V8","V9","V10","V11","V12","V13","V14","V15","V16","V17","V18","V19","V20","V21","V22","V23","V24","V25","V26","V27","V28","Amount","Class"]

In [10]:
# Set isCheckFileSystem flag False only if you are sure that the file exists and there is no filesize as 0
def transform_for_ccfd(path, isCheckFileSystem = True):
  list_offiles=dbutils.fs.ls(path)
  list_offiles_paths = []

  if isCheckFileSystem:
    for fileDetails  in list_offiles[:]:
      if (fileDetails[1].startswith("part-00000-") and  (fileDetails[2] > 0 )):
          print("This is the one Name {0} size {1}".format(fileDetails[1],fileDetails[2]))
          list_offiles_paths.append(fileDetails[0])
      path= list_offiles_paths
  data_O = spark.read.load(path, format='csv', header='true', inferSchema='true')
                          
  data= data_O.toPandas()
  data.columns = ["MessageID","Time","V1","V2","V3","V4","V5","V6","V7","V8","V9","V10","V11","V12","V13",
                  "V14","V15","V16","V17","V18","V19","V20","V21","V22","V23","V24","V25","V26","V27","V28","Amount","Class"]

  selectedFeatures = ['V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V9', 'V10', 'V11', 'V12', 'V14', 'V16', 'V17', 'V18', 'V19']
  selectedFeatures.append('Class')

  equal_data=data.loc[:,selectedFeatures]
  equal_data.shape

  dfff = spark.createDataFrame(equal_data)
  training_df = dfff.rdd.map(lambda x: (DenseVector(x[:-1]),x['Class']))
  training_df = spark.createDataFrame(training_df,["features","label"])
 
  return [training_df,data]

In [11]:
# Data on which the estimator will be trained on
# This data is differnt than the data coming from KAFKA server
path_train = '/FileStore/tables/creditcard_databricks.csv'

# Transform the data file into a spark data frame with all 30 features compressed into
# a single dense vector
train_data = transform_for_ccfd(path=path_train, isCheckFileSystem = False)[0]

# Create an instance of Random Forest Classifier with 200 Trees
estimatr = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=200)

# Create a pipeline which can then be used for predictions and knowledge upgrade (active learning)
pipeline = Pipeline(stages = [estimatr])


# Fit the classifier with training data
model = pipeline.fit(train_data)

In [12]:
%sh telnet ec2-3-249-245-26.eu-west-1.compute.amazonaws.com 9092

In [13]:

%scala
val schema = StructType(
  Seq(
    StructField("Unnamed: 0", DoubleType, true),
    StructField("Time", DoubleType, true),
    StructField("V1", DoubleType, true),
    StructField("V2", DoubleType, true),
    StructField("V3", DoubleType, true),
    StructField("V4", DoubleType, true),
    StructField("V5", DoubleType, true),
    StructField("V6", DoubleType, true),
    StructField("V7", DoubleType, true),
    StructField("V8", DoubleType, true),
    StructField("V9", DoubleType, true),
    StructField("V10", DoubleType, true),
    StructField("V11", DoubleType, true),
    StructField("V12", DoubleType, true),
    StructField("V13", DoubleType, true),
    StructField("V14", DoubleType, true),
    StructField("V15", DoubleType, true),
    StructField("V16", DoubleType, true),
    StructField("V17", DoubleType, true),
    StructField("V18", DoubleType, true),
    StructField("V19", DoubleType, true),
    StructField("V20", DoubleType, true),
    StructField("V21", DoubleType, true),
    StructField("V22", DoubleType, true),
    StructField("V23", DoubleType, true),
    StructField("V24", DoubleType, true),
    StructField("V25", DoubleType, true),
    StructField("V26", DoubleType, true),
    StructField("V27", DoubleType, true),
    StructField("V28", DoubleType, true),
    StructField("Amount", DoubleType, true),
    StructField("Class", DoubleType, true)
  )
)

In [14]:
%scala
val DNS_host = "ec2-3-249-245-26.eu-west-1.compute.amazonaws.com:9092"

In [15]:
%scala
var streamingInputDF = 
  spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", DNS_host)
    .option("subscribe", "card")     
    .option("startingOffsets", "latest")  
    .option("minPartitions", "1")  
    .option("failOnDataLoss", "true")
    .load()

In [16]:
%scala
var streamingSelectDF = 
    streamingInputDF
   .selectExpr("cast (value as string) as json")
   .select(from_json($"json", schema).as("data"))
   .select("data.*")

In [17]:
%scala


val query =
    streamingSelectDF
    .writeStream
    .format("console")
    .outputMode("append")
    .trigger(Trigger.ProcessingTime("30 seconds"))
    .start()

query.awaitTermination

In [18]:
%scala

val query =
    streamingSelectDF
    .writeStream
    .format("csv")
    .outputMode("append")
    .option("path", "/FileStore/tables/card-logs-data")
    .option("checkpointLocation", "/FileStore/tables/card-logs-check")
    .trigger(Trigger.ProcessingTime("30 seconds"))
    .start()

In [19]:
data_O = spark.read.load("/FileStore/tables/card-logs-data", 
                          format='csv', 
                          header='true', 
                          inferSchema='true')
display(data_O)

15515.0,26904.0,1.2049381848,-0.1223084958,0.5558263777,-0.0987149449,-0.6104550098,-0.5449173344,-0.253801833,-0.0382008416,0.37184417,-0.2687736619,0.0151005234,0.370770316,0.228823525,0.1392470514,1.4932959828,0.1105542153,-0.0914952695,-0.8783661051,-0.336630052,-0.0524476197,-0.093961322,-0.2588618581,0.1093105753,0.1408489313,0.0419692437,0.93172843,-0.0569546528,0.0094759678,20.47,0.0
15516.0,26905.0,-1.059690124,1.4442398159,1.4021601297,-0.0039575924,0.4313087845,-0.2663826561,1.0118127759,-0.8165845873,-0.7181505012,-0.5022710758,-0.1476371245,0.5345015985,0.5620484673,0.2037363817,0.3994687835,0.0159510764,-0.3823352274,-1.1333129331,-1.10877904,-0.1788836032,0.3560744273,-1.3700931848,0.1424650394,-0.0051327916,-0.0826111597,-0.9387589102,0.4060907504,0.1668482393,52.0,0.0
15517.0,26906.0,1.1609567794,1.2656213441,-1.5764729803,1.4729879638,1.1621726912,-1.0135323569,0.6581328268,-0.1526238089,-0.8473259374,-1.4524985804,2.8939231012,0.7741492136,0.6635058078,-3.3337206434,0.5232978065,0.8978112902,2.6826126385,1.2693141465,-0.9122617532,0.0024655761,-0.1203645055,-0.2049968528,-0.203189095,-0.2821715296,0.7852781366,-0.2849579915,0.0583991252,0.0854358811,1.79,0.0
15518.0,26906.0,1.1945546066,-0.1467282003,0.4597760204,-0.2333314876,-0.5436549225,-0.4818937894,-0.2221221155,-0.0470851329,0.0461698221,-0.081033111,1.3007436261,1.1831935259,0.8197624487,0.1733737919,0.5018370619,0.6695586948,-0.7734134266,0.1036205026,0.4252988957,0.0751826789,-0.0739031165,-0.2818232173,0.0040417289,0.087200732,0.1421305978,0.9010407832,-0.077138038,0.0037395343,38.16,0.0
15519.0,26909.0,-0.4372562536,-3.4599029903,-0.8510696436,0.1737636945,-1.7108014185,-0.0771364774,0.748273876,-0.2165807824,-1.0740082459,0.3488182003,1.2536747619,0.4642410898,-0.5421979735,0.6032509306,-0.3149753313,-1.8002366066,0.4831637421,0.8473124677,-0.8918122825,1.2630127695,0.1815352891,-0.9567472968,-0.7486900442,0.3104933337,-0.0515741823,0.9973787173,-0.2309269844,0.1420098916,908.6,0.0
15520.0,26911.0,0.7750096633,-0.9586281014,1.0055755342,0.597924481,-1.1679624651,0.576904844,-0.8664704836,0.4349765924,0.7539800101,-0.0062420383,0.901491218,-0.1311530719,-1.5579372871,0.3897912198,1.3952161912,0.9543525749,-0.6768584986,0.8369574757,-0.6102196693,0.1744140539,0.3712255091,0.6228807701,-0.1972479979,-0.3013604219,0.0219622709,0.4905795651,-0.0125631129,0.0435216447,190.0,0.0
15521.0,26912.0,1.0935532844,0.0138924535,1.1491267785,1.3743511399,-0.7353302398,-0.0127907553,-0.4106614462,0.0873984866,0.6282664827,-0.242893922,-0.1672665655,1.0776346664,0.7697052099,-0.3979425201,0.1885448555,-0.5253359311,0.1622708763,-0.6623085006,-0.723796919,-0.115906188,0.1524005881,0.7413398691,-0.0859163073,0.4540305779,0.5615818187,-0.2093641896,0.0815190169,0.0327433607,15.13,0.0
15522.0,26913.0,-0.5429222088,0.6817533002,1.6166490124,-0.4498867407,0.2777079565,-1.1569001686,0.6713976286,-0.0988143118,-0.3953690724,-0.7327828012,-0.4811612102,0.0566578808,0.0652545596,0.0983507493,0.3387078731,0.5460849569,-0.7225444862,-0.4616007772,-0.8385104799,-0.0965455292,-0.1770919703,-0.6754190764,-0.008519805,0.353517433,-0.0592324406,-0.0740869974,-0.0419142832,0.0181621336,1.98,0.0
15523.0,26914.0,-1.1408428333,0.4961850912,2.2659645064,-2.0187499872,-0.7714001059,-0.6017134866,0.0924590047,-0.044847159,-0.8247261246,1.0325617175,1.5050281634,-0.330327818,-0.1380918455,-0.6434771677,-0.2172504241,1.5186724785,-0.3921966294,-0.5664354918,-0.044446401,0.536691544,0.245405728,0.9576891285,-0.3874707091,0.5594096332,0.5432786584,-0.2199400899,0.3296998865,0.0328464517,14.2,0.0
15524.0,26915.0,1.2176597757,-0.4987122047,0.1473877244,-0.3186396103,-0.8838321191,-0.7747624371,-0.4448082007,-0.1057747992,-0.6890894278,0.0483141491,0.403465033,-0.3003488918,0.1751505771,-1.3383513964,0.273333967,0.7195206921,1.6954709623,-1.8220070373,0.1671676528,0.1700617561,0.0035965365,-0.0457202484,0.0084800445,0.3470659179,0.3475651167,-0.2984976151,0.0300083193,0.0479370565,66.44,0.0
15525.0,26916.0,-0.5618932475,1.9514347829,-0.3057757985,0.5634692309,1.2138858485,-2.057169198,1.5634520614,-0.2857910443,-0.8646585893,-2.2408546224,0.6811632711,-1.1444222427,-1.2529720665,-3.3198601044,0.397398774,1.1862390849,2.570602955,1.4216355674,-1.6372004818,-0.1425226813,-0.0975438915,-0.3160905749,-0.3309493851,0.3483059098,0.2586325638,-0.515197401,0.1190286479,0.2021055974,4.8,0.0


In [21]:
path_test = "/FileStore/tables/card-logs-data"

# Archival Directory
to_locations = "dbfs:/FileStore/tables/card-logs-data-archieve/"

# List of file to archive post prediction
list_offiles=dbutils.fs.ls("dbfs:/FileStore/tables/card-logs-data")

# Read the CSV and transform in a dense vector for ML Lib
transformed_result = transform_for_ccfd(path=path_test)

# Dense Vector
test_data  = transformed_result[0]

# Retain the original dataframe for further use 
original_data  = transformed_result[1]

# Archive all files post prediction except meta-data and current file
for fileDetails  in list_offiles[:]:
  if (fileDetails[1].startswith("part-00000-") and  (fileDetails[2] > 0 )):
      print("This is the one Name {0} size {1}".format(fileDetails[1],fileDetails[2]))
      dbutils.fs.mv(fileDetails[0], to_locations + fileDetails[1] , False)



In [22]:
predictions = model.transform(test_data)
display(predictions)

features,label,rawPrediction,probability,prediction
"List(1, 16, List(), List(-1.059690124, 1.4442398159, 1.4021601297, -0.0039575924, 0.4313087845, -0.2663826561, 1.0118127759, -0.7181505012, -0.5022710758, -0.1476371245, 0.5345015985, 0.2037363817, 0.0159510764, -0.3823352274, -1.1333129331, -1.10877904))",0.0,"List(1, 2, List(), List(199.94624926699498, 0.053750733005020195))","List(1, 2, List(), List(0.9997312463349749, 2.6875366502510095E-4))",0.0
"List(1, 16, List(), List(1.1609567794, 1.2656213441, -1.5764729803, 1.4729879638, 1.1621726912, -1.0135323569, 0.6581328268, -0.8473259374, -1.4524985804, 2.8939231012, 0.7741492136, -3.3337206434, 0.8978112902, 2.6826126385, 1.2693141465, -0.9122617532))",0.0,"List(1, 2, List(), List(199.3489729026484, 0.6510270973515901))","List(1, 2, List(), List(0.996744864513242, 0.003255135486757951))",0.0
"List(1, 16, List(), List(1.1945546066, -0.1467282003, 0.4597760204, -0.2333314876, -0.5436549225, -0.4818937894, -0.2221221155, 0.0461698221, -0.081033111, 1.3007436261, 1.1831935259, 0.1733737919, 0.6695586948, -0.7734134266, 0.1036205026, 0.4252988957))",0.0,"List(1, 2, List(), List(199.9484006334307, 0.051599366569324574))","List(1, 2, List(), List(0.9997420031671534, 2.5799683284662286E-4))",0.0
"List(1, 16, List(), List(-0.4372562536, -3.4599029903, -0.8510696436, 0.1737636945, -1.7108014185, -0.0771364774, 0.748273876, -1.0740082459, 0.3488182003, 1.2536747619, 0.4642410898, 0.6032509306, -1.8002366066, 0.4831637421, 0.8473124677, -0.8918122825))",0.0,"List(1, 2, List(), List(199.9468790532071, 0.05312094679292595))","List(1, 2, List(), List(0.9997343952660355, 2.656047339646297E-4))",0.0
"List(1, 16, List(), List(0.7750096633, -0.9586281014, 1.0055755342, 0.597924481, -1.1679624651, 0.576904844, -0.8664704836, 0.7539800101, -0.0062420383, 0.901491218, -0.1311530719, 0.3897912198, 0.9543525749, -0.6768584986, 0.8369574757, -0.6102196693))",0.0,"List(1, 2, List(), List(199.94808702148154, 0.05191297851848245))","List(1, 2, List(), List(0.9997404351074075, 2.595648925924122E-4))",0.0
"List(1, 16, List(), List(1.0935532844, 0.0138924535, 1.1491267785, 1.3743511399, -0.7353302398, -0.0127907553, -0.4106614462, 0.6282664827, -0.242893922, -0.1672665655, 1.0776346664, -0.3979425201, -0.5253359311, 0.1622708763, -0.6623085006, -0.723796919))",0.0,"List(1, 2, List(), List(199.9484006334307, 0.051599366569324574))","List(1, 2, List(), List(0.9997420031671534, 2.5799683284662286E-4))",0.0
"List(1, 16, List(), List(-0.5429222088, 0.6817533002, 1.6166490124, -0.4498867407, 0.2777079565, -1.1569001686, 0.6713976286, -0.3953690724, -0.7327828012, -0.4811612102, 0.0566578808, 0.0983507493, 0.5460849569, -0.7225444862, -0.4616007772, -0.8385104799))",0.0,"List(1, 2, List(), List(199.9484006334307, 0.051599366569324574))","List(1, 2, List(), List(0.9997420031671534, 2.5799683284662286E-4))",0.0
"List(1, 16, List(), List(-1.1408428333, 0.4961850912, 2.2659645064, -2.0187499872, -0.7714001059, -0.6017134866, 0.0924590047, -0.8247261246, 1.0325617175, 1.5050281634, -0.330327818, -0.6434771677, 1.5186724785, -0.3921966294, -0.5664354918, -0.044446401))",0.0,"List(1, 2, List(), List(199.9484006334307, 0.051599366569324574))","List(1, 2, List(), List(0.9997420031671534, 2.5799683284662286E-4))",0.0
"List(1, 16, List(), List(1.2176597757, -0.4987122047, 0.1473877244, -0.3186396103, -0.8838321191, -0.7747624371, -0.4448082007, -0.6890894278, 0.0483141491, 0.403465033, -0.3003488918, -1.3383513964, 0.7195206921, 1.6954709623, -1.8220070373, 0.1671676528))",0.0,"List(1, 2, List(), List(199.94504567076604, 0.05495432923398791))","List(1, 2, List(), List(0.99972522835383, 2.747716461699395E-4))",0.0
"List(1, 16, List(), List(-0.5618932475, 1.9514347829, -0.3057757985, 0.5634692309, 1.2138858485, -2.057169198, 1.5634520614, -0.8646585893, -2.2408546224, 0.6811632711, -1.1444222427, -3.3198601044, 1.1862390849, 2.570602955, 1.4216355674, -1.6372004818))",0.0,"List(1, 2, List(), List(199.37948648192057, 0.6205135180794312))","List(1, 2, List(), List(0.9968974324096028, 0.003102567590397156))",0.0


In [23]:
predictions.groupBy("prediction").count().orderBy('prediction').show()
predictions.groupBy("label").count().orderBy('label').show()

In [24]:
columns_with_prediction = ["MessageID","Time","V1","V2","V3","V4","V5","V6","V7","V8","V9","V10","V11","V12","V13","V14","V15","V16","V17","V18","V19","V20","V21","V22","V23","V24","V25","V26","V27","V28","Amount","Class","Prediction"]

In [25]:
few_columns = ["MessageID","Time","V1","V2","V27","V28","Amount","Class","Prediction"]
original_data["Prediction"] = predictions.select("prediction").toPandas()
display(original_data[few_columns])

MessageID,Time,V1,V2,V27,V28,Amount,Class,Prediction
15516.0,26905.0,-1.059690124,1.4442398159,0.4060907504,0.1668482393,52.0,0.0,0.0
15517.0,26906.0,1.1609567794,1.2656213441,0.0583991252,0.0854358811,1.79,0.0,0.0
15518.0,26906.0,1.1945546066,-0.1467282003,-0.077138038,0.0037395343,38.16,0.0,0.0
15519.0,26909.0,-0.4372562536,-3.4599029903,-0.2309269844,0.1420098916,908.6,0.0,0.0
15520.0,26911.0,0.7750096633,-0.9586281014,-0.0125631129,0.0435216447,190.0,0.0,0.0
15521.0,26912.0,1.0935532844,0.0138924535,0.0815190169,0.0327433607,15.13,0.0,0.0
15522.0,26913.0,-0.5429222088,0.6817533002,-0.0419142832,0.0181621336,1.98,0.0,0.0
15523.0,26914.0,-1.1408428333,0.4961850912,0.3296998865,0.0328464517,14.2,0.0,0.0
15524.0,26915.0,1.2176597757,-0.4987122047,0.0300083193,0.0479370565,66.44,0.0,0.0
15525.0,26916.0,-0.5618932475,1.9514347829,0.1190286479,0.2021055974,4.8,0.0,0.0


In [26]:
# Save the predictions along with the transacrtion on the DBFS
output_directory = "/FileStore/tables/card-logs-data-withOutPuts/"

# Use current timestamp as a file name of CSV
current_time = datetime.now().strftime("%c")
file_name=current_time.replace(" ", "_")+".csv"
file_name=file_name.replace(":", "_")

print("File Name",output_directory+file_name)

filepath_with_name = output_directory + file_name
original_data_sparkDF = spark.createDataFrame(original_data,columns_with_prediction)



original_data_sparkDF.coalesce(1).write.format("com.databricks.spark.csv").save(output_directory+file_name)

In [27]:
%fs ls /FileStore/tables/card-logs-data-withOutPuts/Sun_May_24_16_43_42_2020.csv

path,name,size
dbfs:/FileStore/tables/card-logs-data-withOutPuts/Sun_May_24_16_43_42_2020.csv/_SUCCESS,_SUCCESS,0
dbfs:/FileStore/tables/card-logs-data-withOutPuts/Sun_May_24_16_43_42_2020.csv/_committed_5136772917708194473,_committed_5136772917708194473,114
dbfs:/FileStore/tables/card-logs-data-withOutPuts/Sun_May_24_16_43_42_2020.csv/_started_5136772917708194473,_started_5136772917708194473,0
dbfs:/FileStore/tables/card-logs-data-withOutPuts/Sun_May_24_16_43_42_2020.csv/part-00000-tid-5136772917708194473-df142781-9de4-448d-bda2-a93f58e82d2a-4737-1-c000.csv,part-00000-tid-5136772917708194473-df142781-9de4-448d-bda2-a93f58e82d2a-4737-1-c000.csv,72750


In [28]:
# time.sleep(10)
""" Major issue is coalesce splits a dataframe into multiple csv while collecting from slave nodes"""
"""Move the files to the final table space to reflect on the DATABASE"""

to_locations ="/FileStore/tables/card-logs-data-table/"
list_offiles=dbutils.fs.ls(filepath_with_name)
print(list_offiles)

"""Copy All the CSV files from the timestamped directory to the table space"""
for fileDetails  in list_offiles[:]:
  if (fileDetails[1].startswith("part-00") and  (fileDetails[2] > 10 )):
      print("File Name {0} size {1} moved".format(fileDetails[1],fileDetails[2]))

      dbutils.fs.mv(fileDetails[0], to_locations + file_name , False)

In [29]:
%fs ls   /FileStore/tables/card-logs-data-table/ 

path,name,size
dbfs:/FileStore/tables/card-logs-data-table/Sun_May_24_16_43_42_2020.csv,Sun_May_24_16_43_42_2020.csv,72750


In [30]:
data_for_table = spark.read.load("dbfs:/FileStore/tables/card-logs-data-table/Sun_May_24_16_43_42_2020.csv", format='csv', header='true', inferSchema='true')

display(data_for_table)

15516.0,26905.0,-1.059690124,1.4442398159,1.4021601297,-0.0039575924,0.4313087845,-0.2663826561,1.0118127759,-0.8165845873,-0.7181505012,-0.5022710758,-0.1476371245,0.5345015985,0.5620484673,0.2037363817,0.3994687835,0.0159510764,-0.3823352274,-1.1333129331,-1.10877904,-0.1788836032,0.3560744273,-1.3700931848,0.1424650394,-0.0051327916,-0.0826111597,-0.9387589102,0.4060907504,0.1668482393,52.0,0.031,0.032
15517.0,26906.0,1.1609567794,1.2656213441,-1.5764729803,1.4729879638,1.1621726912,-1.0135323569,0.6581328268,-0.1526238089,-0.8473259374,-1.4524985804,2.8939231012,0.7741492136,0.6635058078,-3.3337206434,0.5232978065,0.8978112902,2.6826126385,1.2693141465,-0.9122617532,0.0024655761,-0.1203645055,-0.2049968528,-0.203189095,-0.2821715296,0.7852781366,-0.2849579915,0.0583991252,0.0854358811,1.79,0.0,0.0
15518.0,26906.0,1.1945546066,-0.1467282003,0.4597760204,-0.2333314876,-0.5436549225,-0.4818937894,-0.2221221155,-0.0470851329,0.0461698221,-0.081033111,1.3007436261,1.1831935259,0.8197624487,0.1733737919,0.5018370619,0.6695586948,-0.7734134266,0.1036205026,0.4252988957,0.0751826789,-0.0739031165,-0.2818232173,0.0040417289,0.087200732,0.1421305978,0.9010407832,-0.077138038,0.0037395343,38.16,0.0,0.0
15519.0,26909.0,-0.4372562536,-3.4599029903,-0.8510696436,0.1737636945,-1.7108014185,-0.0771364774,0.748273876,-0.2165807824,-1.0740082459,0.3488182003,1.2536747619,0.4642410898,-0.5421979735,0.6032509306,-0.3149753313,-1.8002366066,0.4831637421,0.8473124677,-0.8918122825,1.2630127695,0.1815352891,-0.9567472968,-0.7486900442,0.3104933337,-0.0515741823,0.9973787173,-0.2309269844,0.1420098916,908.6,0.0,0.0
15520.0,26911.0,0.7750096633,-0.9586281014,1.0055755342,0.597924481,-1.1679624651,0.576904844,-0.8664704836,0.4349765924,0.7539800101,-0.0062420383,0.901491218,-0.1311530719,-1.5579372871,0.3897912198,1.3952161912,0.9543525749,-0.6768584986,0.8369574757,-0.6102196693,0.1744140539,0.3712255091,0.6228807701,-0.1972479979,-0.3013604219,0.0219622709,0.4905795651,-0.0125631129,0.0435216447,190.0,0.0,0.0
15521.0,26912.0,1.0935532844,0.0138924535,1.1491267785,1.3743511399,-0.7353302398,-0.0127907553,-0.4106614462,0.0873984866,0.6282664827,-0.242893922,-0.1672665655,1.0776346664,0.7697052099,-0.3979425201,0.1885448555,-0.5253359311,0.1622708763,-0.6623085006,-0.723796919,-0.115906188,0.1524005881,0.7413398691,-0.0859163073,0.4540305779,0.5615818187,-0.2093641896,0.0815190169,0.0327433607,15.13,0.0,0.0
15522.0,26913.0,-0.5429222088,0.6817533002,1.6166490124,-0.4498867407,0.2777079565,-1.1569001686,0.6713976286,-0.0988143118,-0.3953690724,-0.7327828012,-0.4811612102,0.0566578808,0.0652545596,0.0983507493,0.3387078731,0.5460849569,-0.7225444862,-0.4616007772,-0.8385104799,-0.0965455292,-0.1770919703,-0.6754190764,-0.008519805,0.353517433,-0.0592324406,-0.0740869974,-0.0419142832,0.0181621336,1.98,0.0,0.0
15523.0,26914.0,-1.1408428333,0.4961850912,2.2659645064,-2.0187499872,-0.7714001059,-0.6017134866,0.0924590047,-0.044847159,-0.8247261246,1.0325617175,1.5050281634,-0.330327818,-0.1380918455,-0.6434771677,-0.2172504241,1.5186724785,-0.3921966294,-0.5664354918,-0.044446401,0.536691544,0.245405728,0.9576891285,-0.3874707091,0.5594096332,0.5432786584,-0.2199400899,0.3296998865,0.0328464517,14.2,0.0,0.0
15524.0,26915.0,1.2176597757,-0.4987122047,0.1473877244,-0.3186396103,-0.8838321191,-0.7747624371,-0.4448082007,-0.1057747992,-0.6890894278,0.0483141491,0.403465033,-0.3003488918,0.1751505771,-1.3383513964,0.273333967,0.7195206921,1.6954709623,-1.8220070373,0.1671676528,0.1700617561,0.0035965365,-0.0457202484,0.0084800445,0.3470659179,0.3475651167,-0.2984976151,0.0300083193,0.0479370565,66.44,0.0,0.0
15525.0,26916.0,-0.5618932475,1.9514347829,-0.3057757985,0.5634692309,1.2138858485,-2.057169198,1.5634520614,-0.2857910443,-0.8646585893,-2.2408546224,0.6811632711,-1.1444222427,-1.2529720665,-3.3198601044,0.397398774,1.1862390849,2.570602955,1.4216355674,-1.6372004818,-0.1425226813,-0.0975438915,-0.3160905749,-0.3309493851,0.3483059098,0.2586325638,-0.515197401,0.1190286479,0.2021055974,4.8,0.0,0.0
15526.0,26916.0,1.3393225315,-0.4237128067,0.3332655092,-0.7136928152,-0.9019485472,-0.9448449696,-0.3738208066,-0.2179119217,-1.3253645796,0.788885807,1.8011395836,0.6726171295,1.0540904092,-0.0580083289,-0.1697540276,1.0809452461,0.0583383015,-1.0815644049,0.7194506515,0.1689706925,0.2985129938,0.7759045743,-0.1272814277,0.5990662918,0.6021874767,-0.1514966899,-0.005772103,0.0072728573,26.85,0.0,0.0


In [31]:
%fs ls /FileStore/tables/card-logs-data-table/

path,name,size
dbfs:/FileStore/tables/card-logs-data-table/Sun_May_24_16_43_42_2020.csv,Sun_May_24_16_43_42_2020.csv,72750


In [32]:
%sql

DROP TABLE  IF  EXISTS  card_logs_data

In [33]:
%sql
CREATE EXTERNAL TABLE IF NOT EXISTS  card_logs_data(
  MessageID DOUBLE,
  Time DOUBLE,
  
  V1 DOUBLE,   V2 DOUBLE,   V3 DOUBLE,   V4 DOUBLE,   V5 DOUBLE,   V6 DOUBLE,   V7 DOUBLE,
  V8 DOUBLE,   V9 DOUBLE,   V10 DOUBLE,  V11 DOUBLE,  V12 DOUBLE,  V13 DOUBLE,  V14 DOUBLE,  
  V15 DOUBLE,  V16 DOUBLE,  V17 DOUBLE,  V18 DOUBLE,  V19 DOUBLE,  V20 DOUBLE,  V21 DOUBLE,
  V22 DOUBLE,  V23 DOUBLE,  V24 DOUBLE,  V25 DOUBLE,  V26 DOUBLE,  V27 DOUBLE,  V28 DOUBLE, 
  Amount DOUBLE,  Class DOUBLE,  Prediction DOUBLE

  
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/FileStore/tables/card-logs-data-table/';
     


In [34]:
%sql 
SELECT    * FROM card_logs_data 
-- MessageID,  Time, V1,  V28, Amount, Class, Prediction

-- SELECT    * FROM card_logs_data GROUP BY Prediction  

-- TRUE POSITIVES 24
-- SELECT    COUNT(*) FROM card_logs_data WHERE Class ==1 AND  Prediction ==1

-- TRUE NEGATIVES 3695
-- SELECT    COUNT(*) FROM card_logs_data WHERE Class ==0 AND  Prediction ==0

-- FALSE POSTIVES 0
-- SELECT    COUNT(*) FROM card_logs_data WHERE Class ==0 AND  Prediction ==1

-- FALSE NEGATIVES 2
-- SELECT    COUNT(*) FROM card_logs_data WHERE Class ==1 AND  Prediction ==0



MessageID,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,V10,V11,V12,V13,V14,V15,V16,V17,V18,V19,V20,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class,Prediction
15516.0,26905.0,-1.059690124,1.4442398159,1.4021601297,-0.0039575924,0.4313087845,-0.2663826561,1.0118127759,-0.8165845873,-0.7181505012,-0.5022710758,-0.1476371245,0.5345015985,0.5620484673,0.2037363817,0.3994687835,0.0159510764,-0.3823352274,-1.1333129331,-1.10877904,-0.1788836032,0.3560744273,-1.3700931848,0.1424650394,-0.0051327916,-0.0826111597,-0.9387589102,0.4060907504,0.1668482393,52.0,0.0,0.0
15517.0,26906.0,1.1609567794,1.2656213441,-1.5764729803,1.4729879638,1.1621726912,-1.0135323569,0.6581328268,-0.1526238089,-0.8473259374,-1.4524985804,2.8939231012,0.7741492136,0.6635058078,-3.3337206434,0.5232978065,0.8978112902,2.6826126385,1.2693141465,-0.9122617532,0.0024655761,-0.1203645055,-0.2049968528,-0.203189095,-0.2821715296,0.7852781366,-0.2849579915,0.0583991252,0.0854358811,1.79,0.0,0.0
15518.0,26906.0,1.1945546066,-0.1467282003,0.4597760204,-0.2333314876,-0.5436549225,-0.4818937894,-0.2221221155,-0.0470851329,0.0461698221,-0.081033111,1.3007436261,1.1831935259,0.8197624487,0.1733737919,0.5018370619,0.6695586948,-0.7734134266,0.1036205026,0.4252988957,0.0751826789,-0.0739031165,-0.2818232173,0.0040417289,0.087200732,0.1421305978,0.9010407832,-0.077138038,0.0037395343,38.16,0.0,0.0
15519.0,26909.0,-0.4372562536,-3.4599029903,-0.8510696436,0.1737636945,-1.7108014185,-0.0771364774,0.748273876,-0.2165807824,-1.0740082459,0.3488182003,1.2536747619,0.4642410898,-0.5421979735,0.6032509306,-0.3149753313,-1.8002366066,0.4831637421,0.8473124677,-0.8918122825,1.2630127695,0.1815352891,-0.9567472968,-0.7486900442,0.3104933337,-0.0515741823,0.9973787173,-0.2309269844,0.1420098916,908.6,0.0,0.0
15520.0,26911.0,0.7750096633,-0.9586281014,1.0055755342,0.597924481,-1.1679624651,0.576904844,-0.8664704836,0.4349765924,0.7539800101,-0.0062420383,0.901491218,-0.1311530719,-1.5579372871,0.3897912198,1.3952161912,0.9543525749,-0.6768584986,0.8369574757,-0.6102196693,0.1744140539,0.3712255091,0.6228807701,-0.1972479979,-0.3013604219,0.0219622709,0.4905795651,-0.0125631129,0.0435216447,190.0,0.0,0.0
15521.0,26912.0,1.0935532844,0.0138924535,1.1491267785,1.3743511399,-0.7353302398,-0.0127907553,-0.4106614462,0.0873984866,0.6282664827,-0.242893922,-0.1672665655,1.0776346664,0.7697052099,-0.3979425201,0.1885448555,-0.5253359311,0.1622708763,-0.6623085006,-0.723796919,-0.115906188,0.1524005881,0.7413398691,-0.0859163073,0.4540305779,0.5615818187,-0.2093641896,0.0815190169,0.0327433607,15.13,0.0,0.0
15522.0,26913.0,-0.5429222088,0.6817533002,1.6166490124,-0.4498867407,0.2777079565,-1.1569001686,0.6713976286,-0.0988143118,-0.3953690724,-0.7327828012,-0.4811612102,0.0566578808,0.0652545596,0.0983507493,0.3387078731,0.5460849569,-0.7225444862,-0.4616007772,-0.8385104799,-0.0965455292,-0.1770919703,-0.6754190764,-0.008519805,0.353517433,-0.0592324406,-0.0740869974,-0.0419142832,0.0181621336,1.98,0.0,0.0
15523.0,26914.0,-1.1408428333,0.4961850912,2.2659645064,-2.0187499872,-0.7714001059,-0.6017134866,0.0924590047,-0.044847159,-0.8247261246,1.0325617175,1.5050281634,-0.330327818,-0.1380918455,-0.6434771677,-0.2172504241,1.5186724785,-0.3921966294,-0.5664354918,-0.044446401,0.536691544,0.245405728,0.9576891285,-0.3874707091,0.5594096332,0.5432786584,-0.2199400899,0.3296998865,0.0328464517,14.2,0.0,0.0
15524.0,26915.0,1.2176597757,-0.4987122047,0.1473877244,-0.3186396103,-0.8838321191,-0.7747624371,-0.4448082007,-0.1057747992,-0.6890894278,0.0483141491,0.403465033,-0.3003488918,0.1751505771,-1.3383513964,0.273333967,0.7195206921,1.6954709623,-1.8220070373,0.1671676528,0.1700617561,0.0035965365,-0.0457202484,0.0084800445,0.3470659179,0.3475651167,-0.2984976151,0.0300083193,0.0479370565,66.44,0.0,0.0
15525.0,26916.0,-0.5618932475,1.9514347829,-0.3057757985,0.5634692309,1.2138858485,-2.057169198,1.5634520614,-0.2857910443,-0.8646585893,-2.2408546224,0.6811632711,-1.1444222427,-1.2529720665,-3.3198601044,0.397398774,1.1862390849,2.570602955,1.4216355674,-1.6372004818,-0.1425226813,-0.0975438915,-0.3160905749,-0.3309493851,0.3483059098,0.2586325638,-0.515197401,0.1190286479,0.2021055974,4.8,0.0,0.0


In [35]:
%sql
-- TRUE POSITIVES 
SELECT    COUNT(*) FROM card_logs_data WHERE Class ==1 AND  Prediction ==1

count(1)
3


In [36]:
%sql
-- TRUE NEGATIVES
SELECT    COUNT(*) FROM card_logs_data WHERE Class ==0 AND  Prediction ==0

count(1)
177


In [37]:
%sql
-- FALSE POSTIVES 
SELECT    COUNT(*) FROM card_logs_data WHERE Class ==0 AND  Prediction ==1

count(1)
0


In [38]:
%sql
-- FALSE NEGATIVES 2
SELECT    COUNT(*) FROM card_logs_data WHERE Class ==1 AND  Prediction ==0

count(1)
0


In [39]:
%sql
-- TOTAL CLASS DISTRIBUTION
SELECT    COUNT(*) FROM card_logs_data GROUP BY CLASS 


count(1)
177
3


In [40]:
%sql

-- Generate a list of transactions for the investigators ordered by importance

SELECT     MESSAGEID, AMOUNT, PREDICTION FROM card_logs_data ORDER BY PREDICTION DESC, AMOUNT DESC

MESSAGEID,AMOUNT,PREDICTION
15506.0,99.99,1.0
15566.0,99.99,1.0
15539.0,99.99,1.0
15555.0,2217.18,0.0
15490.0,1746.8,0.0
15501.0,1494.8,0.0
15502.0,1164.0,0.0
15577.0,976.0,0.0
15519.0,908.6,0.0
15581.0,871.62,0.0


In [41]:
 # path_test = '/FileStore/tables/card-logs-data-archievepart-00000-e63cacf7-b5e7-4a37-8654-e76f4ec8fc29-c000.csv' 
# test_data = transform_for_ccfd(path=path_test)
# predictions = model.transform(test_data)


# predictions.groupBy("prediction").count().orderBy('prediction').show()
# predictions.groupBy("label").count().orderBy('label').show()

In [42]:
# data.columns = ["Time","V1","V2","V3","V4","V5","V6","V7","V8","V9","V10","V11","V12","V13","V14","V15","V16","V17","V18","V19","V20","V21","V22","V23","V24","V25","V26","V27","V28","Amount","Class"]

In [43]:
# selectedFeatures = ['V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V9', 'V10', 'V11', 'V12', 'V14', 'V16', 'V17', 'V18', 'V19']
# selectedFeatures.append('Class')
# equal_data=equal_data.loc[:,selectedFeatures]
# equal_data.head()

In [44]:
# dfff = spark.createDataFrame(equal_data)

In [45]:
# training_df = dfff.rdd.map(lambda x: (DenseVector(x[:-1]),x['Class']))

# training_df = spark.createDataFrame(training_df,["features","label"])

In [47]:
# training_df.groupBy("label").count().show()

In [48]:
# path_test = '/FileStore/tables/card-logs-data-archievepart-00000-e63cacf7-b5e7-4a37-8654-e76f4ec8fc29-c000.csv' 
# test_data = transform_for_ccfd(path=path_test)
# predictions = model.transform(test_data)


# predictions.groupBy("prediction").count().orderBy('prediction').show()
# predictions.groupBy("label").count().orderBy('label').show()