In [1]:
sc

In [2]:
%matplotlib inline

import numpy as np
import pandas as pd

from os import listdir
from os.path import join, basename
import struct
import json
# from scipy import misc
import datetime as dt

from bigdl.nn.layer import *
from optparse import OptionParser
from bigdl.nn.criterion import *
from bigdl.optim.optimizer import *
from bigdl.util.common import *
from bigdl.dataset.transformer import *
from bigdl.nn.initialization_method import *
# from transformer import *
# from imagenet import *
# from transformer import Resize

In [3]:
# helper func to read the files from disk
def read_local_path(folder, has_label=True):
    dirs = listdir(folder)
    image_paths = []
    if has_label:
        dirs.sort()
        for d in dirs:
            for f in listdir(join(folder, d)):
                image_paths.append((join(join(folder, d), f), dirs.index(d) + 1))
    else:
        for f in dirs:
            image_paths.append((join(folder, f), -1))
    return image_paths

In [4]:
# helper func to read the files from disk
def read_local(sc, folder, normalize=255.0, has_label=True):
    """
    Read images from local directory
    :param sc: spark context
    :param folder: local directory
    :param normalize: normalization value
    :param has_label: whether the image folder contains label
    :return: RDD of sample
    """
    # read directory, create image paths list
    image_paths = read_local_path(folder, has_label)
    # print "BEFORE PARALLELIZATION: ", image_paths
    # create rdd
    image_paths_rdd = sc.parallelize(image_paths)
    # print image_paths_rdd
    feature_label_rdd = image_paths_rdd.map(lambda path_label: (misc.imread(path_label[0]), np.array(path_label[1]))) \
        .map(lambda img_label:
             (Resize(256, 256)(img_label[0]), img_label[1])) \
        .map(lambda feature_label:
             (((feature_label[0] & 0xff) / normalize).astype("float32"), feature_label[1]))
    # print "feature_label_rdd", feature_label_rdd
    return feature_label_rdd

The following function takes an input, if the input is a list, it insert into index 0 spot, such that the real data starts from index 1. Returns back a dictionary with key being the index and value the list of numbers.

In [5]:
def scala_T(input_T):
    if type(input_T) is list:
        # insert 0 into first index spot, such that the real data starts from index 1
        temp = [0]
        temp.extend(input_T)
        return dict(enumerate(temp))
    # if dictionary, return it back
    return input_T

In [6]:
def Inception_Layer_v1(input_size, config, name_prefix=""):
    """
    Builds the inception-v1 submodule, a local network, that is stacked in the entire architecture when building
    the full model.  
    
    :param input_size: dimensions of input coming into the local network
    :param config: ?
    :param name_prefix: string naming the layers of the particular local network
    :return: concat container object with all of the Sequential layers' ouput concatenated depthwise
    """        
    
    '''
    Concat is a container who concatenates the output of it's submodules along the provided dimension: all submodules 
    take the same inputs, and their output is concatenated.
    '''
    concat = Concat(2)
    
    """
    In the above code, we first create a container Sequential. Then add the layers into the container one by one. The 
    order of the layers in the model is same with the insertion order. 
    
    """
    conv1 = Sequential()
    
    #Adding layes to the conv1 model we jus created
    
    #SpatialConvolution is a module that applies a 2D convolution over an input image.
    conv1.add(SpatialConvolution(input_size, config[1][1], 1, 1, 1, 1).set_name(name_prefix + "1x1"))
    conv1.add(ReLU(True).set_name(name_prefix + "relu_1x1"))
    concat.add(conv1)
    
    conv3 = Sequential()
    conv3.add(SpatialConvolution(input_size, config[2][1], 1, 1, 1, 1).set_name(name_prefix + "3x3_reduce"))
    conv3.add(ReLU(True).set_name(name_prefix + "relu_3x3_reduce"))
    conv3.add(SpatialConvolution(config[2][1], config[2][2], 3, 3, 1, 1, 1, 1).set_name(name_prefix + "3x3"))
    conv3.add(ReLU(True).set_name(name_prefix + "relu_3x3"))
    concat.add(conv3)
    
    
    conv5 = Sequential()
    conv5.add(SpatialConvolution(input_size,config[3][1], 1, 1, 1, 1).set_name(name_prefix + "5x5_reduce"))
    conv5.add(ReLU(True).set_name(name_prefix + "relu_5x5_reduce"))
    conv5.add(SpatialConvolution(config[3][1], config[3][2], 5, 5, 1, 1, 2, 2).set_name(name_prefix + "5x5"))
    conv5.add(ReLU(True).set_name(name_prefix + "relu_5x5"))
    concat.add(conv5)
    
    
    pool = Sequential()
    pool.add(SpatialMaxPooling(3, 3, 1, 1, 1, 1, to_ceil=True).set_name(name_prefix + "pool"))
    pool.add(SpatialConvolution(input_size, config[4][1], 1, 1, 1, 1).set_name(name_prefix + "pool_proj"))
    pool.add(ReLU(True).set_name(name_prefix + "relu_pool_proj"))
    concat.add(pool).set_name(name_prefix + "output")
    return concat

In [7]:
def Inception_v1_NoAuxClassifier(class_num):
    model = Sequential()
    model.add(SpatialConvolution(3, 64, 7, 7, 2, 2, 3, 3, 1, False).set_name("conv1/7x7_s2"))
    model.add(ReLU(True).set_name("conv1/relu_7x7"))
    model.add(SpatialMaxPooling(3, 3, 2, 2, to_ceil=True).set_name("pool1/3x3_s2"))
    model.add(SpatialCrossMapLRN(5, 0.0001, 0.75).set_name("pool1/norm1"))
    model.add(SpatialConvolution(64, 64, 1, 1, 1, 1).set_name("conv2/3x3_reduce"))
    model.add(ReLU(True).set_name("conv2/relu_3x3_reduce"))
    model.add(SpatialConvolution(64, 192, 3, 3, 1, 1, 1, 1).set_name("conv2/3x3"))
    model.add(ReLU(True).set_name("conv2/relu_3x3"))
    model.add(SpatialCrossMapLRN(5, 0.0001, 0.75).set_name("conv2/norm2"))
    model.add(SpatialMaxPooling(3, 3, 2, 2, to_ceil=True).set_name("pool2/3x3_s2"))
    model.add(Inception_Layer_v1(192, scala_T([scala_T([64]), scala_T(
         [96, 128]), scala_T([16, 32]), scala_T([32])]), "inception_3a/"))
    model.add(Inception_Layer_v1(256, scala_T([scala_T([128]), scala_T(
         [128, 192]), scala_T([32, 96]), scala_T([64])]), "inception_3b/"))
    model.add(SpatialMaxPooling(3, 3, 2, 2, to_ceil=True))
    model.add(Inception_Layer_v1(480, scala_T([scala_T([192]), scala_T(
         [96, 208]), scala_T([16, 48]), scala_T([64])]), "inception_4a/"))
    model.add(Inception_Layer_v1(512, scala_T([scala_T([160]), scala_T(
         [112, 224]), scala_T([24, 64]), scala_T([64])]), "inception_4b/"))
    model.add(Inception_Layer_v1(512, scala_T([scala_T([128]), scala_T(
         [128, 256]), scala_T([24, 64]), scala_T([64])]), "inception_4c/"))
    model.add(Inception_Layer_v1(512, scala_T([scala_T([112]), scala_T(
         [144, 288]), scala_T([32, 64]), scala_T([64])]), "inception_4d/"))
    model.add(Inception_Layer_v1(528, scala_T([scala_T([256]), scala_T(
         [160, 320]), scala_T([32, 128]), scala_T([128])]), "inception_4e/"))
    model.add(SpatialMaxPooling(3, 3, 2, 2, to_ceil=True))
    model.add(Inception_Layer_v1(832, scala_T([scala_T([256]), scala_T(
         [160, 320]), scala_T([32, 128]), scala_T([128])]), "inception_5a/"))
    model.add(Inception_Layer_v1(832, scala_T([scala_T([384]), scala_T(
         [192, 384]), scala_T([48, 128]), scala_T([128])]), "inception_5b/"))
    model.add(SpatialAveragePooling(7, 7, 1, 1).set_name("pool5/7x7_s1"))
    model.add(Dropout(0.4).set_name("pool5/drop_7x7_s1"))
    model.add(View([1024], num_input_dims=3))
    model.add(Linear(1024, class_num).set_name("loss3/classifier_flowers"))
    model.add(LogSoftMax().set_name("loss3/loss3"))
    model.reset()
    return model

In [8]:
def Inception_v1(class_num):
    """
    Builds the entire network using Inception architecture  
    
    :param class_num: number of categories of classification
    :return: entire model architecture 
    """
    #contains first 3 inception modules
    feature1 = Sequential()
    
    feature1.add(SpatialConvolution(3, 64, 7, 7, 2, 2, 3, 3, 1, False).set_name("conv1/7x7_s2"))
    feature1.add(ReLU(True).set_name("conv1/relu_7x7"))
    feature1.add(SpatialMaxPooling(3, 3, 2, 2, to_ceil=True).set_name("pool1/3x3_s2"))
    feature1.add(SpatialCrossMapLRN(5, 0.0001, 0.75).set_name("pool1/norm1"))
    feature1.add(SpatialConvolution(64, 64, 1, 1, 1, 1).set_name("conv2/3x3_reduce"))
    feature1.add(ReLU(True).set_name("conv2/relu_3x3_reduce"))
    feature1.add(SpatialConvolution(64, 192, 3, 3, 1, 1, 1, 1).set_name("conv2/3x3"))
    feature1.add(ReLU(True).set_name("conv2/relu_3x3"))
    feature1.add(SpatialCrossMapLRN(5, 0.0001, 0.75).set_name("conv2/norm2"))
    feature1.add(SpatialMaxPooling(3, 3, 2, 2, to_ceil=True).set_name("pool2/3x3_s2"))
    feature1.add(Inception_Layer_v1(192,scala_T([scala_T([64]), scala_T([96, 128]),scala_T([16, 32]), scala_T([32])]),
                                    "inception_3a/"))
    feature1.add(Inception_Layer_v1(256, scala_T([scala_T([128]), scala_T([128, 192]), scala_T([32, 96]), scala_T([64])]),
                                    "inception_3b/"))
    feature1.add(SpatialMaxPooling(3, 3, 2, 2, to_ceil=True).set_name("pool3/3x3_s2"))
    feature1.add(Inception_Layer_v1(480, scala_T([scala_T([192]), scala_T([96, 208]), scala_T([16, 48]), scala_T([64])]),
                                    "inception_4a/"))
    # 1st classification ouput after 3 inception subnetworks
    output1 = Sequential()
    output1.add(SpatialAveragePooling(5, 5, 3, 3, ceil_mode=True).set_name("loss1/ave_pool"))
    output1.add(SpatialConvolution(512, 128, 1, 1, 1, 1).set_name("loss1/conv"))
    output1.add(ReLU(True).set_name("loss1/relu_conv"))
    output1.add(View([128 * 4 * 4], num_input_dims = 3))
    output1.add(Linear(128 * 4 * 4, 1024).set_name("loss1/fc"))
    output1.add(ReLU(True).set_name("loss1/relu_fc"))
    output1.add(Dropout(0.7).set_name("loss1/drop_fc"))
    output1.add(Linear(1024, class_num).set_name("loss1/classifier_5classes"))
    output1.add(LogSoftMax().set_name("loss1/loss"))

    # contains next 3 inception submodules
    feature2 = Sequential()
    feature2.add(Inception_Layer_v1(512, scala_T([scala_T([160]), scala_T([112, 224]),scala_T([24, 64]), scala_T([64])]),
                                    "inception_4b/"))
    feature2.add(Inception_Layer_v1(512, scala_T([scala_T([128]), scala_T([128, 256]),scala_T([24, 64]), scala_T([64])]),
                                    "inception_4c/"))
    feature2.add(Inception_Layer_v1(512, scala_T([scala_T([112]), scala_T([144, 288]), scala_T([32, 64]), scala_T([64])]),
                                    "inception_4d/"))
    # 2nd classification output after 3 more inception subnetworks
    output2 = Sequential()
    output2.add(SpatialAveragePooling(5, 5, 3, 3).set_name("loss2/ave_pool"))
    output2.add(SpatialConvolution(528, 128, 1, 1, 1, 1).set_name("loss2/conv"))
    output2.add(ReLU(True).set_name("loss2/relu_conv"))
    output2.add(View([128 * 4 * 4], num_input_dims=3))
    output2.add(Linear(128 * 4 * 4, 1024).set_name("loss2/fc"))
    output2.add(ReLU(True).set_name("loss2/relu_fc"))
    output2.add(Dropout(0.7).set_name("loss2/drop_fc"))
    output2.add(Linear(1024, class_num).set_name("loss2/classifier_5classes"))
    output2.add(LogSoftMax().set_name("loss2/loss"))

    # final 3 inception submodules followed by linear/softmax
    output3 = Sequential()
    output3.add(Inception_Layer_v1(528, scala_T([scala_T([256]), scala_T([160, 320]), scala_T([32, 128]), scala_T([128])]),
                                   "inception_4e/"))
    output3.add(SpatialMaxPooling(3, 3, 2, 2, to_ceil=True).set_name("pool4/3x3_s2"))
    output3.add(Inception_Layer_v1(832, scala_T([scala_T([256]), scala_T([160, 320]), scala_T([32, 128]), scala_T([128])]),
                                   "inception_5a/"))
    output3.add(Inception_Layer_v1(832,scala_T([scala_T([384]), scala_T([192, 384]),scala_T([48, 128]), scala_T([128])]),
                                   "inception_5b/"))
    output3.add(SpatialAveragePooling(7, 7, 1, 1).set_name("pool5/7x7_s1"))
    output3.add(Dropout(0.4).set_name("pool5/drop_7x7_s1"))
    output3.add(View([1024], num_input_dims=3))
    output3.add(Linear(1024, class_num).set_name("loss3/classifier_5classes"))
    output3.add(LogSoftMax().set_name("loss3/loss3"))

    # Attach the separate Sequential layers to create the whole model
    split2 = Concat(2).set_name("split2")
    split2.add(output3)
    split2.add(output2)

    #create a branch starting from feature2 upwards
    mainBranch = Sequential()
    mainBranch.add(feature2)
    mainBranch.add(split2)

    #concatenate the mainBranch with output1
    split1 = Concat(2).set_name("split1")
    split1.add(mainBranch)
    split1.add(output1)

    #Attach feature1 to the rest of the model
    model = Sequential()

    model.add(feature1)
    model.add(split1)

    model.reset()
    return model

In [9]:
def get_inception_data(folder, file_type="image", data_type="train", normalize=255.0):
    """
    Builds the entire network using Inception architecture  
    
    :param class_num: number of categories of classification
    :return: entire model architecture 
    """
    #Getting the path of our data
    path = os.path.join(folder, data_type)
    if "seq" == file_type:
        #return imagenet.read_seq_file(sc, path, normalize) #-- incase if we are trying to read the orig imagenet data
        return read_seq_file(sc, path, normalize)
    elif "image" == file_type:
        #return imagenet.read_local(sc, path, normalize)
        return read_local(sc, path, normalize)

## Constants

In [10]:
IMAGE_PATH = '../a0409a00-8-dataset_dp/train_img'

## Load Labels

In [11]:
def load_image_classes(csv_file_name):
    image_to_class = {}
    with open(csv_file_name, 'rt') as f:
        line = f.readline() # Skip header
        for line in f:
            line = line.strip('\n')
            name, cls = tuple(line.split(','))
            image_to_class.setdefault(name, cls)
    return image_to_class

train_image_to_class = load_image_classes(IMAGE_PATH + '/../train.csv')
            
vals = set(train_image_to_class.values())
classes = {}
for v in vals:
    classes.setdefault(v, float(len(classes) + 1))
    
classes

{'beans': 22.0,
 'cake': 24.0,
 'candy': 21.0,
 'cereal': 16.0,
 'chips': 18.0,
 'chocolate': 14.0,
 'coffee': 10.0,
 'corn': 13.0,
 'fish': 2.0,
 'flour': 12.0,
 'honey': 3.0,
 'jam': 8.0,
 'juice': 4.0,
 'milk': 5.0,
 'nuts': 6.0,
 'oil': 11.0,
 'pasta': 17.0,
 'rice': 9.0,
 'soda': 23.0,
 'spices': 25.0,
 'sugar': 7.0,
 'tea': 1.0,
 'tomatosauce': 19.0,
 'vinegar': 20.0,
 'water': 15.0}

# Create Model

In [12]:
init_engine()

In [13]:
classNum = len(classes)

In [14]:
inception_model = Inception_v1(classNum)

creating: createSequential
creating: createSpatialConvolution
creating: createReLU
creating: createSpatialMaxPooling
creating: createSpatialCrossMapLRN
creating: createSpatialConvolution
creating: createReLU
creating: createSpatialConvolution
creating: createReLU
creating: createSpatialCrossMapLRN
creating: createSpatialMaxPooling
creating: createConcat
creating: createSequential
creating: createSpatialConvolution
creating: createReLU
creating: createSequential
creating: createSpatialConvolution
creating: createReLU
creating: createSpatialConvolution
creating: createReLU
creating: createSequential
creating: createSpatialConvolution
creating: createReLU
creating: createSpatialConvolution
creating: createReLU
creating: createSequential
creating: createSpatialMaxPooling
creating: createSpatialConvolution
creating: createReLU
creating: createConcat
creating: createSequential
creating: createSpatialConvolution
creating: createReLU
creating: createSequential
creating: createSpatialConvolutio

## Load Images

In [43]:
from zoo.pipeline.nnframes.nn_image_reader import NNImageReader
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType, StringType
from sklearn.model_selection import train_test_split
from bigdl.transform.vision.image import *

In [17]:
data_path = '../a0409a00-8-dataset_dp'
image_dir = '../a0409a00-8-dataset_dp/train_img/'
image_path = '../a0409a00-8-dataset_dp/train_img/*'
# imageDF = NNImageReader.readImages(image_path, sc).repartition(12).cache()
# print ("partiion number: ", imageDF.rdd.getNumPartitions())
# print ("image number: ", imageDF.count())

In [18]:
# getName = udf(lambda row: row[0].split('/')[-1])
# getLabel = udf(lambda name: classes[train_image_to_class[name.split('.')[0]]], DoubleType())
# labelDF = imageDF \
#     .withColumn("label", getLabel(getName(col('image'))))

In [19]:
# labelDF.show()

In [51]:
transform_input = Pipeline([CenterCrop(224, 224),
                               ChannelNormalize(0.485, 0.456, 0.406, 0.229, 0.224, 0.225)])

creating: createCenterCrop
creating: createChannelNormalize
creating: createPipeline


In [21]:
images = []
labels = []
for file_name in os.listdir(image_dir):
    images.append(resize(mpimg.imread(image_dir + '/' + file_name), (224, 224), anti_aliasing=True))
    lbl = classes[train_image_to_class[file_name.split('.')[0]]]
    labels.append(lbl)

  warn("The default mode, 'constant', will be changed to 'reflect' in "


In [22]:
images = np.array(images)
labels = np.array(labels)

In [23]:
x_train, x_test, label_train, label_test = train_test_split(images, labels)

In [24]:
x_train.shape, x_test.shape, label_train.shape, label_test.shape

((2411, 224, 224, 3), (804, 224, 224, 3), (2411,), (804,))

In [31]:
def normalize(image_data, labels):
    rdd_images = sc.parallelize(image_data)
    rdd_labels = sc.parallelize(labels)
    rdd_sample = rdd_images \
        .zip(rdd_labels) \
        .map(lambda features_labels: \
             Sample.from_ndarray(features_labels[0], features_labels[1]))
    return rdd_sample

In [32]:
train_rdd = normalize(x_train, label_train)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.readRDDFromFile.
: java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:473)
	at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)


In [None]:
test_rdd = normalize(x_test, label_test)

## Train Empty Model

In [25]:
EPOCHS = 10

In [26]:
optimizer = Optimizer(model=inception_model, 
                      training_rdd=train_rdd,
                      criterion=ClassNLLCriterion(),
                      optim_method=SGD(nesterov=True, momentum=0.9, dampening=0.0),
                      end_trigger=MaxEpoch(EPOCHS),
                      batch_size=16)

NameError: name 'train_rdd' is not defined

In [67]:
optimizer.set_validation(batch_size=64, 
                         val_rdd=valid_rdd,
                         trigger=EveryEpoch(),
                         val_method=[Top1Accuracy()])

creating: createEveryEpoch
creating: createTop1Accuracy


In [68]:
import shutil
from os import path

LOG_DIR = '/tmp/bigdl_summaries'
APP_NAME='lenet5-' # + datetime.now().strftime("%Y%m%d-%H%M%S")
try:
    shutil.rmtree('/private' + LOG_DIR + '/' + APP_NAME)
except:
    pass

In [69]:
train_summary = TrainSummary(log_dir=LOG_DIR, app_name=APP_NAME)
train_summary.set_summary_trigger("Parameters", SeveralIteration(10)) 
val_summary = ValidationSummary(log_dir=LOG_DIR, app_name=APP_NAME)
optimizer.set_train_summary(train_summary)
optimizer.set_val_summary(val_summary)
print("saving logs to ", APP_NAME)

creating: createTrainSummary
creating: createSeveralIteration
creating: createValidationSummary
('saving logs to ', 'lenet5-')


### Start Training

In [70]:
%%time
trained_model = optimizer.optimize()
print("Done")

Py4JJavaError: An error occurred while calling o2096.optimize.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 9.0 failed 1 times, most recent failure: Lost task 1.0 in stage 9.0 (TID 22, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/src/app/spark-2.2.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/usr/src/app/spark-2.2.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/src/app/spark-2.2.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-65-a4ab52492d85>", line 1, in <lambda>
TypeError: from_ndarray() takes at least 3 arguments (2 given)

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1158)
	at com.intel.analytics.bigdl.dataset.DistributedDataSet$$anon$5.cache(DataSet.scala:191)
	at com.intel.analytics.bigdl.optim.DistriOptimizer.prepareInput(DistriOptimizer.scala:872)
	at com.intel.analytics.bigdl.optim.DistriOptimizer.optimize(DistriOptimizer.scala:895)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/src/app/spark-2.2.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/usr/src/app/spark-2.2.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/src/app/spark-2.2.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-65-a4ab52492d85>", line 1, in <lambda>
TypeError: from_ndarray() takes at least 3 arguments (2 given)

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
