In [3]:
import numpy as np
import pandas as pd

In [4]:
import subprocess
import json
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from pyspark.sql import SparkSession

import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf

In [65]:
spark = SparkSession\
        .builder\
        .appName("PySpark ML")\
        .master("local[2]")\
        .getOrCreate()

In [66]:
df = spark\
  .read\
  .option("header", "true")\
  .csv("bank.csv")

In [67]:
numeric_features = [t[0] for t in df.dtypes if t[1] == 'int']
df.select(numeric_features).describe().toPandas().transpose()
numeric_data = df.select(numeric_features).toPandas()
 
unused_col = ['day','month','age','balance','duration','campaign','pdays','previous']
df = df.select([col for col in df.columns if col not in unused_col])
cols = df.columns

In [68]:
df.printSchema()

root
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



In [69]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
 
# filter categorical columns
categoricalColumns = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'poutcome']
stages = []
 
# 运用StringIndexer 和 OneHotEncoderEstimator处理categorical columns 
# 声明stages的顺序和内容
 
for col in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = col, outputCol = col + 'Index')
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[col + "classVec"])    
    stages += [stringIndexer, encoder]
 
# 定义 label columns
label_stringIdx = StringIndexer(inputCol = 'deposit', outputCol = 'label')
stages += [label_stringIdx]
 
# 将numeric columns也加入，output 总的features。 process numeric columns
# numericCols = ['age', 'balance', 'duration', 'campaign', 'pdays', 'previous']
numericCols=[]
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [70]:
stages

[StringIndexer_d3e6c8401155,
 OneHotEncoder_26610ca6c046,
 StringIndexer_ce70cb97c45e,
 OneHotEncoder_d10ef3b4c6be,
 StringIndexer_50c557a815b1,
 OneHotEncoder_01aa02c0aaaa,
 StringIndexer_0b3d89408c28,
 OneHotEncoder_8dc660fd55f7,
 StringIndexer_1022e5656c98,
 OneHotEncoder_16a96fd99598,
 StringIndexer_3777fc93659c,
 OneHotEncoder_386a4ad1b50c,
 StringIndexer_020cacbd0b4f,
 OneHotEncoder_cd1eee9293a9,
 StringIndexer_0b14e69778a0,
 OneHotEncoder_4c97bd4a1eb0,
 StringIndexer_9410b17b0aed,
 VectorAssembler_7edf6da721c4]

In [71]:
assemblerInputs

['jobclassVec',
 'maritalclassVec',
 'educationclassVec',
 'defaultclassVec',
 'housingclassVec',
 'loanclassVec',
 'contactclassVec',
 'poutcomeclassVec']

In [72]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)

In [73]:
df.limit(5).toPandas()

Unnamed: 0,job,marital,education,default,housing,loan,contact,poutcome,deposit,jobIndex,...,housingIndex,housingclassVec,loanIndex,loanclassVec,contactIndex,contactclassVec,poutcomeIndex,poutcomeclassVec,label,features
0,admin.,married,secondary,no,yes,no,unknown,unknown,yes,3.0,...,1.0,(0.0),0.0,(1.0),1.0,"(0.0, 1.0)",0.0,"(1.0, 0.0, 0.0)",1.0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
1,admin.,married,secondary,no,no,no,unknown,unknown,yes,3.0,...,0.0,(1.0),0.0,(1.0),1.0,"(0.0, 1.0)",0.0,"(1.0, 0.0, 0.0)",1.0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2,technician,married,secondary,no,yes,no,unknown,unknown,yes,2.0,...,1.0,(0.0),0.0,(1.0),1.0,"(0.0, 1.0)",0.0,"(1.0, 0.0, 0.0)",1.0,"(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
3,services,married,secondary,no,yes,no,unknown,unknown,yes,4.0,...,1.0,(0.0),0.0,(1.0),1.0,"(0.0, 1.0)",0.0,"(1.0, 0.0, 0.0)",1.0,"(0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, ..."
4,admin.,married,tertiary,no,no,no,unknown,unknown,yes,3.0,...,0.0,(1.0),0.0,(1.0),1.0,"(0.0, 1.0)",0.0,"(1.0, 0.0, 0.0)",1.0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."


In [74]:
# # select columns
# SelCol =  ['label', 'features'] + cols
# df = df.select(SelCol)
# df.printSchema()

In [75]:
df_fin = df.select('features','label')
df_fin.limit(5).toPandas()

Unnamed: 0,features,label
0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",1.0
1,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",1.0
2,"(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",1.0
3,"(0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, ...",1.0
4,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",1.0


In [76]:
df.features

Column<'features'>

In [77]:
#split train & test data
train, test = df.randomSplit([0.8, 0.2], seed = 666)

In [78]:
df_fin.show(truncate=False)

+---------------------------------------------------------------+-----+
|features                                                       |label|
+---------------------------------------------------------------+-----+
|(24,[3,11,13,16,18,20,21],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])       |1.0  |
|(24,[3,11,13,16,17,18,20,21],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|1.0  |
|(24,[2,11,13,16,18,20,21],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])       |1.0  |
|(24,[4,11,13,16,18,20,21],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])       |1.0  |
|(24,[3,11,14,16,17,18,20,21],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|1.0  |
|(24,[0,12,14,16,20,21],[1.0,1.0,1.0,1.0,1.0,1.0])              |1.0  |
|(24,[0,11,14,16,20,21],[1.0,1.0,1.0,1.0,1.0,1.0])              |1.0  |
|(24,[5,13,16,18,20,21],[1.0,1.0,1.0,1.0,1.0,1.0])              |1.0  |
|(24,[2,11,13,16,18,20,21],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])       |1.0  |
|(24,[4,12,13,16,18,20,21],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])       |1.0  |
|(24,[3,12,13,16,18,20,21],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])       

In [79]:
nb_classes = train.select('label').distinct().count()
input_dim = len(train.select('features').first()[0])

In [80]:
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.models import load_model
from tensorflow.keras.layers import Dense,Dropout,Activation
from tensorflow.keras import losses,optimizers,regularizers
from tensorflow.keras.optimizers import Adam
import tensorflow as tf
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split


In [81]:
model = Sequential()
model.add(Dense(256,input_shape=(input_dim,),activity_regularizer=regularizers.l2(0.01)))
model.add(Activation('relu'))
model.add(Dropout(rate=0.3))
model.add(Dense(256,activity_regularizer=regularizers.l2(0.01)))
model.add(Activation('relu'))
model.add(Dropout(rate=0.3))
model.add(Dense(nb_classes))
model.add(Activation('sigmoid'))
model.compile(loss='binart_crossentropy',optimizer='adam')

In [82]:
model.summary()

Model: "sequential_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_6 (Dense)             (None, 256)               6400      
                                                                 
 activation_6 (Activation)   (None, 256)               0         
                                                                 
 dropout_4 (Dropout)         (None, 256)               0         
                                                                 
 dense_7 (Dense)             (None, 256)               65792     
                                                                 
 activation_7 (Activation)   (None, 256)               0         
                                                                 
 dropout_5 (Dropout)         (None, 256)               0         
                                                                 
 dense_8 (Dense)             (None, 2)                

In [83]:
from elephas.ml_model import ElephasEstimator

In [84]:
# Set and Serialize Optimizer
optimizer_conf = optimizers.Adam(lr=0.01)
opt_conf = optimizers.serialize(optimizer_conf)
#Initialize SparkML Estimator and Get Settings 
# estimator = ElephasEstimator()
estimator = ElephasEstimator(featuresCol='features', labelCol='label')
# estimator.setFeaturesCol("features")
# estimator.setLabelCo1("label")
estimator.set_keras_model_config(model.to_json())
estimator.set_categorical_labels(True)
estimator.set_nb_classes(nb_classes)
estimator.set_num_workers(1)
estimator.set_epochs(25)
estimator.set_batch_size(64)
estimator.set_verbosity(1)
estimator.set_validation_split(0.10)
estimator.set_optimizer_config(opt_conf)
estimator.set_mode("synchronous")
estimator.set_loss("binary_crossentropy")
estimator.set_metrics(['acc'])

  super(Adam, self).__init__(name, **kwargs)


ElephasEstimator_5694c71e15ea

In [85]:
dl_pipeline=Pipeline(stages=[estimator])

In [86]:
def dl_pipeline_fit_score_results(dl_pipeline=dl_pipeline,
                                  train_data=train,
                                  test_data=test,
                                  label='label'):
    fit_dl_pipeline = dl_pipeline.fit(train_data)
    pred_train = fit_dl_pipeline.transform(train_data)
    pred_test = fit_dl_pipeline.transform(test_data)
    
    pnl_train = pred_train.select(label, "prediction")
    pnl_test = pred_test.select(label,"prediction")
                                  
    pred_and_abel_train= pnl_train.rdd.map(lambda row: (row[label], row['prediction'])) 
    pred_and_label_test=pnl_test.rdd.map(lambda row:(row[label], row['prediction'])) 

    metrics_train =MulticlassMetrics(pred_and_label_train)    
    metrics_test =MulticlassMetrics(pred_and_label_test)
    
    print("rraining Data Accuracy: ()".format(round(metrics_train.precision(), 4)))
    print("Training Data Confusion Matrix")
    display(pnl_train.crosstab('label_index', 'prediction').topandas())
    
    print("\nTest Data Accuracy：()".format(round(metrics_test.precision(), 4)))
    print("Test Data Confusion Matrix")
    display(pnl_test.crosstab('label_index', 'prediction').toPandas())


In [87]:
import sys
print(sys.version)

3.9.7 (tags/v3.9.7:1016ef3, Aug 30 2021, 20:19:38) [MSC v.1929 64 bit (AMD64)]


In [88]:
dl_pipeline_fit_score_results(dl_pipeline=dl_pipeline,
                              train_data=train,
                              test_data=test,
                             label='label')

>>> Fit model


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 146.0 failed 1 times, most recent failure: Lost task 0.0 in stage 146.0 (TID 96) (172.28.52.80 executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:188)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:108)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:121)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:162)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:135)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
	at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:199)
	at java.net.ServerSocket.implAccept(ServerSocket.java:560)
	at java.net.ServerSocket.accept(ServerSocket.java:528)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:175)
	... 19 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2352)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2351)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2351)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1109)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2591)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2533)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:898)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:188)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:108)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:121)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:162)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:135)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
	at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:199)
	at java.net.ServerSocket.implAccept(ServerSocket.java:560)
	at java.net.ServerSocket.accept(ServerSocket.java:528)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:175)
	... 19 more
