In [1]:
from pyspark.sql import SparkSession

### Create SparkSession on cluster with default Hadoop configurations along with application name

In [2]:
spark = SparkSession \
    .builder \
    .master('local') \
    .appName('ann_banknote') \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/05 02:45:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Read the dataset & check the schemas

In [99]:
dataset = spark.read.csv('data_banknote_authentication.csv',header=True)

In [100]:
dataset.printSchema()

root
 |-- feature_1: string (nullable = true)
 |-- feature_2: string (nullable = true)
 |-- feature_3: string (nullable = true)
 |-- feature_4: string (nullable = true)
 |-- Class : string (nullable = true)



### Change the schemas from string to double (8-byte double-precision floating point numbers)

In [76]:
for col in dataset.columns:
    dataset = dataset.withColumn(col,dataset[col].cast('double'))

### Check number of NULL values for all columns

In [78]:
from pyspark.sql.functions import when, count, isnull
dataset.select([count(when(isnull(col), col)).alias(col) for col in dataset.columns]).show()

+---------+---------+---------+---------+------+
|feature_1|feature_2|feature_3|feature_4|Class |
+---------+---------+---------+---------+------+
|        1|        1|        1|        0|     0|
+---------+---------+---------+---------+------+



### Impute NULL values with column's mean & check if it's transformed correctly

In [79]:
from pyspark.ml.feature import Imputer
input_cols = ['feature_1', 'feature_2', 'feature_3', 'feature_4']
dataset = Imputer(strategy='mean', missingValue=None, inputCols=input_cols, outputCols=['f01','f02','f03','f04']).fit(dataset).transform(dataset)

In [80]:
dataset.select([count(when(isnull(col), col)).alias(col) for col in dataset.columns]).show()

+---------+---------+---------+---------+------+---+---+---+---+
|feature_1|feature_2|feature_3|feature_4|Class |f01|f02|f03|f04|
+---------+---------+---------+---------+------+---+---+---+---+
|        1|        1|        1|        0|     0|  0|  0|  0|  0|
+---------+---------+---------+---------+------+---+---+---+---+



### Combine features in terms of vectors before feeding into the model along with scale

In [81]:
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
dataset = VectorAssembler(inputCols=['f01','f02','f03','f04'], outputCol='features', handleInvalid="keep").transform(dataset)
dataset = MinMaxScaler(min=0.0, max=1.0,inputCol='features', outputCol='features_sc').fit(dataset).transform(dataset)

### Show resulting dataset (first 20 rows)

In [82]:
dataset.show()

+---------+---------+---------+---------+------+------------------+------------------+--------+--------+--------------------+--------------------+
|feature_1|feature_2|feature_3|feature_4|Class |               f01|               f02|     f03|     f04|            features|         features_sc|
+---------+---------+---------+---------+------+------------------+------------------+--------+--------+--------------------+--------------------+
|   3.6216|   8.6661|  -2.8073| -0.44699|   0.0|            3.6216|            8.6661| -2.8073|-0.44699|[3.6216,8.6661,-2...|[0.76900388695382...|
|   4.5459|   8.1674|  -2.4586|  -1.4621|   0.0|            4.5459|            8.1674| -2.4586| -1.4621|[4.5459,8.1674,-2...|[0.83565901535310...|
|    3.866|  -2.6383|   1.9242|  0.10645|   0.0|             3.866|           -2.6383|  1.9242| 0.10645|[3.866,-2.6383,1....|[0.78662859038429...|
|   3.4566|   9.5228|  -4.0112|  -3.5944|   0.0|            3.4566|            9.5228| -4.0112| -3.5944|[3.4566,9.5228

### Split train & test data

In [83]:
train_df,test_df = dataset.select('Class ','features_sc').randomSplit([0.7,0.3],seed=24)

### Check shape of train & test datasets

In [94]:
print((train_df.count(), len(train_df.columns))) , print((test_df.count(), len(test_df.columns)))

(955, 2)
(417, 2)


(None, None)

### Create dense nueral network with 4 inputs(no. of features) + one hidden layer with 128 percepteron + 2 outputs as nuniques of classes

In [95]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
mlp = MultilayerPerceptronClassifier(featuresCol='features_sc', labelCol='Class ',
                                    maxIter=100, seed=42, layers=(4, 128, 2), blockSize=8, stepSize=0.03, solver='l-bfgs')

### Train & test the model

In [118]:
model = mlp.fit(train_df)

In [119]:
pred = model.transform(test_df)

### Score the accuracy

In [120]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol='Class ',predictionCol='prediction',metricName='f1')
evaluator.evaluate(pred)

0.9976009848807263

22/04/05 03:40:33 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: Master removed our application: KILLED
22/04/05 03:40:33 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exiting due to error from cluster scheduler: Master removed our application: KILLED
	at org.apache.spark.scheduler.TaskSchedulerImpl.error(TaskSchedulerImpl.scala:919)
	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:154)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint.markDead(StandaloneAppClient.scala:262)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$receive$1.applyOrElse(StandaloneAppClient.scala:169)
	at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
	at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
	at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$Mess