In [1]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('Elephas_App').setMaster('local[*]')
sc = SparkContext(conf=conf)

In [2]:
from __future__ import print_function
print(sc)

<SparkContext master=local[*] appName=Elephas_App>


In [88]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]')\
.appName('deep-learning').getOrCreate()

In [5]:
from pyspark.sql import SQLContext
from pyspark.ml.linalg import Vectors
import numpy as np
import random

sql_context = SQLContext(sc)

def shuffle_csv(csv_file):
    lines = open(csv_file).readlines()
    random.shuffle(lines)
    open(csv_file, 'w').writelines(lines)

def load_data_frame(csv_file, shuffle=True, train=True):
    if shuffle:
        shuffle_csv(csv_file)
    data = sc.textFile(csv_file)
    # This is an RDD, which will later be transformed to a data frame
    data = data.filter(lambda x:x.split(',')[0] != 'id').map(lambda line: line.split(','))
    if train:
        data = data.map(
            lambda line: (Vectors.dense(np.asarray(line[1:-1]).astype(np.float32)),
                          str(line[-1])) )
    else:
        # Test data gets dummy labels. We need the same structure as in Train data
        data = data.map( lambda line: (Vectors.dense(np.asarray(line[1:]).astype(np.float32)),"Class_1") ) 
    return sql_context.createDataFrame(data, ['features', 'category'])

In [6]:
train_df = load_data_frame("data/S8_train.csv")
test_df = load_data_frame("data/S8_test.csv", shuffle=False, train=False) # No need to shuffle test data

print("Train data frame:")
train_df.show(10)

print("Test data frame (note the dummy category):")
test_df.show(10)

Train data frame:
+--------------------+--------+
|            features|category|
+--------------------+--------+
|[1.0,0.0,0.0,0.0,...| Class_2|
|[0.0,1.0,0.0,1.0,...| Class_8|
|[0.0,0.0,0.0,0.0,...| Class_2|
|[0.0,0.0,0.0,0.0,...| Class_5|
|[1.0,1.0,0.0,0.0,...| Class_6|
|[0.0,0.0,0.0,0.0,...| Class_2|
|[0.0,0.0,0.0,0.0,...| Class_6|
|[0.0,0.0,0.0,1.0,...| Class_2|
|[0.0,0.0,0.0,0.0,...| Class_2|
|[0.0,0.0,0.0,1.0,...| Class_6|
+--------------------+--------+
only showing top 10 rows

Test data frame (note the dummy category):
+--------------------+--------+
|            features|category|
+--------------------+--------+
|[0.0,0.0,0.0,0.0,...| Class_1|
|[2.0,2.0,14.0,16....| Class_1|
|[0.0,1.0,12.0,1.0...| Class_1|
|[0.0,0.0,0.0,1.0,...| Class_1|
|[1.0,0.0,0.0,1.0,...| Class_1|
|[0.0,0.0,0.0,0.0,...| Class_1|
|[0.0,0.0,0.0,0.0,...| Class_1|
|[2.0,0.0,0.0,0.0,...| Class_1|
|[0.0,0.0,0.0,0.0,...| Class_1|
|[0.0,0.0,0.0,0.0,...| Class_1|
+--------------------+--------+
only showing top 

In [7]:
from pyspark.ml.feature import StringIndexer

string_indexer = StringIndexer(inputCol="category", outputCol="label")
fitted_indexer = string_indexer.fit(train_df)
indexed_df = fitted_indexer.transform(train_df)

In [8]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
fitted_scaler = scaler.fit(indexed_df)
scaled_df = fitted_scaler.transform(indexed_df)
print("The result of indexing and scaling. Each transformation adds new columns to the data frame:")
scaled_df.show(10)

The result of indexing and scaling. Each transformation adds new columns to the data frame:
+--------------------+--------+-----+--------------------+
|            features|category|label|     scaled_features|
+--------------------+--------+-----+--------------------+
|[1.0,0.0,0.0,0.0,...| Class_2|  0.0|[0.40208999583479...|
|[0.0,1.0,0.0,1.0,...| Class_8|  2.0|[-0.2535060296260...|
|[0.0,0.0,0.0,0.0,...| Class_2|  0.0|[-0.2535060296260...|
|[0.0,0.0,0.0,0.0,...| Class_5|  6.0|[-0.2535060296260...|
|[1.0,1.0,0.0,0.0,...| Class_6|  1.0|[0.40208999583479...|
|[0.0,0.0,0.0,0.0,...| Class_2|  0.0|[-0.2535060296260...|
|[0.0,0.0,0.0,0.0,...| Class_6|  1.0|[-0.2535060296260...|
|[0.0,0.0,0.0,1.0,...| Class_2|  0.0|[-0.2535060296260...|
|[0.0,0.0,0.0,0.0,...| Class_2|  0.0|[-0.2535060296260...|
|[0.0,0.0,0.0,1.0,...| Class_6|  1.0|[-0.2535060296260...|
+--------------------+--------+-----+--------------------+
only showing top 10 rows



In [9]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Activation
from tensorflow.keras.utils import to_categorical

nb_classes = train_df.select("category").distinct().count()
input_dim = len(train_df.select("features").first()[0])

model = Sequential()
model.add(Dense(512, input_shape=(input_dim,)))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(512))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(512))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(nb_classes))
model.add(Activation('softmax'))

model.compile(loss='categorical_crossentropy', optimizer='adam')

In [10]:
from elephas.ml_model import ElephasEstimator
from tensorflow.keras import optimizers


adam = optimizers.Adam(lr=0.01)
opt_conf = optimizers.serialize(adam)

# Initialize SparkML Estimator and set all relevant properties
estimator = ElephasEstimator()
# The next two paramters come directly from pyspark
estimator.setFeaturesCol("scaled_features")             
estimator.setLabelCol("label")                
# Provide serialized Keras model to the estimator object
estimator.set_keras_model_config(model.to_yaml())
estimator.set_categorical_labels(True)
estimator.set_nb_classes(nb_classes)
estimator.set_num_workers(1)
estimator.set_epochs(5) 
estimator.set_batch_size(128)
estimator.set_verbosity(1)
estimator.set_validation_split(0.15)
estimator.set_optimizer_config(opt_conf)
estimator.set_mode("synchronous")
estimator.set_loss("categorical_crossentropy")
estimator.set_metrics(['acc'])

ElephasEstimator_48bd89065c64

In [11]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[string_indexer, scaler, estimator])
fitted_pipeline = pipeline.fit(train_df) 
prediction = fitted_pipeline.transform(train_df)
# prediction = fitted_pipeline.transform(test_df) # <-- The same code evaluates test data.
pnl = prediction.select("label", "prediction")


>>> Fit model
>>> Synchronous training complete.


In [14]:
pnl.show(5)

Py4JJavaError: An error occurred while calling o368.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 22.0 failed 1 times, most recent failure: Lost task 0.0 in stage 22.0 (TID 227) (192.168.13.6 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 594, in process
    out_iter = func(split_index, iterator)
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/rdd.py", line 2916, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/rdd.py", line 2916, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/rdd.py", line 418, in func
    return f(iterator)
  File "/home/philipharman/.local/lib/python3.8/site-packages/elephas/ml_model.py", line 226, in extract_features_and_predict
    return model.predict(np.array([from_vector(x[features_col]) for x in data]))
  File "/home/philipharman/.local/lib/python3.8/site-packages/elephas/ml_model.py", line 226, in <listcomp>
    return model.predict(np.array([from_vector(x[features_col]) for x in data]))
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 138, in load_stream
    yield self._read_with_length(stream)
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 160, in _read_with_length
    return self.loads(obj)
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 430, in loads
    return pickle.loads(obj, encoding=encoding)
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1416, in <lambda>
    return lambda *a: dataType.fromInternal(a)
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 651, in fromInternal
    values = [f.fromInternal(v) if c else v
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 651, in <listcomp>
    values = [f.fromInternal(v) if c else v
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 452, in fromInternal
    return self.dataType.fromInternal(obj)
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 709, in fromInternal
    return self.deserialize(v)
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/ml/linalg/__init__.py", line 163, in deserialize
    raise ValueError("do not recognize type %r" % tpe)
ValueError: do not recognize type None

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at org.apache.spark.rdd.RDD$$anon$3.hasNext(RDD.scala:951)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.rdd.RDD$$anon$3.foreach(RDD.scala:950)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:397)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1996)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:232)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2722)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2722)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2929)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 594, in process
    out_iter = func(split_index, iterator)
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/rdd.py", line 2916, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/rdd.py", line 2916, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/rdd.py", line 418, in func
    return f(iterator)
  File "/home/philipharman/.local/lib/python3.8/site-packages/elephas/ml_model.py", line 226, in extract_features_and_predict
    return model.predict(np.array([from_vector(x[features_col]) for x in data]))
  File "/home/philipharman/.local/lib/python3.8/site-packages/elephas/ml_model.py", line 226, in <listcomp>
    return model.predict(np.array([from_vector(x[features_col]) for x in data]))
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 138, in load_stream
    yield self._read_with_length(stream)
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 160, in _read_with_length
    return self.loads(obj)
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 430, in loads
    return pickle.loads(obj, encoding=encoding)
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1416, in <lambda>
    return lambda *a: dataType.fromInternal(a)
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 651, in fromInternal
    values = [f.fromInternal(v) if c else v
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 651, in <listcomp>
    values = [f.fromInternal(v) if c else v
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 452, in fromInternal
    return self.dataType.fromInternal(obj)
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/types.py", line 709, in fromInternal
    return self.deserialize(v)
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/ml/linalg/__init__.py", line 163, in deserialize
    raise ValueError("do not recognize type %r" % tpe)
ValueError: do not recognize type None

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at org.apache.spark.rdd.RDD$$anon$3.hasNext(RDD.scala:951)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.rdd.RDD$$anon$3.foreach(RDD.scala:950)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:397)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1996)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:232)


In [15]:
from sklearn.metrics import accuracy_score
import numpy as np

metrics_df= prediction.toPandas()
labels= metrics_df["label"].values
predictions= [np.argmax(val) for val in np.array(metrics_df["prediction"].values)]
accuracy_score(labels, predictions)

Py4JJavaError: An error occurred while calling o363.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (TID 228) (192.168.13.6 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 594, in process
    out_iter = func(split_index, iterator)
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/rdd.py", line 2916, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/rdd.py", line 2916, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/rdd.py", line 418, in func
    return f(iterator)
  File "/home/philipharman/.local/lib/python3.8/site-packages/elephas/ml_model.py", line 226, in extract_features_and_predict
    return model.predict(np.array([from_vector(x[features_col]) for x in data]))
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/keras/engine/training.py", line 1598, in predict
    data_handler = data_adapter.DataHandler(
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/keras/engine/data_adapter.py", line 1100, in __init__
    self._adapter = adapter_cls(
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/keras/engine/data_adapter.py", line 263, in __init__
    x, y, sample_weights = _process_tensorlike((x, y, sample_weights))
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/keras/engine/data_adapter.py", line 1016, in _process_tensorlike
    inputs = nest.map_structure(_convert_numpy_and_scipy, inputs)
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/util/nest.py", line 659, in map_structure
    structure[0], [func(*x) for x in entries],
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/util/nest.py", line 659, in <listcomp>
    structure[0], [func(*x) for x in entries],
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/keras/engine/data_adapter.py", line 1011, in _convert_numpy_and_scipy
    return ops.convert_to_tensor_v2_with_dispatch(x, dtype=dtype)
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/util/dispatch.py", line 201, in wrapper
    return target(*args, **kwargs)
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/framework/ops.py", line 1404, in convert_to_tensor_v2_with_dispatch
    return convert_to_tensor_v2(
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/framework/ops.py", line 1410, in convert_to_tensor_v2
    return convert_to_tensor(
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/profiler/trace.py", line 163, in wrapped
    return func(*args, **kwargs)
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/framework/ops.py", line 1540, in convert_to_tensor
    ret = conversion_func(value, dtype=dtype, name=name, as_ref=as_ref)
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/framework/tensor_conversion_registry.py", line 52, in _default_conversion_function
    return constant_op.constant(value, dtype, name=name)
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/framework/constant_op.py", line 264, in constant
    return _constant_impl(value, dtype, shape, name, verify_shape=False,
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/framework/constant_op.py", line 276, in _constant_impl
    return _constant_eager_impl(ctx, value, dtype, shape, verify_shape)
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/framework/constant_op.py", line 301, in _constant_eager_impl
    t = convert_to_eager_tensor(value, ctx, dtype)
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/framework/constant_op.py", line 98, in convert_to_eager_tensor
    return ops.EagerTensor(value, ctx.device_name, dtype)
ValueError: Failed to convert a NumPy array to a Tensor (Unsupported object type numpy.ndarray).

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at org.apache.spark.rdd.RDD$$anon$3.hasNext(RDD.scala:951)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.rdd.RDD$$anon$3.foreach(RDD.scala:950)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:397)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1996)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:232)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2267)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:390)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3519)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3516)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 594, in process
    out_iter = func(split_index, iterator)
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/rdd.py", line 2916, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/rdd.py", line 2916, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/philipharman/.local/lib/python3.8/site-packages/pyspark/rdd.py", line 418, in func
    return f(iterator)
  File "/home/philipharman/.local/lib/python3.8/site-packages/elephas/ml_model.py", line 226, in extract_features_and_predict
    return model.predict(np.array([from_vector(x[features_col]) for x in data]))
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/keras/engine/training.py", line 1598, in predict
    data_handler = data_adapter.DataHandler(
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/keras/engine/data_adapter.py", line 1100, in __init__
    self._adapter = adapter_cls(
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/keras/engine/data_adapter.py", line 263, in __init__
    x, y, sample_weights = _process_tensorlike((x, y, sample_weights))
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/keras/engine/data_adapter.py", line 1016, in _process_tensorlike
    inputs = nest.map_structure(_convert_numpy_and_scipy, inputs)
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/util/nest.py", line 659, in map_structure
    structure[0], [func(*x) for x in entries],
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/util/nest.py", line 659, in <listcomp>
    structure[0], [func(*x) for x in entries],
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/keras/engine/data_adapter.py", line 1011, in _convert_numpy_and_scipy
    return ops.convert_to_tensor_v2_with_dispatch(x, dtype=dtype)
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/util/dispatch.py", line 201, in wrapper
    return target(*args, **kwargs)
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/framework/ops.py", line 1404, in convert_to_tensor_v2_with_dispatch
    return convert_to_tensor_v2(
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/framework/ops.py", line 1410, in convert_to_tensor_v2
    return convert_to_tensor(
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/profiler/trace.py", line 163, in wrapped
    return func(*args, **kwargs)
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/framework/ops.py", line 1540, in convert_to_tensor
    ret = conversion_func(value, dtype=dtype, name=name, as_ref=as_ref)
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/framework/tensor_conversion_registry.py", line 52, in _default_conversion_function
    return constant_op.constant(value, dtype, name=name)
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/framework/constant_op.py", line 264, in constant
    return _constant_impl(value, dtype, shape, name, verify_shape=False,
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/framework/constant_op.py", line 276, in _constant_impl
    return _constant_eager_impl(ctx, value, dtype, shape, verify_shape)
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/framework/constant_op.py", line 301, in _constant_eager_impl
    t = convert_to_eager_tensor(value, ctx, dtype)
  File "/home/philipharman/.local/lib/python3.8/site-packages/tensorflow/python/framework/constant_op.py", line 98, in convert_to_eager_tensor
    return ops.EagerTensor(value, ctx.device_name, dtype)
ValueError: Failed to convert a NumPy array to a Tensor (Unsupported object type numpy.ndarray).

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at org.apache.spark.rdd.RDD$$anon$3.hasNext(RDD.scala:951)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.rdd.RDD$$anon$3.foreach(RDD.scala:950)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:397)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1996)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:232)
