In [None]:
from mmlspark import ImageReader, ImageFeaturizer, UnrollImage, ImageTransform, TrainClassifier, \
    SelectColumns, Repartition, ImageFeaturizer, ModelDownloader

import pyspark
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, StringType, DoubleType
from pyspark.ml import Transformer, Estimator, Pipeline
from pyspark.ml.classification import LogisticRegression

import numpy as np, pandas as pd, os, sys, time
from os.path import join, abspath, exists
from urllib.request import urlretrieve

spark = pyspark.sql.SparkSession.builder.appName("CIFAR10 - ImageFeaturizer").getOrCreate()

### Set some paths.

In [None]:
dataFile = "flowers_and_labels.parquet"
cdnURL = "https://amldockerdatasets.azureedge.net"
dataURL = cdnURL + "/Flowers/" + dataFile
localDataDir = abspath("Flowers/")
localDataFile = join(localDataDir, dataFile)

In [None]:
modelName = "ResNet50"
modelDir = "wasb:///models/"

In [None]:
modelName = "ResNet50"
modelDir = "file:" + abspath("models")

### Download the CNTK model

In [None]:
def mkdir(path):
    directory = abspath(path)
    if not exists(directory):
        print("making directory {}".format(directory))
        os.makedirs(directory)
    else:
        print("already have directory {}".format(directory))


def download(url, path):
    path = abspath(path)
    if not os.path.isfile(path):
        print("downloading to {}".format(path))
        urlretrieve(url, path)
    else:
        print("found {} skipping download".format(abspath(path)))

mkdir(localDataDir)
download(dataURL, localDataFile)

d = ModelDownloader(spark, modelDir)
model = d.downloadByName(modelName)

### Load the images

In [None]:
imagesWithLabels = spark.read.parquet("file:"+localDataFile)
imagesWithLabels.printSchema()

<img src="http://www.pyimagesearch.com/wp-content/uploads/2014/01/flowers17sample.jpg" alt="Smiley face" width="50%" height="50%"> 

### Make some featurizers

In [None]:
it = ImageTransform()\
    .setOutputCol("scaled")\
    .resize(height = 60, width = 60)
      
ur = UnrollImage()\
    .setInputCol("scaled")\
    .setOutputCol("features")

basicFeaturizer = Pipeline(stages=[it,ur])

In [None]:
cntkFeaturizer = ImageFeaturizer()\
    .setInputCol("image")\
    .setOutputCol("features")\
    .setModelLocation(model.uri)\
    .setLayerNames(model.layerNames)\
    .setCutOutputLayers(1)

<img src="https://susudarseu.blob.core.windows.net/images/resnet-18-horiz2.png" alt="Smiley face" width="80%" height="80%"> 

### How does it work?

<img src="http://i.stack.imgur.com/Hl2H6.png" alt="Smiley face" width="80%" height="80%"> 

### Define some methods to help us experiment

In [None]:
def featurize(featurizer, train, test, name):
    start = time.time()
    sc1 = SelectColumns(cols=["features","labels"])
    rep = Repartition(n=4)
    pipe = Pipeline(stages=[featurizer,sc1,rep]).fit(train)
    trainFeats = pipe.transform(train).cache()
    testFeats = pipe.transform(test).cache()

    print("Featurized {} images with {} featurizer in {} seconds"\
          .format(trainFeats.count()+testFeats.count(), name, time.time()-start))
    sys.stdout.flush()
    return trainFeats, testFeats

def predict(model, train, test, name):
    start=time.time()
    sc2 = SelectColumns(cols=(["scored_labels","labels"]))
    pipe = Pipeline(stages=[model, sc2]).fit(train)
    predictions = pipe.transform(test).cache()
    
    print("Classified {} images from {} features in {} seconds"\
          .format(predictions.count(), name, time.time()-start))
    sys.stdout.flush()
    
    predictions\
        .coalesce(1)\
        .write.mode("overwrite")\
        .csv("/tmp/flower_predictions_{}".format(name), header=True)

### Run the experiment

In [None]:
train, test = imagesWithLabels.randomSplit([.8,.2])
train = train.coalesce(51*2).cache()
test = test.coalesce(51).cache()
train.count(), test.count()

In [None]:
model = TrainClassifier().setModel(LogisticRegression()).setLabelCol("labels")

In [None]:
trainFeatsBasic, testFeatsBasic = featurize(basicFeaturizer,train,test,'basic')

In [None]:
predict(model, trainFeatsBasic, testFeatsBasic, 'basic')

In [None]:
trainFeatsCNTK, testFeatsCNTK = featurize(cntkFeaturizer,train,test,'cntk')

In [None]:
predict(model, trainFeatsCNTK, testFeatsCNTK, 'cntk')

### Plot confusion matrix.

In [None]:
%%local
%matplotlib inline
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
import pandas as pd
from glob import glob
import numpy as np


!rm -rf /tmp/flower_predictions_basic
!hdfs dfs -copyToLocal /tmp/flower_predictions_basic /tmp/flower_predictions_basic
results_basic = pd.read_csv(glob('/tmp/flower_predictions_basic/*.csv')[0])

!rm -rf /tmp/flower_predictions_cntk
!hdfs dfs -copyToLocal /tmp/flower_predictions_cntk /tmp/flower_predictions_cntk
results_cntk = pd.read_csv(glob('/tmp/flower_predictions_cntk/*.csv')[0])


#labels = {"airplane":1, "automobile":2, "bird":3, "cat":4, "deer":5, "dog":6, "frog":7, "horse":8, "ship":9, "truck":10}
def evaluate(results, name):
    y, y_hat = results["labels"],results["scored_labels"]
    y = [int(l) for l in y]

    accuracy = np.mean([1. if pred==true else 0. for (pred,true) in zip(y_hat,y)])
    cm = confusion_matrix(y, y_hat)
    cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
    
    plt.text(40, 10,"$Accuracy$ $=$ ${}\%$".format(round(accuracy*100,1)),fontsize=14)
    plt.imshow(cm, interpolation="nearest", cmap=plt.cm.Blues)
    plt.colorbar()
    plt.xlabel("$Predicted$ $label$", fontsize=18)
    plt.ylabel("$True$ $Label$", fontsize=18)
    plt.title("$Normalized$ $CM$ $for$ ${}$".format(name))

plt.figure(figsize=(12,5))
plt.subplot(1,2,1)
evaluate(results_cntk,"CNTKModel + LR")
plt.subplot(1,2,2)
evaluate(results_basic,"LR")
plt.show()