# PySpark implementation of ANN(MultilayerPerceptronClassifier)

## Finding Spark Installation
We need to tell the code where the Spark installation lie in the current system

In [1]:
import findspark
findspark.init()

## Creating Spark Context
Bare minimum requirement of any spark program

In [2]:
from pyspark import SparkContext, SparkConf
conf = SparkConf()
sc = SparkContext(conf=conf)

## Create Spark Session

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

## Creating Spark SQL Context
This will enable us to have sql functionality in the code

In [4]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

### Read the csv file into a Spark Dataframe


In [5]:
data = sqlContext.read.format("com.databricks.spark.csv")\
.option("header", "true")\
.option("inferschema", "true")\
.option("mode", "DROPMALFORMED")\
.load("data/Churn_Modelling.csv")

In [6]:
import pyspark.sql.functions as F

# converting integer values in Exited column to string, so that we can train the stringIndexer
data =data.withColumn("Exited", F.col("Exited").cast('boolean').cast('string'))# 1 -> true -> 'true'


## Data Preprocessing


### Splitting the data to training and test set

In [7]:
(trainingData, testData) = data.randomSplit([0.8, 0.2])

### Setting up data preprocessing stages for the model pipeline

In [72]:
from pyspark.ml.feature import StringIndexer, IndexToString, VectorAssembler, OneHotEncoderEstimator, MinMaxScaler
from pyspark.ml.classification import MultilayerPerceptronClassifier, RandomForestClassifier


In [115]:

stages=[] # to hold all stages in sequence

# stage for output label encoding
stages.append(StringIndexer(inputCol='Exited',  outputCol='s_exited', handleInvalid='skip').fit(trainingData))

# string index geography
stages.append(StringIndexer(inputCol ='Geography', outputCol='s_geography', handleInvalid='keep'))

# One hot encode geography
stages.append(OneHotEncoderEstimator(inputCols=['s_geography'], outputCols=['oh_s_geography']))

# String index gender
stages.append(StringIndexer(inputCol = 'Gender', outputCol='s_gender', handleInvalid='keep'))

# one hot encoding gender
stages.append(OneHotEncoderEstimator(inputCols=['s_gender'], outputCols=['oh_s_gender']))



# stage for feature/vector assembler
stages.append(VectorAssembler(inputCols=['oh_s_gender','oh_s_geography','CreditScore',
 'Age',
 'Tenure',
 'Balance',
 'NumOfProducts',
 'HasCrCard',
 'IsActiveMember',
 'EstimatedSalary'], outputCol='features'))

# stage for scaling the features using MinMax scaler
stages.append(MinMaxScaler(inputCol='features', outputCol='scaledfeatures'))

#### Random Forest Classifier Stage

In [52]:
# stage for random forest
stages.append(RandomForestClassifier(labelCol='s_exited', featuresCol='features',
                                     maxBins=100, numTrees=40, maxDepth=10))

#### MultilayerPerceptronClassifier stage 
!!!!!!!!!!!!!! __REQUIRES SCALED FEATURE SET__ !!!!!!!!!!!!!!!!!!!!!!!!

In [116]:
# stage for classifier
stages.append(MultilayerPerceptronClassifier(labelCol="s_exited", featuresCol="scaledfeatures",
                                         maxIter=200, layers=[13,5,5,2]))

In [117]:
#stage for reverse indexing the prediction label

stages.append(IndexToString(inputCol='prediction', outputCol='lab_prediction', labels=stages[0].labels))

### Making the Pipeline Model

In [118]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=stages) # Making Pipeline

model = pipeline.fit(trainingData)# Making the model

### Making the predictions

In [119]:
predictions = model.transform(testData)

### Evaluating the Model

In [120]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol='s_exited', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print("Accuracy  = %g and Test Error = %g"%(accuracy, 1-accuracy))

Accuracy  = 0.854115 and Test Error = 0.145885
