**Table of contents**<a id='toc0_'></a>    
- 1. [Launch a Spark session](#toc1_)    
- 2. [Libraries imports](#toc2_)    
- 3. [Define paths to the S3 bucket](#toc3_)    
- 4. [Load Images](#toc4_)    
- 5. [Build model for feature extraction and broadcast its weights.](#toc5_)    
- 6. [Functions](#toc6_)    
- 7. [Loading the results](#toc7_)    

<!-- vscode-jupyter-toc-config
	numbering=true
	anchor=true
	flat=false
	minLevel=1
	maxLevel=6
	/vscode-jupyter-toc-config -->
<!-- THIS CELL WILL BE REPLACED ON TOC UPDATE. DO NOT WRITE YOUR TEXT IN THIS CELL -->

# 1.  [&#9650;](#toc0_) <a id='toc1_'></a>Launch a Spark session

This notebook is designed to be launched via a spark kernel inside an EMR cluster.

If so, the next cell automatically launches a Spark Session and returns its major attributes.

In [None]:
%%info

# 2.  [&#9650;](#toc0_) <a id='toc2_'></a>Libraries imports

In [5]:
import pandas as pd
import numpy as np
import os
from PIL import Image
import io

import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model

from pyspark.sql.functions import col, element_at, split
from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
from pyspark.sql.types import *
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import StandardScaler, PCA
from pyspark.ml import Pipeline

# 3.  [&#9650;](#toc0_) <a id='toc3_'></a>Define paths to the S3 bucket

In [6]:
PATH = 's3://oc-cloud-computing'
IMAGE_SUBSET_PATH = os.path.join(PATH, "images_subset")
RESULT_PATH = os.path.join(PATH, 'results')

print('PATH: '+ PATH \
      +'\nIMAGE_SUBSET_PATH: '+  IMAGE_SUBSET_PATH \
      +'\nRESULT_PATH: '+RESULT_PATH)

PATH: s3://oc-cloud-computing
IMAGE_SUBSET_PATH: s3://oc-cloud-computing/images_subset
RESULT_PATH: s3://oc-cloud-computing/results


# 4.  [&#9650;](#toc0_) <a id='toc4_'></a>Load Images

In [7]:
images = spark.read.format("binaryFile") \
    .option("pathGlobFilter", "*.jpg") \
    .option("recursiveFileLookup", "true") \
    .load(IMAGE_SUBSET_PATH)
    
# Add a label column from the image path
# and select the path and label only.
images = (
    images
    .withColumn('label', element_at(split(images['path'], '/'),-2))
    .select(
        col("path"),
        col("label"),
    )
)

NameError: name 'spark' is not defined

In [None]:
# Inspect path and label
print(images.show(10,False))

+------------------------------------------------------------------------------------------------+------------------+
|path                                                                                            |label             |
+------------------------------------------------------------------------------------------------+------------------+
|file:/home/louberehc/OCR/projets/8_cloud_computing/images_subset/Huckleberry/r_162_100.jpg      |Huckleberry       |
|file:/home/louberehc/OCR/projets/8_cloud_computing/images_subset/Huckleberry/r_138_100.jpg      |Huckleberry       |
|file:/home/louberehc/OCR/projets/8_cloud_computing/images_subset/Huckleberry/r_136_100.jpg      |Huckleberry       |
|file:/home/louberehc/OCR/projets/8_cloud_computing/images_subset/Huckleberry/r_311_100.jpg      |Huckleberry       |
|file:/home/louberehc/OCR/projets/8_cloud_computing/images_subset/Huckleberry/r_322_100.jpg      |Huckleberry       |
|file:/home/louberehc/OCR/projets/8_cloud_computing/imag

# 5.  [&#9650;](#toc0_) <a id='toc5_'></a>Build model for feature extraction and broadcast its weights.

In [None]:
# DL or load mobilenetV2
model = MobileNetV2(
    weights='imagenet',
    include_top=True,
    input_shape=(224, 224, 3)
)

2023-07-06 14:45:36.916945: W tensorflow/core/common_runtime/gpu/gpu_device.cc:1956] Cannot dlopen some GPU libraries. Please make sure the missing libraries mentioned above are installed properly if you would like to use GPU. Follow the guide at https://www.tensorflow.org/install/gpu for how to download and setup the required libraries for your platform.
Skipping registering GPU devices...


In [None]:
# Create a model without the last layer
new_model = Model(
    inputs=model.input,
    outputs=model.layers[-2].output
)

In [None]:
new_model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 Conv1 (Conv2D)                 (None, 112, 112, 32  864         ['input_1[0][0]']                
                                )                                                                 
                                                                                                  
 bn_Conv1 (BatchNormalization)  (None, 112, 112, 32  128         ['Conv1[0][0]']                  
                                )                                                             

In [None]:
# Make a broadcast object to broadcast the model weights to
# each worker.
broadcast_weights = sc.broadcast(new_model.get_weights())

In [None]:
broadcast_weights

<pyspark.broadcast.Broadcast at 0x7f06cf8c75e0>

In [None]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

# 6.  [&#9650;](#toc0_) <a id='toc6_'></a>Functions

In [None]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(
        weights='imagenet',
        include_top=True,
        input_shape=(224, 224, 3),
    )
    
    for layer in model.layers:
        layer.trainable = False
        
    new_model = Model(
        inputs=model.input,
        outputs=model.layers[-2].output,
    )
    new_model.set_weights(broadcast_weights.value)
    return new_model


def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)


@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).
    
    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                                is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

In [None]:
# Build a DF with the path, label and features of each image
features_df = (
    images
    .repartition(16)
    .select(
        col("path"),
        col("label"),
        featurize_udf("content").alias("features")
   )
)

#MLLib needs some post processing of the features column format
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())

features_df = (
    features_df
    .select(
        col("path"),  
        col("label"),
        list_to_vector_udf(features_df["features"]).alias("features")
   )
)

# Define a pipeline to Standardize the features
# and compute the PCA projection onto the 300 first PCs.
scaler = StandardScaler(
    inputCol="features",
    outputCol="scaled_features",
    withStd=True,
    withMean=True
)

pca = PCA(
    k=300,
    inputCol=scaler.getOutputCol(),
    outputCol="pca_features",
)

pipeline = Pipeline(stages=[scaler , pca])
model = pipeline.fit(features_df)
features_df = model.transform(features_df)

# Write results
(
    features_df
    .drop('scaled_features')
    .write.mode("overwrite")
    .parquet(RESULT_PATH)
)

I am not going for optimization right now, but it is not fast... It took 2 minutes to compute the features and its PCA projection for 50 images.

# 7.  [&#9650;](#toc0_) <a id='toc7_'></a>Loading the results

In [None]:
df = pd.read_parquet(RESULT_PATH, engine='pyarrow')
df.shape

(50, 4)

In [None]:
df.columns

Index(['path', 'label', 'features', 'pca_features'], dtype='object')

In [None]:
df.loc[0, 'features']["values"].shape

(1280,)

In [None]:
df.loc[0, 'pca_features']["values"].shape

(300,)

In [None]:
df.loc[0, 'pca_features']["values"]

array([ 3.98710174e+00,  1.10383853e+01,  2.38947932e+01, -7.69875881e+00,
        2.37189645e+00, -4.17323052e+00, -8.75384999e-01,  7.58112684e+00,
        8.46811258e-01, -1.63814549e-01, -1.94542496e+00, -1.54144326e-01,
       -2.80772266e+00, -3.84350279e+00, -3.65789436e-01,  4.48795007e-03,
       -5.93215632e-01,  7.51930420e-01, -1.21083240e+00,  3.10585734e-01,
       -9.34863532e-01, -8.04337676e-01, -1.59773086e+00, -1.16569854e+01,
        4.49314885e+00,  3.54640000e+00,  6.11986943e+00, -1.55038903e+00,
       -1.06345044e+01,  1.89692596e+00,  6.92818642e+00, -6.49194137e-02,
       -8.11849837e-01, -3.15826681e-01,  3.81113795e+00,  9.31022245e-01,
       -1.36885986e+00, -6.04712899e-01, -2.13370353e-01, -2.28626013e+00,
        6.87706819e-01,  1.70728531e-01, -1.34922969e+00,  5.75543040e-01,
        1.64755088e-01, -1.87366122e-01,  3.14558154e-01,  4.69869065e-01,
       -1.30640922e-01, -8.37625319e-07, -9.38967464e-07,  1.56347147e-07,
       -4.47211194e-07, -

Results have the right dimension.

In [None]:
sc.stop()