## Deploy a model using cloud service

### Summary

**1. Introduction**<br /><br />

**2. Technical solutions chosen**<br /><br />

**3. Deployment of local solution**<br /><br />

**4. Deployment of cloud solution**<br /><br />

**5. Conclusion**<br /><br />



### 1. Introduction

#### 1.1. Problem

The very young AgriTech start-up, named "**Fruits**!", seeks to offer innovative solutions for fruit harvesting.

The company's desire is to preserve fruit biodiversity by allowing specific treatments for each species of fruit by developing intelligent picking robots.

The start-up initially wants to make itself known by making a mobile application available to the general public that would allow users to take a photo of a fruit and obtain information about this fruit.

For the start-up, this application would raise awareness among the general public about fruit biodiversity and set up a first version of the fruit image classification engine.

In addition, the development of the mobile application will make it possible to build a first version of the necessary **Big Data** architecture.

#### 1.2. Objectives

1. Develop a first data processing chain which will include **preprocessing** and a **dimension reduction** step.
2. Take into account that <u>the volume of data will increase very quickly</u> after delivery of this project, which involves:
  - Deploy data processing in a **Big Data** environment
  - Develop scripts in **pyspark** to perform **distributed computing**

#### 1.3. Project developement steps

The project will be carried out in two stages, in two different environments. We will first develop and execute our code locally, working on a limited number of images to process.

Once the technical choices have been validated, we will deploy our solution in a Big Data environment in distributed mode.

<u>For this reason, this project will be divided into 3 parts</u>:
1. List of general technical choices retained
2. Deployment of the solution locally
3. Deployment of the solution in the cloud

### 2. Technical solutions chosen

#### 2.1. Distributed computing 

The project brief requires us to develop scripts in **pyspark** in order to <u>take into account the very rapid increase in the volume of data after delivery of the project</u>.

To quickly and simply understand what **pyspark** is and its operating principle, we advise you to read this article: [PySpark: Everything you need to know about the Python library](https://datascientest.com/pyspark)

<u>The beginning of the article tells us this </u>:
“*When we talk about database processing in Python, we immediately think of the pandas library. However, when dealing with databases that are too massive, calculations become too slow. Fortunately, there is another python library, quite close to pandas, which allows you to process very large quantities of data: PySpark. Apache Spark is an open-source framework developed by the AMPLab at UC Berkeley for processing massive databases using distributed computing, a technique which consists of exploiting several computing units distributed in clusters for the benefit of a single project in order to to divide the execution time of a query. Spark was developed in Scala and is at its best in its native language. However, the PySpark library offers to use it with the Python language, keeping performance similar to Scala implementations. Pyspark is therefore a good alternative to the pandas library when you want to process too large data sets which result in
too time-consuming calculations.* »

As we see, **pySpark** is a way to communicate with **Spark** through the **Python** language. **Spark**, on the other hand, is a tool that allows you to manage and coordinate the execution of tasks on data across a group of computers. <u>Spark (or Apache Spark) is an open source distributed in-memory computing framework for processing and analyzing massive data</u>.

Another [very interesting and much more complete article to understand how Spark works**](https://www.veonum.com/apache-spark-pour-les-nuls/), as well as the role of * *Spark Session** which we will use in this project.

<u>Here is also an excerpt</u>:

*Spark applications consist of a driver process and several worker processes. It can be configured to be the worker itself (local mode) or use as many as necessary to process the application, with Spark supporting automatic scaling by configuring a minimum and maximum number of workers.*

![Spark schema](img/spark-schema.png)

The driver (sometimes called “Spark Session”) distributes and schedules <br />
tasks between the different workers that execute them and enable distributed processing. He is responsible for executing the code on the different machines.

*Each worker is a separate Java Virtual Machine (JVM) process whose number of CPUs and amount of memory allocated to it can be configured. Only one task can process one data split at a time.*

In both environments (Local and Cloud) we will therefore use **Spark** and we will exploit it through python scripts using **PySpark**.

In the <u>local version</u> of our script we **simulate distributed computing** in order to validate that our solution works. In the <u>cloud version</u> we **perform operations on a machine cluster**.

#### 2.2. Transfer learning

The project statement also asks us to carry out a first data processing chain which will include preprocessing and a dimension reduction step.

It is also clarified that there is no need to train a model at this time.

We decide to go with a **transfer learning** solution.

Simply, **transfer learning** consists of using the knowledge already acquired by a trained model (here **MobileNetV2**) to adapt it to our problem.

We will provide the model with our images, and we will <u>recover the penultimate layer</u> of the model. Indeed the last model layer is a softmax layer which allows the classification of images which we do not want in this project.

The penultimate layer corresponds to a **reduced vector** of dimension (1,1,1280).

This will make it possible to create a first version of the engine for classifying fruit images.

**MobileNetV2** was chosen for its <u>speed of execution</u>, particularly suitable for processing a large volume of data as well as the <u>low dimensionality of the output characteristic vector</u> (1,1,1280)

### 3. Solution deployment in local environment 

#### 3.1. Working environment 

No need to install a virtual machine, because we are using MacOS.

#### 3.2. Install Spark 

Before installing Spark for MacOS, it is necessary to [install Java](https://www.youtube.com/watch?v=n15tOYj6d4E). 

Once Java is installed, proceed to Spark installation.

In [1]:
pip install spark


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.2[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


#### 3.3. Install other packages

In [2]:
pip install pandas pillow tensorflow pyspark pyarrow findspark


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.2[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


#### 3.4. Import libraries

In [3]:
# General 
import io 
import os

# Data handling
import numpy    as np
import pandas   as pd

# Image processing 
import tensorflow       as tf
from PIL                import Image
from tensorflow.keras   import Model
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image       import img_to_array

# Big data 
from pyspark.sql            import SparkSession
from pyspark.sql.functions  import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.ml.feature     import PCA
from pyspark.sql.functions  import udf
from pyspark.ml.linalg      import Vectors, VectorUDT

#### 3.5. Set PATH to load images and save results

In the local version, the images are supposed to be stored in the save repository as the present notebook. 

A sample of 496 images (corresponding to 3 different classes - Apple, Clementine, and Banana) are going to be treated in the step. 

The sample data is stored in a folder named test1. The result of the image processing will be stored in a folder named results.

In [4]:
PATH = os.getcwd()
PATH_Data = PATH+'/data/test1'
PATH_Result = PATH+'/data/results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

PATH:        /Users/felipelima/Documents/projets/fruits-classification
PATH_Data:   /Users/felipelima/Documents/projets/fruits-classification/data/test1
PATH_Result: /Users/felipelima/Documents/projets/fruits-classification/data/results


#### 3.6. Creation of Spark session 

The Spark application is controlled using a driver processed called **SparkSession**. A SparkSession instance is how Spark execute user-defined functions throughout the cluster. A SparkSession always corresponds to a Spark application.

Here, a Spark session is created specifying: 
- A name for the application(**P8**), that will be shown in the Web Spark UI.
- That the application is run locally. The number of cores is not specified (like .master('local[4]) in order to use 4 cores), so all available cores in the processor will be used.
- An additional option allowing to use **parquet** format that will be used to stored and load the results.
- To get an existing Spark session or to create a new one otherwise.

In [5]:
import findspark
findspark.init()

In [6]:
spark = (SparkSession
             .builder
             .appName('P8')
             .master('local')
             .config("spark.sql.parquet.writeLegacyFormat", 'true')
             .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/19 11:51:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


A variable named **sc** is also created, which is a **SparkContext** from the Spark variable:

In [7]:
sc = spark.sparkContext

Display of Spark information while it is running:

In [8]:
spark

#### 3.7. Data processing

In this step, the following actions will be taken: 

1. Data preparation 
- Load images in a **Pandas UDF dataframe**
- Associate images with their labels
- Preprocess by resizing the images, so that they are compatible with the model

2. Model preparation
- Import **MobileNetV2** model
- Create a new model without the last layer of MobileNetV2

3. Set the process of loading images and the application of their featurization through the use of Pandas UDF

4. Run feature extraction actions

5. Save results

6. Test for proper operation by loading the saved data


##### 3.7.1. Load data

Images are loaded using binary format, which offers more flexibility in how to preprocess images. 

Before loading the images, it is specified that only the files with **jpg** extension will be loaded. It is also specified that all the files in the subfolder of the specified folder.

In [9]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

Display the first 5 images containing: 
- the image path
- the date and time of its last modification
- its length
- its content encoded in hexadecimal value

In [10]:
images.show(5)

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|file:/Users/felip...|2021-09-12 19:25:10|  5473|[FF D8 FF E0 00 1...|
|file:/Users/felip...|2021-09-12 19:25:10|  5473|[FF D8 FF E0 00 1...|
|file:/Users/felip...|2021-09-12 19:25:10|  5467|[FF D8 FF E0 00 1...|
|file:/Users/felip...|2021-09-12 19:25:10|  5465|[FF D8 FF E0 00 1...|
|file:/Users/felip...|2021-09-12 19:25:10|  5450|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows



Only the image path is kept and a column containing the labels is added.

In [11]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+------------------------------------------------------------------------------------------------------+--------------+
|path                                                                                                  |label         |
+------------------------------------------------------------------------------------------------------+--------------+
|file:/Users/felipelima/Documents/projets/fruits-classification/data/test1/Apple Braeburn/r_326_100.jpg|Apple Braeburn|
|file:/Users/felipelima/Documents/projets/fruits-classification/data/test1/Apple Braeburn/r_4_100.jpg  |Apple Braeburn|
|file:/Users/felipelima/Documents/projets/fruits-classification/data/test1/Apple Braeburn/r_8_100.jpg  |Apple Braeburn|
|file:/Users/felipelima/Documents/projets/fruits-classification/data/t

##### 3.7.2. Model preparation

The technique of **Transfer learning** will be used to extract features from images. 

The model **MobileNetV2** was chosen for its running speed compared to other models like *VGG16*.

[Check here](https://towardsdatascience.com/review-mobilenetv2-light-weight-model-image-classification-8febb490e61c) for more information about how MobileNetV2 is designed and works.


<u>Here is the diagram of its overall architecture:</u>

![Architecture de MobileNetV2](img/mobilenetv2_architecture.png)

There is a final layer, which is used to classify images according to 1000 categories, that will not be used. The idea in this project is to get the **vector of characteristics of dimensions (1, 1, 1280)**, which will be used afterwards to recognize the different fruits of the dataset through a classification engine.

Like other similar models, **MobileNetV2** necessarily expects images of dimension (224, 224, 3), when used including all its layers. The images in the dataset are all of a specific dimension (100, 100, 3), so they will have to be **resized** before giving them to the model.

<u>Running order:</u>
- The **MobileNetV2** model is loaded with the **precalculated weights** from **imagenet** and specifying the format of the input images.
- A new model is created with 
    - <u>Input:</u> the input of MobileNetV2 model
    - <u>Output:</u> the penultimate layer of MobileNetV2 model

In [12]:
import ssl
ssl._create_default_https_context = ssl._create_unverified_context

model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

In [13]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

Display the summary of the new model where it is possible to check that the output is a <u>vector of dimensions (1, 1, 1280):</u>

In [14]:
new_model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 224, 224, 3)]        0         []                            
                                                                                                  
 Conv1 (Conv2D)              (None, 112, 112, 32)         864       ['input_1[0][0]']             
                                                                                                  
 bn_Conv1 (BatchNormalizati  (None, 112, 112, 32)         128       ['Conv1[0][0]']               
 on)                                                                                              
                                                                                                  
 Conv1_relu (ReLU)           (None, 112, 112, 32)         0         ['bn_Conv1[0][0]']        

All workers must be able to access the model and its weights. 

A good practice is to load the model on the driver the broadcast the weights to different workers.

In [15]:
brodcast_weights = sc.broadcast(new_model.get_weights())

A function is created to do that.

In [16]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

##### 3.7.3. Outline of the process of loading images and the application of their featurization through the use of Pandas UDF

This notebook defines the logic in steps, up to pandas UDF.

The actions stack as follows: 
- Pandas UDF
    - Featurize a series of image pd.Series
    - Pretreat an image

In [17]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)



##### 3.7.4. Run feature extraction actions

Pandas UDF, on large recordings (e.g. very large images), may encouter *Out-Of-Memory (OOM) errors*. If this error is found, try to reduce Arrow batch size via 'maxRecordsPerBatch'.

In this project, this command will not be used, but it is included just in case.

In [18]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

It is now possible to run featurization across the entire Spark DataFrame.

*Note:* This can take a long time depending on the volume of data to be processed.

The **Test** dataset contains **22819 images**. However, when running locally, only a sample of **496 images** will be treated.

In [19]:
features_df = images.repartition(20).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

Reminder of the PATH, where the files in **parquet** format will be written, containing the results, namely a Dataframe containing 3 columns: 
1. Image path
2. Image label
3. Image feature vector

In [20]:
print(PATH_Result)

/Users/felipelima/Documents/projets/fruits-classification/data/results


<u>Saving processed data in **parquet** format:</u>

In [21]:
features_df.write.mode("overwrite").parquet(PATH_Result)

                                                                                

##### 3.7.5. Apply dimension reduction to the test data

In [22]:
# Convert the features column to a dense vector
dense_vector = udf(lambda a: Vectors.dense(a), VectorUDT())
dense_df = features_df.select("path", "label", dense_vector("features").alias("dense_features"))

In [23]:
# Apply PCA to the dense vector
pca = PCA(k=50, inputCol="dense_features", outputCol="pca_features")
model = pca.fit(dense_df)
result = model.transform(dense_df).select("path", "label", "pca_features")

24/02/19 11:52:24 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/02/19 11:52:51 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [24]:
result.show(5)

[Stage 20:>                                                         (0 + 1) / 1]

+--------------------+--------------+--------------------+
|                path|         label|        pca_features|
+--------------------+--------------+--------------------+
|file:/Users/felip...|Apple Braeburn|[-9.6213794259407...|
|file:/Users/felip...|Apple Braeburn|[-9.3530218036925...|
|file:/Users/felip...|    Clementine|[-6.6546919761487...|
|file:/Users/felip...|Apple Braeburn|[-11.345843230965...|
|file:/Users/felip...|Apple Braeburn|[-11.967175931100...|
+--------------------+--------------+--------------------+
only showing top 5 rows



                                                                                

In [25]:
# Write the result after PCA to a parquet file
result.write.mode("overwrite").parquet(PATH_Result + "/pca_results")

                                                                                

In [26]:
print(PATH_Result + "/pca_results")

/Users/felipelima/Documents/projets/fruits-classification/data/results/pca_results


##### 3.7.6. Load saved data and validate results

<u>Let's load into a pandas DataFrame the data that have just been saved: </u>

In [27]:
df = pd.read_parquet(PATH_Result, engine='pyarrow')

In [28]:
df.head()

Unnamed: 0,path,label,features
0,file:/Users/felipelima/Documents/projets/fruit...,Apple Braeburn,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.78862435, 0.010513..."
1,file:/Users/felipelima/Documents/projets/fruit...,Apple Braeburn,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.42206788, 0.008726..."
2,file:/Users/felipelima/Documents/projets/fruit...,Clementine,"[0.4255883, 0.0, 0.0, 0.0, 0.14050685, 0.0, 0...."
3,file:/Users/felipelima/Documents/projets/fruit...,Apple Braeburn,"[1.0492134, 0.15010282, 0.0, 0.0, 0.0, 0.78402..."
4,file:/Users/felipelima/Documents/projets/fruit...,Apple Braeburn,"[0.5429632, 0.2340707, 0.0, 0.0, 0.0, 0.411553..."


<u>It is possible to see all categories present in the data sample: </u>

In [29]:
df['label'].unique()

array(['Apple Braeburn', 'Clementine', 'Banana'], dtype=object)

<u> And the number of images:</u>

In [30]:
df.shape

(992, 3)

In [31]:
df.loc[0,'features'].shape

(1280,)

<u> Let's load the data after a PCA: </u>

In [32]:
df_pca = pd.read_parquet(PATH_Result + '/pca_results', engine='pyarrow')
df_pca['pca_features'] = df_pca['pca_features'].apply(lambda x: np.nan if pd.isnull(x) else x['values'])

In [33]:
df_pca.head()

Unnamed: 0,path,label,pca_features
0,file:/Users/felipelima/Documents/projets/fruit...,Apple Braeburn,"[-9.621379425940793, -3.6597103402224427, -3.5..."
1,file:/Users/felipelima/Documents/projets/fruit...,Apple Braeburn,"[-9.353021803692558, -4.09004641516716, -2.745..."
2,file:/Users/felipelima/Documents/projets/fruit...,Clementine,"[-6.654691976148705, -1.422289398607812, 2.437..."
3,file:/Users/felipelima/Documents/projets/fruit...,Apple Braeburn,"[-11.345843230965516, 0.39216842881092856, -8...."
4,file:/Users/felipelima/Documents/projets/fruit...,Apple Braeburn,"[-11.967175931100384, -0.31898023746034265, -6..."


<u>It is possible to see all categories present in the data sample after PCA: </u>

In [34]:
df_pca.shape

(496, 3)

In [35]:
df_pca['label'].unique()

array(['Apple Braeburn', 'Clementine', 'Banana'], dtype=object)

<u> Let's check the column 'pca_features': </u>

In [36]:
df_pca.loc[0,'pca_features'].shape

(50,)

The processing step has just been validated locally on a data sample, where the processor cores were used to distributed the workload, simulating a cluster of machines.

From now, the processing step will be generalized by deploying it on a real cluster of machines and it will deal with the whole dataset "Test" containing 22819 images.

In [None]:
df_pca.to_csv('data/results/pca_results/pca_matrix.csv', sep=';')