# 1. Introduction

## 1.1 Préambule

La jeune start-up "Fruits!", spécialisée dans l'AgriTech, ambitionne de révolutionner la récolte des fruits grâce à des solutions innovantes L’entreprise se fixe pour objectif de préserver la biodiversité fruitière en concevant des robots cueilleurs intelligents capables de traiter chaque espèce de fruit de manière spécifique.

Dans un premier temps, la start-up prévoit de lancer une application mobile grand public. Cette application permettra aux utilisateurs de photographier un fruit et d'accéder instantanément à des informations détaillées sur celui-ci.

Ce projet vise à sensibiliser le grand public à l'importance de la biodiversité des fruits, tout en servant de base pour le développement d’un moteur de classification d’images spécialisé. Par ailleurs, le déploiement de cette application contribuera à la mise en place d’une première version de l’architecture Big Data nécessaire à l’évolution des technologies de la start-up.

## 1.2 Objectifs 

1. Concevoir une chaîne initiale de traitement des données intégrant une phase de prétraitement ainsi qu’une étape de réduction de dimensionnalité.
2. Anticiper la croissance rapide du volume de données après la mise en service du projet, ce qui nécessite :
    - Le déploiement des processus de traitement dans un environnement Big Data adapté
    - Le développement de scripts en PySpark pour permettre des calculs distribués et optimiser la gestion des données à grande échelle

## 1.3 Déroulement des étapes du projet

Le projet sera réalisé en deux phases distinctes, chacune dans un environnement différent. Dans un premier temps, le développement et l'exécution du code se feront en local, en travaillant sur un volume limité d'images. Cette étape permettra de valider les choix techniques initiaux. Une fois ces choix validés, la solution sera déployée dans un environnement Big Data (AWS) en mode distribué pour gérer des volumes de données plus importants.

En conséquence, le projet se structurera en trois étapes principales:
- Établissement des choix techniques généraux retenus
- Développement et déploiement de la solution en local
- Déploiement de la solution dans un environnement cloud

# 2.Choix techniques généraux retenus

## 2.1 Calcul distribué

Le projet nécessite le développement de scripts en PySpark afin d’anticiper et de gérer efficacement l’augmentation rapide du volume de données prévue après la livraison. PySpark offre une interface en langage Python pour interagir avec Spark, un outil puissant pour coordonner et exécuter des tâches sur des données réparties entre plusieurs ordinateurs.
Apache Spark, un framework open source de calcul distribué in-memory, est conçu pour le traitement et l’analyse de données massives, offrant rapidité et scalabilité.

Le fonctionnement de Spark repose sur :
- Le driver (ou Spark Session) qui planifie et distribue les tâches entre différents exécuteurs, assurant l'exécution du code sur plusieurs machines
- Les exécuteurs, processus distincts configurables en termes de CPU et de mémoire, qui exécutent les tâches de traitement sur des fractions de données

Pour ce projet, Spark sera utilisé dans les deux environnements, local et cloud, via des scripts PySpark :
- En environnement local, nous simulerons le calcul distribué pour valider la solution et ses performances
- En environnement cloud, nous déploierons la solution sur un cluster de machines, réalisant les opérations à grande échelle avec une véritable architecture distribuée

## 2.2. Transfert Learning

Le projet exige également la mise en place d’une première chaîne de traitement des données, incluant une phase de prétraitement et une étape de réduction de dimensionnalité. Il est spécifié qu’il n’est pas nécessaire d’entraîner un modèle à ce stade. Ainsi, nous avons opté pour une approche de transfert learning. Le transfert learning consiste à exploiter les connaissances déjà acquises par un modèle pré-entraîné (dans notre cas, MobileNetV2) et à les adapter à notre problématique.

Concrètement, nous fournirons nos images au modèle et extrairons les données de l’avant-dernière couche. Cette couche produit un vecteur réduit de dimension (1,1,1280), qui est idéal pour nos besoins. La dernière couche, étant une couche softmax dédiée à la classification, sera ignorée, car nous ne visons pas la classification à ce stade. Cette méthode nous permettra de créer une première version fonctionnelle du moteur de classification d’images de fruits.

Le choix de MobileNetV2 s’est imposé pour plusieurs raisons :
- Sa rapidité d’exécution, essentielle pour traiter un grand volume de données
- La faible dimensionnalité du vecteur de caractéristiques en sortie (1,1,1280), qui facilite le traitement et la gestion des données

# 3. Déploiement de la solution en local

## 3.1. Importation des librairies

In [1]:
import os
import shutil
import io
import numpy as np
import pandas as pd
from PIL import Image
from fpdf import FPDF

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, split, element_at, pandas_udf, PandasUDFType, udf
)
from pyspark.sql.types import ArrayType, FloatType
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors, VectorUDT

import tensorflow as tf
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input
from tensorflow.keras.utils import img_to_array
from tensorflow.keras.models import Model


2024-12-04 12:19:21.831495: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


## 3.2. Définition des PATHs

Pour cette première itération, nous travaillons sur un échantillon représentant 5% des images (3110 images au total). Ces images sont situées dans le répertoire fruits-360-original-size/Test. Les résultats du traitement seront enregistrés dans le dossier "Resultats".

In [None]:
PATH = os.getcwd()
PATH_Data = PATH+'/data/fruits-360_dataset/fruits-360/Test'
PATH_Result = PATH+'/Resultats_2'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

PATH:        /Users/amira/Documents/OPC/Projet9
PATH_Data:   /Users/amira/Documents/OPC/Projet9/data/fruits-360_dataset/fruits-360/Test
PATH_Result: /Users/amira/Documents/OPC/Projet9/Resultats_2


## 3.3. Création de la SparkSession

L’application Spark est orchestrée via une SparkSession, qui sert de point d’entrée pour exécuter les fonctions définies par l’utilisateur sur un cluster. Chaque SparkSession est associée à une application Spark unique, permettant une gestion centralisée des tâches.

Dans le code, une SparkSession est configurée avec les paramètres suivants :

- Nom de l’application : "P8", visible dans l’interface utilisateur Web de Spark
- Mode d’exécution : défini sur local[*], ce qui indique que Spark peut utiliser tous les cœurs disponibles du processeur pour l’exécution locale
- Adresse et port du driver :
  - spark.driver.bindAddress : défini sur 192.168.1.14, spécifiant l’adresse IP pour la communication du driver
  - spark.driver.port : fixé à 50400, assurant une liaison explicite et stable pour les connexions
- Timeout réseau :
  - spark.network.timeout : réglé sur 600s pour prévenir les déconnexions dues à une inactivité prolongée entre le driver et les exécuteurs
  - spark.executor.heartbeatInterval : défini sur 100s, spécifiant la fréquence à laquelle les exécuteurs envoient des signaux d’activité au driver pour maintenir la connexion active

Ces configurations permettent une exécution locale optimisée et robuste, tout en réduisant les risques de perte de connexion dans des environnements instables.

In [None]:
spark = (SparkSession
         .builder
         .appName('P8')
         .master('local[*]')
         .config("spark.driver.bindAddress", "192.168.1.14")  
         .config("spark.driver.port", "50400")               
         .config("spark.network.timeout", "600s")            
         .config("spark.executor.heartbeatInterval", "100s") 
         .getOrCreate()
)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/03 19:19:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Nous créons également la variable "sc" qui est un SparkContext issue de la variable spark:

In [None]:
sc = spark.sparkContext

In [None]:
spark

## 3.4. Traitement des données

Dans la suite de notre flux de travail, nous procéderons successivement aux étapes suivantes :

- Préparation des données:

    - Importer les images dans un DataFrame à l’aide d’un pandas UDF
    - Associer chaque image à son label correspondant
    - Prétraiter les images en les redimensionnant pour les rendre compatibles avec notre modèle
    - Préparation du modèle

- Importer le modèle MobileNetV2:
    - Créer un nouveau modèle en supprimant la dernière couche de MobileNetV2
    - Définition du processus de chargement des images et application de leur featurisation à l’aide de pandas UDF

- Exécution des actions d’extraction des features.

- Enregistrement des résultats des actions réalisées.

- Vérification du bon fonctionnement en chargeant les données enregistrées.

### 3.4.1. Chargement des données

Les images sont chargées au format binaire, ce qui permet une plus grande flexibilité dans leur prétraitement. Avant de procéder au chargement, nous spécifions que seules les images ayant l'extension .jpg doivent être prises en compte. Nous indiquons également que tous les objets présents dans les sous-dossiers du répertoire spécifié doivent être chargés.

In [None]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

                                                                                

Nous affichons les 5 premières images contenant:
 - le path de l'image
 - la date et heure de sa dernière modification
 - sa longueur
 - son contenu encodé en valeur hexadécimal

 Nous conservons que le path de l'image et nous ajoutons une colonne contenant les labels de chaque image

In [None]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+--------------------------------------------------------------------------------------------------------+----------+
|path                                                                                                    |label     |
+--------------------------------------------------------------------------------------------------------+----------+
|file:/Users/amira/Documents/OPC/Projet9/data/fruits-360_dataset/fruits-360/Test/Watermelon/r_106_100.jpg|Watermelon|
|file:/Users/amira/Documents/OPC/Projet9/data/fruits-360_dataset/fruits-360/Test/Watermelon/r_109_100.jpg|Watermelon|
|file:/Users/amira/Documents/OPC/Projet9/data/fruits-360_dataset/fruits-360/Test/Watermelon/r_108_100.jpg|Watermelon|
|file:/Users/amira/Documents/OPC/Projet9/data/fruits-360_dataset/fruits-360/Test/W

### 3.4.2. Préparation du modèle

Dans ce projet, j’utiliserai la technique de transfert learning pour extraire les caractéristiques des images. J’ai opté pour le modèle MobileNetV2, reconnu pour sa rapidité d’exécution, notamment par rapport à des modèles comme VGG16.

Voici un schéma représentant son fonctionnement (M. Akay et al., "Deep Learning Classification of Systemic Sclerosis Skin Using the MobileNetV2 Model," in IEEE Open Journal of Engineering in Medicine and Biology, vol. 2, pp. 104-110, 2021, doi: 10.1109/OJEMB.2021.3066097.):

![Description MobileNetV2](https://i.imgur.com/w2EGdt0.png)


Le modèle intègre une dernière couche dédiée à la classification en 1000 catégories, que nous n'utiliserons pas dans notre cas.
L’objectif est de récupérer le vecteur de caractéristiques de dimensions (1,1,1280). Ce vecteur servira à alimenter un moteur de classification qui permettra d’identifier les différents fruits de notre jeu de données. Comme pour d’autres modèles similaires, MobileNetV2 exige des images d’entrée au format (224,224,3). Étant donné que nos images ont une dimension de (100,100,3), nous devrons d'abord les redimensionner pour qu'elles soient compatibles.

Les étapes principales :

- Charger le modèle MobileNetV2 avec des poids précalculés basés sur ImageNet, tout en adaptant le format des images d’entrée
- Construire un nouveau modèle comprenant :
   - En entrée: les entrées originales de MobileNetV2
   - En sortie: l'avant-dernière couche de MobileNetV2

In [None]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

In [None]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

Voici le résumé de notre nouveau modèle, où l'on peut observer que le vecteur de sortie est bien de dimension (1, 1, 1280):

In [None]:
new_model.summary()

Tous les workeurs doivent avoir accès au modèle ainsi qu'à ses poids. Il est recommandé de charger d'abord le modèle sur le serveur principal, puis de distribuer les poids aux différents workeurs.

In [None]:
brodcast_weights = sc.broadcast(new_model.get_weights())

Mettons cela sous forme de fonction:

In [None]:
def model_fn():
    """
    Fonction de création du modèle avec les poids diffusés
    """
    model = MobileNetV2(weights=None, 
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value) 
    return new_model


### 3.4.3. Chargement des images et featurisation via pandas UDF

Nous traitons et extrayons les features des images brutes en utilisant un modèle de prédiction :

- preprocess(content) : prépare une image brute (en redimensionnant et prétraitant) pour la prédiction

- featurize_series(model, content_series) : applique le modèle sur une série d'images prétraitées, extrait les caractéristiques, et retourne une série de vecteurs de features.

- featurize_udf(content_series_iter) : définit une fonction UDF Spark qui traite des lots d'images, applique le modèle et renvoie les caractéristiques sous forme de DataFrame Spark.

En résumé, nous transformons les images brutes en vecteurs de caractéristiques prêts pour la prédiction, le tout dans un cadre Spark distribué.

In [None]:
def preprocess(content):
    """
    Prétraitement des images pour les adapter au modèle MobileNetV2
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Extraction des features pour une série d'images
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    """
    Utilisation de pandas_udf pour permettre l'extraction de caractéristiques dans PySpark
    """
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)



### 3.4.4. Exécution des actions d'extraction de features

Les Pandas UDF peuvent provoquer des erreurs de type Out Of Memory (OOM) lorsqu'elles traitent de grands ensembles de données, comme des images volumineuses. Pour éviter cela, nous optons pour un échantillonnage à hauteur de 1,5% du jeu de données d'une taille totale de 22 688, ce qui correspond à 338 images.

In [None]:
jpg_files = [f for root, dirs, files in os.walk(PATH_Data) for f in files if f.endswith('.jpg')]
print(f"Nombre total de fichiers .jpg : {len(jpg_files)}")

Nombre total de fichiers .jpg : 22688


In [None]:
print(f"Nombre total d'images détectées par Spark : {images.count()}")

[Stage 4:>                                                          (0 + 1) / 1]

Nombre total d'images détectées par Spark : 22688


                                                                                

In [None]:
sampled_images = images.sample(withReplacement=False, fraction=0.015)
print(f"Nombre d'images échantillonnées : {sampled_images.count()}")

[Stage 7:>                                                          (0 + 1) / 1]

Nombre d'images échantillonnées : 338


                                                                                

Les images échantillonnées sont repartitionnées (20 partitions) et transformées en un DataFrame contenant :
- path : le chemin de l'image
- label : son label
- features : des features extraites via la fonction featurize_udf

Puis, les chemins des images sont collectés, nettoyés (suppression du préfixe "file:"), et convertis en liste. Enfin, un répertoire (sampled_images_2) est créé. Les images existantes dans les chemins collectés sont copiées vers ce répertoire.

In [None]:
features_df = sampled_images.repartition(20).select(
    col("path"),
    col("label"),
    featurize_udf("content").alias("features")
)

In [None]:
# Sauvegarder les images échantillonnées
image_paths = sampled_images.select("path").rdd.flatMap(lambda x: x).collect()
image_paths = [img_path.replace("file:", "") for img_path in image_paths]  

sampled_images_dir = os.path.join(PATH, "sampled_images_2")
os.makedirs(sampled_images_dir, exist_ok=True)

for img_path in image_paths:
    if os.path.exists(img_path):  
        shutil.copy(img_path, sampled_images_dir)
    else:
        print(f"Fichier introuvable : {img_path}")


                                                                                

In [None]:
print(PATH_Result)

/Users/amira/Documents/OPC/Projet9/Resultats_2


### 3.4.5 Réduction de dimension (ACP)

Nous appliquons une réduction de dimensionnalité via l'ACP en plusieurs étapes:

- Conversion en DenseVector: une UDF transforme les caractéristiques (array<float>) en objets DenseVector
- Initialisation de l'ACP: le modèle est d'abord ajusté pour évaluer la variance expliquée cumulée
- Optimisation: le nombre minimal de composantes expliquant au moins 75% de la variance est déterminé
- Application finale: l'ACP est réajustée avec ce nombre optimal de composantes et appliquée aux données

In [None]:
#Conversion des features en DenseVector
def array_to_dense_vector(arr):
    """
    Convertit un array<float> en DenseVector.
    """
    if isinstance(arr, list):
        return Vectors.dense(arr)
    else:
        raise ValueError("Les données ne sont pas au format attendu : array<float>")

array_to_dense_vector_udf = udf(array_to_dense_vector, VectorUDT())

#Application de l'UDF pour transformer la colonne 'features' en DenseVector
print("=== Conversion en DenseVector ===")
features_df = features_df.withColumn("features_vector", array_to_dense_vector_udf("features"))


=== Conversion en DenseVector ===


In [None]:
#Ajustement de l'ACP pour initialiser avec toutes les composantes disponibles
pca = PCA(k=features_df.select("features_vector").first()[0].size,  
          inputCol="features_vector", 
          outputCol="pca_features")

# Ajustement du modèle PCA
pca_model = pca.fit(features_df)

#Calcul de la variance cumulée
explained_variance = np.cumsum(pca_model.explainedVariance)  
print("Variance Expliquée Cumulative : ", explained_variance)

#Recherche du plus petit k pour lequel la variance cumulée >= 75%
threshold = 0.75 
k_optimal = np.argmax(explained_variance >= threshold) + 1  
print(f"Nombre de composantes nécessaires pour expliquer au moins 75% de la variance : {k_optimal}")

# Réajustement de l'ACP avec le k déterminé
pca = PCA(k=k_optimal, inputCol="features_vector", outputCol="pca_features")
pca_model = pca.fit(features_df)

# Application de l'ACP avec le k déterminé
pca_result = pca_model.transform(features_df)

2024-12-03 19:30:32.572679: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step0 + 1) / 1]
2024-12-03 19:32:19.017188: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 3s/step0 + 1) / 1]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 4s/step + 4) / 20]
2024-12-03 19:32:54.156171: I tensorflow/core/platform/cpu_feat

Variance Expliquée Cumulative :  [0.0307839  0.05603421 0.07534274 ... 1.00012384 1.00012384 1.        ]
Nombre de composantes nécessaires pour expliquer au moins 75% de la variance : 86


2024-12-03 19:37:08.745462: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 3s/step0 + 1) / 1]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 3s/step0 + 1) / 1]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 4s/step + 4) / 20]
2024-12-03 19:38:34.386009: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2024-12-03 19:38:34.388734: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available

Sauvegarde et lecture :
Les résultats transformés sont sauvegardés au format Parquet et peuvent être chargés avec Pandas

In [None]:
features_df.write.mode("overwrite").parquet(PATH_Result)

2024-12-03 19:53:46.521424: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2024-12-03 19:53:46.519931: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2024-12-03 19:53:46.528976: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2024-12-03 19:53:46.550397: I tensorflow/core/platform/cpu_featu

## 3.5. Chargement des données enregistrées et validation du résultat

Nous chargeons les données enregistrées dans un DataFrame Pandas, et affichons les 5 premières lignes

In [None]:
df = pd.read_parquet(PATH_Result, engine='pyarrow')

In [None]:
df.head()

Unnamed: 0,path,label,features,features_vector
0,file:/Users/amira/Documents/OPC/Projet9/data/f...,cucumber_3,"[1.2083813, 0.47967055, 0.11343843, 0.07406247...","{'type': 1, 'size': None, 'indices': None, 'va..."
1,file:/Users/amira/Documents/OPC/Projet9/data/f...,apple_braeburn_1,"[0.13303593, 0.0, 0.026936326, 0.0, 0.0, 1.795...","{'type': 1, 'size': None, 'indices': None, 'va..."
2,file:/Users/amira/Documents/OPC/Projet9/data/f...,apple_red_1,"[0.5185862, 0.0, 0.0, 0.0, 0.17228875, 0.0, 0....","{'type': 1, 'size': None, 'indices': None, 'va..."
3,file:/Users/amira/Documents/OPC/Projet9/data/f...,apple_braeburn_1,"[0.49638048, 0.37547272, 0.0, 0.0, 0.0, 0.0807...","{'type': 1, 'size': None, 'indices': None, 'va..."
4,file:/Users/amira/Documents/OPC/Projet9/data/f...,apple_golden_1,"[0.0, 0.20505804, 0.82092077, 0.0, 0.0, 0.0, 0...","{'type': 1, 'size': None, 'indices': None, 'va..."


Nous validons que la dimension du vecteur de caractéristiques des images est bien de dimension 1280

In [None]:
df.loc[0,'features'].shape

(1280,)

Nous avons validé le processus en local sur un jeu de données réduit, en simulant un cluster de machines en répartissant la charge de travail sur plusieurs cœurs d’un même processeur.
Nous allons maintenant passer à l’étape suivante en généralisant ce processus. La solution sera déployée sur un véritable cluster de machines, et nous traiterons l’ensemble des 22688 images présentes dans le dossier "Test".

# 4. Déploiement de la solution sur le cloud

Les étapes suivantes ont été mises en place afin de déploier la solution sur le cloud AWS:
- Création d'un S3 nommé opc-p9-ba : un compartiment S3 a été créé sur AWS avec le nom opc-p9-ba. Ce compartiment sera utilisé pour stocker les images et autres fichiers nécessaires au projet.

- Chargement des images sur ce S3 : les images pertinentes pour le projet ont été téléchargées dans le compartiment opc-p9-ba. Ces images seront ensuite utilisées pour le traitement et l'analyse au sein de l'environnement EMR.

- Configuration de l'EMR: l'EMR P8_Fruits a été configuré avec les paramètres appropriés pour assurer le bon déroulement du traitement des données. Les paramètres sont les suivants:
    - utilisation de la version EMR 7.5.0 
    - installation de Hadoop 3.4.0, JupyterHub 1.5.0, Spark 3.5.2, TensorFlow 2.16.1
    - proupes d'instances uniformes Primaire m5.xlarge et Unité principale m5.xlarge et Tâche 1 sur 1 m5.xlarge
    - volume racine EBS 15 Gio IOPS 3000 Débit 124 Mio/s
    - paramètres du logiciel:
    [
  {
    "Classification": "jupyter-s3-conf",
    "Properties": {
      "s3.persistence.bucket": "opc-p9-ba",
      "s3.persistence.enabled": "true"
    }
  }
]

- Installation des packages via un fichier bootstrap-emr.sh: un fichier bootstrap-emr.sh a été créé pour automatiser l'installation des packages nécessaires. Ce script permet de mettre à jour les outils Python, puis d'installer toutes les bibliothèques nécessaires, y compris pillow pour la gestion des images, numpy et scikit-learn pour le machine learning, ainsi que boto3 et s3fs pour l'interaction avec AWS S3. Ce fichier contient les instructions suivantes :

![Détails installations](https://i.imgur.com/NYgVKgx.png)


- Création d'un tunnel SSH avec FoxyProxy: un tunnel SSH a été configuré à l'aide de FoxyProxy pour permettre un accès sécurisé et direct à l'environnement EMR

- Exécution du code avec JupyterHub hébergé par EMR dans un environnement PySpark: le code suivant est ensuite exécuté sur l'environnement EMR via JupyterHub. Cela permet de tirer parti de la puissance de calcul distribuée pour le traitement de données volumineuses et l'exécution de modèles machine learning à grande échelle

## 4.1. Démarrage de la session Spark

In [None]:
# L'exécution de cette cellule démarre l'application Spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1733303984191_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<u>Affichage des informations sur la session en cours et liens vers Spark UI</u> :

In [None]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1733303984191_0001,pyspark,idle,Link,Link,,✔


## 4.2. Importation des librairies

In [2]:
import pandas as pd
import numpy as np
import io
import os
import tensorflow as tf
from PIL import Image
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split, udf, base64
import boto3
import glob

from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.types import StringType


## 4.3. Définition des PATHs

Nous accédons à nos données sur S3

In [None]:
PATH = 's3://opc-p9-ba'
PATH_Data = PATH
PATH_Result = PATH+'/Resultats'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://opc-p9-ba
PATH_Data:   s3://opc-p9-ba
PATH_Result: s3://opc-p9-ba/Resultats

## 4.4. Traitement des données

### 4.4.1. Chargement des données

Nous chargeons les fichiers image depuis le stockage S3 en utilisant Pyspark, et nous observons les cinq premières lignes

In [None]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://opc-p9-ba/Wa...|2024-12-03 18:48:38|  7353|[FF D8 FF E0 00 1...|
|s3://opc-p9-ba/Wa...|2024-12-03 18:48:38|  7350|[FF D8 FF E0 00 1...|
|s3://opc-p9-ba/Wa...|2024-12-03 18:48:38|  7349|[FF D8 FF E0 00 1...|
|s3://opc-p9-ba/Wa...|2024-12-03 18:48:38|  7348|[FF D8 FF E0 00 1...|
|s3://opc-p9-ba/Wa...|2024-12-03 18:48:39|  7328|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

Nous conservons uniquement le path de l'image et nous ajoutons une colonne contenant les labels de chaque image

In [None]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+---------------------------------------+----------+
|path                                   |label     |
+---------------------------------------+----------+
|s3://opc-p9-ba/Watermelon/r_106_100.jpg|Watermelon|
|s3://opc-p9-ba/Watermelon/r_109_100.jpg|Watermelon|
|s3://opc-p9-ba/Watermelon/r_108_100.jpg|Watermelon|
|s3://opc-p9-ba/Watermelon/r_107_100.jpg|Watermelon|
|s3://opc-p9-ba/Watermelon/r_95_100.jpg |Watermelon|
+---------------------------------------+----------+
only showing top 5 rows

None

### 4.4.2. Préparation du modèle

Nous préparons le modèle MobileNetV2 de la même façon que précédemment

In [None]:
# Téléchargement du modèle pré-entraîné MobileNetV2 pour extraire des features
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5

[1m       0/14536120[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m0s[0m 0s/step
[1m   32768/14536120[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m40s[0m 3us/step
[1m   65536/14536120[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m34s[0m 2us/step
[1m   81920/14536120[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m38s[0m 3us/step
[1m  147456/14536120[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m27s[0m 2us/step
[1m  221184/14536120[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m21s[0m 2us/step
[1m  327680/14536120[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m16s[0m 1us/s

In [None]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "functional_1"
┏━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┓
┃ Layer (type)        ┃ Output Shape      ┃ Param # ┃ Connected to         ┃
┡━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━┩
│ input_layer         │ (None, 224, 224,  │       0 │ -                    │
│ (InputLayer)        │ 3)                │         │                      │
├─────────────────────┼───────────────────┼─────────┼──────────────────────┤
│ Conv1 (Conv2D)      │ (None, 112, 112,  │     864 │ input_layer[0][0]    │
│                     │ 32)               │         │                      │
├─────────────────────┼───────────────────┼─────────┼──────────────────────┤
│ bn_Conv1            │ (None, 112, 112,  │     128 │ Conv1[0][0]          │
│ (BatchNormalizatio… │ 32)               │         │                      │
├─────────────────────┼───────────────────┼─────────┼──────────────────────┤
│ Conv1_relu (ReLU)   │ (None, 112, 112,  │       0 │ 

Nous définissons les fonctions pour le modèle et la préprocessing des images. Ces fonctions incluent la préparation des images et l'extraction des features avec MobileNetV2 comme précédemment

In [None]:
def model_fn():
    # Création du modèle MobileNetV2 avec des couches gelées (non entraînables)
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
def preprocess(content):
# Prétraitement des images pour les adapter au modèle MobileNetV2
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img).astype('float32')  
    return preprocess_input(arr)


def featurize_series(model, content_series):
    # Extraction des features pour une série d'images
    input = np.stack([np.asarray(preprocess(x), dtype=np.float32) for x in content_series])
    preds = model.predict(input)
    output = [p.flatten().tolist() for p in preds]
    return pd.Series(output)


@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
# Utilisation de pandas_udf pour permettre l'extraction de caractéristiques dans PySpark
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



### 4.4.3. Extraction des features

Nous utilisons la fonction UDF pour générer un DataFrame contenant les caractéristiques extraites de l'ensemble des images, contrairement à la démarche locale où cela avait été effectué uniquement sur un échantillon d'images.

In [None]:
features_df = images.repartition(24).select(
    col("path"),
    col("label"),
    featurize_udf("content").alias("features")
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 4.4.4.  Réduction de dimension (ACP)

Nous faisons une réduction de dimension en utilisant la méthode ACP comme précédemment

In [None]:
# Conversion des caractéristiques en DenseVector
def array_to_dense_vector(arr):
    if isinstance(arr, list):
        return Vectors.dense(arr)
    else:
        raise ValueError("Les données ne sont pas au format attendu : array<float>")

array_to_dense_vector_udf = udf(array_to_dense_vector, VectorUDT())
features_df = features_df.withColumn("features_vector", array_to_dense_vector_udf("features"))

print("=== Vérification des Données après Conversion ===")
features_df.select("features_vector").show(1, truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

=== V?rification des Donn?es apr?s Conversion ===
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
# Utilisation de PCA pour réduire la dimensionnalité des caractéristiques en conservant au moins 75% de la variance
pca = PCA(k=features_df.select("features_vector").first()[0].size,  
          inputCol="features_vector", 
          outputCol="pca_features")

print("=== Ajustement du Modèle PCA ===")
pca_model = pca.fit(features_df)

explained_variance = np.cumsum(pca_model.explainedVariance)  
print("Variance Expliquée Cumulative : ", explained_variance)

threshold = 0.75 
k_optimal = np.argmax(explained_variance >= threshold) + 1 
print(f"Nombre de composantes nécessaires pour expliquer au moins 75% de la variance : {k_optimal}")

pca = PCA(k=k_optimal, inputCol="features_vector", outputCol="pca_features")
pca_model = pca.fit(features_df)

print("=== Application de PCA ===")
pca_result = pca_model.transform(features_df)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

=== Ajustement du Mod?le PCA ===
Variance Expliqu?e Cumulative :  [0.1014099  0.18146972 0.24497273 ... 0.99999945 0.99999982 1.        ]
Nombre de composantes n?cessaires pour expliquer au moins 75% de la variance : 57
=== Application de PCA ===

Nous sauvegardons les résultats au format Parquet 

In [None]:
features_df.write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 4.5. Chargement des données et validation du résultat

In [None]:
df = pd.read_parquet(PATH_Result, engine='pyarrow')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                        path  ...                                    features_vector
0     s3://opc-p9-ba/Watermelon/r_70_100.jpg  ...  {'type': 1, 'size': None, 'indices': None, 'va...
1     s3://opc-p9-ba/Watermelon/r_51_100.jpg  ...  {'type': 1, 'size': None, 'indices': None, 'va...
2  s3://opc-p9-ba/Pineapple Mini/275_100.jpg  ...  {'type': 1, 'size': None, 'indices': None, 'va...
3  s3://opc-p9-ba/Pineapple Mini/269_100.jpg  ...  {'type': 1, 'size': None, 'indices': None, 'va...
4      s3://opc-p9-ba/Watermelon/183_100.jpg  ...  {'type': 1, 'size': None, 'indices': None, 'va...

[5 rows x 4 columns]

In [None]:
df.loc[0,'features'].shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(1280,)

In [None]:
df.shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(22688, 4)

Voici les étapes pour récupérer les résultats dans un fichier unique CSV:

- Transformation des images en base64: les images sont converties en une représentation base64 pour faciliter leur gestion et leur exportation dans des formats textuels comme CSV

- Jointure des features ACP avec les images base64: nous associons les résultats ACP aux images en fonction du chemin (path). Cela permet d'avoir à la fois les features réduites (ACP) et les images sous forme de base64 dans un seul DataFrame, facilitant l'analyse et l'exportation

- Conversion des caractéristiques ACP en chaîne JSON: nous convertissons les caractéristiques ACP (structurées) en chaînes JSON pour les rendre facilement exportables au format CSV. Cela permet de conserver la structure des données ACP tout en les rendant lisibles et exportables

- Exportation en CSV: le DataFrame final est sauvegardé sous forme de fichier CSV, permettant une utilisation ultérieure ou un partage facile des résultats

- Téléchargement des résultats depuis S3 vers un dossier local: les fichiers CSV générés sont téléchargés depuis le compartiment S3 pour être combinés localement

- Fusion des fichiers CSV et sauvegarde finale: les différents fichiers CSV sont fusionnés en un seul fichier pour simplifier l'analyse et éviter de travailler avec plusieurs petits fichiers. Le fichier fusionné est ensuite téléchargé sur S3 pour garantir une copie de sauvegarde dans le cloud, accessible à tout moment.

In [None]:
# Transformation des images en une représentation `base64`
images_base64 = images.select(
    col("path"),
    col("label"),
    base64(col("content")).alias("image_base64")
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# Jointure des caractéristiques PCA avec les données `base64` et exportation au format CSV
images_base64 = images.select(
    col("path"),
    col("label").alias("image_label"),  
    base64(col("content")).alias("image_base64")
)

def struct_to_json(pca_features):
    if pca_features:
        return str(pca_features.values)  
    return None

struct_to_json_udf = udf(struct_to_json, StringType())

export_df = pca_result.withColumn("pca_features_str", struct_to_json_udf(col("pca_features"))) \
    .drop("features", "features_vector", "pca_features")

final_df = export_df.join(images_base64, on="path", how="inner")

# Sauvegarde en CSV
export_path_csv = PATH_Result + "/final_output_csv"
final_df.write.mode("overwrite") \
    .option("header", "true") \
    .csv(export_path_csv)

print(f"Fichier CSV enregistré sur {export_path_csv}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Fichier CSV enregistr? sur s3://opc-p9-ba/Resultats/final_output_csv

Téléchargement des fichiers CSV depuis S3 

In [3]:
# Téléchargement des résultats du répertoire S3 vers un dossier local
bucket_name = "opc-p9-ba"
s3_folder = "Resultats/final_output_csv/"
local_folder = "./local_csv/"

os.makedirs(local_folder, exist_ok=True)

s3 = boto3.client('s3')

objects = s3.list_objects_v2(Bucket=bucket_name, Prefix=s3_folder)

for obj in objects.get('Contents', []):
    file_key = obj['Key']
    if file_key.endswith('.csv'):  
        local_file_path = os.path.join(local_folder, os.path.basename(file_key))
        s3.download_file(bucket_name, file_key, local_file_path)
        print(f"Téléchargé : {file_key}")

Téléchargé : Resultats/final_output_csv/part-00000-f0e03654-1ae0-4583-87fc-e14e57103622-c000.csv
Téléchargé : Resultats/final_output_csv/part-00001-f0e03654-1ae0-4583-87fc-e14e57103622-c000.csv
Téléchargé : Resultats/final_output_csv/part-00002-f0e03654-1ae0-4583-87fc-e14e57103622-c000.csv
Téléchargé : Resultats/final_output_csv/part-00003-f0e03654-1ae0-4583-87fc-e14e57103622-c000.csv
Téléchargé : Resultats/final_output_csv/part-00004-f0e03654-1ae0-4583-87fc-e14e57103622-c000.csv
Téléchargé : Resultats/final_output_csv/part-00005-f0e03654-1ae0-4583-87fc-e14e57103622-c000.csv
Téléchargé : Resultats/final_output_csv/part-00006-f0e03654-1ae0-4583-87fc-e14e57103622-c000.csv
Téléchargé : Resultats/final_output_csv/part-00007-f0e03654-1ae0-4583-87fc-e14e57103622-c000.csv
Téléchargé : Resultats/final_output_csv/part-00008-f0e03654-1ae0-4583-87fc-e14e57103622-c000.csv
Téléchargé : Resultats/final_output_csv/part-00009-f0e03654-1ae0-4583-87fc-e14e57103622-c000.csv
Téléchargé : Resultats/final_o

Fusion des fichiers CSV

In [None]:
# Les fichiers CSV sont combinés en un seul fichier final, qui est sauvegardé localement et sur S3
folder_path = "./local_csv/"

all_csv_files = glob.glob(os.path.join(folder_path, "part-*.csv"))

df_list = [pd.read_csv(file) for file in all_csv_files]
merged_df = pd.concat(df_list, axis=0)

output_file = "./Bahou_Amira_2_images_112024.csv"
merged_df.to_csv(output_file, index=False)

print(f"Fichier fusionné sauvegardé localement : {output_file}")

Fichier fusionné sauvegardé localement : ./Bahou_Amira_images_112024.csv


In [None]:
output_s3_key = "Resultats/final_output_csv/Bahou_Amira_2_images_112024.csv"

s3.upload_file(output_file, bucket_name, output_s3_key)

print(f"Fichier fusionné sauvegardé sur S3 : s3://{bucket_name}/{output_s3_key}")

Fichier fusionné sauvegardé sur S3 : s3://opc-p9-ba/Resultats/final_output_csv/Bahou_Amira_images_112024.csv


# 5. Conclusion

Nous avons réalisé ce projet en deux étapes, en tenant compte des contraintes imposées.

Dans un premier temps, nous avons développé notre solution localement. La première phase a consisté à installer l'environnement Spark. Ce dernier dispose d'un paramètre permettant de travailler en mode local, simulant ainsi du calcul distribué en considérant chaque cœur de processeur comme un worker indépendant. Nous avons testé notre solution sur un jeu de données plus petit, l'objectif étant de valider son bon fonctionnement.
Pour le modèle, nous avons opté pour du transfer learning avec MobileNetV2, choisi pour sa légèreté, sa rapidité d'exécution et la faible taille de son vecteur de sortie. Les résultats ont été sauvegardés sur disque sous forme de partitions au format parquet. La solution a parfaitement fonctionné en local.

La deuxième phase a consisté à mettre en place un cluster de calculs réel, afin d'anticiper une augmentation future de la charge de travail. Nous avons choisi d'utiliser Amazon Web Services (AWS), qui permet de louer à la demande de la puissance de calcul à un coût abordable. Le service EC2 d'AWS, qui fait partie des solutions Infrastructure as a Service (IaaS), a été retenu. Nous avons également adopté un service de niveau supérieur (Platform as a Service - PaaS) : EMR. Ce service nous a permis de lancer un cluster de serveurs sur lesquels nous avons installé et configuré plusieurs programmes et librairies nécessaires à notre projet, tels que Spark, Hadoop, JupyterHub et TensorFlow. Cette approche nous a permis non seulement d'être plus rapides et efficaces dans la mise en place, mais aussi de garantir le bon fonctionnement de la solution, validée par les ingénieurs d'Amazon. L'installation des packages nécessaires a été réalisée facilement sur l'ensemble des machines du cluster.
Avec peu de modifications, nous avons pu exécuter notre notebook exactement comme en local, cette fois-ci sur toutes les images du dossier "Test". Pour le stockage des données, nous avons opté pour Amazon S3, qui offre un stockage évolutif et à faible coût. S3 permet une gestion efficace des données, avec un espace de stockage potentiellement illimité et des coûts en fonction de l'espace utilisé.

Grâce à cette solution, nous serons capables de faire face à l'augmentation de la charge de travail en redimensionnant facilement notre cluster. Bien que cela augmente les coûts, ils resteront bien inférieurs à ceux associés à l'achat de matériel ou à la location de serveurs dédiés.