In [None]:
# from pyspark import SparkContext, SparkConf
# conf = SparkConf().setAppName('Elephas_App').setMaster('local[8]')
# sc = SparkContext(conf=conf)
# sc

In [None]:
!pip uninstall tensorflow

In [29]:
import os
os.environ['TF_KERAS'] = '1'
import tensorflow as tf
import pyspark


2.3.1


In [2]:
# from pyspark.sql.session import SparkSession
# spark = SparkSession(sc)

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "15g") \
    .appName('Elephas_App') \
    .getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")


In [24]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import Dense, Conv2D, Flatten, Activation, MaxPooling2D

# adam = optimizers.Adam(lr=0.01)
opt = Adam(lr=0.01)

model = Sequential()
model.add(Conv2D(64, (3, 3), input_shape=(28,28,1)))
model.add(Activation('relu'))
model.add(MaxPooling2D(2, 2))
model.add(Flatten())
model.add(Dense(128))
model.add(Activation('relu'))
model.add(Dense(64))
model.add(Activation('relu'))
model.add(Dense(10))
model.add(Activation('softmax'))
model.compile(loss='categorical_crossentropy', optimizer=opt)


In [4]:
mnist = tf.keras.datasets.fashion_mnist


In [5]:
(training_images, training_labels), (test_images, test_labels) = mnist.load_data()

In [6]:
print(training_images.shape)
print(training_labels.shape)
print(test_images.shape)
print(test_labels.shape)

(60000, 28, 28)
(60000,)
(10000, 28, 28)
(10000,)


In [7]:
def preprocess_images(image_set):
    image_set = image_set.reshape(-1, 28, 28)
    image_set = image_set / 255.0
    return image_set

training_images = preprocess_images(training_images)
test_images = preprocess_images(test_images)

print(training_images.shape)
print(training_labels.shape)

(60000, 28, 28)
(60000,)


In [28]:
class CustomCallbacks(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs={}):
      if(logs.get('accuracy')>0.998):
        print("\n 99.8% accuracy reached")
        self.model.stop_training = True

# def preprocess_images(image_set):
#     image_set = image_set.reshape(-1, 28, 28, 1)
#     image_set = image_set / 255.0
#     return image_set

# training_images = preprocess_images(training_images)
# test_images = preprocess_images(test_images)

model = tf.keras.models.Sequential([
    tf.keras.layers.Conv2D(64, (3, 3), activation='relu', input_shape=(28,28,1)),
    tf.keras.layers.MaxPooling2D(2, 2),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(64, activation='relu'),
    tf.keras.layers.Dense(10, activation='softmax')
])

model.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

# model.fit(
#     training_images,
#     training_labels,
#     batch_size=64,
#     epochs=20,
#     callbacks=[CustomCallbacks()]
# )


In [8]:
import numpy as np

In [9]:
train_set = []
for img, label in zip(training_images,training_labels):
    train_set.append([img.tolist(), int(label)])

In [10]:
test_set = []
for img, label in zip(test_images,test_labels):
    test_set.append([img.tolist(), int(label)])

In [11]:
train_data = spark.createDataFrame(train_set)

In [12]:
test_data = spark.createDataFrame(test_set)

In [25]:
from elephas.ml_model import ElephasEstimator

# Set and Serialize Optimizer
optimizer_conf = opt
opt_conf = tf.keras.optimizers.serialize(opt)

# Initialize SparkML Estimator and Get Settings
estimator = ElephasEstimator(labelCol='label', featuresCol='feature')
# estimator.setFeaturesCol("features")
# estimator.setLabelCol("label_index")
estimator.set_keras_model_config(model.to_yaml())
estimator.set_categorical_labels(True)
estimator.set_nb_classes(10)
estimator.set_num_workers(1)
estimator.set_epochs(25) 
estimator.set_batch_size(64)
estimator.set_verbosity(1)
estimator.set_validation_split(0.10)
estimator.set_optimizer_config(opt_conf)
estimator.set_mode("synchronous")
estimator.set_loss("binary_crossentropy")
estimator.set_metrics(['acc'])

ElephasEstimator_b33eac231292

In [26]:
from pyspark.ml import Pipeline

# Create Deep Learning Pipeline
dl_pipeline = Pipeline(stages=[estimator])

In [15]:
train_data = train_data.withColumnRenamed("_1", 'feature')
train_data = train_data.withColumnRenamed("_2", 'label')
test_data = test_data.withColumnRenamed("_1", 'feature')
test_data = test_data.withColumnRenamed("_2", 'label')


In [16]:
train_data.show(2)

+--------------------+-----+
|             feature|label|
+--------------------+-----+
|[[0.0, 0.0, 0.0, ...|    9|
|[[0.0, 0.0, 0.0, ...|    0|
+--------------------+-----+
only showing top 2 rows



In [17]:
test_data.show(2)

+--------------------+-----+
|             feature|label|
+--------------------+-----+
|[[0.0, 0.0, 0.0, ...|    9|
|[[0.0, 0.0, 0.0, ...|    2|
+--------------------+-----+
only showing top 2 rows



In [27]:
def dl_pipeline_fit_score_results(train_data,
                                  test_data,
                                  dl_pipeline,
                                  label,
                                  ):
    
    fit_dl_pipeline = dl_pipeline.fit(train_data)
    pred_train = fit_dl_pipeline.transform(train_data)
    pred_test = fit_dl_pipeline.transform(test_data)
    
    pnl_train = pred_train.select(label, "prediction")
    pnl_test = pred_test.select(label, "prediction")
    
    pred_and_label_train = pnl_train.rdd.map(lambda row: (row[label], row['prediction']))
    pred_and_label_test = pnl_test.rdd.map(lambda row: (row[label], row['prediction']))
    
    metrics_train = MulticlassMetrics(pred_and_label_train)
    metrics_test = MulticlassMetrics(pred_and_label_test)
    
    print("Training Data Accuracy: {}".format(round(metrics_train.precision(),4)))
    print("Training Data Confusion Matrix")
    display(pnl_train.crosstab('label_index', 'prediction').toPandas())
    
    print("\nTest Data Accuracy: {}".format(round(metrics_test.precision(),4)))
    print("Test Data Confusion Matrix")
    display(pnl_test.crosstab('label_index', 'prediction').toPandas())

In [28]:
dl_pipeline_fit_score_results(dl_pipeline=dl_pipeline,
                              train_data=train_data,
                              test_data=test_data,
                              label='label');

>>> Fit model


ValueError: Could not interpret optimizer identifier: False

In [None]:
from elephas.utils.rdd_utils import to_simple_rdd
rdd = to_simple_rdd(sc, training_images, training_labels)


In [None]:
from elephas.spark_model import SparkModel

spark_model = SparkModel(model, frequency='epoch', mode='asynchronous')
spark_model.fit(rdd, epochs=20, batch_size=64, verbose=1, validation_split=0.1)
