In [1]:
# Spark session,PIpeline,Fucntions and Metrics
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext,SparkSession
from pyspark.ml.feature import OneHotEncoder, StringIndexer,StandardScaler,VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import rand
from pyspark.mllib.evaluation import MulticlassMetrics

In [2]:
# Keras/Deep learning
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import optimizers,regularizers,Sequential
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.optimizers import Adam,Optimizer
from tensorflow.keras.layers import Input,Dense,Dropout, Activation,Lambda
from keras.layers.core import Dropout, Lambda
from keras.layers.convolutional import Conv2D, Conv2DTranspose
from keras.layers.pooling import MaxPooling2D
from keras.layers.merge import concatenate


In [3]:
# Elephas for Deep learning on spark
import elephas
from elephas.ml_model import ElephasEstimator
from elephas.spark_model import SparkModel
from elephas.utils.rdd_utils import to_simple_rdd

In [4]:
conf = SparkConf().setAppName("Spark DL Tabular Pipeline").setMaster('local[6]')
sc = SparkContext(conf=conf)

In [5]:
sql_context = SQLContext(sc)

In [6]:
df = sql_context.read.csv('/mnt/PySpark/bank-additional/bank-additional-full.csv',sep =';', header=True,inferSchema=True)

In [7]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- emp.var.rate: double (nullable = true)
 |-- cons.price.idx: double (nullable = true)
 |-- cons.conf.idx: double (nullable = true)
 |-- euribor3m: double (nullable = true)
 |-- nr.employed: double (nullable = true)
 |-- y: string (nullable = true)



In [8]:
df.limit(5).toPandas()

Unnamed: 0,age,job,marital,education,default,housing,loan,contact,month,day_of_week,...,campaign,pdays,previous,poutcome,emp.var.rate,cons.price.idx,cons.conf.idx,euribor3m,nr.employed,y
0,56,housemaid,married,basic.4y,no,no,no,telephone,may,mon,...,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
1,57,services,married,high.school,unknown,no,no,telephone,may,mon,...,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
2,37,services,married,high.school,no,yes,no,telephone,may,mon,...,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
3,40,admin.,married,basic.6y,no,no,no,telephone,may,mon,...,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
4,56,services,married,high.school,no,no,yes,telephone,may,mon,...,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no


In [9]:
df = df.drop('day_of_week','month', 'emp.var.rate', 'cons.price.idx', 'cons.conf.idx', 'euribor3m', 'nr.employed')

In [10]:
def select_features_to_scale (df=df,lower_skew=-2,upper_skew=2,dtypes='int32',drop_cols=[]):
    selected_features =[]
    # Get column names or feature list with thos eparticualr dtypes and drop columns that we dont want to scale
    feature_list = list(df.toPandas().select_dtypes(include=[dtypes]).columns.drop(drop_cols))
    for feature in feature_list:
        if df.toPandas()[feature].kurtosis() <lower_skew or df.toPandas()[feature].kurtosis()>upper_skew:
            selected_features.append(feature)
    return selected_features

In [11]:
num_cols = list(df.toPandas().select_dtypes(include=['int32','double']))
cat_cols = list(df.toPandas().select_dtypes(include=['object']).columns.drop(['y']))
label='y'

In [12]:
# pipeline stages list
stages =[]

In [13]:
# Loop for StringIndexer and OHE for Categorical variables

for features in cat_cols:
    # indexein the categorical variable
    string_indexer =  StringIndexer(inputCol=features,outputCol=features+'_index')
    # one hot encoding them
    encoder = OneHotEncoder(inputCols=[string_indexer.getOutputCol()],outputCols=[features+"_class_vec"])
    # append pipeline stages
    stages +=[string_indexer,encoder]

In [14]:
# index label feature
label_str_index = StringIndexer(inputCol=label, outputCol = 'label_index')

# Scaling Features
unscaled_features  =select_features_to_scale(df=df)
unscaled_assember  = VectorAssembler(inputCols= unscaled_features,outputCol="unscaled_features")
scaler = StandardScaler(inputCol="unscaled_features",outputCol="scaled_features")

In [15]:
stages +=[unscaled_assember,scaler]

In [16]:
# creating a list of numberic features that are not bein scaled
num_unsclaed_diff_list = list(set(num_cols)-set(unscaled_features))

In [17]:
# Assemble or contact the categorical Features and Numerical Features
assembler_inputs = [feature+'_class_vec' for feature in cat_cols]+num_unsclaed_diff_list

In [18]:
assembler_inputs

['job_class_vec',
 'marital_class_vec',
 'education_class_vec',
 'default_class_vec',
 'housing_class_vec',
 'loan_class_vec',
 'contact_class_vec',
 'poutcome_class_vec',
 'age']

In [19]:
assembler = VectorAssembler(inputCols=assembler_inputs,outputCol="assembled_inputs")
stages += [label_str_index,assembler]

In [20]:
assembler_final = VectorAssembler(inputCols =['scaled_features','assembled_inputs'],outputCol='features')

In [21]:
stages += [assembler_final]

In [22]:
pipeline=Pipeline(stages=stages)
pipeline_model =pipeline.fit(df)
df_transform=pipeline_model.transform(df)

In [23]:
df_transform.limit(5).toPandas()

Unnamed: 0,age,job,marital,education,default,housing,loan,contact,duration,campaign,...,loan_class_vec,contact_index,contact_class_vec,poutcome_index,poutcome_class_vec,unscaled_features,scaled_features,label_index,assembled_inputs,features
0,56,housemaid,married,basic.4y,no,no,no,telephone,261,1,...,"(1.0, 0.0)",1.0,(0.0),0.0,"(1.0, 0.0)","[261.0, 1.0, 999.0, 0.0]","[1.0066366713543702, 0.3610090653030643, 5.344...",0.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ...","(1.0066366713543702, 0.3610090653030643, 5.344..."
1,57,services,married,high.school,unknown,no,no,telephone,149,1,...,"(1.0, 0.0)",1.0,(0.0),0.0,"(1.0, 0.0)","[149.0, 1.0, 999.0, 0.0]","[0.5746699771333377, 0.3610090653030643, 5.344...",0.0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.5746699771333377, 0.3610090653030643, 5.344..."
2,37,services,married,high.school,no,yes,no,telephone,226,1,...,"(1.0, 0.0)",1.0,(0.0),0.0,"(1.0, 0.0)","[226.0, 1.0, 999.0, 0.0]","[0.8716470794102975, 0.3610090653030643, 5.344...",0.0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.8716470794102975, 0.3610090653030643, 5.344..."
3,40,admin.,married,basic.6y,no,no,no,telephone,151,1,...,"(1.0, 0.0)",1.0,(0.0),0.0,"(1.0, 0.0)","[151.0, 1.0, 999.0, 0.0]","[0.5823836681015704, 0.3610090653030643, 5.344...",0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.5823836681015704, 0.3610090653030643, 5.344..."
4,56,services,married,high.school,no,no,yes,telephone,307,1,...,"(0.0, 1.0)",1.0,(0.0),0.0,"(1.0, 0.0)","[307.0, 1.0, 999.0, 0.0]","[1.1840515636237228, 0.3610090653030643, 5.344...",0.0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.1840515636237228, 0.3610090653030643, 5.344..."


In [24]:
df_transform_fin=df_transform.select('features','label_index')
df_transform_fin=df_transform_fin.orderBy(rand())
train_data,test_data =df_transform_fin.randomSplit([.75,.25],seed=1234)

In [25]:
nb_classes =train_data.select("label_index").distinct().count()
input_dim =len(train_data.select('features').first()[0])

In [26]:
model =Sequential()
model.add(Dense(256,activity_regularizer=regularizers.l2(0.01),input_shape=(input_dim,),activation='relu'))
model.add(Dense(256,activity_regularizer=regularizers.l2(0.01),activation='relu'))
model.add(Dense(128,activation='relu'))
model.add(Dropout(0.05))
model.add(Dense(64,activation='relu'))
model.add(Dense(nb_classes,activation='sigmoid'))
model.compile(optimizer='adam',loss='binary_crossentropy',metrics=['accuracy'])

In [27]:
model.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense (Dense)                (None, 256)               9216      
_________________________________________________________________
dense_1 (Dense)              (None, 256)               65792     
_________________________________________________________________
dense_2 (Dense)              (None, 128)               32896     
_________________________________________________________________
dropout (Dropout)            (None, 128)               0         
_________________________________________________________________
dense_3 (Dense)              (None, 64)                8256      
_________________________________________________________________
dense_4 (Dense)              (None, 2)                 130       
Total params: 116,290
Trainable params: 116,290
Non-trainable params: 0
__________________________________________________

In [28]:
# set and serialize optimizer

In [29]:
optimizer_conf =optimizers.Adam(lr=0.01)
opt_conf=optimizers.serialize(optimizer_conf)

In [30]:
# Intialize SparkML estimator and get settings

In [31]:
estimator = ElephasEstimator()
estimator.setFeaturesCol('features')
estimator.setLabelCol('label_index')
estimator.set_categorical_labels(True)
estimator.set_nb_classes(nb_classes)
estimator.set_num_workers(1)
estimator.set_epochs(32)
estimator.set_batch_size(64)
estimator.set_verbosity(1)
estimator.set_validation_split(0.15)
estimator.set_optimizer_config(opt_conf)
estimator.set_mode("synchronous")
estimator.set_loss("binary_crossentropy")
estimator.set_metrics(['acc'])

ElephasEstimator_2f383bad39f3

In [32]:
dl_pipeline=Pipeline(stages=[estimator])

In [33]:
def dl_pipeline_fit_score_results(dl_pipeline=dl_pipeline,
                                 train_data=train_data,
                                 test_data=test_data,
                                 label='label_index'):
    fit_dl_pipeline=dl_pipeline.fit(train_data)
    pred_train=fit_dl_pipeline.transform(train_data)
    pred_test=fit_dl_pipeline.transfomr(test_data)
    
    pnl_train=pred_train.select(label,'prediction')
    pnl_test =pred_test.select(label,'prediction')
    
    pred_and_label_train = pnl_train_rdd.map(lambda row:(row[label],row['prediction']))
    pred_and_label_test = pnl_test_rdd.map(lambda row:(row[label],row['prediction']))
    
    metrics_train =MulticlassMetrics(pred_and_label_train)
    metrics_test =MulticlassMetrics(pred_and_label_test)
    
    print("Training data accuracy:{}".format(round(metrics_train.precision(),4)))
    print("Training data confusion matrix")
    display(pnl_train.crosstab('label_index','prediction').toPandas())
            
    print("\nTest data accuracy:{}".format(round(metrics_test.precision(),4)))
    print("Test data confusion matrix")
    display(pnl_test.crosstab('label_index','prediction').toPandas())


In [None]:
dl_pipeline_fit_score_results(dl_pipeline=dl_pipeline,
                             train_data=train_data,
                             test_data=test_data,
                             label='label_index')

In [None]:
conf = SparkConf().setAppName("UNet")
conf.setMaster(master_url)
sc = SparkContext(conf = conf)


In [None]:
# Create Spark model
spark_model = SparkModel(model, frequency='epoch', mode='asynchronous', 
num_workers=1)
spark_model.fit(rdd, epochs=EPOCHS, batch_size=BS, verbose=1, validation_split=0.1)

# Evaluate Spark model by evaluating the underlying model
score = spark_model.master_network.evaluate(testX, testY, verbose=1)

print('Test accuracy:', score[1])
print(spark_model.get_results())