# Deploy a model in the cloud

### Context:
The company Fruits! would like to develop and deploy intelligent picking robots to:
   - Take a picture of one or more fruits
   
   - Recognize the concerned fruit with the help of a classification model 
    
    
 ### Objectives:
 
  - Develop a Big-Data environment allowing the scaling of the application (exponential volume, distributed calculations) 
  

  - Realize a first data processing chain with pre-processing and a dimension reduction step 

In [1]:
# Start spark execution by running this cell
sc

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1677747094314_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<SparkContext master=yarn appName=livy-session-0>

In [2]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1677747094314_0001,pyspark,idle,Link,Link,,✔


In [3]:
spark

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7fb444610190>

## 1.0 Load the libraries

In [6]:
# Load required libraries

# General
import pandas as pd
from PIL import Image
import PIL
import numpy as np
import io
import os
import sys

# Tensorflow / Keras
import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model

# Pyspark
import pyspark
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split, udf
from pyspark.sql.types import StringType
from pyspark.sql import SparkSession

# spots ML
from pyspark.ml.image import ImageSchema

# Dimension reduction - PCA
from pyspark.ml.feature import PCA
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors, VectorUDT, DenseVector

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
# check versions of the dependencies
print('Version of the libraries used :')
print('Python        : ' + sys.version)
print('Tensorflow    : ' + tf.__version__)
print('Pyspark       : ' + pyspark.__version__)
print('PIL           : ' + PIL.__version__)
print('Numpy         : ' + np.__version__)
print('Pandas        : ' + pd.__version__)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Version of the libraries used :
Python        : 3.7.16 (default, Dec 15 2022, 23:24:54) 
[GCC 7.3.1 20180712 (Red Hat 7.3.1-15)]
Tensorflow    : 2.4.1
Pyspark       : 3.2.1+amzn.0.dev0
PIL           : 9.4.0
Numpy         : 1.20.0
Pandas        : 1.3.5

## 2.0 Create data path

In [7]:
# create result parts
PATH = 's3://datasetsp8'
PATH_Data = PATH+'/Test'
PATH_Result = PATH+'/Results1'
PATH_Result_PCA = PATH+'/pca_results'
print('PATH:        '+PATH+'\nPATH_Data:   '+PATH_Data+'\nPATH_Result: '+PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://datasetsp8
PATH_Data:   s3://datasetsp8/Test
PATH_Result: s3://datasetsp8/Results1

## 3.0 Data processing

### 3.1 Data loading

In [8]:
images = spark.read.format("binaryFile") \
    .option("pathGlobFilter", "*.jpg") \
    .option("recursiveFileLookup", "true") \
    .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
# show top 5 observations
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://datasetsp8/T...|2023-03-01 15:45:48|125135|[FF D8 FF E0 00 1...|
|s3://datasetsp8/T...|2023-03-01 15:45:48|124785|[FF D8 FF E0 00 1...|
|s3://datasetsp8/T...|2023-03-01 15:45:48|123514|[FF D8 FF E0 00 1...|
|s3://datasetsp8/T...|2023-03-01 15:45:49|122958|[FF D8 FF E0 00 1...|
|s3://datasetsp8/T...|2023-03-01 15:45:48|122807|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

In [10]:
# Display the first 5 images containing: path | date-time | length | content encoded in hexadecimal
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+-------------------------------------------+-----------+
|path                                       |label      |
+-------------------------------------------+-----------+
|s3://datasetsp8/Test/apple_hit_1/r0_115.jpg|apple_hit_1|
|s3://datasetsp8/Test/apple_hit_1/r0_119.jpg|apple_hit_1|
|s3://datasetsp8/Test/apple_hit_1/r0_107.jpg|apple_hit_1|
|s3://datasetsp8/Test/apple_hit_1/r0_143.jpg|apple_hit_1|
|s3://datasetsp8/Test/apple_hit_1/r0_111.jpg|apple_hit_1|
+-------------------------------------------+-----------+
only showing top 5 rows

None

## 4.0 Transfer learning

In [11]:
model = MobileNetV2(weights='imagenet', include_top=True, input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5

In [12]:
model_new = Model(inputs=model.input, outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
model_new.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
Conv1 (Conv2D)                  (None, 112, 112, 32) 864         input_1[0][0]                    
__________________________________________________________________________________________________
bn_Conv1 (BatchNormalization)   (None, 112, 112, 32) 128         Conv1[0][0]                      
__________________________________________________________________________________________________
Conv1_relu (ReLU)               (None, 112, 112, 32) 0           bn_Conv1[0][0]                   
______________________________________________________________________________________________

In [14]:
# Allow Spark workers access to the model's weights 
brodcast_weights = sc.broadcast(model_new.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 4.1 Definition of the image loading process and application of their featurization through the use of UDF pandas

In [15]:
def new_model_fun():
    """
    Returns the MobileNetV2 model with its last layer removed and the pre-entrained weights on imagenet broadcast.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

def preprocess_image(image):
    """
    Function to change the dimension of the image
    """
    img = Image.open(io.BytesIO(image)).resize([224, 224])
    ird = img_to_array(img)
    return preprocess_input(ird)


def series_featurization(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess_image))
    
    # make predictions
    predictions = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in predictions]
    return pd.Series(output)


@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def udf_featurization(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, 
    where each batch is a pandas Series of image data.
    '''
   
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = new_model_fun()
    for content_series in content_series_iter:
        yield series_featurization(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



In [16]:
# Featurization on the Spark DataFrame.

df_features = images.repartition(24).select(col("path"), col("label"),
                        udf_featurization("content").alias("features"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
print(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://datasetsp8/Results1

In [18]:
# show some rows in the df_features
df_features.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+--------------------+
|                path|               label|            features|
+--------------------+--------------------+--------------------+
|s3://datasetsp8/T...|         apple_hit_1|[0.18946813, 0.03...|
|s3://datasetsp8/T...|         apple_hit_1|[0.06689602, 1.05...|
|s3://datasetsp8/T...|         apple_hit_1|[0.06757998, 0.35...|
|s3://datasetsp8/T...|         apple_hit_1|[0.76107204, 0.01...|
|s3://datasetsp8/T...|     cabbage_white_1|[0.0, 0.3348133, ...|
|s3://datasetsp8/T...|         apple_hit_1|[0.5018424, 0.233...|
|s3://datasetsp8/T...|         apple_hit_1|[0.15540709, 0.13...|
|s3://datasetsp8/T...|         apple_hit_1|[0.052988023, 0.3...|
|s3://datasetsp8/T...|         apple_hit_1|[0.9340065, 0.076...|
|s3://datasetsp8/T...|     cabbage_white_1|[0.0, 0.43177053,...|
|s3://datasetsp8/T...|              pear_3|[0.46841124, 0.12...|
|s3://datasetsp8/T...|              pear_3|[0.54661757, 0.21...|
|s3://datasetsp8/T...|   

In [19]:
df_features.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

25722

In [20]:
# save the featurized results
df_features.write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Dimension Reduction - PCA 

In [23]:
def pca_preprocessing(dataframe):
    '''
     vectorisation - conversion of image data to dense vector and standardized scaled values
    '''

    # vectorisation - conversion of image data to dense vector
    vector_transform = udf(lambda r: Vectors.dense(r), VectorUDT())
    dataframe = dataframe.withColumn('features_vec', vector_transform('features'))

    # standardization of the vectorized features
    scaler = StandardScaler(inputCol="features_vec", outputCol="scaledFeatures", withStd=True, withMean=True)
    model_scaler = scaler.fit(dataframe)
    df_scaled = model_scaler.transform(dataframe)

    return df_scaled 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

***Vectorization and standardization of the featurized dataframe***

In [24]:
pca_features = pca_preprocessing(df_features)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
pca_features.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+---------------+--------------------+--------------------+--------------------+
|                path|          label|            features|        features_vec|      scaledFeatures|
+--------------------+---------------+--------------------+--------------------+--------------------+
|s3://datasetsp8/T...|    apple_hit_1|[0.18946813, 0.03...|[0.18946813046932...|[-0.4065063206989...|
|s3://datasetsp8/T...|    apple_hit_1|[0.06689602, 1.05...|[0.06689602136611...|[-0.6386809222005...|
|s3://datasetsp8/T...|    apple_hit_1|[0.06757998, 0.35...|[0.06757997721433...|[-0.6373853813334...|
|s3://datasetsp8/T...|    apple_hit_1|[0.76107204, 0.01...|[0.76107203960418...|[0.67621886687546...|
|s3://datasetsp8/T...|cabbage_white_1|[0.0, 0.3348133, ...|[0.0,0.3348132967...|[-0.7653945540076...|
+--------------------+---------------+--------------------+--------------------+--------------------+
only showing top 5 rows

***Get the optimal number of PCA components***

In [27]:
# get the optimal components at 95% variance
def n_components(df, opt_k =500):
    '''returns the optimum number of components that explain at least 95% of the cumulative pca variance'''
    
    pca = PCA(k = opt_k, inputCol="scaledFeatures", outputCol="pcaFeatures")
    pc_model = pca.fit(df)
    variance = pc_model.explainedVariance
        
    # get optimal number of PCA components at 95% cumulative variance
    
    def opt_k ():
        for i in range(500):
            a = variance.cumsum()[i]
            if a >= 0.95:
                print("{} principal components explains 95% total variance".format(i))
                break
        
        return i
    
    k = opt_k()
    
    return k

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
components_opt = n_components(pca_features)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
# fit the model
pca_model = PCA(k=components_opt, inputCol= "scaledFeatures", outputCol='pca_vectors')
scaler_model = pca_model.fit(pca_features)

# Transform the data
data_reduced = scaler_model.transform(pca_features)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
# display the reduced dataframe
# Visualisation du dataframe réduit
data_reduced.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                path|               label|            features|        features_vec|      scaledFeatures|         pca_vectors|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|s3://datasetsp8/T...|         apple_hit_1|[0.18946813, 0.03...|[0.18946813046932...|[-0.4065063206989...|[-10.729452774796...|
|s3://datasetsp8/T...|         apple_hit_1|[0.06689602, 1.05...|[0.06689602136611...|[-0.6386809222005...|[-8.6950177584278...|
|s3://datasetsp8/T...|         apple_hit_1|[0.06757998, 0.35...|[0.06757997721433...|[-0.6373853813334...|[-2.1701611848710...|
|s3://datasetsp8/T...|         apple_hit_1|[0.76107204, 0.01...|[0.76107203960418...|[0.67621886687546...|[1.16963547295904...|
|s3://datasetsp8/T...|     cabbage_white_1|[0.0, 0.3348133, ...|[0.0,0.3348132967...|[-0.7653945540076..

In [31]:
# save the reduced data
data_reduced.write.mode("overwrite").parquet(PATH_Result_PCA)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…