# Projet 8 : Déployez un modèle dans le cloud

Notebook de traitement des images en utilisant Spark via pyspark.
Ce notebook reprend en grande partie le contenu du travail déjà effectué par l'alternant (lien vers ce document : https://s3.eu-west-1.amazonaws.com/course.oc-static.com/projects/Data_Scientist_P8/Mode_ope%CC%81ratoire.zip) en l'adaptant et en ajoutant une réduction de dimension (PCA).


# Summary
 - <a href="#C1">Starting Spark Session</a>
 - <a href="#C2">Import and settings</a>
    - <a href="#C2.1">Import required items</a>
    - <a href="#C2.2">Settings and assumptions</a>
 - <a href="#C3">Data processing</a>
    - <a href="#C3.1">Prepare the data</a>
    - <a href="#C3.2">Prepare the model</a>
    - <a href="#C3.3">Featurize images</a>
    - <a href="#C3.4">Reduce dimensions</a>
    - <a href="#C3.5">Save the data and validate</a>

# <a name="C1"> Starting Spark Session

Running the first code cell will launch the Spark Session which is the driver process that controls the Spark application.

In [1]:
# Start spark application by running this cell - store starting date and hour
import datetime
start = datetime.datetime.now()
print("start :", start.strftime("%Y-%m-%d %H:%M:%S"))

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1674810531637_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

start : 2023-01-27 09:25:44

<u>We create the **SparkConext** under the varaible "**sc**"</u> :

In [2]:
sc = spark.sparkContext

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Let's display information about the spark session :

In [3]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1674810531637_0001,pyspark,idle,Link,Link,,✔


# <a name="C2"> Import and settings
## <a name="C2.1"> Import required items

In [4]:
# Import required librairies
import pandas as pd
import numpy as np
from PIL import Image
import io

from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model

from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split, udf
from pyspark.ml.feature import PCA, StandardScaler
from pyspark.ml.linalg import Vectors, VectorUDT

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## <a name="C2.2"> Settings and assumptions

Define paths values for the location of images and where to store the processed features.

In [5]:
PATH = "s3://brunolo-p8-data/"
PATH_Data = PATH + "Test1/"
PATH_Result = PATH + "Results/"
print(
    "PATH:        "
    + PATH
    + "\nPATH_Data:   "
    + PATH_Data
    + "\nPATH_Result: "
    + PATH_Result
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://brunolo-p8-data/
PATH_Data:   s3://brunolo-p8-data/Test1/
PATH_Result: s3://brunolo-p8-data/Results/

# <a name="C3"> Data processing

1. Prepare the data
    1. Import images in a **pandas UDF** dataframe
    2. Associate images to their **label**
    3. Preprocess the images by **resizing them in order to be compatible with our model**
2. Prepare the model
    1. Import model **MobileNetV2**
    2. Create a **new model** without MobileNetV2 final layer
3. Featurize images
    1. Define the process to load images and featurize them using pandas UDF
    2. Execute features extraction actions
4. Reduce features dimensions using PCA
5. Save the results in parquet files and validate the result by loading them in a dataframe

## <a name="C3.1"> Prepare the data
    
Let's read all jpg files located in the folders and sub-folders of the path, and load them in binary format.

In [6]:
images = (
    spark.read.format("binaryFile")
    .option("pathGlobFilter", "*.jpg")
    .option("recursiveFileLookup", "true")
    .load(PATH_Data)
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<u>Load and show the 5 first images</u> :
 - image path
 - last change date and hour
 - length
 - binary content

In [7]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://brunolo-p8-d...|2023-01-17 15:25:12|  6989|[FF D8 FF E0 00 1...|
|s3://brunolo-p8-d...|2023-01-17 15:25:11|  6973|[FF D8 FF E0 00 1...|
|s3://brunolo-p8-d...|2023-01-17 15:25:12|  6924|[FF D8 FF E0 00 1...|
|s3://brunolo-p8-d...|2023-01-17 15:25:11|  6894|[FF D8 FF E0 00 1...|
|s3://brunolo-p8-d...|2023-01-17 15:25:11|  6890|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

Let's only keep the **path** value and extract the fruit **label** from the names of the folders containing the images.

In [8]:
images = images.withColumn("label", element_at(split(images["path"], "/"), -2))
print(images.printSchema())
print(images.select("path", "label").show(5, False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+---------------------------------------------------+----------+
|path                                               |label     |
+---------------------------------------------------+----------+
|s3://brunolo-p8-data/Test1/Watermelon/r_46_100.jpg |Watermelon|
|s3://brunolo-p8-data/Test1/Watermelon/r_44_100.jpg |Watermelon|
|s3://brunolo-p8-data/Test1/Watermelon/r_50_100.jpg |Watermelon|
|s3://brunolo-p8-data/Test1/Watermelon/r_116_100.jpg|Watermelon|
|s3://brunolo-p8-data/Test1/Watermelon/r_118_100.jpg|Watermelon|
+---------------------------------------------------+----------+
only showing top 5 rows

None

## <a name="C3.2"> Prepare the model

We're going to use a the **transfer learning** technique in order to extract features from images. Let's use a fast model like **MobileNetV2**. As this stage, we do not want to train a prediction model, but we are focusing on extracting features.
More information about MobileNetV2 : https://towardsdatascience.com/review-mobilenetv2-light-weight-model-image-classification-8febb490e61c

 1. Let's load **MobileNetV2** model with **precalculated** weights from **imagenet**, specifyning our images format as input
 2. Then create a new model with:
  - <u>as input</u> : MobileNetV2 input
  - <u>as output</u> : penultimate layer (1280 dimension vector) 

In [9]:
model = MobileNetV2(weights="imagenet", include_top=True, input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5

In [10]:
new_model = Model(inputs=model.input, outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Display our new model summary, at the end we get a 1280 dimension vector :

In [11]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
Conv1 (Conv2D)                  (None, 112, 112, 32) 864         input_1[0][0]                    
__________________________________________________________________________________________________
bn_Conv1 (BatchNormalization)   (None, 112, 112, 32) 128         Conv1[0][0]                      
__________________________________________________________________________________________________
Conv1_relu (ReLU)               (None, 112, 112, 32) 0           bn_Conv1[0][0]                   
______________________________________________________________________________________________

Now we have loaded the model on the Master, let's broadcast the model weights to the workers so they have those values available in cache and their tasks are optimized.

In [12]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<u>Let's create a function doing that</u> :

In [13]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights="imagenet", include_top=True, input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input, outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## <a name="C3.3"> Featurize images

Let's define some functions we'll be using to : 
 - load images, resize and prepare them using MobileNetV2 preprocesses
 - for each image pass its preprocessed data through the model and get the output vector

In [14]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)


def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)


@pandas_udf("array<float>", PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    """
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    """
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



In case we face an "Out Of Memory" type of error running the Pandas UDF process, we can activate the following option that reduce Arrow batch size. So far, we are only dealing with a **sample of 210 images**, so this command is disabled.

In [15]:
#spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Let's now execute the featurization on our Spark DataFrame that contains 210 images,

In [16]:
features_df = images.repartition(24).select(
    col("path"), col("label"), featurize_udf("content").alias("features")
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

and show the 5 first values :

In [17]:
features_df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----------+--------------------+
|                path|      label|            features|
+--------------------+-----------+--------------------+
|s3://brunolo-p8-d...|     Lychee|[1.1398207, 2.386...|
|s3://brunolo-p8-d...|    Avocado|[1.5401926, 0.023...|
|s3://brunolo-p8-d...|     Banana|[0.044500094, 0.3...|
|s3://brunolo-p8-d...|Onion White|[0.02180182, 0.00...|
|s3://brunolo-p8-d...|     Banana|[1.3179387, 0.003...|
+--------------------+-----------+--------------------+
only showing top 5 rows

## <a name="C3.4"> Reduce dimensions

Let's now try to reduce our 1280 dimensions using Principal Component Analysis.
    
Some sources for spark PCA:
    
http://ethen8181.github.io/machine-learning/big_data/spark_pca.html#Spark-PCA
    
https://stackoverflow.com/questions/42138482/how-do-i-convert-an-array-i-e-list-column-to-vector

In [18]:
# First step is to convert our features arrays into vectors
array_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
df_with_vectors = features_df.select(
    features_df["path"],
    features_df["label"],
    features_df["features"],
    array_to_vector_udf(features_df["features"]).alias("vectorFeatures"),
)
# show the 5 first values :
df_with_vectors.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----------+--------------------+--------------------+
|                path|      label|            features|      vectorFeatures|
+--------------------+-----------+--------------------+--------------------+
|s3://brunolo-p8-d...|Onion White|[0.02180182, 0.00...|[0.02180182002484...|
|s3://brunolo-p8-d...|     Banana|[1.3179387, 0.003...|[1.31793868541717...|
|s3://brunolo-p8-d...|   Chestnut|[0.0, 0.013976299...|[0.0,0.0139762992...|
|s3://brunolo-p8-d...|     Lychee|[1.1398207, 2.386...|[1.13982069492340...|
|s3://brunolo-p8-d...|    Avocado|[1.5401926, 0.023...|[1.54019260406494...|
+--------------------+-----------+--------------------+--------------------+
only showing top 5 rows

In [19]:
# Second step is to scale the date before applying the PCA process
scaler = StandardScaler(
    inputCol="vectorFeatures", outputCol="scaledFeatures", withMean=True, withStd=True
).fit(df_with_vectors)

# when we transform the dataframe, the old feature will still remain in it
df_scaled = scaler.transform(df_with_vectors)
# show the 5 first values :
df_scaled.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----------+--------------------+--------------------+--------------------+
|                path|      label|            features|      vectorFeatures|      scaledFeatures|
+--------------------+-----------+--------------------+--------------------+--------------------+
|s3://brunolo-p8-d...|Onion White|[0.02180182, 0.00...|[0.02180182002484...|[-0.7192914667640...|
|s3://brunolo-p8-d...|     Banana|[1.3179387, 0.003...|[1.31793868541717...|[1.43083285175990...|
|s3://brunolo-p8-d...|   Chestnut|[0.0, 0.013976299...|[0.0,0.0139762992...|[-0.7554578814962...|
|s3://brunolo-p8-d...|     Lychee|[1.1398207, 2.386...|[1.13982069492340...|[1.13535801946945...|
|s3://brunolo-p8-d...| Watermelon|[0.37178153, 0.18...|[0.37178152799606...|[-0.1387201542807...|
+--------------------+-----------+--------------------+--------------------+--------------------+
only showing top 5 rows

In [20]:
# finally, let's apply the PCA starting with 100 components
n_components = 100
pca = PCA(k=n_components, inputCol="scaledFeatures", outputCol="pcaFeatures").fit(
    df_scaled
)

# Set now the relevant number of components (explaining 95% of the variance)
c = 1
while pca.explainedVariance.toArray()[0:c].cumsum()[-1] < 0.95:
    c += 1
c
print("Components number = ", c)
n_components = c

# Redo the PCA with this number of components
pca = PCA(k=n_components, inputCol="scaledFeatures", outputCol="pcaFeatures").fit(
    df_scaled
)
df_pca = pca.transform(df_scaled)
# show the 5 first values :
df_pca.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Components number =  80
+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+
|                path|      label|            features|      vectorFeatures|      scaledFeatures|         pcaFeatures|
+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+
|s3://brunolo-p8-d...|     Lychee|[1.1398207, 2.386...|[1.13982069492340...|[1.13535801946945...|[2.87088800181359...|
|s3://brunolo-p8-d...|    Avocado|[1.5401926, 0.023...|[1.54019260406494...|[1.79952351100379...|[6.05244754539655...|
|s3://brunolo-p8-d...|Onion White|[0.02180182, 0.00...|[0.02180182002484...|[-0.7192914667640...|[7.74666580032709...|
|s3://brunolo-p8-d...|     Banana|[1.3179387, 0.003...|[1.31793868541717...|[1.43083285175990...|[-15.535467873425...|
|s3://brunolo-p8-d...|   Chestnut|[0.0, 0.013976299...|[0.0,0.0139762992...|[-0.7554578814962...|[5.31951416610905...|
+--------------------+--

## <a name="C3.5"> Save the data and validate

Finally, we select only the path to the image, its label and the reduced features, and save this data in **parquet** format in our defined path for the results:

In [21]:
df_final = df_pca.select(["path", "label", "pcaFeatures"])
df_final.write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In order to validate that the process is a success, let's load the parquet files into a Pandas DataFrame, 

In [22]:
df = spark.read.parquet(PATH_Result).toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

and show the 5 first values :

In [23]:
df.head(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                                path  ...                                        pcaFeatures
0  s3://brunolo-p8-data/Test1/Onion White/r2_28_1...  ...  [0.44842627166352284, 13.118890304136217, -17....
1  s3://brunolo-p8-data/Test1/Onion White/r_94_10...  ...  [8.055837387538725, 10.077144095271168, -7.978...
2      s3://brunolo-p8-data/Test1/Chestnut/5_100.jpg  ...  [6.063769121067797, 11.7585377943831, 1.202264...
3       s3://brunolo-p8-data/Test1/Banana/44_100.jpg  ...  [-19.214138470010784, -1.0043655529803803, -4....
4      s3://brunolo-p8-data/Test1/Lychee/321_100.jpg  ...  [3.931219298749094, -25.090702091287827, -4.91...

[5 rows x 3 columns]

<u>Let's check that the dimension of our features match the number of components found with the PCA (80)</u> :

In [24]:
np.array(df.loc[0, "pcaFeatures"]).shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(80,)

And that the number of images matches our sample size (210) :

In [25]:
df.shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(210, 3)

In [26]:
# End of the processes
end = datetime.datetime.now()
print("start :", start.strftime("%Y-%m-%d %H:%M:%S"))
print("end :", end.strftime("%Y-%m-%d %H:%M:%S"))
delta = end - start
print("duration in minutes :", round(delta.seconds/60, 2))
print("duration details :")
delta

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

start : 2023-01-27 09:25:44
end : 2023-01-27 09:29:08
duration in minutes : 3.4
duration details :
datetime.timedelta(seconds=204, microseconds=129420)