In [1]:
df = spark.read\
    .format("cosmos.olap")\
    .option("spark.synapse.linkedService", "LinkServiceCosmosDB")\
    .option("spark.cosmos.container", "container-fraud-detection")\
    .load()

display(df.limit(10))

StatementMeta(poolSpark, 5, 1, Finished, Available)

SynapseWidget(Synapse.DataFrame, 0464d362-0d41-4875-a988-f76489a3711d)

In [11]:
df = df.selectExpr("cast(amount as long) amount", "cast(old_balance as long) old_balance", "cast(income as long) income", 'credit', 'marital_status', 'children', 'month', 'day_of_week',
 'category_recipient', 'transaction_type', 'transaction_method', 'foreign_transaction', 'isFraud')

display(df.limit(10))

StatementMeta(poolSpark, 5, 11, Finished, Available)

SynapseWidget(Synapse.DataFrame, c9864e33-a090-4958-9bd1-497423f770fa)

In [12]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier

from pyspark.ml.classification import LinearSVC
from pyspark.ml.classification import GBTClassifier

StatementMeta(poolSpark, 5, 12, Finished, Available)

EDA

In [13]:
df.printSchema()
#df.select('amount','old_balance').describe().show()

StatementMeta(poolSpark, 5, 13, Finished, Available)

root
 |-- amount: long (nullable = true)
 |-- old_balance: long (nullable = true)
 |-- income: long (nullable = true)
 |-- credit: long (nullable = true)
 |-- marital_status: long (nullable = true)
 |-- children: long (nullable = true)
 |-- month: long (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- category_recipient: string (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- transaction_method: string (nullable = true)
 |-- foreign_transaction: long (nullable = true)
 |-- isFraud: long (nullable = true)

In [14]:
train, test = df.randomSplit([0.7, 0.3], seed = 7)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

StatementMeta(poolSpark, 5, 14, Finished, Available)

Training Dataset Count: 6316
Test Dataset Count: 2684

Preprocessing

In [19]:
continuous_column = ['amount', 'old_balance', 'income']
ohe_column = ['month', 'day_of_week', 'category_recipient', 'transaction_method']
le_column = 'transaction_type'
other_column = ['credit', 'marital_status', 'children', 'foreign_transaction']

stages = []

indexer = StringIndexer(inputCol=le_column, outputCol= le_column + "_LE")
stages += [indexer]

for catCol in ohe_column:
    stringIndexer = StringIndexer(inputCol = catCol, outputCol = catCol + '_LE')
    encoder = OneHotEncoder(
        inputCols=[stringIndexer.getOutputCol()], 
        outputCols=[catCol + "_OHE"]
    )
    stages += [stringIndexer, encoder]

assemblerScaler = VectorAssembler(inputCols=continuous_column, outputCol="feature_SS")
stages += [assemblerScaler]

standardizer = StandardScaler().setInputCol("feature_SS").setOutputCol("feature_SS2")
stages += [standardizer]

assemblerInputs = [oc + "_OHE" for oc in ohe_column] + [le_column + "_LE"] + ["feature_SS2"] + other_column
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

stages += [assembler]

#rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'isFraud', numTrees=1000)
rf = GBTClassifier(featuresCol = 'features', labelCol = 'isFraud', maxIter=100)

stages += [rf]

print(stages)

StatementMeta(poolSpark, 5, 19, Finished, Available)

[StringIndexer_d81772d0c206, StringIndexer_dafd776f0544, OneHotEncoder_bcb309a249f3, StringIndexer_7688df463963, OneHotEncoder_6db804be2579, StringIndexer_2ec94498d737, OneHotEncoder_f73b7a3fde5a, StringIndexer_502f14e76a3a, OneHotEncoder_d7f324af08bc, VectorAssembler_f9efa9c2e2cf, StandardScaler_bdac443fb0cc, VectorAssembler_c906cd06f71e, GBTClassifier_75cc2c014540]

In [16]:
pipeline = Pipeline(stages = stages)
model = pipeline.fit(train)
predictions = model.transform(test)

StatementMeta(poolSpark, 5, 16, Finished, Available)

In [42]:
pred = predictions.selectExpr("cast(prediction as long) pred","isFraud")
#display(pred)
pred.printSchema()

StatementMeta(poolSpark, 3, 42, Finished, Available)

root
 |-- pred: long (nullable = true)
 |-- isFraud: long (nullable = true)

In [43]:
pandas_pred = pred.toPandas()

from sklearn.metrics import accuracy_score, confusion_matrix

print(accuracy_score(pandas_pred['isFraud'], pandas_pred['pred']))

print(confusion_matrix(pandas_pred['isFraud'], pandas_pred['pred']))

StatementMeta(poolSpark, 3, 43, Finished, Available)

0.9552906110283159
[[2373   28]
 [  92  191]]

In [44]:
model.save("https://datalakesparkproject.blob.core.windows.net/containerdatalakesparkproject/SparkModelFraudDetection")

StatementMeta(poolSpark, 3, 44, Finished, Available)

In [46]:
from pyspark.ml import PipelineModel
pipelineModel = PipelineModel.load("/mysparkmodel")
pred = pipelineModel.transform(test)
display(pred)

StatementMeta(poolSpark, 3, 46, Finished, Available)

SynapseWidget(Synapse.DataFrame, 1345e1cc-3188-478a-8994-9ebbabc9e927)

In [19]:
import pyspark

print(pyspark.__version__)

StatementMeta(poolSpark, 4, 19, Finished, Available)

3.1.2.dev0

In [4]:
import json
x_new = [{'id': 10000, 'amount': 95.55648880442871, 'old_balance': 142590.44432050743, 
          'income': 2979.7136940079527, 'credit': 0, 'marital_status': 1, 'children': 2, 
          'month': 10, 'day_of_week': 'monday', 'category_recipient': 'food_store', 
          'transaction_type': 'onsite', 'transaction_method': 'other', 'foreign_transaction': 1, 'isFraud': 0}]

# Convert the array to a serializable list in a JSON document
input_json = json.dumps({"data": x_new})

input_list = json.loads(input_json)["data"]
sc = spark.sparkContext
df = sc.parallelize(input_list).map(lambda x: json.dumps(x))
df = spark.read.json(df)        

display(df)

StatementMeta(poolSpark, 6, 4, Finished, Available)

SynapseWidget(Synapse.DataFrame, 5e4c0141-9390-4a96-8fce-649e4349654c)