# Project 8: Deploy a model with a big data architecture in AWS

*Pierre-Eloi Ragetly*

This notebook has been realised to perform a dimension reduction on an image dataset with Pyspark

<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Setup" data-toc-modified-id="Setup-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Setup</a></span></li><li><span><a href="#Load-images" data-toc-modified-id="Load-images-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Load images</a></span></li><li><span><a href="#Create-a-label-column" data-toc-modified-id="Create-a-label-column-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Create a label column</a></span></li><li><span><a href="#Feature-extraction" data-toc-modified-id="Feature-extraction-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>Feature extraction</a></span><ul class="toc-item"><li><span><a href="#Prepare-model" data-toc-modified-id="Prepare-model-4.1"><span class="toc-item-num">4.1&nbsp;&nbsp;</span>Prepare model</a></span></li><li><span><a href="#Prepare-data" data-toc-modified-id="Prepare-data-4.2"><span class="toc-item-num">4.2&nbsp;&nbsp;</span>Prepare data</a></span></li><li><span><a href="#Define-featurization-in-a-Pandas-UDF" data-toc-modified-id="Define-featurization-in-a-Pandas-UDF-4.3"><span class="toc-item-num">4.3&nbsp;&nbsp;</span>Define featurization in a Pandas UDF</a></span></li><li><span><a href="#Apply-featurization-to-the-DataFrame-of-images" data-toc-modified-id="Apply-featurization-to-the-DataFrame-of-images-4.4"><span class="toc-item-num">4.4&nbsp;&nbsp;</span>Apply featurization to the DataFrame of images</a></span></li></ul></li><li><span><a href="#Add-a-dimensionality-reduction-step" data-toc-modified-id="Add-a-dimensionality-reduction-step-5"><span class="toc-item-num">5&nbsp;&nbsp;</span>Add a dimensionality reduction step</a></span><ul class="toc-item"><li><span><a href="#Sparse-random-projection" data-toc-modified-id="Sparse-random-projection-5.1"><span class="toc-item-num">5.1&nbsp;&nbsp;</span>Sparse random projection</a></span></li></ul></li><li><span><a href="#Save-results" data-toc-modified-id="Save-results-6"><span class="toc-item-num">6&nbsp;&nbsp;</span>Save results</a></span></li><li><span><a href="#Load-parquet-data" data-toc-modified-id="Load-parquet-data-7"><span class="toc-item-num">7&nbsp;&nbsp;</span>Load parquet data</a></span></li></ul></div>

## Setup

First, let's import modules.

In [1]:
# Standard libraries
import os
import io
from typing import Iterator

# Import numpy and pandas for data manipulation
import numpy as np
import pandas as pd

# image preprocessing
from PIL import Image, ImageOps
from tensorflow.keras.preprocessing.image import img_to_array

# Import deep learning models with tensorflow
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input

# Import pyspark library
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract, pandas_udf, udf
from pyspark.sql.types import ArrayType, FloatType
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import StandardScaler, PCA
from pyspark.ml import Pipeline

2023-01-28 12:49:11.693166: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


Al dataset folder names must be renamed to avoid loading issues with pyspark, all spaces (' ') will be replaced by '_'.

In [2]:
def rename_folders(path):
    """
    Change all spaces (' ') by '_' in directory names
    to avoid loading issues with pyspark.
    """
    subfolders = [d.path for d in os.scandir(path) if d.is_dir()]
    for d in subfolders:
        os.rename(d, d.replace(' ', '_'))

In [3]:
path = os.getcwd()
local_path = os.path.join(path, 'dataset/local_test')
data_path = os.path.join(path, 'dataset/Test')

rename_folders(local_path)
rename_folders(data_path)

To finish this setup, let's create a spark session. We will specify:
1. an application name
2. the app will be executed locally
3. a config option enabling to use the parquet format to save results.
4. get an existing session or create one if not

We will also create a SparkContext from the spark variable.

In [4]:
spark = (SparkSession.builder
                     .appName('DS_P8')
                     .master('local')
                     .config("spark.sql.parquet.writeLegacyFormat", ('true'))
                     .getOrCreate()
        )

sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/28 12:49:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
spark

## Load images

Since Spark 2.4, reading image in compressed formats (jpg, png, etc...) is possible with `spark.read.format('image').load('path')`.  
The image is read with the ImageIO *Java Library*, and has a special DataFrame schema. The schema contains a StructType Column "Image" with all information about reading data.

However, data manipulation is much easier by using **binaryFile** format. Instead of creating a unique column *image* including six subcolums,it creates four columns that contain the raw content and metadata of the file:
- path: `StringType` *image file path* 
- modificationTime: `TimestampType` *last modification time of the image*
- lenth: `IntegerType` *bytes number of the image*
- content: `BinaryType` *image bytes in OpenCV-compatible order (BGR)*

The latter will be chosen to avoid multi-index.

In [6]:
def load_img(path):
    """
    Load all .jpg images saved in a directory to a binary Spark DataFrame.
    """
    images = spark.read.format("binaryFile") \
        .option("pathGlobFilter", "*.jpg") \
        .option("recursiveFileLookup", "true") \
        .load(path)
    return images

In [7]:
df_img = load_img(local_path)

In [8]:
df_img.printSchema()

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)



In [9]:
df_img.rdd.getNumPartitions()

31

In [10]:
n_img = df_img.count()

## Create a label column

Here, the label will be the fruit category.  
In the dataset we have sometimes different subfolders for the same category (eg. Cherry_1 and Cherry_2), to avoid getting different labels for the same category we will use a regular expression. This one will automatically delete the *_number* at the end if any.

In [11]:
regex = r'(.*)/(.*[a-zA-Z])(.*)/'
df_img = df_img.withColumn('label', regexp_extract('path', regex, 2))

## Feature extraction

### Prepare model

Let's extract image features with transfert learning.

We will choose the **MobileNetV2** model. The MobileNetV2 architecture uses [mobile inverted bottleneck convolution (MBConv)](https://towardsdatascience.com/mobilenetv2-inverted-residuals-and-linear-bottlenecks-8a4362f4ffd5), which enables to deliver high accuracy while keeping the parameters and mathematical operations as low as possible. Residual blocks connect the beginning and end of a convolutional block with a skip connection.

MobileNetV2 follows a narrow->wide->narrow approach, which is the inversion of the original residual Block (explaining the "inverted" word). The first step widens the network by a factor 6 using a 1x1 convolution, the following 3x3 depthwise convolution greatly reduces the number of parameters. Afterwards another 1x1 convolution squeezes the network to match the initial number of channels.

In [12]:
model = MobileNetV2(weights='imagenet',
                       include_top=False,
                       input_shape=(224, 224, 3))
model.summary()

2023-01-28 12:49:27.624368: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


Model: "mobilenetv2_1.00_224"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 Conv1 (Conv2D)                 (None, 112, 112, 32  864         ['input_1[0][0]']                
                                )                                                                 
                                                                                                  
 bn_Conv1 (BatchNormalization)  (None, 112, 112, 32  128         ['Conv1[0][0]']                  
                                )                                              

 block_3_expand_relu (ReLU)     (None, 56, 56, 144)  0           ['block_3_expand_BN[0][0]']      
                                                                                                  
 block_3_pad (ZeroPadding2D)    (None, 57, 57, 144)  0           ['block_3_expand_relu[0][0]']    
                                                                                                  
 block_3_depthwise (DepthwiseCo  (None, 28, 28, 144)  1296       ['block_3_pad[0][0]']            
 nv2D)                                                                                            
                                                                                                  
 block_3_depthwise_BN (BatchNor  (None, 28, 28, 144)  576        ['block_3_depthwise[0][0]']      
 malization)                                                                                      
                                                                                                  
 block_3_d

 lization)                                                                                        
                                                                                                  
 block_7_expand (Conv2D)        (None, 14, 14, 384)  24576       ['block_6_project_BN[0][0]']     
                                                                                                  
 block_7_expand_BN (BatchNormal  (None, 14, 14, 384)  1536       ['block_7_expand[0][0]']         
 ization)                                                                                         
                                                                                                  
 block_7_expand_relu (ReLU)     (None, 14, 14, 384)  0           ['block_7_expand_BN[0][0]']      
                                                                                                  
 block_7_depthwise (DepthwiseCo  (None, 14, 14, 384)  3456       ['block_7_expand_relu[0][0]']    
 nv2D)    

                                                                                                  
 block_10_depthwise_relu (ReLU)  (None, 14, 14, 384)  0          ['block_10_depthwise_BN[0][0]']  
                                                                                                  
 block_10_project (Conv2D)      (None, 14, 14, 96)   36864       ['block_10_depthwise_relu[0][0]']
                                                                                                  
 block_10_project_BN (BatchNorm  (None, 14, 14, 96)  384         ['block_10_project[0][0]']       
 alization)                                                                                       
                                                                                                  
 block_11_expand (Conv2D)       (None, 14, 14, 576)  55296       ['block_10_project_BN[0][0]']    
                                                                                                  
 block_11_

 block_14_depthwise (DepthwiseC  (None, 7, 7, 960)   8640        ['block_14_expand_relu[0][0]']   
 onv2D)                                                                                           
                                                                                                  
 block_14_depthwise_BN (BatchNo  (None, 7, 7, 960)   3840        ['block_14_depthwise[0][0]']     
 rmalization)                                                                                     
                                                                                                  
 block_14_depthwise_relu (ReLU)  (None, 7, 7, 960)   0           ['block_14_depthwise_BN[0][0]']  
                                                                                                  
 block_14_project (Conv2D)      (None, 7, 7, 160)    153600      ['block_14_depthwise_relu[0][0]']
                                                                                                  
 block_14_

As shown above, the output dimension will be (7, 7, 1280), meaning **62'720** features.

The model weights will be saved in **Broadcast variables** to reduce communication costs. 
Broadcast variables are read-only shared variables, they are cached and available on all nodes in a cluster to be accessed or used by the tasks. Instead sending this data along with every task, spark distributes broadcast variables to the machine using efficient broadcast algorithms.

In [13]:
bc_model_weights = sc.broadcast(model.get_weights())

In [14]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights=None,
                        include_top=False,
                        input_shape=(224, 224, 3))
    model.set_weights(bc_model_weights.value)
    return model

### Prepare data

As other CNN models (ResNet50, InceptionV3, VGG16,etc...), the MobileNetV2 model expects (224, 224, 3) input images. All images of the dataset have a (100, 100, 3) dimension and so, must be reshaped before using the model. We will also add MobileNetV2 specific preprocessing steps.

In [15]:
def preprocess_img(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

### Define featurization in a Pandas UDF

In [16]:
def featurize_series(model, series):
    """
    Featurize a pd.Series of raw images using the MobileNetV2 model.
    For some layers, output features will be multi-dimensional tensors.
    Feature tensors are flattened to vectors for easier storage in Spark DataFrames.
    -----------    
    Return: a pd.Series of image features
    """
    X = np.stack(series.map(preprocess_img))
    preds = model.predict(X)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

We will use an iterator UDF, this pandas UDF is useful when the UDF execution requires initializing some state, as loading a machine learning model to apply inference to every input batch, which is the case here.

As explained in the [spark documentation](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.pandas_udf.html), it is preferred to specify type hints for the pandas UDF instead of specifying pandas UDF type via *functionType* which will be deprecated in the future releases. We will specify the Python type hint as `Iterator[pandas.Series] -> Iterator[pandas.Series]`.  

In [17]:
@pandas_udf('array<float>')
def featurize_udf(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    """This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column
    of type ArrayType(FloatType).
    With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    for multiple data batches.
    This amortizes the overhead of loading big models.
    """
    model = model_fn()
    for s in batch_iter:
        yield featurize_series(model, s)

### Apply featurization to the DataFrame of images

In [18]:
df_features = df_img.select('label',
                            featurize_udf('content').alias('features')
                           )

## Add a dimensionality reduction step

### Sparse random projection

**Principal Component Analysis** (PCA) is by far the most popular dimensionality reduction algorithm.  
However, it may fail when  the number of features is high &ndash; which is the case here. The reason is the covariance matrix itself will not fit into memory. For instance, with 7 x 7 x 1280 = **62'720 features**, it results a 62'720 x 62'720 x 8 ~ **31.5Gb** covariance matrix! So we need a other technique to reduce dimensions.

For doing it, we will use the **Sparse random projection**. In short, if you have dataset X of size n x m, you can multiply it by some sparse random matrix R of size m x k (with k << m) and obtain new matrix X' of a much smaller size n x k. To get R, only the following values parameters are required:
- the number of features $m$ (automatically calculated when using the fit( ) method)
- the number of components $k$ (we will use the **Johnson-Lindenstrauss lemma**)

Sparse random matrices are an alternative to dense Gaussian random projection matrix that guarantees similar embedding quality while being much more memory efficient and allowing faster computation of the projected data

In [19]:
import scipy
from sklearn.random_projection import johnson_lindenstrauss_min_dim
from sklearn.random_projection import SparseRandomProjection

n_features = len(df_features.first()['features'])
k = johnson_lindenstrauss_min_dim(n_features, eps=0.1)
# Create a sparse dummy array to fit the sparse random projection
# It enables to broadcast the fitted sparse random projection
dummy_X = scipy.sparse.csr_matrix((n_img, n_features), dtype=np.float32)
srp = SparseRandomProjection(n_components=k, random_state=42)
srp.fit(dummy_X)
bc_srp = sc.broadcast(srp)

2023-01-28 12:49:31.317987: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-01-28 12:49:34.516162: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
                                                                                

In [20]:
def dim_red_series(model, series):
    """
    Reduce the dimension of a pd.Series of features using Sparse Random Projection.
    -----------    
    Return: a pd.Series of image features
    """
    X = np.stack(series)
    X_tr = model.transform(X)
    return pd.Series(X_tr.tolist())

In [21]:
@pandas_udf('array<float>')
def reduce_udf(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    """This method is a Scalar Iterator pandas UDF
    wrapping our dimensionality reduction function.
    The decorator specifies that this returns a Spark DataFrame column
    of type ArrayType(FloatType).
    With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    for multiple data batches.
    This amortizes the overhead of loading big models.
    """
    model = bc_srp.value
    for s in batch_iter:
        yield dim_red_series(model, s)

In [22]:
result = df_features.select('label',
                            reduce_udf('features').alias('srp_features')
                           )

len(result.first()['srp_features'])

2023-01-28 12:49:44.932550: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
2023-01-28 12:49:48.326499: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
                                                                                

9468

## Save results

Results will be saved using the parquet format for performance purpose.

In [23]:
result_path = os.path.join(path, 'dataset/results/data_parquet')

In [24]:
result.repartition(1).write.mode("overwrite").parquet(result_path)

2023-01-28 12:49:51.421312: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
2023-01-28 12:49:54.666673: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
                                                                                

## Load parquet data

In [25]:
df = pd.read_parquet(result_path, engine='pyarrow')
df.head(5)

Unnamed: 0,label,srp_features
0,Apple_Golden,"[-3.2093186, 1.396389, 9.263401, -0.4009514, -..."
1,Apple_Golden,"[-3.871591, 0.19321874, 7.828599, -0.7790657, ..."
2,Apple_Golden,"[-1.8690182, 1.2320036, 8.495892, -0.560804, -..."
3,Apple_Golden,"[-2.9468348, 0.66861093, 7.8678336, -0.5279124..."
4,Apple_Golden,"[-2.7231698, 1.4083207, 8.18139, 0.07034999, -..."
