# Handling Volume with Apache Spark

Use Apache Spark to perform image classification from preprocessed images.

## License

MIT License

Copyright (c) 2019 PT Bukalapak.com

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

## Software Version

In [1]:
import sys
print("Python %s" % sys.version)
import base64
import pickle
import shutil

Python 3.7.3 | packaged by conda-forge | (default, Jul  1 2019, 21:52:21) 
[GCC 7.3.0]


In [2]:
import numpy as np
print("NumPy %s" % np.__version__)

NumPy 1.15.4


In [3]:
import tensorflow as tf
print("TensorFlow %s" % tf.__version__)

TensorFlow 1.14.0


In [4]:
import pyspark
print("PySpark %s" % pyspark.__version__)
from pyspark.sql import SparkSession, Row

PySpark 2.3.3


## Settings

In [5]:
print("Built With CUDA:", tf.test.is_built_with_cuda())

Built With CUDA: False


In [6]:
print("Keras FP Precision:", tf.keras.backend.floatx())

Keras FP Precision: float32


In [7]:
LOCAL_PROJECT_URL = '/home/jovyan/work/'

## Create Spark Context

Set config to run Spark locally with 2 threads.

In [8]:
APP_NAME = "bukalapak-core-ai.big-data-3v.volume-spark-feature-extraction"
spark = SparkSession \
    .builder \
    .appName(APP_NAME) \
    .config("spark.master", "local[2]") \
    .getOrCreate()

In [9]:
sc = spark.sparkContext

In [10]:
sc

In [11]:
sc.getConf().getAll()

[('spark.app.name',
  'bukalapak-core-ai.big-data-3v.volume-spark-feature-extraction'),
 ('spark.driver.port', '34019'),
 ('spark.rdd.compress', 'True'),
 ('spark.driver.host', '6cf0dbfb5e10'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.app.id', 'local-1564465161105'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.master', 'local[2]')]

## Generate Preprocessed Images for Input

Generate preprocessed images and store them in HDFS Orc file. Preprocessed images normally come from previous process.

In [12]:
# ped: preprocessed image
def convert_image_url_to_ped(x):
    import io, base64, pickle
    from PIL import Image as pil_image
    import tensorflow as tf
    # Convert URL to PIL image
    image_url = x[0]
    image_pil = pil_image.open(image_url)
    # Make Sure the Image is in RGB (not BW)
    if image_pil.mode != 'RGB':
        image_pil = image_pil.convert('RGB')
    # Resize Image
    target_size = (224, 224)
    width_height_tuple = (target_size[1], target_size[0])
    if image_pil.size != width_height_tuple:
        image_pil = image_pil.resize(width_height_tuple, pil_image.NEAREST)
    # Normalise Image
    image_np = tf.keras.preprocessing.image.img_to_array(image_pil)
    image_np = tf.keras.applications.vgg16.preprocess_input(image_np)
    # Convert numpy array to string
    image_ped = base64.b64encode(pickle.dumps(image_np)).decode('UTF-8')
    
    return ["local_disk", image_url, "jeket", image_ped]

In [13]:
def convert_image_url_to_ped_orc(image_url_pathfilename, image_ped_orc_pathfilename):
    image_url_file_rdd = sc.textFile(image_url_pathfilename)
    print("        Number of Partitions:", image_url_file_rdd.getNumPartitions())
    image_url_list_rdd = image_url_file_rdd.map(lambda x: x.split('\n'))
    image_ped_list_rdd = image_url_list_rdd.map(lambda x: convert_image_url_to_ped(x))
    image_ped_dict_rdd = image_ped_list_rdd.map(lambda x: Row(tid=x[0], 
                                                              iid=x[1], 
                                                              l=x[2], 
                                                              i_ped=x[3]))
    image_ped_dict_df = spark.createDataFrame(image_ped_dict_rdd)
    image_ped_dict_df.write.save(image_ped_orc_pathfilename, format="orc")

In [14]:
# Input file path
image_url_pathfilename = "file:{0}data/image_paths_10.txt".format(LOCAL_PROJECT_URL)
# Output file path
image_ped_orc_directory = "{0}data/images_ped.orc".format(LOCAL_PROJECT_URL)
image_ped_orc_pathfilename = "file:%s" % image_ped_orc_directory
# Remove existing output directory
shutil.rmtree(image_ped_orc_directory, ignore_errors=True)
# Start preprocessing
convert_image_url_to_ped_orc(image_url_pathfilename, image_ped_orc_pathfilename)

        Number of Partitions: 2


## Making Inference

Making inference using VGG16 model[1] pretrained with imagenet.

In [15]:
# input is the list of [tid, iid, l, i_ped] from preprocessed image table
# output is the list of [tid, iid, pred] for inference result table
# where:
#  - tid: table ID
#  - iid: image ID
#  - l: label
#  - i_ped: preprocessed image
#  - pred: output of prediction layer (the inference result)

def make_inference(xs):
    import base64, pickle
    import numpy as np
    import tensorflow as tf
    # Extract input lists
    inference_lists = []
    images_array = []
    for x in xs:
        inference_lists.append([x.tid, x.iid])
        images_array.append(pickle.loads(base64.b64decode(x.i_ped.encode('UTF-8'))))
    images_np = np.array(images_array)
    # Load VGG16 model
    vgg = tf.keras.applications.vgg16.VGG16(weights='imagenet', include_top=True)
    # Do inference
    inference = vgg.predict(images_np)
    # Add Medusa features to output lists
    if len(inference_lists) != len(inference):
        raise ValueError('Total of image information lists is not ' +
                         'the same as the total of Medusa feature lists')
    for i in range(len(inference_lists)):
        inference_lists[i].append(
            base64.b64encode(pickle.dumps(inference[i])).decode('UTF-8')
        )

    return iter(inference_lists)

In [16]:
def convert_image_ped_orc_to_infr_orc(image_ped_orc_pathfilename,
                                      image_infr_orc_pathfilename):
    image_ped_dict_df = spark.read.orc(image_ped_orc_pathfilename)
    image_ped_dict_rdd = image_ped_dict_df.rdd
    print("        Number of Partitions:", image_ped_dict_rdd.getNumPartitions())
    image_infr_list_rdd = image_ped_dict_rdd.mapPartitions(make_inference)
    image_infr_dict_rdd = image_infr_list_rdd.map(lambda x: Row(tid=x[0],
                                                                iid=x[1],
                                                                pred=x[2]))
    image_infr_dict_df = spark.createDataFrame(image_infr_dict_rdd)
    image_infr_dict_df.write.save(image_infr_orc_pathfilename, format="orc")

In [17]:
# Input file path
image_ped_orc_directory = "{0}data/images_ped.orc".format(LOCAL_PROJECT_URL)
image_ped_orc_pathfilename = "file:{0}".format(image_ped_orc_directory)
# Output file path
image_infr_orc_directory = "{0}data/images_infr.orc".format(LOCAL_PROJECT_URL)
image_infr_orc_pathfilename = "file:{0}".format(image_infr_orc_directory)
# Remove existing output directory
shutil.rmtree(image_infr_orc_directory, ignore_errors=True)
# Start making inference
convert_image_ped_orc_to_infr_orc(image_ped_orc_pathfilename, 
                                  image_infr_orc_pathfilename)

        Number of Partitions: 2


## Check Result

In [18]:
# Read inference result
image_infr_orc_directory = "{0}data/images_infr.orc".format(LOCAL_PROJECT_URL)
image_infr_orc_pathfilename = "file:{0}".format(image_infr_orc_directory)
image_infr_dict_df = spark.read.orc(image_infr_orc_pathfilename)

In [19]:
image_infr_dict_rdd = image_infr_dict_df.rdd
inference_lists = image_infr_dict_rdd.take(1)[0]

In [20]:
inference_lists[0]

'./data/images/jacket copy 0.png'

In [21]:
inference_lists[1][:100]

'gANjbnVtcHkuY29yZS5tdWx0aWFycmF5Cl9yZWNvbnN0cnVjdApxAGNudW1weQpuZGFycmF5CnEBSwCFcQJDAWJxA4dxBFJxBShL'

In [22]:
def convert_pbs_to_infr_tensor(pbs):
    return np.array(
        [list(pickle.loads(base64.b64decode(pbs.encode('UTF-8'))))])

In [23]:
tf.keras.applications.vgg16.decode_predictions(
    convert_pbs_to_infr_tensor(inference_lists[1]))

[[('n04370456', 'sweatshirt', 0.971396),
  ('n04599235', 'wool', 0.015504221),
  ('n03595614', 'jersey', 0.005360415),
  ('n02963159', 'cardigan', 0.0013214782),
  ('n03594734', 'jean', 0.0011332377)]]

Expected result:
```
[[('n04370456', 'sweatshirt', 0.971396),
  ('n04599235', 'wool', 0.015504221),
  ('n03595614', 'jersey', 0.005360415),
  ('n02963159', 'cardigan', 0.0013214782),
  ('n03594734', 'jean', 0.0011332377)]]
```

## Close Spark Context

In [24]:
sc.stop()

In [25]:
spark.stop()

## References

1. K Simonyan, A Zisserman. 2014. Very Deep Convolutional Networks for Large-Scale Image Recognition. CoRR, abs/1409.1556. http://arxiv.org/abs/1409.1556.