# Introduction
This project is my attempt to learn how to use PySpark (Apache Spark) with Keras via Elephas interface. Since the loaded MNIST has already been preprocessed, this Notebook only covers 3 steps:
1. Defining a model with Keras (& train the Keras model using CPU - the traditional way)
2. Convert the dataset into RDD with `sc.parallel()`
3. Load the Keras model onto Spark with Elephas & train

From this project, I've learned how to install Apache Spark & load PySpark onto local machine; how to convert a Pandas DF into RDD & how to load a Keras model into PySpark.

**Dependency:** `keras`, `findspark`, `elephas`, and local Apache Spark

# Load MNIST dataset

In [1]:
import numpy as np
import keras
from keras.datasets import mnist
(X_train, y_train), (X_test, y_test) = mnist.load_data() # Train (60000, 28, 28)

#Flatten each of our 28 X 28 images to a vector of 1, 784
X_train = X_train.reshape(-1, 784) # (60000, 784)
X_test = X_test.reshape(-1, 784) # (10000, 784)

from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler().fit(X_train)
X_train = scaler.transform(X_train)
X_test = scaler.transform(X_test)

Using TensorFlow backend.


# Defining ShallowNet Model

In [2]:
from keras.models import Sequential
from keras.layers import Input, Dense, Conv2D
from keras.layers import MaxPooling2D, Dropout,Flatten
from keras import backend as K
from keras.models import Model

In [3]:
# Define global vars
INPUT_SHAPE = (1,28,28) if K.image_data_format() == 'channels_first' else (28,28, 1)
EPOCHS = 5
BS = 100
OPTIMIZER = 'adam'
LOSS = 'categorical_crossentropy'
METRICS = ['accuracy']

In [4]:
def ShallowNet():
    # CONV > CONV > FC
    model = Sequential()
    model.add(Conv2D(32, kernel_size=(5, 5), 
                           activation='relu', input_shape=INPUT_SHAPE, padding='same'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Conv2D(64, (5, 5), activation='relu', padding='same'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Flatten())
    model.add(Dense(512, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(10, activation='softmax'))
    return model

model = ShallowNet() # instantiate model
model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)
model.summary()

Model: "sequential_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_1 (Conv2D)            (None, 28, 28, 32)        832       
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 14, 14, 32)        0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 14, 14, 64)        51264     
_________________________________________________________________
max_pooling2d_2 (MaxPooling2 (None, 7, 7, 64)          0         
_________________________________________________________________
flatten_1 (Flatten)          (None, 3136)              0         
_________________________________________________________________
dense_1 (Dense)              (None, 512)               1606144   
_________________________________________________________________
dropout_1 (Dropout)          (None, 512)              

# Train with Keras TF (CPU multithreading)

In [5]:
# Train with CPU multithreading
model.fit(X_train, y_train, batch_size=BS,
         epochs=EPOCHS, verbose=1, workers=-1)
pred = model.predict(X_test)
acc = model.score(X_test, y_test)
print("Test Data Accuracy: {}".format(round(acc,4)))

UsageError: Line magic function `%%script` not found.


# Train with Elephas (Apache Spark)

In [35]:
# Open a PySpark session
import findspark
findspark.init()
findspark.find()
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.mllib.evaluation import MulticlassMetrics
conf = pyspark.SparkConf().setAppName('appName').setMaster('local').setAll(
    [('spark.executor.memory', '10g'), 
     ('spark.executor.cores', '3'), ('spark.cores.max', '3'), 
     ('spark.driver.memory','8g'),
    ('spark.jars.packages', 'databricks:spark-deep-learning:1.5.0-spark2.4-s_2.11')])
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)
sc

In [None]:
# NumPy array -> RDD DataFrame
rdd_train = sc.parallelize(zip(X_train.tolist(), y_train.tolist()),
                    numSlices=5000).toDF(['features','label'])
rdd_test = sc.parallelize(zip(X_test.tolist(), y_test.tolist()),
                    numSlices=5000).toDF(['features','label'])

In [None]:
# Keras > Elephas
from elephas.ml_model import ElephasEstimator
estimator = ElephasEstimator() # Instantiate ELephas Model
estimator.set_keras_model_config(model.to_yaml())
estimator.setFeaturesCol('features')
estimator.setLabelCol('label')
estimator.set_categorical_labels(True)
estimator.set_nb_classes(10)
estimator.set_num_workers(-1)
estimator.set_epochs(EPOCHS)
estimator.set_batch_size(BS)
estimator.set_optimizer_config(OPTIMIZER)
estimator.set_metrics(METRICS)
estimator.set_loss(LOSS)
estimator.set_verbosity(1)
estimator.set_mode('synchronous')

In [None]:
# Train with Elephas
elephas_pl = Pipeline(stages=[estimator])
elephas_fit = elephas_pl.fit(rdd_train) # train model
elephas_test = elephas_fit.transform(rdd_test) # fit model to test
elephas_pred = elephas_test.select('label') # SQL prediction
# Return test accuracy
collate = elephas_test.select('label', 
                           'prediction').rdd.map(lambda row: (row['label'], row['prediction']))
metrics_test = MulticlassMetrics(collate).precision()
print("Test Data Accuracy: {}".format(round(metrics_test,4)))

In [34]:
# Stop the PySpark session
sc.stop()