## Training LeNet model with MNIST dataset using BigDL in IBM DSX

#### Companion notebook for [this blog post](https://medium.com/@hhbyyh/training-lenet-model-with-mnist-dataset-using-bigdl-in-ibm-dsx-7e8310b23057)

In [1]:
#!(export sv=2.1 bv=0.3.0 ; cd ~/data/libs/ && wget  https://repo1.maven.org/maven2/com/intel/analytics/bigdl/bigdl-SPARK_${sv}/${bv}/bigdl-SPARK_${sv}-${bv}-jar-with-dependencies.jar)

In [2]:
#!pip install bigdl==0.3.0 | cat

In [1]:
from bigdl.nn.layer import *
from bigdl.nn.criterion import *
from bigdl.util.common import *
from pyspark import SparkContext
import numpy as np

Prepending /gpfs/fs01/user/s292-2479f4c3f28697-e04c173fe4bd/.local/lib/python2.7/site-packages/bigdl/share/conf/spark-bigdl.conf to sys.path




In [2]:
sc.stop()
confCore=create_spark_conf()
confCore.set("spark.executor.cores", 1)
confCore.set("spark.cores.max", 1)
sc = SparkContext(appName="Mnist", conf=confCore)
init_engine()

In [3]:
linear = Linear(2, 1)
print (linear.parameters())

creating: createLinear
{u'Linear24e703c8': {u'gradWeight': array([[ 0.,  0.]], dtype=float32), u'bias': array([-0.58791041], dtype=float32), u'weight': array([[-0.14095348,  0.64820635]], dtype=float32), u'gradBias': array([ 0.], dtype=float32)}}


In [4]:
from optparse import OptionParser
from bigdl.dataset import mnist
from bigdl.dataset.transformer import *
from bigdl.nn.layer import *
from bigdl.nn.criterion import *
from bigdl.optim.optimizer import *
from bigdl.util.common import *

In [5]:
def build_model(class_num):
    model = Sequential()
    model.add(Reshape([1, 28, 28]))
    model.add(SpatialConvolution(1, 6, 5, 5))
    model.add(Tanh())
    model.add(SpatialMaxPooling(2, 2, 2, 2))
    model.add(Tanh())
    model.add(SpatialConvolution(6, 12, 5, 5))
    model.add(SpatialMaxPooling(2, 2, 2, 2))
    model.add(Reshape([12 * 4 * 4]))
    model.add(Linear(12 * 4 * 4, 100))
    model.add(Tanh())
    model.add(Linear(100, class_num))
    model.add(LogSoftMax())
    return model


def get_mnist(sc, data_type="train", location="/tmp/mnist"):
    """
    Get and normalize the mnist data. We would download it automatically
    if the data doesn't present at the specific location.
    :param sc: SparkContext
    :param data_type: training data or testing data
    :param location: Location storing the mnist
    :return: A RDD of (features: Ndarray, label: Ndarray)
    """
    (images, labels) = mnist.read_data_sets(location, data_type)
    images = sc.parallelize(images)
    labels = sc.parallelize(labels + 1) # Target start from 1 in BigDL
    record = images.zip(labels)
    return record

def get_end_trigger():
        return MaxEpoch(10)

train_data = get_mnist(sc, "train", "")\
    .map(lambda rec_tuple: (normalizer(rec_tuple[0], mnist.TRAIN_MEAN, mnist.TRAIN_STD),
                       rec_tuple[1]))\
    .map(lambda t: Sample.from_ndarray(t[0], t[1]))
test_data = get_mnist(sc, "test", "")\
    .map(lambda rec_tuple: (normalizer(rec_tuple[0], mnist.TEST_MEAN, mnist.TEST_STD),
                       rec_tuple[1]))\
    .map(lambda t: Sample.from_ndarray(t[0], t[1]))
optimizer = Optimizer(
    model=build_model(10),
    training_rdd=train_data,
    criterion=ClassNLLCriterion(),
    optim_method=SGD(learningrate=0.01, learningrate_decay=0.0002),
    end_trigger=get_end_trigger(),
    batch_size=128)
optimizer.set_validation(
    batch_size=128,
    val_rdd=test_data,
    trigger=EveryEpoch(),
    val_method=[Top1Accuracy()]
)
trained_model = optimizer.optimize()
parameters = trained_model.parameters()
print("training finished")

('Extracting', 'train-images-idx3-ubyte.gz')
('Extracting', 'train-labels-idx1-ubyte.gz')
('Extracting', 't10k-images-idx3-ubyte.gz')
('Extracting', 't10k-labels-idx1-ubyte.gz')
creating: createSequential
creating: createReshape
creating: createSpatialConvolution
creating: createTanh
creating: createSpatialMaxPooling
creating: createTanh
creating: createSpatialConvolution
creating: createSpatialMaxPooling
creating: createReshape
creating: createLinear
creating: createTanh
creating: createLinear
creating: createLogSoftMax
creating: createClassNLLCriterion
creating: createDefault
creating: createSGD
creating: createMaxEpoch
creating: createOptimizer
creating: createEveryEpoch
creating: createTop1Accuracy


Py4JJavaError: An error occurred while calling o339.optimize.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 10 times, most recent failure: Lost task 0.9 in stage 0.0 (TID 18, yp-spark-dal09-env5-0022, executor 90bd8af9-3d9a-4c08-8805-2bb92b62c50f): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/worker.py", line 160, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/worker.py", line 54, in read_command
    command = serializer._read_with_length(file)
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/serializers.py", line 169, in _read_with_length
    return self.loads(obj)
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/serializers.py", line 458, in loads
    return pickle.loads(obj)
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 783, in _make_skel_func
    closure = _reconstruct_closure(closures) if closures else None
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 775, in _reconstruct_closure
    return tuple([_make_cell(v) for v in values])
TypeError: ("'int' object is not iterable", <function _make_skel_func at 0x7f08dcb92758>, (<code object pipeline_func at 0x7f08619f3030, file "/gpfs/fs01/user/s292-2479f4c3f28697-e04c173fe4bd/.local/lib/python2.7/site-packages/pyspark/rdd.py", line 2437>, 2, {}))

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:326)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:290)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:326)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:290)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:326)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:290)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:326)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:290)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:326)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.lang.Thread.run(Thread.java:785)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1442)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1430)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1429)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1429)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:803)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:803)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:803)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1657)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1612)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at java.lang.Thread.getStackTrace(Thread.java:1117)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:629)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1941)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1967)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1981)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1178)
	at com.intel.analytics.bigdl.dataset.DistributedDataSet$$anon$5.cache(DataSet.scala:188)
	at com.intel.analytics.bigdl.optim.DistriOptimizer.prepareInput(DistriOptimizer.scala:757)
	at com.intel.analytics.bigdl.optim.DistriOptimizer.optimize(DistriOptimizer.scala:777)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:95)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
	at java.lang.reflect.Method.invoke(Method.java:507)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:785)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/worker.py", line 160, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/worker.py", line 54, in read_command
    command = serializer._read_with_length(file)
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/serializers.py", line 169, in _read_with_length
    return self.loads(obj)
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/serializers.py", line 458, in loads
    return pickle.loads(obj)
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 783, in _make_skel_func
    closure = _reconstruct_closure(closures) if closures else None
  File "/usr/local/src/spark21master/spark-2.1.2-bin-2.7.3/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 775, in _reconstruct_closure
    return tuple([_make_cell(v) for v in values])
TypeError: ("'int' object is not iterable", <function _make_skel_func at 0x7f08dcb92758>, (<code object pipeline_func at 0x7f08619f3030, file "/gpfs/fs01/user/s292-2479f4c3f28697-e04c173fe4bd/.local/lib/python2.7/site-packages/pyspark/rdd.py", line 2437>, 2, {}))

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:326)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:290)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:326)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:290)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:326)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:290)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:326)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:290)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:326)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [8]:
results = trained_model.evaluate(test_data, 128, [Top1Accuracy()])
for result in results:
    print(result)

creating: createTop1Accuracy
Evaluated result: 0.946799993515, total_num: 10000, method: Top1Accuracy
