# Scoring images on Spark

This notebook illustrates how trained Cognitive Toolkit (CNTK) and TensorFlow models can be applied to large image collections using PySpark. For more detail on image set creation and model training, please see the rest of the [Embarrassingly Parallel Image Classification](https://github.com/Azure/Embarrassingly-Parallel-Image-Classification) repository.

## Outline
- [Setting up a Microsoft HDInsight Spark cluster and Azure Data Lake Store](#setup)
   - [Provisioning the resources](#provision)
   - [Transferring the image set](#transfer)
   - [Installing Cognitive Toolkit and Tensorflow](#install)
- [Image scoring with PySpark](#pyspark)
   - [Cognitive Toolkit](#cntk)
   - [TensorFlow](#tf)

<a name="setup"></a>
## Setting up a Microsoft HDInsight Spark cluster and associated Azure Data Lake Store

<a name="provision"></a>
### Provisioning the resources

#### Azure Data Lake Store
1. After logging into [Azure Portal](https://ms.portal.azure.com), click the "+ New" button near the upper left to create a new resource.
   <img src="./img/spark_adls_provisioning/new_resource_button.gif" width="100 px"/>
1. In the search field that appears, enter "Data Lake Store" and press Enter.
   <img src="./img/spark_adls_provisioning/resource_search_box_adls.gif" width="200 px"/>

1. In the search results, click on the "Data Lake Store" option published by Microsoft.
   <img src="./img/spark_adls_provisioning/resource_search_result_adls.gif" width="400 px"/>

1. Click the "Create" button at the bottom of the new pane that opens to describe the Data Lake Store resource type.
   <img src="./img/spark_adls_provisioning/create.gif" width="100 px"/>
1. Choose a unique name, subscription, resource group, and location for your Data Lake Store. Note the location: you will need to use the same location when deploying the Spark cluster.
1. Select the appropriate pricing plan for your needs. (We recommend "Pay-as-you-go"; the tutorial will use <1 TB of data.)
1. Click the "Create" button at the bottom of the pane.

#### Azure HDInsight Spark Cluster
1. After logging into [Azure Portal](https://ms.portal.azure.com), click the "+ New" button near the upper left to create a new resource.
   <img src="./img/spark_adls_provisioning/new_resource_button.gif" width="100 px"/>
1. In the search field that appears, enter "HDInsight" and press Enter.
   <img src="./img/spark_adls_provisioning/resource_search_box.gif" width="200 px"/>

1. In the search results, click on the "HDInsight" option published by Microsoft.
   <img src="./img/spark_adls_provisioning/resource_search_result.gif" width="400 px"/>

1. Click the "Create" button at the bottom of the new pane that opens to describe the HDInsight resource type.
   <img src="./img/spark_adls_provisioning/create.gif" width="100 px"/>
1. In the "Basics" section of the "New HDInsight cluster" pane:
    1. Choose a unique cluster name and the appropriate subscription.
    1. Click on "Cluster configuration" to load a pane of settings.
       1. Set the cluster type to "Spark".
       1. Set the version to "Spark 2.0.2 (HDI 3.5)".
       <img src="./img/spark_adls_provisioning/cluster_type_settings.gif" width="400 px"/>
       1. Click the "Select" button at the bottom of the pane.
    1. Choose a password for the `admin` account. You will use this account to log into Jupyter Notebook later in the walkthrough.
    1. Select the resource group and location where your Data Lake Store is located.
    1. Click the "Next" button at the bottom of the pane.
1. In the "Storage" section of the "New HDInsight cluster" pane:
   1. Ensure that "Data Lake Store" is selected for the "Primary storage type".
   1. Click on "Select Data Lake Storage Account" to load a pane of settings.
       1. Under "Select a storage account", select your ADLS.
       1. Click on "Configure Data Lake Access" to load a pane of settings.
           1. Create a new service principal.
           1. Click on "Access" to load a pane of settings.
               1. Under "Select File Permissions", click the box to the left of your ADLS name. (The box may be obscured until mouseover.). Click "Select".
               1. Under "Assign Selected Permissions", click "Run".
               1. When the run completes, click "Done".
       1. Click the "Next" button at the bottom of the pane.
1. In the "Summary" section of the "New HDInsight cluster" pane:
   1. If desired, you can edit the cluster size settings to choose node counts/sizes based on your budget and time constraints. This tutorial can be completed using a cluster with **4** worker nodes and a node size of **D12 v2** (for both worker and head nodes). For more information, please see the [cluster](https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-hadoop-provision-linux-clusters) and [VM](https://docs.microsoft.com/en-us/azure/virtual-machines/virtual-machines-linux-sizes#dv2-series) size guides.
1. Click the "Create" button at the bottom of the pane.

#### Checking cluster deployment status

Cluster deployment may take approximately twenty minutes. (We recommend transferring your image set to the ADLS while you wait; see the next section.) Cluster deployment status can be checked as follows:
1. Click on the "Search Resources" magnifying glass icon along the top bar of [Azure Portal](https://ms.portal.azure.com).
1. Type in the name of your HDInsight cluster and click on its entry in the resulting drop-down list. The overview pane for your HDInsight cluster will appear.
1. During deployment, a blue bar will appear across the top of the overview pane with the title "Applying changes". When this bar disappears, deployment is complete.

<a name="transfer"></a>
### Transferring the image set

Our evaluation image set was creating on a Data Science Virtual Machine. To transfer these images to our Azure Data Lake Store, we first copied the images to Azure Blob Storage using [AzCopy](https://docs.microsoft.com/en-gb/azure/storage/storage-use-azcopy), then to the Azure Data Lake Store with [AdlCopy](https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-copy-data-azure-storage-blob). After following the instructions linked above to download and install AzCopy/AdlCopy, we transferred the files with the following shell commands:

In [None]:
local_image_dir = 'E:\\combined\\test'
blob_account_name = ''
blob_account_key = ''
blob_account_container = 'test'
adl_account_name = ''
adl_account_folder = 'test'

commands = '''
AzCopy /Source:{0} /Dest:https://{1}.blob.core.windows.net/{2} /DestKey:{3} /S
AdlCopy /source https://{1}.blob.core.windows.net/{2}/ /dest swebhdfs://{4}.azuredatalakestore.net/{5}/ /sourcekey {3}
'''.format(local_image_dir, blob_account_name, blob_account_container,
           blob_account_key, adl_account_name, adl_account_folder)

print(commands)

<a name="install"></a>
### Installing Cognitive Toolkit and Tensorflow

#### Obtaining and (optionally) modifying the script action

We will install Cognitive Toolkit and Tensorflow on all head and worker nodes via Script Action. We have included a sample script action in the `scoring` subdirectory of [the Embarrassingly Parallel Image Classification git repository](https://github.com/Azure/Embarrassingly-Parallel-Image-Classification), reproduced below for your convenience:

The code above installed CNTK 2.0 beta release 10. As of this writing, other CNTK releases can be substituted as follows:
1. Navigate to the [CNTK Releases](https://github.com/Microsoft/CNTK/releases) page
1. Click on the appropriate release's link for a Linux, CPU Only release.
1. After reading and agreeing to the mentioned licenses, copy the URL linked to the "I accept" button (e.g. from the page source) and paste over the URL in the `curl` command above.

#### Running the script action

After HDInsight cluster deployment finishes, run the script action to install CNTK as follows:
1. Obtain the URI for the script action.
   - If using the unmodified version in this git repo, ensure that your URI points to the "raw" file (not a webpage-embedded file).
   - If you have modified the script action, upload it to the website or Azure Blob Storage account of your choice and note its URI.
1. Click on the "Search Resources" magnifying glass icon along the top bar of [Azure Portal](https://ms.portal.azure.com).
1. Type in the name of your HDInsight cluster and click on its entry in the resulting drop-down list. The overview pane for your HDInsight cluster will appear.
1. In the search field at upper left, type in "Script actions". Click the "Script actions" option in the results list.
1. Click the "+ Submit new" button along the top of the Script Actions pane. A new pane of options will appear.
   1. Under name, type "install" (without the quotes).
   1. Under "Bash script URI", type in the URI.
   1. Ensure that "Head" and "Worker" boxes are checked.
   1. Click the "Create" button along the bottom of the pane.
   
Expect the script action to take roughly fifteen minutes to run.
   
#### Updating the Python 3 path

The script action above installed Cognitive Toolkit and Tensorflow under a new Python environment, `cntk-py35`. Follow the steps below to direct PySpark to use this new environment:

1. Navigate back to the HDInsight cluster's overview pane by clicking "Overview" near the upper left of the pane.
1. Under "Quick links" in the main window, click the "Cluster dashboards" button. A new pane of dashboard options will appear.
1. Click "HDInsight cluster dashboard". A new window will load.
1. In the menu at left, click "Spark2".
1. In the main window, click on the "Configs" tab.
1. Scroll down to the "Custom spark2-defaults" option and expand its dropdown by clicking on the label (or triange beside it).
1. Find the `spark.yarn.appMasterEnv.PYSPARK3_PYTHON` entry in the dropdown list. Change its path to the following:

    `/usr/bin/anaconda/envs/cntk-py35/bin/python`<br/><br/>
    
1. Click on the green "Save" button that appears at upper right.
1. When prompted, click the orange "Restart" button and select "Restart all affected".
1. When the restart concludes, close the window. This will return you to a pane of dashboard options.

<a name="pyspark"></a>
## Image scoring with PySpark

### Define functions/variables/RDDs used by both scoring pipelines 

In [1]:
import os
import numpy as np
import pandas as pd
from io import BytesIO
from PIL import Image
from pyspark import SparkFiles

def get_nlcd_id(my_filename):
    ''' Extracts the true label  '''
    folder, _ = os.path.split(my_filename)
    return(int(os.path.basename(folder)))

adls_name = 'mawahtensorflow'
adls_folder = 'test'
n_workers = 4
local_tmp_dir = '/tmp/resnet'

dataset_dir = 'adl://{}.azuredatalakestore.net/{}'.format(adls_name, adls_folder)
image_rdd = sc.binaryFiles('{}/*/*.png'.format(dataset_dir), minPartitions=n_workers).coalesce(n_workers)

# Define correspondence of NLCD ids to labels of the trained model
nlcd_id_to_group = {21: 'Developed',
                    22: 'Developed',
                    23: 'Developed',
                    24: 'Developed',
                    11: 'Water/Wetlands',
                    12: 'Water/Wetlands',
                    95: 'Water/Wetlands',
                    41: 'Forest',
                    42: 'Forest',
                    43: 'Forest',
                    90: 'Forest',
                    31: 'Barren',
                    51: 'Shrubland',
                    52: 'Shrubland',
                    71: 'Grassland',
                    72: 'Grassland',
                    73: 'Grassland',
                    74: 'Grassland',
                    81: 'Cultivated',
                    82: 'Cultivated'}
group_to_label = {'Shrubland': 0,
                  'Forest': 1,
                  'Cultivated': 2,
                  'Barren': 3,
                  'Water/Wetlands': 4,
                  'Grassland': 5,
                  'Developed': 6}

nlcd_id_to_label = {key: group_to_label[nlcd_id_to_group[key]] for key in nlcd_id_to_group.keys()}
nlcd_id_to_label_bc = sc.broadcast(nlcd_id_to_label)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
5,application_1486565576928_0061,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.


<a name="cntk"></a>
### Score and evaluate with a trained Cognitive Toolkit (CNTK) model

#### Make the trained CNTK model available to all workers

In [15]:
from cntk import load_model

cntk_model_filepath = '{}/resnet20_237_improvedpreprocessing.dnn'.format(local_tmp_dir)
cntk_model_filepath_bc = sc.broadcast(cntk_model_filepath)
sc.addFile(cntk_model_filepath)

#### Define functions to be run by worker nodes

In [21]:
def cntk_get_preprocessed_image(my_file):
    ''' Perform reshaping '''
    image_data = np.array(Image.open(my_file), dtype=np.float32)
    image_data = np.ascontiguousarray(np.transpose(image_data, (2, 0, 1)))
    return(image_data)

def argsoftmax(x):
    ''' Apply softmax, then return the best label '''
    exponentiated = np.exp(x)
    softmax = exponentiated / exponentiated.sum(axis=0)
    return(np.argmax(softmax))

def cntk_run_worker(files):
    ''' Scoring script run by each worker '''
    cntk_model_filepath = cntk_model_filepath_bc.value
    loaded_model = load_model(SparkFiles.get(cntk_model_filepath))
    nlcd_id_to_label = nlcd_id_to_label_bc.value
    
    # Iterate through the files. The first value in each tuple is the file name; the second is the image data
    for file in files:
        preprocessed_image = cntk_get_preprocessed_image(BytesIO(file[1]))
        dnn_output = loaded_model.eval({loaded_model.arguments[0]: [preprocessed_image]})
        true_label = nlcd_id_to_label[get_nlcd_id(file[0])]
        yield (file[0], true_label, argsoftmax(np.squeeze(dnn_output)))

#### Score all test set images with the trained model

In [None]:
labeled_images = image_rdd.mapPartitions(cntk_run_worker)

start = pd.datetime.now()
cntk_results = labeled_images.collect()
print('Scored {} images'.format(len(results)))
stop = pd.datetime.now()
print(stop - start)

#### Evaluate the model's performance

In [None]:
cntk_df = pd.DataFrame(cntk_results, columns=['filename', 'true_label', 'predicted_label'])
num_correct = sum(cntk_df['true_label'] == cntk_df['predicted_label'])
num_total = len(cntk_results)
print('Correctly predicted {} of {} images ({:0.2f}%)'.format(num_correct, num_total, 100 * num_correct / num_total))

<a name="tf"></a>
### Tensorflow

#### Make the trained Tensorflow model available to all workers

Loads a slightly modified version of the tf-slim ResNet definition from the Tensorflow models git repository.

In [11]:
sc.addPyFile(os.path.join(local_tmp_dir, 'resnet_utils.py'))
sc.addPyFile(os.path.join(local_tmp_dir, 'resnet_v1.py'))
model_dir_bc = sc.broadcast(local_tmp_dir)

import tensorflow as tf
import functools
import resnet_v1
slim = tf.contrib.slim

#### Define functions used by workers for scoring

In [12]:
def get_network_fn(num_classes, weight_decay=0.0, is_training=False):
    arg_scope = resnet_v1.resnet_arg_scope(weight_decay=weight_decay)
    func = resnet_v1.resnet_v1_50
    @functools.wraps(func)
    def network_fn(images):
        with slim.arg_scope(arg_scope):
            return func(images, num_classes, is_training=is_training)
    if hasattr(func, 'default_image_size'):
        network_fn.default_image_size = func.default_image_size
    return(network_fn)

def mean_image_subtraction(image, means):
    num_channels = image.get_shape().as_list()[-1]
    channels = tf.split(2, num_channels, image)
    for i in range(num_channels):
        channels[i] -= means[i]
    return(tf.concat(2, channels))

def get_preprocessing():
    def preprocessing_fn(image, output_height=224, output_width=224):
        image = tf.expand_dims(image, 0)
        resized_image = tf.image.resize_bilinear(image, [output_height, output_width], align_corners=False)
        resized_image = tf.squeeze(resized_image)
        resized_image.set_shape([output_height, output_width, 3])
        image = tf.to_float(resized_image)
        return(mean_image_subtraction(image, [123.68, 116.78, 103.94]))
    return(preprocessing_fn)

def tf_run_worker(files):
    nlcd_id_to_label = nlcd_id_to_label_bc.value
    model_dir = model_dir_bc.value
    class_count = 7
    results = []
    
    with tf.Graph().as_default():
        network_fn = get_network_fn(num_classes=class_count, is_training=False)
        image_preprocessing_fn = get_preprocessing()
        
        current_image = tf.placeholder(tf.uint8, shape=(224, 224, 3))
        preprocessed_image = image_preprocessing_fn(current_image, 224, 224)
        image  = tf.expand_dims(preprocessed_image, 0)
        logits, _ = network_fn(image)
        predictions = tf.argmax(logits, 1)
        
        with tf.Session() as sess:
            my_saver = tf.train.Saver()
            my_saver.restore(sess, tf.train.latest_checkpoint(model_dir))
            
            coord = tf.train.Coordinator()
            threads = tf.train.start_queue_runners(sess=sess, coord=coord)
            try:
                for file in files:
                    imported_image_np = np.asarray(Image.open(BytesIO(file[1])), dtype=np.uint8)
                    result = sess.run(predictions, feed_dict={current_image: imported_image_np})
                    true_label = nlcd_id_to_label[get_nlcd_id(file[0])]
                    results.append([file[0], true_label, result[0]])
            finally:
                coord.request_stop()
            coord.join(threads)
    return(results)

#### Score all images with trained Tensorflow model

In [None]:
labeled_images_tf = image_rdd.mapPartitions(tf_run_worker)

start = pd.datetime.now()
results_tf = labeled_images_tf.collect()
print('Scored {} images'.format(len(results_tf)))
stop = pd.datetime.now()
print(stop - start)

#### Evaluate the model's performance

In [None]:
tf_df = pd.DataFrame(results_tf, columns=['filename', 'true_label', 'predicted_label'])
num_correct = sum(tf_df['true_label'] == tf_df['predicted_label'])
num_total = len(results_tf)
print('Correctly predicted {} of {} images ({:0.2f}%)'.format(num_correct, num_total, 100 * num_correct / num_total))