In [1]:
# Spark Session, Pipeline, Functions, and Metrics
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.ml.feature import OneHotEncoder, StringIndexer, StandardScaler, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import rand
from pyspark.mllib.evaluation import MulticlassMetrics

# Keras / Deep Learning
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from keras import optimizers, regularizers
from tensorflow.keras.optimizers import Adam

# Elephas for Deep Learning on Spark
from elephas.ml_model import ElephasEstimator

In [2]:
import pyspark

In [3]:
# Spark Session
# conf = SparkConf().setMaster('local[6]').setAppName('Spark DL Tabular Pipeline')
# sc = SparkContext(conf=conf)
# spark = SQLContext(sc)


spark = pyspark.sql.SparkSession.builder.appName('practice').getOrCreate()

In [4]:
spark

In [5]:
# Load Data to Spark Dataframe
df = spark.read.csv('data/bank.csv',
                    header=True,
                    inferSchema=True)

In [6]:
# View Schema
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



In [7]:
# Preview Dataframe (Pandas Preview is Cleaner)
df.limit(5).toPandas()

Unnamed: 0,id,age,job,marital,education,default,balance,housing,loan,contact,day,month,duration,campaign,pdays,previous,poutcome,deposit
0,1,59,admin.,married,secondary,no,2343,yes,no,unknown,5,may,1042,1,-1,0,unknown,yes
1,2,56,admin.,married,secondary,no,45,no,no,unknown,5,may,1467,1,-1,0,unknown,yes
2,3,41,technician,married,secondary,no,1270,yes,no,unknown,5,may,1389,1,-1,0,unknown,yes
3,4,55,services,married,secondary,no,2476,yes,no,unknown,5,may,579,1,-1,0,unknown,yes
4,5,54,admin.,married,tertiary,no,184,no,no,unknown,5,may,673,2,-1,0,unknown,yes


In [8]:
# Drop Unnessary Features (Day and Month)
df = df.drop('day', 'month')

In [9]:
# Preview Dataframe
df.limit(5).toPandas()

Unnamed: 0,id,age,job,marital,education,default,balance,housing,loan,contact,duration,campaign,pdays,previous,poutcome,deposit
0,1,59,admin.,married,secondary,no,2343,yes,no,unknown,1042,1,-1,0,unknown,yes
1,2,56,admin.,married,secondary,no,45,no,no,unknown,1467,1,-1,0,unknown,yes
2,3,41,technician,married,secondary,no,1270,yes,no,unknown,1389,1,-1,0,unknown,yes
3,4,55,services,married,secondary,no,2476,yes,no,unknown,579,1,-1,0,unknown,yes
4,5,54,admin.,married,tertiary,no,184,no,no,unknown,673,2,-1,0,unknown,yes


In [10]:
# Helper function to select features to scale given their skew
def select_features_to_scale(df=df, lower_skew=-2, upper_skew=2, dtypes='int32', drop_cols=['']):
    
    # Empty Selected Feature List for Output
    selected_features = []
    
    # Select Features to Scale based on Inputs ('in32' type, drop 'ID' columns or others, skew bounds)
    feature_list = list(df.toPandas().select_dtypes(include=[dtypes]).columns.drop(drop_cols))
    
    # Loop through 'feature_list' to select features based on Kurtosis / Skew
    for feature in feature_list:

        if df.toPandas()[feature].kurtosis() < -2 or df.toPandas()[feature].kurtosis() > 2:
            
            selected_features.append(feature)
    
    # Return feature list to scale
    return selected_features

In [11]:
# Spark Pipeline
cat_features = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'poutcome']
num_features = ['age','balance','duration','campaign','pdays','previous']
label = 'deposit'

# Pipeline Stages List
stages = []

# Loop for StringIndexer and OHE for Categorical Variables
for features in cat_features:
    
    # Index Categorical Features
    string_indexer = StringIndexer(inputCol=features, outputCol=features + "_index")
    
    # One Hot Encode Categorical Features
    encoder = OneHotEncoder(inputCols=[string_indexer.getOutputCol()],
                                     outputCols=[features + "_class_vec"])
    # Append Pipeline Stages
    stages += [string_indexer, encoder]
    
# Index Label Feature
label_str_index =  StringIndexer(inputCol=label, outputCol="label_index")

# Scale Feature: Select the Features to Scale using helper 'select_features_to_scale' function above and Standardize 
unscaled_features = select_features_to_scale(df=df, lower_skew=-2, upper_skew=2, dtypes='int32', drop_cols=['id'])

unscaled_assembler = VectorAssembler(inputCols=unscaled_features, outputCol="unscaled_features")
scaler = StandardScaler(inputCol="unscaled_features", outputCol="scaled_features")

stages += [unscaled_assembler, scaler]

# Create list of Numeric Features that Are Not Being Scaled
num_unscaled_diff_list = list(set(num_features) - set(unscaled_features))

# Assemble or Concat the Categorical Features and Numeric Features
assembler_inputs = [feature + "_class_vec" for feature in cat_features] + num_unscaled_diff_list

assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="assembled_inputs") 

stages += [label_str_index, assembler]

# Assemble Final Training Data of Scaled, Numeric, and Categorical Engineered Features
assembler_final = VectorAssembler(inputCols=["scaled_features","assembled_inputs"], outputCol="features")

stages += [assembler_final]

In [12]:
stages

[StringIndexer_8251ab32872f,
 OneHotEncoder_3e9b97c0666a,
 StringIndexer_bba0cfd78a5c,
 OneHotEncoder_ca6a5eb1b3b1,
 StringIndexer_cfe4b0d3e5a6,
 OneHotEncoder_3600493bb5af,
 StringIndexer_c8b1a10f7f30,
 OneHotEncoder_564e66037d0a,
 StringIndexer_7d96b11abdc9,
 OneHotEncoder_4e7bd09e39e0,
 StringIndexer_ca9bf34af678,
 OneHotEncoder_87c4bf033321,
 StringIndexer_d4b44717bcf8,
 OneHotEncoder_5de4b7c6fb72,
 StringIndexer_5f19d3547e32,
 OneHotEncoder_6f5ee94f6aa0,
 VectorAssembler_5bddd04c9dbe,
 StandardScaler_0fc2a079e9f0,
 StringIndexer_ab6df44c1af0,
 VectorAssembler_9529fa65a22f,
 VectorAssembler_da0e40b6776c]

In [13]:
# Set Pipeline
pipeline = Pipeline(stages=stages)

# Fit Pipeline to Data
pipeline_model = pipeline.fit(df)

# Transform Data using Fitted Pipeline
df_transform = pipeline_model.transform(df)

In [14]:
# Preview Newly Transformed Data
df_transform.limit(5).toPandas()

Unnamed: 0,id,age,job,marital,education,default,balance,housing,loan,contact,...,loan_class_vec,contact_index,contact_class_vec,poutcome_index,poutcome_class_vec,unscaled_features,scaled_features,label_index,assembled_inputs,features
0,1,59,admin.,married,secondary,no,2343,yes,no,unknown,...,(1.0),1.0,"(0.0, 1.0)",0.0,"(1.0, 0.0, 0.0)","[2343.0, 1042.0, 1.0, -1.0, 0.0]","[0.7264185278681131, 3.0017712260834295, 0.367...",1.0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.7264185278681131, 3.0017712260834295, 0.367..."
1,2,56,admin.,married,secondary,no,45,no,no,unknown,...,(1.0),1.0,"(0.0, 1.0)",0.0,"(1.0, 0.0, 0.0)","[45.0, 1467.0, 1.0, -1.0, 0.0]","[0.013951700279157103, 4.226102100445672, 0.36...",1.0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.013951700279157103, 4.226102100445672, 0.36..."
2,3,41,technician,married,secondary,no,1270,yes,no,unknown,...,(1.0),1.0,"(0.0, 1.0)",0.0,"(1.0, 0.0, 0.0)","[1270.0, 1389.0, 1.0, -1.0, 0.0]","[0.39374798565621155, 4.001401375268602, 0.367...",1.0,"(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.39374798565621155, 4.001401375268602, 0.367..."
3,4,55,services,married,secondary,no,2476,yes,no,unknown,...,(1.0),1.0,"(0.0, 1.0)",0.0,"(1.0, 0.0, 0.0)","[2476.0, 579.0, 1.0, -1.0, 0.0]","[0.7676535531376218, 1.667970767660562, 0.3673...",1.0,"(0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, ...","(0.7676535531376218, 1.667970767660562, 0.3673..."
4,5,54,admin.,married,tertiary,no,184,no,no,unknown,...,(1.0),1.0,"(0.0, 1.0)",0.0,"(1.0, 0.0, 0.0)","[184.0, 673.0, 2.0, -1.0, 0.0]","[0.05704695225255348, 1.938763949284211, 0.734...",1.0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.05704695225255348, 1.938763949284211, 0.734..."


In [15]:
# Data Structure Type is a PySpark Dataframe
type(df_transform)

pyspark.sql.dataframe.DataFrame

In [16]:
# Select only 'features' and 'label_index' for Final Dataframe
df_transform_fin = df_transform.select('features','label_index')
df_transform_fin.limit(5).toPandas()

Unnamed: 0,features,label_index
0,"(0.7264185278681131, 3.0017712260834295, 0.367...",1.0
1,"(0.013951700279157103, 4.226102100445672, 0.36...",1.0
2,"(0.39374798565621155, 4.001401375268602, 0.367...",1.0
3,"(0.7676535531376218, 1.667970767660562, 0.3673...",1.0
4,"(0.05704695225255348, 1.938763949284211, 0.734...",1.0


In [17]:
# Shuffle Data
df_transform_fin = df_transform_fin.orderBy(rand())

In [18]:
# Split Data into Train / Test Sets
train_data, test_data = df_transform_fin.randomSplit([.8, .2],seed=1234)

In [19]:
# Number of Classes
nb_classes = train_data.select("label_index").distinct().count()

# Number of Inputs or Input Dimensions
input_dim = len(train_data.select("features").first()[0])

In [20]:
# Set up Deep Learning Model / Architecture
model = Sequential()
model.add(Dense(256, input_shape=(input_dim,), activity_regularizer=regularizers.l2(0.01)))
model.add(Activation('relu'))
model.add(Dropout(rate=0.3))
model.add(Dense(256, activity_regularizer=regularizers.l2(0.01)))
model.add(Activation('relu'))
model.add(Dropout(rate=0.3))
model.add(Dense(nb_classes))
model.add(Activation('sigmoid'))
model.compile(loss='binary_crossentropy', optimizer='adam')

In [21]:
# Model Summary
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense (Dense)               (None, 256)               7936      
                                                                 
 activation (Activation)     (None, 256)               0         
                                                                 
 dropout (Dropout)           (None, 256)               0         
                                                                 
 dense_1 (Dense)             (None, 256)               65792     
                                                                 
 activation_1 (Activation)   (None, 256)               0         
                                                                 
 dropout_1 (Dropout)         (None, 256)               0         
                                                                 
 dense_2 (Dense)             (None, 2)                 5

In [22]:
# Set and Serialize Optimizer
optimizer_conf = Adam(learning_rate = 0.01)
opt_conf = optimizers.serialize(optimizer_conf)

# Initialize SparkML Estimator and Get Settings
estimator = ElephasEstimator()
estimator.setFeaturesCol("features")
estimator.setLabelCol("label_index")
estimator.set_keras_model_config(model.to_json())
estimator.set_categorical_labels(True)
estimator.set_nb_classes(nb_classes)
estimator.set_num_workers(1)
estimator.set_epochs(25) 
estimator.set_batch_size(64)
estimator.set_verbosity(1)
estimator.set_validation_split(0.10)
estimator.set_optimizer_config(opt_conf)
estimator.set_mode("synchronous")
estimator.set_loss("binary_crossentropy")
estimator.set_metrics(['acc'])

ElephasEstimator_3307ae17b414

In [23]:
# Create Deep Learning Pipeline
dl_pipeline = Pipeline(stages=[estimator])

In [24]:
def dl_pipeline_fit_score_results(dl_pipeline=dl_pipeline,
                                  train_data=train_data,
                                  test_data=test_data,
                                  label='label_index'):
    
    fit_dl_pipeline = dl_pipeline.fit(train_data)
    pred_train = fit_dl_pipeline.transform(train_data)
    pred_test = fit_dl_pipeline.transform(test_data)
    
    pnl_train = pred_train.select(label, "prediction")
    pnl_test = pred_test.select(label, "prediction")
    
    pred_and_label_train = pnl_train.rdd.map(lambda row: (row[label], row['prediction']))
    pred_and_label_test = pnl_test.rdd.map(lambda row: (row[label], row['prediction']))
    
    metrics_train = MulticlassMetrics(pred_and_label_train)
    metrics_test = MulticlassMetrics(pred_and_label_test)
    
    print("Training Data Accuracy: {}".format(round(metrics_train.precision(),4)))
    print("Training Data Confusion Matrix")
    display(pnl_train.crosstab('label_index', 'prediction').toPandas())
    
    print("\nTest Data Accuracy: {}".format(round(metrics_test.precision(),4)))
    print("Test Data Confusion Matrix")
    display(pnl_test.crosstab('label_index', 'prediction').toPandas())

In [25]:
dl_pipeline_fit_score_results(dl_pipeline=dl_pipeline,
                              train_data=train_data,
                              test_data=test_data,
                              label='label_index');

>>> Fit model
>>> Synchronous training complete.




Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 60.0 failed 1 times, most recent failure: Lost task 0.0 in stage 60.0 (TID 47) (192.168.8.101 executor driver): java.lang.NullPointerException
	at org.apache.spark.ml.linalg.VectorUDT.deserialize(VectorUDT.scala:62)
	at org.apache.spark.ml.linalg.VectorUDT.deserialize(VectorUDT.scala:28)
	at org.apache.spark.sql.catalyst.CatalystTypeConverters$UDTConverter.toScala(CatalystTypeConverters.scala:156)
	at org.apache.spark.sql.catalyst.CatalystTypeConverters$UDTConverter.toScalaImpl(CatalystTypeConverters.scala:160)
	at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toScala(CatalystTypeConverters.scala:121)
	at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toScala(CatalystTypeConverters.scala:284)
	at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toScala(CatalystTypeConverters.scala:248)
	at org.apache.spark.sql.catalyst.CatalystTypeConverters$.$anonfun$createToScalaConverter$2(CatalystTypeConverters.scala:498)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:86)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:80)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:670)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:424)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2019)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:259)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2352)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2351)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2351)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1109)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2591)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2533)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:898)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
	at org.apache.spark.ml.linalg.VectorUDT.deserialize(VectorUDT.scala:62)
	at org.apache.spark.ml.linalg.VectorUDT.deserialize(VectorUDT.scala:28)
	at org.apache.spark.sql.catalyst.CatalystTypeConverters$UDTConverter.toScala(CatalystTypeConverters.scala:156)
	at org.apache.spark.sql.catalyst.CatalystTypeConverters$UDTConverter.toScalaImpl(CatalystTypeConverters.scala:160)
	at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toScala(CatalystTypeConverters.scala:121)
	at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toScala(CatalystTypeConverters.scala:284)
	at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toScala(CatalystTypeConverters.scala:248)
	at org.apache.spark.sql.catalyst.CatalystTypeConverters$.$anonfun$createToScalaConverter$2(CatalystTypeConverters.scala:498)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:86)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:80)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:670)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:424)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2019)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:259)
