In [1]:
# to verify if the spark context was initialized 
sc

<pyspark.context.SparkContext at 0x7f1ae6fe2510>

In [2]:
#Import all the required packages

import numpy as np
import pandas as pd

from os import listdir
from os.path import join, basename
import struct
import json
from scipy import misc
import datetime as dt

from bigdl.nn.layer import *
from optparse import OptionParser
from bigdl.nn.criterion import *
from bigdl.optim.optimizer import *
from bigdl.util.common import *
from bigdl.dataset.transformer import *
from bigdl.nn.initialization_method import *
from transformer import *
from imagenet import *
from transformer import Resize

# if you want to train on whole imagenet
#from bigdl.dataset import imagenet

In [3]:
#%pylab inline
%matplotlib inline 

## Model Definition and helper functions

In [4]:
# helper func to read the files from disk
def read_local_path(folder, has_label=True):
    """
    :param folder: local directory (str)
    :param has_label: does image have label (bool)
    :return: list of (image path , label) tuples
    """
    # read directory, create map
    dirs = listdir(folder)
    print "local path: ", folder
    print "listdir: ", dirs
    # create a list of (image path , label) tuples
    image_paths = []
    #append image path to the label (ex: )
    if has_label:
        dirs.sort()
        for d in dirs:
            for f in listdir(join(folder, d)):
                image_paths.append((join(join(folder, d), f), dirs.index(d) + 1))
    else:
        for f in dirs:
            image_paths.append((join(folder, f), -1))
    return image_paths

In [5]:
# helper func to read the files from disk
def read_local(sc, folder, normalize=255.0, has_label=True):
    """
    Read images from local directory
    :param sc: spark context
    :param folder: local directory
    :param normalize: normalization value
    :param has_label: whether the image folder contains label
    :return: RDD of sample
    """
    # read directory, create image paths list
    image_paths = read_local_path(folder, has_label)
    print image_paths
    # create rdd
    image_paths_rdd = sc.parallelize(image_paths)
    print image_paths_rdd
    feature_label_rdd = image_paths_rdd.map(lambda path_label: (misc.imread(path_label[0]), np.array(path_label[1]))) \
        .map(lambda img_label:
             (Resize(256, 256)(img_label[0]), img_label[1])) \
        .map(lambda feature_label:
             (((feature_label[0] & 0xff) / normalize).astype("float32"), feature_label[1]))
    print "feature_label_rdd", feature_label_rdd
    return feature_label_rdd

The following function takes an input, if the input is a list, it insert into index 0 spot, such that the real data starts from index 1. Returns back a dictionary with key being the index and value the list of numbers.

In [6]:
def scala_T(input_T):
    """
    Helper function for building Inception layers. Transforms a list of numbers to a dictionary with ascending keys 
    and 0 appended to the front. Ignores dictionary inputs. 
    
    :param input_T: either list or dict
    :return: dictionary with ascending keys and 0 appended to front {0: 0, 1: realdata_1, 2: realdata_2, ...}
    """    
    if type(input_T) is list:
        # insert 0 into first index spot, such that the real data starts from index 1
        temp = [0]
        temp.extend(input_T)
        return dict(enumerate(temp))
    # if dictionary, return it back
    return input_T

The following functions are used to create and initiate the inception-v1 model. 

In [7]:
# Question: What is config?
def Inception_Layer_v1(input_size, config, name_prefix=""):
    """
    Builds the inception-v1 submodule, a local network, that is stacked in the entire architecture when building
    the full model.  
    
    :param input_size: dimensions of input coming into the local network
    :param config: ?
    :param name_prefix: string naming the layers of the particular local network
    :return: concat container object with all of the Sequential layers' ouput concatenated depthwise
    """        
    
    '''
    Concat is a container who concatenates the output of it's submodules along the provided dimension: all submodules 
    take the same inputs, and their output is concatenated.
    '''
    concat = Concat(2)
    
    """
    In the above code, we first create a container Sequential. Then add the layers into the container one by one. The 
    order of the layers in the model is same with the insertion order. 
    
    """
    conv1 = Sequential()
    
    #Adding layes to the conv1 model we jus created
    
    #SpatialConvolution is a module that applies a 2D convolution over an input image.
    conv1.add(SpatialConvolution(input_size, config[1][1], 1, 1, 1, 1).set_name(name_prefix + "1x1"))
    conv1.add(ReLU(True).set_name(name_prefix + "relu_1x1"))
    concat.add(conv1)
    
    conv3 = Sequential()
    conv3.add(SpatialConvolution(input_size, config[2][1], 1, 1, 1, 1).set_name(name_prefix + "3x3_reduce"))
    conv3.add(ReLU(True).set_name(name_prefix + "relu_3x3_reduce"))
    conv3.add(SpatialConvolution(config[2][1], config[2][2], 3, 3, 1, 1, 1, 1).set_name(name_prefix + "3x3"))
    conv3.add(ReLU(True).set_name(name_prefix + "relu_3x3"))
    concat.add(conv3)
    
    
    conv5 = Sequential()
    conv5.add(SpatialConvolution(input_size,config[3][1], 1, 1, 1, 1).set_name(name_prefix + "5x5_reduce"))
    conv5.add(ReLU(True).set_name(name_prefix + "relu_5x5_reduce"))
    conv5.add(SpatialConvolution(config[3][1], config[3][2], 5, 5, 1, 1, 2, 2).set_name(name_prefix + "5x5"))
    conv5.add(ReLU(True).set_name(name_prefix + "relu_5x5"))
    concat.add(conv5)
    
    
    pool = Sequential()
    pool.add(SpatialMaxPooling(3, 3, 1, 1, 1, 1, to_ceil=True).set_name(name_prefix + "pool"))
    pool.add(SpatialConvolution(input_size, config[4][1], 1, 1, 1, 1).set_name(name_prefix + "pool_proj"))
    pool.add(ReLU(True).set_name(name_prefix + "relu_pool_proj"))
    concat.add(pool).set_name(name_prefix + "output")
    return concat

In [8]:
def Inception_v1(class_num):
    """
    Builds the entire network using Inception architecture  
    
    :param class_num: number of categories of classification
    :return: entire model architecture 
    """
    #feature1 contains the first 3 mini inception modules
    feature1 = Sequential()
    
    feature1.add(SpatialConvolution(3, 64, 7, 7, 2, 2, 3, 3, 1, False).set_name("conv1/7x7_s2"))
    feature1.add(ReLU(True).set_name("conv1/relu_7x7"))
    feature1.add(SpatialMaxPooling(3, 3, 2, 2, to_ceil=True).set_name("pool1/3x3_s2"))
    feature1.add(SpatialCrossMapLRN(5, 0.0001, 0.75).set_name("pool1/norm1"))
    feature1.add(SpatialConvolution(64, 64, 1, 1, 1, 1).set_name("conv2/3x3_reduce"))
    feature1.add(ReLU(True).set_name("conv2/relu_3x3_reduce"))
    feature1.add(SpatialConvolution(64, 192, 3, 3, 1, 1, 1, 1).set_name("conv2/3x3"))
    feature1.add(ReLU(True).set_name("conv2/relu_3x3"))
    feature1.add(SpatialCrossMapLRN(5, 0.0001, 0.75).set_name("conv2/norm2"))
    feature1.add(SpatialMaxPooling(3, 3, 2, 2, to_ceil=True).set_name("pool2/3x3_s2"))
    feature1.add(Inception_Layer_v1(192,scala_T([scala_T([64]), scala_T([96, 128]),scala_T([16, 32]), scala_T([32])]),
                                    "inception_3a/"))
    feature1.add(Inception_Layer_v1(256, scala_T([scala_T([128]), scala_T([128, 192]), scala_T([32, 96]), scala_T([64])]),
                                    "inception_3b/"))
    feature1.add(SpatialMaxPooling(3, 3, 2, 2, to_ceil=True).set_name("pool3/3x3_s2"))
    feature1.add(Inception_Layer_v1(480, scala_T([scala_T([192]), scala_T([96, 208]), scala_T([16, 48]), scala_T([64])]),
                                    "inception_4a/"))
    # 1st classification ouput after 3 inception subnetworks
    output1 = Sequential()
    output1.add(SpatialAveragePooling(5, 5, 3, 3, ceil_mode=True).set_name("loss1/ave_pool"))
    output1.add(SpatialConvolution(512, 128, 1, 1, 1, 1).set_name("loss1/conv"))
    output1.add(ReLU(True).set_name("loss1/relu_conv"))
    output1.add(View([128 * 4 * 4], num_input_dims = 3))
    output1.add(Linear(128 * 4 * 4, 1024).set_name("loss1/fc"))
    output1.add(ReLU(True).set_name("loss1/relu_fc"))
    output1.add(Dropout(0.7).set_name("loss1/drop_fc"))
    output1.add(Linear(1024, class_num).set_name("loss1/classifier_5classes"))
    output1.add(LogSoftMax().set_name("loss1/loss"))

    #feature2 contains next 3 inception submodules
    feature2 = Sequential()
    feature2.add(Inception_Layer_v1(512, scala_T([scala_T([160]), scala_T([112, 224]),scala_T([24, 64]), scala_T([64])]),
                                    "inception_4b/"))
    feature2.add(Inception_Layer_v1(512, scala_T([scala_T([128]), scala_T([128, 256]),scala_T([24, 64]), scala_T([64])]),
                                    "inception_4c/"))
    feature2.add(Inception_Layer_v1(512, scala_T([scala_T([112]), scala_T([144, 288]), scala_T([32, 64]), scala_T([64])]),
                                    "inception_4d/"))
    # 2nd classification output after 3 more inception subnetworks
    output2 = Sequential()
    output2.add(SpatialAveragePooling(5, 5, 3, 3).set_name("loss2/ave_pool"))
    output2.add(SpatialConvolution(528, 128, 1, 1, 1, 1).set_name("loss2/conv"))
    output2.add(ReLU(True).set_name("loss2/relu_conv"))
    output2.add(View([128 * 4 * 4], num_input_dims=3))
    output2.add(Linear(128 * 4 * 4, 1024).set_name("loss2/fc"))
    output2.add(ReLU(True).set_name("loss2/relu_fc"))
    output2.add(Dropout(0.7).set_name("loss2/drop_fc"))
    output2.add(Linear(1024, class_num).set_name("loss2/classifier_5classes"))
    output2.add(LogSoftMax().set_name("loss2/loss"))

    #output3 contails final 3 inception submodules followed by linear/softmax classifier
    output3 = Sequential()
    output3.add(Inception_Layer_v1(528, scala_T([scala_T([256]), scala_T([160, 320]), scala_T([32, 128]), scala_T([128])]),
                                   "inception_4e/"))
    output3.add(SpatialMaxPooling(3, 3, 2, 2, to_ceil=True).set_name("pool4/3x3_s2"))
    output3.add(Inception_Layer_v1(832, scala_T([scala_T([256]), scala_T([160, 320]), scala_T([32, 128]), scala_T([128])]),
                                   "inception_5a/"))
    output3.add(Inception_Layer_v1(832,scala_T([scala_T([384]), scala_T([192, 384]),scala_T([48, 128]), scala_T([128])]),
                                   "inception_5b/"))
    output3.add(SpatialAveragePooling(7, 7, 1, 1).set_name("pool5/7x7_s1"))
    output3.add(Dropout(0.4).set_name("pool5/drop_7x7_s1"))
    output3.add(View([1024], num_input_dims=3))
    output3.add(Linear(1024, class_num).set_name("loss3/classifier_5classes"))
    output3.add(LogSoftMax().set_name("loss3/loss3"))

    # Attach the separate Sequential layers to create the whole model
    split2 = Concat(2).set_name("split2")
    split2.add(output3)
    split2.add(output2)

    #create a branch starting from feature2 upwards
    mainBranch = Sequential()
    mainBranch.add(feature2)
    mainBranch.add(split2)

    #concatenate the mainBranch with output1
    split1 = Concat(2).set_name("split1")
    split1.add(mainBranch)
    split1.add(output1)

    #Attach feature1 to the rest of the model
    model = Sequential()

    model.add(feature1)
    model.add(split1)

    model.reset()
    return model

In [9]:
def get_inception_data(folder, file_type="image", data_type="train", normalize=255.0):
    """
    Builds the entire network using Inception architecture  
    
    :param class_num: number of categories of classification
    :return: entire model architecture 
    """
    #Getting the path of our data
    path = os.path.join(folder, data_type)
    if "seq" == file_type:
        #return imagenet.read_seq_file(sc, path, normalize) #-- incase if we are trying to read the orig imagenet data
        return read_seq_file(sc, path, normalize)
    elif "image" == file_type:
        #return imagenet.read_local(sc, path, normalize)
        return read_local(sc, path, normalize)

## Creating the model


In [10]:
# initializing BigDL engine
init_engine()

In [11]:
# paths for datasets, saving checkpoints 

DATA_PATH = "./sample_images/"
checkpoint_path = "./sample_images/checkpoints"

In [12]:
#providing the no of classes in the dataset to model (5 for flowers)
classNum = 5

# Instantiating the model the model
inception_model = Inception_v1(classNum)  #-- main inception-v1 model
#inception_model = Inception_v1_NoAuxClassifier(classNum)

creating: createSequential
creating: createSpatialConvolution
creating: createReLU
creating: createSpatialMaxPooling
creating: createSpatialCrossMapLRN
creating: createSpatialConvolution
creating: createReLU
creating: createSpatialConvolution
creating: createReLU
creating: createSpatialCrossMapLRN
creating: createSpatialMaxPooling
creating: createConcat
creating: createSequential
creating: createSpatialConvolution
creating: createReLU
creating: createSequential
creating: createSpatialConvolution
creating: createReLU
creating: createSpatialConvolution
creating: createReLU
creating: createSequential
creating: createSpatialConvolution
creating: createReLU
creating: createSpatialConvolution
creating: createReLU
creating: createSequential
creating: createSpatialMaxPooling
creating: createSpatialConvolution
creating: createReLU
creating: createConcat
creating: createSequential
creating: createSpatialConvolution
creating: createReLU
creating: createSequential
creating: createSpatialConvolutio

Download the pre-trained 'Inception v1 caffe model' from the link : https://github.com/BVLC/caffe/tree/master/models/bvlc_googlenet

In [13]:
# path, names of the downlaoded pre-trained caffe models
caffe_prototxt = 'bvlc_googlenet.prototxt'
caffe_model = 'bvlc_googlenet.caffemodel'

# loading the weights to the BigDL's inception model
model = Model.load_caffe(inception_model, caffe_prototxt, caffe_model, match_all=False, bigdl_type="float")

# if we want to export the whole caffe model including definition, this can be used.
#model = Model.load_caffe_model(inception_model, caffe_prototxt, caffe_model, match_all=True)

## Testing the pre-trained model

In [16]:
# get the flower labels
from os import listdir
from os.path import isfile, join
labels = listdir("./sample_images/flower_photos") 
del labels[4]
print "labels: ", labels

labels:  ['roses', 'daisy', 'tulips', 'sunflowers', 'dandelion']


In [24]:
'''
Images in vegnonveg-sample are large and maybe need to be cropped/resized before being trained on.
'''
from PIL import Image # for seeing image
import cv2 # converting img to numpy array (RGB to BGR) 

sample_images_path = "./sample_images/flower_photos/dandelion/"
input_str = '7355522_b66e5d3078_m.jpg'
input_img = Image.open(sample_images_path + input_str)

print "original image dimensions: ", input_img.getbbox()
#input_img.show()


original image dimensions:  (0, 0, 240, 240)


In [25]:
'''
Convert Image object into a 3d numpy array representing BGR of each pixel
'''
img = cv2.imread(sample_images_path + input_str)

print "image shape: ",img.shape
# img = np.array(input_img)
# img = img[:,:,::-1].copy() #invert RGB representation to BGR for each pixel in img
# img.shape

image shape:  (240, 240, 3)


In [26]:
# defining the tranformer, which we will use to pre-process our test image

img_rows = 224
img_cols = 224


transform_input = Transformer([Crop(img_rows, img_cols, "center"),
                                        Flip(0.5),
                                        ChannelNormalizer(0.485, 0.456, 0.406, 0.229, 0.224, 0.225),
                                        TransposeToTensor(False)
                                        ])

In [27]:
# pre-processing the img, feature transformation decreases training time
img_tranx = transform_input(img)
# converting the image to 'Sample' format which BigDL expects. 
label = np.array([-1]) #label is '-1' as its unknown to the model
img_to_model = Sample.from_ndarray(img_tranx, label)

In [28]:
# converting image from 'Sample' format into RDD format
img_data_rdd = sc.parallelize([img_to_model])

In [30]:
# predicting the image using our model
predict_result = model.predict_class(img_data_rdd)
pred_index = predict_result.collect()[0]
print pred_index

4


In [31]:
# printing out the category 
if pred_index > 4 :
    print ("not in the class list")
else:
    class_predicted = str(labels[pred_index - 1])
    print (class_predicted)

sunflowers


In [35]:
'''
1st attempt: Use exact same working code from above to serialize ONE img at a time and do predictions for all of them using a for loop.
Goal: Predict first 20 images in dandelion folder using un-trained model
ERROR: java.lang.IllegalArgumentException: requirement failed: input smaller than kernel size
'''
from PIL import Image # for seeing image
import cv2 # converting img to numpy array (RGB to BGR) 

# defining the tranformer, which we will use to pre-process our test image

img_rows = 224
img_cols = 224


dand_transformer = Transformer([Crop(img_rows, img_cols, "center"),
                                        Flip(0.5),
                                        ChannelNormalizer(0.485, 0.456, 0.406, 0.229, 0.224, 0.225),
                                        TransposeToTensor(False)
                                        ])
dand_path = "./sample_images/flower_photos/dandelion/"
imgs = listdir(dand_path)
for img in imgs[0:20]:
    # input_img = Image.open(dand_path + img)
    #print "original image dimensions: ", input_img.getbbox()
    # convert img to rdd form for bigdl
    img_bgr = cv2.imread(dand_path + img) 
    
    img_tranx = dand_transformer(img_bgr)    # converting the image to 'Sample' format which BigDL expects. 
    
    # converting the image to 'Sample' format which BigDL expects. 
    label = np.array(-1) #label is '-1' as its unknown to the model
    img_to_model = Sample.from_ndarray(img_tranx, label)    
    
    # converting image from 'Sample' format into RDD format
    img_data_rdd = sc.parallelize([img_to_model])
    # predicting the image using our model
    predict_result = model.predict_class(img_data_rdd)
    pred_index = predict_result.collect()[0]   
    # printing out the category 
    if pred_index > 5 :
        print pred_index
    else:
        class_predicted = str(labels[pred_index - 1])
        print (class_predicted)
    

daisy
11


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 6.0 failed 1 times, most recent failure: Lost task 1.0 in stage 6.0 (TID 13, localhost, executor driver): java.lang.IllegalArgumentException: requirement failed: input smaller than kernel size
	at scala.Predef$.require(Predef.scala:224)
	at com.intel.analytics.bigdl.nn.SpatialMaxPooling.updateOutput(SpatialMaxPooling.scala:77)
	at com.intel.analytics.bigdl.nn.SpatialMaxPooling.updateOutput(SpatialMaxPooling.scala:43)
	at com.intel.analytics.bigdl.nn.abstractnn.AbstractModule.forward(AbstractModule.scala:215)
	at com.intel.analytics.bigdl.nn.Sequential.updateOutput(Sequential.scala:37)
	at com.intel.analytics.bigdl.nn.abstractnn.AbstractModule.forward(AbstractModule.scala:215)
	at com.intel.analytics.bigdl.nn.Sequential.updateOutput(Sequential.scala:37)
	at com.intel.analytics.bigdl.nn.abstractnn.AbstractModule.forward(AbstractModule.scala:215)
	at com.intel.analytics.bigdl.optim.Predictor$$anonfun$predict$1$$anonfun$apply$3.apply(Predictor.scala:61)
	at com.intel.analytics.bigdl.optim.Predictor$$anonfun$predict$1$$anonfun$apply$3.apply(Predictor.scala:60)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:117)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:112)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:112)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:112)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: requirement failed: input smaller than kernel size
	at scala.Predef$.require(Predef.scala:224)
	at com.intel.analytics.bigdl.nn.SpatialMaxPooling.updateOutput(SpatialMaxPooling.scala:77)
	at com.intel.analytics.bigdl.nn.SpatialMaxPooling.updateOutput(SpatialMaxPooling.scala:43)
	at com.intel.analytics.bigdl.nn.abstractnn.AbstractModule.forward(AbstractModule.scala:215)
	at com.intel.analytics.bigdl.nn.Sequential.updateOutput(Sequential.scala:37)
	at com.intel.analytics.bigdl.nn.abstractnn.AbstractModule.forward(AbstractModule.scala:215)
	at com.intel.analytics.bigdl.nn.Sequential.updateOutput(Sequential.scala:37)
	at com.intel.analytics.bigdl.nn.abstractnn.AbstractModule.forward(AbstractModule.scala:215)
	at com.intel.analytics.bigdl.optim.Predictor$$anonfun$predict$1$$anonfun$apply$3.apply(Predictor.scala:61)
	at com.intel.analytics.bigdl.optim.Predictor$$anonfun$predict$1$$anonfun$apply$3.apply(Predictor.scala:60)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:117)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:112)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:112)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:112)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [34]:
'''
2nd attempt: Use a mapping function to create array of Samples containing first 20 images, then call model.predict_class()
Goal: Predict first 20 images in dandelion folder using un-trained model.
ERROR: py4j.Py4JException: Method modelPredictClass([class com.intel.analytics.bigdl.nn.Sequential, class java.util.ArrayList]) does not exist
'''
from PIL import Image # for seeing image
import cv2 # converting img to numpy array (RGB to BGR) 

# defining the tranformer, which we will use to pre-process our test image

img_rows = 224
img_cols = 224


transform_input = Transformer([Crop(img_rows, img_cols, "center"),
                                        Flip(0.5),
                                        ChannelNormalizer(0.485, 0.456, 0.406, 0.229, 0.224, 0.225),
                                        TransposeToTensor(False)
                                        ])
dand_path = "./sample_images/flower_photos/dandelion/"
imgs = listdir(dand_path)

#get paths
img_paths = []
for img in imgs[0:20]:
    img_paths.append((dand_path+img, 1))
    
#turn img_paths into labelled rdds
image_paths_rdd = sc.parallelize(img_paths)
feature_label_rdd = image_paths_rdd.map(lambda path_label: (misc.imread(path_label[0]), np.array(path_label[1]))) \
        .map(lambda img_label:
             (Resize(256, 256)(img_label[0]), img_label[1])) \
        .map(lambda feature_label:
             (((feature_label[0] & 0xff) / normalize).astype("float32"), feature_label[1]))

#turn to sample form for predictions 
img_data = feature_label_rdd.map(
                lambda features_label: (transform_input(features_label[0]), features_label[1])).map(
                lambda features_label: Sample.from_ndarray(features_label[0], features_label[1] + 1))
# predicting the image using our model
predict_result = model.predict_class([img_data])
pred_index = predict_result.collect()


Py4JError: An error occurred while calling o36.modelPredictClass. Trace:
py4j.Py4JException: Method modelPredictClass([class com.intel.analytics.bigdl.nn.Sequential, class java.util.ArrayList]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
	at py4j.Gateway.invoke(Gateway.java:272)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)



## Fine-tuning the model

### To-do:
- pre-req : Define inception_v1 model in BigDL and load it with Caffe model's weights
- remove the last layer of the model)
- freeze all the layers 
- add a new layer with 'num_class' we have in our data-set
- compile the model

This concludes the fine-tuning process
#### next steps :
- train the model on our dataset ( you can use the code in the below sections "Traning the model for transfer learning ") 
- test the trained model on our test dataset 

## Training the model for transfer learning 

In [None]:
# reading the data and performing pre-processing 


# the image size expected by the model
image_size = 224

print "---------TRAIN DATA---------------"
# image transformer, used for pre-processing the train images 
train_transformer = Transformer([Crop(image_size, image_size),
                                  Flip(0.5),
                                  ChannelNormalizer(0.485, 0.456, 0.406, 0.229, 0.224, 0.225),
                                  TransposeToTensor(False)])

# reading the traning data
train_data = get_inception_data(DATA_PATH, "image", "train").map(
                lambda features_label: (train_transformer(features_label[0]), features_label[1])).map(
                lambda features_label: Sample.from_ndarray(features_label[0], features_label[1] + 1))

print "---------VAL DATA---------------"
# validation data transformer 
val_transformer = Transformer([Crop(image_size, image_size, "center"),
                                Flip(0.5),
                                ChannelNormalizer(0.485, 0.456, 0.406, 0.229, 0.224, 0.225),
                                TransposeToTensor(False)])

#reading the validation data
val_data = get_inception_data(DATA_PATH, "image", "val").map(
                lambda features_label: (val_transformer(features_label[0]), features_label[1])).map(
                lambda features_label: Sample.from_ndarray(features_label[0], features_label[1] + 1))



In [24]:
# training the model


# parameters for 
batch_size = 16
no_epochs = 2

# Optimizer
optimizer = Optimizer(
                model=inception_model,
                training_rdd=train_data,
                #optim_method=Adam(learningrate=0.002),
                optim_method = SGD(learningrate=0.01, learningrate_decay=0.0002),
                criterion=ClassNLLCriterion(),
                end_trigger=MaxEpoch(no_epochs),
                batch_size=batch_size
            )

# setting checkpoints
optimizer.set_checkpoint(EveryEpoch(), checkpoint_path, isOverWrite=False)

# setting validation parameters 
optimizer.set_validation( batch_size=batch_size,
                          val_rdd=val_data,
                          trigger=EveryEpoch(),
                          val_method=[Top1Accuracy()])



creating: createDefault
creating: createSGD
creating: createClassNLLCriterion
creating: createMaxEpoch
creating: createOptimizer
creating: createEveryEpoch
creating: createEveryEpoch
creating: createTop1Accuracy


In [25]:
# Log the training process to measure loss/accuracy
app_name= 'inception-' + dt.datetime.now().strftime("%Y%m%d-%H%M%S")
train_summary = TrainSummary(log_dir='/tmp/inception_summaries',
                                     app_name=app_name)
train_summary.set_summary_trigger("Parameters", SeveralIteration(50))
val_summary = ValidationSummary(log_dir='/tmp/inception_summaries',
                                        app_name=app_name)
optimizer.set_train_summary(train_summary)
optimizer.set_val_summary(val_summary)
print "saving logs to ",app_name


creating: createTrainSummary
creating: createSeveralIteration
creating: createValidationSummary
saving logs to  inception-20171003-180850


In [26]:
# Boot training process
%pylab inline
trained_model = optimizer.optimize()
print "Optimization Done."

Populating the interactive namespace from numpy and matplotlib


`%matplotlib` prevents importing * from pylab and numpy
  "\n`%matplotlib` prevents importing * from pylab and numpy"
Traceback (most recent call last):
  File "/Users/ashleyzhao/anaconda/envs/inception/lib/python2.7/SocketServer.py", line 290, in _handle_request_noblock
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/Users/ashleyzhao/Desktop/BIGDL/spark-2.1.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 883, in send_command
    response = connection.send_command(command)
  File "/Users/ashleyzhao/Desktop/BIGDL/spark-2.1.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1040, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
Py4JNetworkError: Error while receiving
    self.process_request(request, client_address)
  File "/Users/ashleyzhao/anaconda/envs/inception/lib/python2.7/SocketServer.py", line 318, in process_request
    self.finish_request(request, client_address)
  

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 51827)
----------------------------------------


    num_updates = read_int(self.rfile)
  File "/Users/ashleyzhao/Desktop/BIGDL/spark-2.1.1-bin-hadoop2.7/python/pyspark/serializers.py", line 577, in read_int
    raise EOFError
EOFError


Py4JError: An error occurred while calling None.None

In [181]:
# testing our model on valid data
# todo : should tried with test data and modify the code accordingly

# image transformer, used for pre-processing the validation images 
test_transformer = Transformer([Crop(image_size, image_size, "center"),
                                Flip(0.5),
                                ChannelNormalizer(0.485, 0.456, 0.406, 0.229, 0.224, 0.225),
                                TransposeToTensor(False)])

# shouldn't the option to be passed is 'test' here rather than 'val' ? 
# reading val data 
# get_inception_data() returns a PythonRDD
test_data = get_inception_data(DATA_PATH, "image", "val").map(
                lambda features_label: (test_transformer(features_label[0]), features_label[1])).map(
                lambda features_label: Sample.from_ndarray(features_label[0], features_label[1] + 1))

def map_groundtruth_label(l):
    return l[0] - 1

print "Predictions: "
res = trained_model.predict_class(test_data)
print res.collect()

print "True Labels: "
print ', '.join(str.format(map_groundtruth_label(s.label)) for s in test_data.take(8))
# testing the trained model 
results = trained_model.test(test_data, batch_size, ["Top1Accuracy", "Top5Accuracy"])

NameError: name 'image_size' is not defined

In [None]:
# NOT WORKING
# results = trained_model.evaluate(test_data, batch_size, ["Top1Accuracy", "Top5Accuracy"])