In [1]:
import pandas as pd
from os import listdir
from os.path import join, basename
import struct
import pickle
import json
import os
from scipy import misc
import datetime as dt
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline as ml_Pipeline
from pyspark.ml.feature import StringIndexer
from math import ceil
# import matplotlib.pyplot as plt
# %matplotlib inline

In [2]:
# %pylab inline
from bigdl.nn.layer import *
from bigdl.nn.criterion import *
from bigdl.nn.initialization_method import *
from bigdl.optim.optimizer import *
from bigdl.util.common import *
from bigdl.dataset.transformer import *
from bigdl.dataset import mnist
from bigdl.transform.vision.image import *
from zoo.pipeline.nnframes.nn_image_reader import *
from zoo.pipeline.nnframes.nn_image_transformer import *
from zoo.pipeline.nnframes.nn_classifier import *
from zoo.common.nncontext import *
import urllib


In [3]:

def t(input_T):
    """
    Helper function for building Inception layers. Transforms a list of numbers to a dictionary with ascending keys 
    and 0 appended to the front. Ignores dictionary inputs. 
    
    :param input_T: either list or dict
    :return: dictionary with ascending keys and 0 appended to front {0: 0, 1: realdata_1, 2: realdata_2, ...}
    """    
    if type(input_T) is list:
        # insert 0 into first index spot, such that the real data starts from index 1
        temp = [0]
        temp.extend(input_T)
        return dict(enumerate(temp))
    # if dictionary, return it back
    return input_T

In [4]:
def inception_layer_v1(input_size, config, name_prefix=""):
    concat = Concat(2)
    conv1 = Sequential()
    conv1.add(SpatialConvolution(input_size, config[1][1], 1, 1, 1, 1)
              .set_init_method(weight_init_method=Xavier(),bias_init_method=Zeros())
              .set_name(name_prefix + "1x1"))
    conv1.add(ReLU(True).set_name(name_prefix + "relu_1x1"))
    concat.add(conv1)
    conv3 = Sequential()
    conv3.add(SpatialConvolution(input_size, config[2][1], 1, 1, 1, 1)
              .set_init_method(weight_init_method=Xavier(), bias_init_method=Zeros())
              .set_name(name_prefix + "3x3_reduce"))
    conv3.add(ReLU(True).set_name(name_prefix + "relu_3x3_reduce"))
    conv3.add(SpatialConvolution(config[2][1], config[2][2], 3, 3, 1, 1, 1, 1)
              .set_init_method(weight_init_method=Xavier(), bias_init_method=Zeros())
              .set_name(name_prefix + "3x3"))
    conv3.add(ReLU(True).set_name(name_prefix + "relu_3x3"))
    concat.add(conv3)
    conv5 = Sequential()
    conv5.add(SpatialConvolution(input_size, config[3][1], 1, 1, 1, 1)
              .set_init_method(weight_init_method=Xavier(), bias_init_method=Zeros())
              .set_name(name_prefix + "5x5_reduce"))
    conv5.add(ReLU(True).set_name(name_prefix + "relu_5x5_reduce"))
    conv5.add(SpatialConvolution(config[3][1], config[3][2], 5, 5, 1, 1, 2, 2)
              .set_init_method(weight_init_method=Xavier(), bias_init_method=Zeros())
              .set_name(name_prefix + "5x5"))
    conv5.add(ReLU(True).set_name(name_prefix + "relu_5x5"))
    concat.add(conv5)
    pool = Sequential()
    pool.add(SpatialMaxPooling(3, 3, 1, 1, 1, 1, to_ceil=True).set_name(name_prefix + "pool"))
    pool.add(SpatialConvolution(input_size, config[4][1], 1, 1, 1, 1)
             .set_init_method(weight_init_method=Xavier(), bias_init_method=Zeros())
             .set_name(name_prefix + "pool_proj"))
    pool.add(ReLU(True).set_name(name_prefix + "relu_pool_proj"))
    concat.add(pool).set_name(name_prefix + "output")
    return concat

In [5]:
def inception_v1_no_aux_classifier(class_num, has_dropout=True):
    model = Sequential()
    model.add(SpatialConvolution(3, 64, 7, 7, 2, 2, 3, 3, 1, False)
              .set_init_method(weight_init_method=Xavier(), bias_init_method=Zeros())
              .set_name("conv1/7x7_s2"))
    model.add(ReLU(True).set_name("conv1/relu_7x7"))
    model.add(SpatialMaxPooling(3, 3, 2, 2, to_ceil=True).set_name("pool1/3x3_s2"))
    model.add(SpatialCrossMapLRN(5, 0.0001, 0.75).set_name("pool1/norm1"))
    model.add(SpatialConvolution(64, 64, 1, 1, 1, 1)
              .set_init_method(weight_init_method=Xavier(), bias_init_method=Zeros())
              .set_name("conv2/3x3_reduce"))
    model.add(ReLU(True).set_name("conv2/relu_3x3_reduce"))
    model.add(SpatialConvolution(64, 192, 3, 3, 1, 1, 1, 1)
              .set_init_method(weight_init_method=Xavier(), bias_init_method=Zeros())
              .set_name("conv2/3x3"))
    model.add(ReLU(True).set_name("conv2/relu_3x3"))
    model.add(SpatialCrossMapLRN(5, 0.0001, 0.75).set_name("conv2/norm2"))
    model.add(SpatialMaxPooling(3, 3, 2, 2, to_ceil=True).set_name("pool2/3x3_s2"))
    model.add(inception_layer_v1(192, t([t([64]), t(
        [96, 128]), t([16, 32]), t([32])]), "inception_3a/"))
    model.add(inception_layer_v1(256, t([t([128]), t(
        [128, 192]), t([32, 96]), t([64])]), "inception_3b/"))
    model.add(SpatialMaxPooling(3, 3, 2, 2, to_ceil=True))
    model.add(inception_layer_v1(480, t([t([192]), t(
        [96, 208]), t([16, 48]), t([64])]), "inception_4a/"))
    model.add(inception_layer_v1(512, t([t([160]), t(
        [112, 224]), t([24, 64]), t([64])]), "inception_4b/"))
    model.add(inception_layer_v1(512, t([t([128]), t(
        [128, 256]), t([24, 64]), t([64])]), "inception_4c/"))
    model.add(inception_layer_v1(512, t([t([112]), t(
        [144, 288]), t([32, 64]), t([64])]), "inception_4d/"))
    model.add(inception_layer_v1(528, t([t([256]), t(
        [160, 320]), t([32, 128]), t([128])]), "inception_4e/"))
    model.add(SpatialMaxPooling(3, 3, 2, 2, to_ceil=True))
    model.add(inception_layer_v1(832, t([t([256]), t(
        [160, 320]), t([32, 128]), t([128])]), "inception_5a/"))
    model.add(inception_layer_v1(832, t([t([384]), t(
        [192, 384]), t([48, 128]), t([128])]), "inception_5b/"))
    model.add(SpatialAveragePooling(7, 7, 1, 1).set_name("pool5/7x7_s1"))
    if has_dropout:
        model.add(Dropout(0.4).set_name("pool5/drop_7x7_s1"))
    model.add(View([1024], num_input_dims=3))
    model.add(Linear(1024, class_num)
              .set_init_method(weight_init_method=Xavier(), bias_init_method=Zeros())
              .set_name("loss3/classifier"))
    model.add(LogSoftMax().set_name("loss3/loss3"))
    model.reset()
    return model

In [6]:
def Inception_v1(class_num):
    model = Sequential()
    model.add(SpatialConvolution(3, 64, 7, 7, 2, 2, 3, 3, 1, False).set_name("conv1/7x7_s2"))
    model.add(ReLU(True).set_name("conv1/relu_7x7"))
    model.add(SpatialMaxPooling(3, 3, 2, 2, to_ceil=True).set_name("pool1/3x3_s2"))
    model.add(SpatialCrossMapLRN(5, 0.0001, 0.75).set_name("pool1/norm1"))
    model.add(SpatialConvolution(64, 64, 1, 1, 1, 1).set_name("conv2/3x3_reduce"))
    model.add(ReLU(True).set_name("conv2/relu_3x3_reduce"))
    model.add(SpatialConvolution(64, 192, 3, 3, 1, 1, 1, 1).set_name("conv2/3x3"))
    model.add(ReLU(True).set_name("conv2/relu_3x3"))
    model.add(SpatialCrossMapLRN(5, 0.0001, 0.75).set_name("conv2/norm2"))
    model.add(SpatialMaxPooling(3, 3, 2, 2, to_ceil=True).set_name("pool2/3x3_s2"))
    model.add(inception_layer_v1(192, scala_T([scala_T([64]), scala_T(
         [96, 128]), scala_T([16, 32]), scala_T([32])]), "inception_3a/"))
    model.add(inception_layer_v1(256, scala_T([scala_T([128]), scala_T(
         [128, 192]), scala_T([32, 96]), scala_T([64])]), "inception_3b/"))
    model.add(SpatialMaxPooling(3, 3, 2, 2, to_ceil=True))
    model.add(inception_layer_v1(480, scala_T([scala_T([192]), scala_T(
         [96, 208]), scala_T([16, 48]), scala_T([64])]), "inception_4a/"))
    model.add(inception_layer_v1(512, scala_T([scala_T([160]), scala_T(
         [112, 224]), scala_T([24, 64]), scala_T([64])]), "inception_4b/"))
    model.add(inception_layer_v1(512, scala_T([scala_T([128]), scala_T(
         [128, 256]), scala_T([24, 64]), scala_T([64])]), "inception_4c/"))
    model.add(inception_layer_v1(512, scala_T([scala_T([112]), scala_T(
         [144, 288]), scala_T([32, 64]), scala_T([64])]), "inception_4d/"))
    model.add(inception_layer_v1(528, scala_T([scala_T([256]), scala_T(
         [160, 320]), scala_T([32, 128]), scala_T([128])]), "inception_4e/"))
    model.add(SpatialMaxPooling(3, 3, 2, 2, to_ceil=True))
    model.add(inception_layer_v1(832, scala_T([scala_T([256]), scala_T(
         [160, 320]), scala_T([32, 128]), scala_T([128])]), "inception_5a/"))
    model.add(inception_layer_v1(832, scala_T([scala_T([384]), scala_T(
         [192, 384]), scala_T([48, 128]), scala_T([128])]), "inception_5b/"))
    model.add(SpatialAveragePooling(7, 7, 1, 1).set_name("pool5/7x7_s1"))
    model.add(Dropout(0.4).set_name("pool5/drop_7x7_s1"))
    model.add(View([1024], num_input_dims=3))
    model.add(Linear(1024, class_num).set_name("loss3/classifier"))
    model.add(LogSoftMax().set_name("loss3/loss3"))
    model.reset()
    return model

## Download the images from Amazon s3

Make sure you have AWS command line interface to recursively download all images in s3 folder. You can set up aws cli from this link: http://docs.aws.amazon.com/cli/latest/userguide/cli-chap-welcome.html

In [8]:
import urllib
from os import path
MODEL_ROOT = "/mnt/nobigdl/vegnonveg/python/inception_v1/models/"
checkpoint_path = path.join(MODEL_ROOT, "checkpoints")

# if not path.isdir(local_folder):
#   os.system('aws s3 cp --recursive s3://vegnonveg/vegnonveg-fewsamples %s' % local_folder)

## Read images to parquet fileand load to Spark as Image dataframe

save data to parquet files and load to spark. Add label to each image.

In [10]:
DATA_ROOT = "/mnt/nobigdl/vegnonveg/python/inception_v1/sample_images/"
sample_path = DATA_ROOT + 'vegnonveg-fewsamples/'
label_path = DATA_ROOT + 'vegnonveg-samples_labels.csv'
parquet_path = DATA_ROOT + 'sample_parquet/'
dbutils.fs.rm(parquet_path, True)


In [11]:
#intializa bigdl
init_engine()
redire_spark_logs()


In [12]:
# This only runs at the first time to generate parquet files
image_frame = NNImageReader.readImages(sample_path, sc, minParitions=32)
# save dataframe to parquet files
# image_frame.write.parquet(parquet_path)

In [13]:
# load parquet file into spark cluster
import time
start = time.time()
# image_raw_DF = sqlContext.read.parquet(parquet_path)
end = time.time()
print("Load data time is: " + str(end-start) + " seconds")

In [14]:
# create label dataframe
label_raw_DF = sqlContext.read.format("com.databricks.spark.csv")\
    .option("header", "true")\
    .option("mode", "DROPMALFORMED")\
    .load(label_path)

In [15]:
# create image data dataframe
get_name = udf(lambda row: row[0].split("/")[-1], StringType())
imageDF = image_frame.withColumn("image_name", get_name("image"))
# imageDF.show(truncate=False)

In [16]:
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col, rand
# image dataframe join with labels
dataDF = imageDF.join(label_raw_DF, imageDF.image_name==label_raw_DF.obs_uid, "inner").select("image", "image_name", "item_name")
# only use samples whose label count > 100
items = dataDF.groupBy("item_name").count().filter("count > 100").select("item_name")
indexer = StringIndexer(inputCol="item_name", outputCol="label")
labels = indexer.fit(items).transform(items).withColumn("label", (col("label") + 1).cast("float"))
filteredDF = dataDF.join(labels, "item_name", "inner").select("image", "image_name", "label").orderBy(rand())
n_classes = labels.count()

## Do Train/Test Split and preprocessing
Split Train/Test split with some ratio and preprocess images.

In [18]:
data = filteredDF.randomSplit([0.9, 0.1], seed=10)
train_image = data[0]
val_image = data[1]

In [19]:
IMAGE_SIZE = 224

train_transformer = NNImageTransformer(
    Pipeline([Resize(256, 256), CenterCrop(IMAGE_SIZE, IMAGE_SIZE),
              ChannelNormalize(123.0, 117.0, 104.0, 1.0, 1.0, 1.0),
              MatToTensor()])
).setInputCol("image").setOutputCol("features")

train_data = train_transformer.transform(train_image)


In [20]:
train_size = train_image.count()
train_size

In [21]:
val_transformer = NNImageTransformer(
    Pipeline([Resize(256,256),
              CenterCrop(IMAGE_SIZE, IMAGE_SIZE),
              ChannelNormalize(123.0, 117.0, 104.0, 1.0, 1.0, 1.0),
              MatToTensor(to_rgb=True)]
            )
).setInputCol("image").setOutputCol("features")

In [22]:
test_data = val_transformer.transform(val_image)

## Define Model

In [24]:
model_path = "dbfs:" + MODEL_ROOT + "bvlc_googlenet.caffemodel"
def_path = "dbfs:" + MODEL_ROOT + "deploy_transfer.prototxt"
model = Model.load_caffe_model(def_path, model_path)

In [25]:
# Parameters
learning_rate = 0.2
# parameters for 
batch_size = 64 #depends on dataset
no_epochs = 40 #stop when validation accuracy doesn't improve anymore

In [26]:
# Network Parameters
preTrainedNNModel = NNModel(model, [3,224,224]).setPredictionCol("embedding").setBatchSize(batch_size)
lrModel = Sequential().add(View([1024])).add(Linear(1024, n_classes)).add(LogSoftMax())

In [27]:
criterion = ClassNLLCriterion()
iterations = int(ceil(float(train_size) / batch_size))
optim = SGD(learningrate=learning_rate, learningrate_decay=0.0,
                    momentum=0.9, dampening=0.0, nesterov=False,
                    leaningrate_schedule=Poly(0.5, iterations))
classifier = NNClassifier(lrModel, criterion, [1024])\
    .setBatchSize(batch_size)\
    .setMaxEpoch(no_epochs)\
    .setLearningRate(learning_rate)\
    .setFeaturesCol("embedding")
pipeline = ml_Pipeline(stages=[preTrainedNNModel, classifier])
start = time.time()
trained_model = pipeline.fit(train_data)
end = time.time()
print("Optimization Done.")
print("Training time is: %s seconds" % str(end-start))
# + dt.datetime.now().strftime("%Y%m%d-%H%M%S")

In [28]:
throughput = train_size * no_epochs / (end - start)
print("Average throughput is: %s" % str(throughput))

In [29]:
#predict
# predict_model = trained_model.setBatchSize(batch_size)
predictionDF = trained_model.transform(test_data)
predictionDF.show()

In [30]:
num_preds = 1
preds = predictionDF.select("label", "prediction").take(num_preds)
for idx in range(num_preds):
#    true_label = str(map_to_label(map_groundtruth_label(truth[idx].label)))
    true_label = preds[idx][0]
    pred_label = preds[idx][1]
    print(str(idx + 1) +')'+ 'Ground Truth label: '+ str(true_label))
    print(str(idx + 1) + ')'+ 'Predicted label: '+ str(pred_label))
    print("correct" if true_label == pred_label else "wrong")

In [31]:
'''
Measure Test Accuracy w/Test Set
'''
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictionDF)
print("Accuracy = %g " % accuracy)