In [1]:
# Read in data
df = spark.read.format('csv') \
  .option("inferSchema", 'true') \
  .option("header", 'true') \
  .option("sep", ",") \
  .load("/FileStore/tables/Use_Case_6__India_s_Audt-77a21.csv")


# Exploratory Data Analysis

Let's run some basic tests to see what kind of data we're dealing with.

In [3]:
# What kind of data are we working with?
df.printSchema()

In [4]:
df = df.drop("TOTAL", "LOCATION_ID", "Audit_Risk", "Detection_Risk")

In [5]:
# Shape of Dataframe
print("There are {} rows and {} columns.".format(df.count(), len(df.columns)))

In [6]:
df = df.dropna()

In [7]:
# Let's take a look at each feature's correlational value in relation to class
new_df = df.toPandas()
corr_matrix= new_df.corr()
abs(corr_matrix["Risk"].sort_values(ascending=True))

In [8]:
df.createOrReplaceTempView("fraud")

In [9]:
%sql

select Risk, count(0) from fraud group by Risk

Risk,count(0)
1,305
0,470


In [10]:
df.columns

In [11]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
new_df = df.withColumnRenamed("Risk","label")

features = ['Sector_score',
 'PARA_A',
 'Score_A',
 'Risk_A',
 'PARA_B',
 'Score_B6',
 'Risk_B',
 'numbers',
 'Score_B10',
 'Risk_C',
 'Money_Value',
 'Score_MV',
 'Risk_D',
 'District_Loss',
 'PROB16',
 'RiSk_E',
 'History',
 'Prob19',
 'Risk_F',
 'Score',
 'Inherent_Risk',
 'CONTROL_RISK']

assembler = VectorAssembler(
    inputCols= features,
    outputCol= "features")
output = assembler.transform(new_df)
display(output.select("features", "label").head(10))

features,label
"List(1, 22, List(), List(3.89, 4.18, 0.6, 2.508, 2.5, 0.2, 0.5, 5.0, 0.2, 1.0, 3.38, 0.2, 0.676, 2.0, 0.2, 0.4, 0.0, 0.2, 0.0, 2.4, 8.574, 0.4))",1
"List(1, 22, List(), List(3.89, 0.0, 0.2, 0.0, 4.83, 0.2, 0.966, 5.0, 0.2, 1.0, 0.94, 0.2, 0.188, 2.0, 0.2, 0.4, 0.0, 0.2, 0.0, 2.0, 2.554, 0.4))",0
"List(1, 22, List(), List(3.89, 0.51, 0.2, 0.102, 0.23, 0.2, 0.046, 5.0, 0.2, 1.0, 0.0, 0.2, 0.0, 2.0, 0.2, 0.4, 0.0, 0.2, 0.0, 2.0, 1.548, 0.4))",0
"List(1, 22, List(), List(3.89, 0.0, 0.2, 0.0, 10.8, 0.6, 6.48, 6.0, 0.6, 3.6, 11.75, 0.6, 7.05, 2.0, 0.2, 0.4, 0.0, 0.2, 0.0, 4.4, 17.53, 0.4))",1
"List(1, 22, List(), List(3.89, 0.0, 0.2, 0.0, 0.08, 0.2, 0.016, 5.0, 0.2, 1.0, 0.0, 0.2, 0.0, 2.0, 0.2, 0.4, 0.0, 0.2, 0.0, 2.0, 1.416, 0.4))",0
"List(1, 22, List(), List(3.89, 0.0, 0.2, 0.0, 0.83, 0.2, 0.166, 5.0, 0.2, 1.0, 2.95, 0.2, 0.59, 2.0, 0.2, 0.4, 0.0, 0.2, 0.0, 2.0, 2.156, 0.4))",0
"List(1, 22, List(), List(3.89, 1.1, 0.4, 0.44, 7.41, 0.4, 2.964, 5.0, 0.2, 1.0, 44.95, 0.6, 26.97, 2.0, 0.2, 0.4, 0.0, 0.2, 0.0, 3.2, 31.774, 0.4))",1
"List(1, 22, List(), List(3.89, 8.5, 0.6, 5.1, 12.03, 0.6, 7.218, 5.5, 0.4, 2.2, 7.79, 0.4, 3.116, 2.0, 0.2, 0.4, 0.0, 0.2, 0.0, 4.2, 18.034, 0.4))",1
"List(1, 22, List(), List(3.89, 8.4, 0.6, 5.04, 11.05, 0.6, 6.63, 5.5, 0.4, 2.2, 7.34, 0.4, 2.936, 2.0, 0.2, 0.4, 0.0, 0.2, 0.0, 4.2, 17.206, 0.4))",1
"List(1, 22, List(), List(3.89, 3.98, 0.6, 2.388, 0.99, 0.2, 0.198, 5.0, 0.2, 1.0, 1.93, 0.2, 0.386, 2.0, 0.2, 0.4, 0.0, 0.2, 0.0, 2.4, 4.372, 0.4))",0


In [12]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(output)

# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(output)
scaledData.show()


In [13]:
(train, test) = scaledData.randomSplit([0.8, 0.2], seed=0)

# Cache the training and test datasets
train.cache()
test.cache()

In [14]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import DecisionTreeClassifier

dtc = DecisionTreeClassifier(labelCol='label', featuresCol = 'scaledFeatures')
evaluator = BinaryClassificationEvaluator()

# Create the pipeline
pipeline = Pipeline(stages=[dtc])

# Cross Validate the data and get the best model
paramGrid = ParamGridBuilder().addGrid(dtc.maxBins, [65, 65, 65]).addGrid(dtc.maxDepth, [4, 6, 8]).build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)

# Use the best model for predictions
cv_model = crossval.fit(train)
bestModel = cv_model.bestModel
predictions = bestModel.transform(test)
error= evaluator.evaluate(predictions)
print(str(error))


In [15]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorIndexer

# Build the Model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="indexedFeatures")

# Create the Pipeline
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(train)
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(train)
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

# Train model
model = pipeline.fit(train)

# Display Decision Tree
display(model.stages[-1])

treeNode
"{""index"":9,""featureType"":""continuous"",""prediction"":null,""threshold"":5.015000000000001,""categories"":null,""feature"":20,""overflow"":false}"
"{""index"":7,""featureType"":""categorical"",""prediction"":null,""threshold"":null,""categories"":[0.0,1.0],""feature"":13,""overflow"":false}"
"{""index"":1,""featureType"":""categorical"",""prediction"":null,""threshold"":null,""categories"":[0.0],""feature"":13,""overflow"":false}"
"{""index"":0,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":3,""featureType"":""continuous"",""prediction"":null,""threshold"":2.436,""categories"":null,""feature"":20,""overflow"":false}"
"{""index"":2,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":5,""featureType"":""continuous"",""prediction"":null,""threshold"":4.5E-4,""categories"":null,""feature"":4,""overflow"":false}"
"{""index"":4,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":6,""featureType"":null,""prediction"":1.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":8,""featureType"":null,""prediction"":1.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"


In [16]:
import numpy as np
import pandas as pd
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer

# Create a random forest pipeline

rf = RandomForestClassifier(labelCol="label", featuresCol="indexedFeatures")
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer.labels)
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])
 
# Train model
model = pipeline.fit(train)

# Function to create feature importance dataframe
def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))

# Store feature importance dataframe
scores = ExtractFeatureImp(model.stages[-2].featureImportances, train, "features")

# Create a table for SQL usage
spark_df = sqlContext.createDataFrame(scores)
spark_df.createOrReplaceTempView("scores")
display(scores)

idx,name,score
20,Inherent_Risk,0.256762540063198
19,Score,0.1164530405654262
21,CONTROL_RISK,0.0815660844112927
12,Risk_D,0.0657891822915596
3,Risk_A,0.0655317150605751
10,Money_Value,0.0608068864639937
6,Risk_B,0.0605628429112013
2,Score_A,0.0594081219536186
1,PARA_A,0.0587423049668335
15,RiSk_E,0.0543394356163139
