In [None]:
import re
from bigdl.util.common import * 
from bigdl.transform.vision.image import *
from bigdl.transform.vision import image
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType, StringType
from bigdl.nn.layer import *
from bigdl.nn.criterion import *
from pyspark import SparkConf
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from zoo.common.nncontext import *
from zoo.pipeline.nnframes.nn_classifier import *
from zoo.pipeline.nnframes.nn_image_reader import *
from zoo.pipeline.nnframes.nn_image_transformer import *

In [None]:
sparkConf = SparkConf().setAppName("ImageTransferLearningExample")
sc = get_nncontext(sparkConf)
sc

In [None]:
model_path = '../../../model/bigdl_inception-v1_imagenet_0.4.0.model' 
preTrainedNNModel = NNModel(Model.loadModel(model_path), [3,224,224]).setPredictionCol("embedding")

## Get Flowers

In [None]:
!curl -LO http://download.tensorflow.org/example_images/flower_photos.tgz
!tar xzf flower_photos.tgz
!rm flower_photos/LICENSE.txt

In [None]:
def flowers():
    classes = {}
    file_name_set = set()
    file_name_class = {}

    image_path = 'flower_photos'
    for dir_name in os.listdir(image_path):
        if os.path.isdir(image_path + '/' + dir_name):
            print(dir_name)
            classes.setdefault(dir_name, len(classes) + 1)
            for file_name in os.listdir(image_path + '/' + dir_name):
                if file_name in file_name_set:
                    print('Duplicate file name', file_name)
                file_name_set.add(file_name)
                file_name_class[file_name] = classes[dir_name]
            
    return classes, file_name_class

classes, file_name_class = flowers()

## Read files

In [None]:
data_path = 'flower_photos/*/*'
image_path = data_path
imageDF = NNImageReader.readImages(image_path, sc).repartition(12).cache()
print ("partition number: ", imageDF.rdd.getNumPartitions())
print ("image number: ", imageDF.count())

In [None]:
# os.listdir(data_path + '/train_img')

In [None]:
imageDF.show()

#### Create array of labels

In [None]:
def load_image_classes(csv_file_name):
    image_to_class = {}
    with open(csv_file_name, 'rt') as f:
        line = f.readline() # Skip header
        for line in f:
            line = line.strip('\n')
            name, cls = tuple(line.split(','))
            image_to_class.setdefault(name, cls)
    return image_to_class

train_image_to_class = load_image_classes( data_path + '/train.csv')
            
vals = set(train_image_to_class.values())
classes = {}
for v in vals:
    classes.setdefault(v, float(len(classes) + 1))
    
classes

In [None]:
getName = udf(lambda row: row[0].split('/')[-1])
getLabel = udf(lambda name: classes[train_image_to_class[name.split('.')[0]]], DoubleType())
labelDF = imageDF \
    .withColumn("name", getName(col("image"))) \
    .withColumn("label", getLabel(col('name')))

In [None]:
labelDF.show()

In [None]:
(trainingDF, validationDF) = labelDF.randomSplit([0.9, 0.1])

In [None]:
trainingDF.count()

Compose a pipeline

In [None]:
transformer = NNImageTransformer(
    image.Pipeline([Resize(256, 256), 
                    CenterCrop(224, 224), 
                    ChannelNormalize(123.0, 117.0, 104.0)])). \
        setInputCol("image"). \
        setOutputCol("features")

In [None]:
features = transformer.transform(trainingDF)

In [None]:
trainingEmbedDF = preTrainedNNModel.transform(features)

In [None]:
trainingEmbedDF.show()

Load pretrained model:

In [None]:
EPOCHS = 100
LEARNING_RATE = 0.001
BATCH_SIZE = 64

Create Linear Classifier

In [None]:
from bigdl.optim.optimizer import Optimizer, Adam, MaxEpoch, EveryEpoch, Top1Accuracy, \
    TrainSummary, ValidationSummary, SeveralIteration, SGD

In [None]:
lrModel = Sequential().add(Linear(1000, len(classes))).add(LogSoftMax())

classifier = NNClassifier(lrModel, ClassNLLCriterion(), [1000]) \
        .setOptimMethod(SGD(nesterov=True, momentum=0.9, dampening=0.0)) \
        .setLearningRate(LEARNING_RATE) \
        .setBatchSize(BATCH_SIZE) \
        .setMaxEpoch(EPOCHS) \
        .setFeaturesCol("embedding")

In [None]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", 
    predictionCol="prediction", 
    metricName="accuracy")

In [None]:
# pipeline = Pipeline(stages=[transformer, preTrainedNNModel, classifier])
# pipeline = Pipeline(stages=[transformer, preTrainedNNModel, classifier])

In [None]:
grocery_model = classifier.fit(trainingEmbedDF)

In [None]:
# trainPredictDF = grocery_model.transform(trainingDF)
# evaluator.evaluate(trainPredictDF)

In [None]:
validationEmbedDF = preTrainedNNModel.transform(transformer.transform(validationDF))
predictionDF = grocery_model.transform(validationEmbedDF).cache()
predictionDF.show()

In [None]:
evaluator.evaluate(predictionDF)