# Projet 8 - <font color='green'>Notebook</font> - Déployez un modèle dans le cloud
Dans ce projet, nous mettons en œuvre une architecture et une application big data dans le but de construire un moteur de classification d'images de fruits.
Le [jeu de données](https://www.kaggle.com/moltean/fruits) comprend des images de différents fruits sous différents angles.
La mission consiste à prendre en considération le volume important de données pour réaliser une application en pyspark à exécuter dans le cloud sur une architecture big data.
L'architecture big data est mise en place sur AWS en utilisant S3 pour le stockage des données et un cluster EMR pour l'exécution de l'application.
L'application réalise l'acquisition des images et leur prétraitement comprenant l'extraction des features et la réduction de dimension.

<a id="toc"></a>
**SOMMAIRE**

-  [Préparation du cluster - architecture](#cluster)
-  [Session spark](#session)
-  [Configuration d'exécution](#config)
-  [Lecture des données d'entrée](#read)
-  [Fonctions de création du modèle CNN et d'extraction des features](#cnn)
-  [Extraction itérative des features des images](#features)
-  [Enregistrement du dataset](#dataset)
-  [Standardisation des features de X](#standardization)
-  [Réduction de dimension avec un PCA](#pca)

&nbsp;

<a id="cluster"></a>
### Préparation du cluster - architecture

Le cluster cible consiste en une architecture Big Data EMR sur AWS avec les caractéristiques suivantes:
- Localisation dans la région eu-west-3 (Paris) afin que la proximité minimise le temps de latence.
- Gestion de la sécurité:
  - Compte IAM mis en place, dont les crédendials figurent dans le fichier USER/.aws/credentials pour l'exécution en mode local (PC Windows 10 en stand alone).
  - Paire de clés EC2 créée au prélable sur la région.
  
  &nbsp;
- Version EMR 6.8.0, la plus récente, afin de disposer des versions les plus récentes des libraires Hadoop 3.2.1 et spark 3.3.0.
- Afin d'exécuter ce notebook, les librairies JupyterEnterpriseGateway 2.1.0 et Livy 0.7.1 sont également incluses ; le noyau à utiliser avec jupyterLab est 'pyspark'.
- L'architecture du cluster est basée sur des instances uniformes à usage général et constituée de :
  - <font color='green'>1 nœud maitre</font> : m5.xlarge constitué de 4 cœurs virtuels, 16GB de RAM et 64 GB de stockage local (EBS) ;
  - <font color='green'>2 nœuds esclaves</font>, dont pour chacune de ces instances d'exécution: m5.xlarge constitué de 4 cœurs virtuels, 16GB de RAM et 64 GB de stockage local (EBS).
  
  Notons que cette architecture est surdimensionnée pour les seules opérations de ce notebook, mais qu'elle correspond à la plus petite configuration à usage général disponible sur la région et permettra de traiter un nombre important d'images avec un nœud maitre pouvant disposant d'un volume mémoire important pour les opérations de sérialisation et des nœud d'exécution évolutifs par mise à l'échelle horizontale.
 
- OS Amazon Linux (red hat).
- Amorçage du cluster pour installer des librairies complémentaires sur chaque instance (fichier bootstrap.sh).


*[retour SOMMAIRE](#toc)*

&nbsp;

<a id="session"></a>
### Session spark

Il s'agit de la création de la SparkSession en fonction de l'architecture cible du cluster AWS, en spécifiant certains paramètres importants de configuration, puis utilisation du sparkContext pour régler le niveau de verbosité de l'application.
S'agissant d'un notebook, l'initialisation de la session Spark s'effectue avec Apache Livy et permet de spécifier certaines configurations.

Les paramètres de [configuration](https://spark.apache.org/docs/3.3.0/configuration.html#content):
- '<font color='green'>spark.driver.maxResultSize</font>' est fixé par défaut à 1GB, ce qui peut devenir insuffisant pour effectuer des traitements nécessitant la sérialisation de gros dataframes (ex: PCA avec un grand nombre de features) ;
- '<font color='green'>spark.driver.memory</font>' peut être à régler pour permettre au driver de disposer de suffisamment de mémoire pour sérialiser (collect) les données. Sa valeur maximum est limitée à la mémoire maximum disponible pour spark, fixée en particulier par 'spark.memory.fraction', dont la valeur par défaut est 0.6. Nous recherchons pour ce projet une valeur maximale afin de tirer partie de la RAM disponible pour gérer un dataset aussi volumineux que possible et minimiser le temps de sérialisation.
&nbsp;
- '<font color='green'>spark.executor.memory</font>' et '<font color='green'>spark.executor.cores</font>', valeurs par instance (nœud d'exécution), qui dépend du nombre de cœurs et de la RAM disponible par exécuteur. Nous recherchons une valeur relativement modeste pour 'spark.executor.memory' afin que lors d'une mise à l'échelle nous puissions maximiser la parallélisation en fonction du nombre de vcpu disponibles.

**Note pour un fonctionnement sans Apache Livy** (ex: ligne de commande spark-submit): certaines valeurs (deploy) comme "spark.driver.maxResultSize" et "spark.driver.memory" ne peuvent pas être modifiées dans l'application, ce qui rend nécessaire de les spécifier dans 'SPARK_HOME/conf/spark-defaults.conf' ou au lancement de l'application avec la ligne de commande 'spark-submit --conf spark.driver.maxResultSize=0 main.py' (si l'application est lancée depuis le script python main.py). Dans l'environnement EMR avec AWS linux, la modification de ce paramètre s'effectue sur le nœud maitre avec 'sudo vim /etc/spark/conf/spark-defaults.conf' en ajoutant 'spark.driver.maxResultSize  0'. La valeur '0' permet de ne pas limiter sa valeur, avec le risque de rencontrer une erreur si la taille d'une donnée sérialisée sur le nœud maitre fait dépasser sa capacité mémoire ; l'alternative consisterait à lui donner une valeur fixée, par exemple la même que celle donnée à 'spark.driver.memory'.
Si le fichier 'spark-defaults.conf' n'existe pas encore :
- *sur le cluster AWS*, exécuter un premier 'spark-submit' crée ce fichier qu'on peut ensuite modifier ,
- *en local*, l'installation de pyspark avec pip n'installe pas ce fichier et il faut créer le répertoire 'conf' et y placer le fichier 'spark-defaults.conf' ;

&nbsp;
Nous configurons par ailleurs 'Apache arrow' ainsi que recommandé pour l'utilisation de Pandas et Numpy et de vectorisation UDFs (User Defined Functions).

In [1]:
%%configure -f
{"conf":{"spark.driver.cores": "1",
         "spark.driver.memory": "8g",
         "spark.driver.maxResultSize": "0",
         "spark.executor.instances": "2",
         "spark.executor.cores": "4",
         "spark.executor.memory": "1g"}}

In [2]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName('ocp8') \
    .getOrCreate()
prev = spark.conf.get(
    "spark.sql.execution.arrow.pyspark.enabled")  # get previous conf
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled",
               True)  # Arrow optimization for pyspark.sql.DataFrame
conf = spark.sparkContext.getConf().getAll()
print("Création d'une session spark avec la configuration:")
for item in conf:
    print(item)
sc = spark.sparkContext
sc.setLogLevel('ERROR')

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
6,application_1666957824279_0007,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Cr?ation d'une session spark avec la configuration:
('spark.eventLog.enabled', 'true')
('spark.driver.extraJavaOptions', "-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -XX:OnOutOfMemoryError='kill -9 %p'")
('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES', 'http://ip-172-31-16-49

*[retour SOMMAIRE](#toc)*

&nbsp;

<a id="config"></a>
### Configuration d'exécution

Utilisation de 2 constantes pour spécifier cette configuration :
- Mode local (PC windows 10 en stand alone) vs le mode d'exécution sur AWS EMR
- Mode debug: dans ce mode, le nombre d'images et de features sont réduits pour exécuter rapidement l'application et investiguer notamment des configurations de gestion mémoire du cluster (nota: la configuration matérielle des instances permet d'aller jusqu'à au moins 10000 features).

In [3]:
# Configuration d'exécution
LOCAL = False  # False si exécution sur aws EMR
DEBUG = False  # Mode debug
print(f"Exécution du script en local={LOCAL} et debug={DEBUG}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Ex?cution du script en local=False et debug=False

*[retour SOMMAIRE](#toc)*

&nbsp;

<a id="read"></a>
### Lecture des données d'entrée

Les images sont stockées sur S3, compartiment "ocp8project", répertoire "input_data/" et chaque fruit fait l'objet d'un répertoire à son nom contenant les images de ce fruit.

In [4]:
import boto3

# Structure de stockage du projet dans S3
print("Structure de stockage des données dans S3:")
AWS_S3_BUCKET = "ocp8project"
print(f"AWS_S3_BUCKET={AWS_S3_BUCKET}")
INPUT_DATA_FOLDER = "input_data/"
print(f"INPUT_DATA_FOLDER={INPUT_DATA_FOLDER}")
OUTPUT_DATA_FOLDER = "output_data/"
print(f"OUTPUT_DATA_FOLDER={OUTPUT_DATA_FOLDER}")

# Création d'un client pour l'accès à S3
s3_client = boto3.client("s3")

# Lecture des données d'entrées
objects = s3_client.list_objects_v2(Bucket=AWS_S3_BUCKET,
                                    Prefix=INPUT_DATA_FOLDER)
n_objects = objects['KeyCount']
print(
    f"Nombre d'objets dans s3://{AWS_S3_BUCKET}/{INPUT_DATA_FOLDER}: {n_objects}"
)
if n_objects <= 1:
    print("Erreur: le répertoire des données d'entrée est vide")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Structure de stockage des donn?es dans S3:
AWS_S3_BUCKET=ocp8project
INPUT_DATA_FOLDER=input_data/
OUTPUT_DATA_FOLDER=output_data/
Nombre d'objets dans s3://ocp8project/input_data/: 91

*[retour SOMMAIRE](#toc)*

&nbsp;

<a id="cnn"></a>
### Fonctions de création du modèle CNN et d'extraction des features

Nous utilisons le modèle EfficientNet pour extraire les features avec du transfer-learning, en utilisant la totalité du modèle à l'exception de la couche de classification.
Nous y ajoutons une couche 'GlobalAveragePooling2D' afin de réduire chaque feature de la dimension 7*7 à 1. Le nombre de features est donc de 1280.
La fonction d'extraction des features de chaque image utilise en entrée l'image telle que chargée par PIL et produit en sortie un vecteur pyspark.ml.linalg.Vectors.

In [5]:
from keras.applications import EfficientNetB0
from keras.layers import GlobalAveragePooling2D
from keras.models import Model


def get_model_from_EfficientNetB0(layer='top_activation'):
    """
    Contruit le modèle basé sur EfficientNetB0 jusqu'à la couche
        spécifiée, selon le niveau attendu des features à extraire,
        puis ajoute une couche GlobalAveragePooling2D pour prendre
        la valeur moyenne (de la matrice 7*7) de chaque feature.
    :param layer: str, nom de la dernière couche keras layer du
        modèle EfficientNetB0 à utiliser, par défaut la dernière
        couche avant celle de classification.
    :return: keras.models.Model, modèle produisant les features.
    """
    # Modèle de base EfficientNet sans la couche de classification
    base_model = EfficientNetB0(weights="imagenet",
                                include_top=False,
                                input_shape=(224, 224, 3))
    #print(base_model.summary())  # Donne les noms de chaque layer

    # Sélection du modèle jusqu'à une couche spécifique selon le niveau  attendu des features
    x = base_model.get_layer(layer).output
    x = GlobalAveragePooling2D()(x)
    model = Model(inputs=base_model.input, outputs=x)
    return model


from keras.utils import load_img, img_to_array
from keras.applications.efficientnet import preprocess_input
from pyspark.ml.linalg import Vectors


def feature_extraction(model,
                       img_path=None,
                       img=None,
                       debug=False,
                       debug_feat_size=10):
    """
    Extrait les features de l'image avec le modèle (1280 features avec
        EfficientNetB0).
    :param model: keras.models.Model, modèle produisant les features,
        issu de la fonction get_model_from_EfficientNetB0.
    :param img_path: str, chemin de l'image dont les features seront
        extraites par la fonction.
    :param img: PIL image au format (224, 224).
    :param debug: bool, mode debug, default=False.
    :param debug_feat_size: int, dimension du vecteur de sortie pour
        le mode debug, défault=10.
    :return: pyspark.ml.linalg.Vectors, vecteur 1D des features de
        l'image.
    """
    if img_path is not None:
        # Charge et redim filtre par défaut
        img = load_img(img_path, target_size=(224, 224), keep_aspect_ratio=True)
    elif img is None:
        print('Aucune image spécifiée en entrée')
        return None
    img = img_to_array(img)  # Conversion de l'image en np.array
    img = img.reshape(
        (1, img.shape[0], img.shape[1], img.shape[2]))  # Reshape format CNN
    img = preprocess_input(img)  # Preprocessing efficientnet
    # Prédiction + reshape 1D + format liste + Vectorisation
    features = Vectors.dense(model.predict(img).ravel().tolist()[:debug_feat_size]) \
        if debug else Vectors.dense(model.predict(img).ravel().tolist())
    return features

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

*[retour SOMMAIRE](#toc)*

&nbsp;

<a id="features"></a>
### Extraction itérative des features des images

L'extraction s'effectue image par image selon le processus :
- Lecture de chaque fichier de INPUT_DATA_FOLDER: label et image du fruit ;
- Codage des labels dans un dictionnaire {"nom_fruit": int} ;
- Pour chaque fichier image (filename!=''), chargement de l'image et affichage de son nom, format et dimensions ;
- Redimensionnement de l'image à la dimension d'entrée du modèle ;
- Création d'un dataframe contenant le label de l'image ;
- Création d'un dataframe contenant le vecteur des features de l'image ;
- Concaténation progressive dans un dataframe y (labels) et un dataframe X (features).

En mode DEBUG, ce processus se limite à 'max_img' images et 'max_feat' features.

In [6]:
from PIL import Image

print(
    "Lecture des images et extraction des features avec le CNN 'EfficientNetB0':"
)
labels = dict()
label_count = 0
img_count = 0
max_img = 10 if DEBUG else n_objects  # remplacer 'n_objects' par une constante selon la capacité du cluster
max_feat = 10 if DEBUG else 1280
model = get_model_from_EfficientNetB0()

for obj in objects['Contents']:
    label = obj['Key'].split('/')[-2]
    filename = obj['Key'].split('/')[-1]
    if filename != '':

        # Codage du label
        if label not in labels.keys():
            labels.update({label: label_count})
            label_count += 1

        # Lecture et redimensionnement de l'image
        file = s3_client.get_object(Bucket=AWS_S3_BUCKET, Key=obj['Key'])
        img = Image.open(file['Body'])
        print(f"{label}: {filename}, {img.format}, {img.size}, {img.mode}")
        if img.size != (224, 224):
            img = img.resize((224, 224))

        # labels et features des images
        sdf_label = spark.createDataFrame([(labels[label],)], ['label'])
        features = feature_extraction(model,
                                      img=img,
                                      debug=DEBUG,
                                      debug_feat_size=max_feat)
        sdf_features = spark.createDataFrame([(features,)], ['features'])
        if img_count == 0:
            y = sdf_label
            X = sdf_features
            features_size = len(features)
        else:
            y = y.union(sdf_label)
            X = X.union(sdf_features)
        img_count += 1
    if img_count == max_img:
        break  # mode debug

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Lecture des images et extraction des features avec le CNN 'EfficientNetB0':
Corn: 0_100.jpg, JPEG, (100, 100), RGB
Corn: 199_100.jpg, JPEG, (100, 100), RGB
Corn: r2_123_100.jpg, JPEG, (100, 100), RGB
Corn: r2_12_100.jpg, JPEG, (100, 100), RGB
Corn: r2_135_100.jpg, JPEG, (100, 100), RGB
Corn: r2_147_100.jpg, JPEG, (100, 100), RGB
Corn: r2_182_100.jpg, JPEG, (100, 100), RGB
Corn: r2_194_100.jpg, JPEG, (100, 100), RGB
Corn: r2_24_100.jpg, JPEG, (100, 100), RGB
Corn: r2_36_100.jpg, JPEG, (100, 100), RGB
Corn: r2_48_100.jpg, JPEG, (100, 100), RGB
Corn: r2_87_100.jpg, JPEG, (100, 100), RGB
Corn: r2_99_100.jpg, JPEG, (100, 100), RGB
Corn: r_0_100.jpg, JPEG, (100, 100), RGB
Corn: r_100_100.jpg, JPEG, (100, 100), RGB
Corn: r_119_100.jpg, JPEG, (100, 100), RGB
Corn: r_129_100.jpg, JPEG, (100, 100), RGB
Corn: r_136_100.jpg, JPEG, (100, 100), RGB
Corn: r_169_100.jpg, JPEG, (100, 100), RGB
Corn: r_175_100.jpg, JPEG, (100, 100), RGB
Corn: r_188_100.jpg, JPEG, (100, 100), RGB
Corn: r_18_100.jpg, JPEG

*[retour SOMMAIRE](#toc)*

&nbsp;

<a id="dataset"></a>
### Enregistrement du dataset

Sont enregistrés en sortie dans le répertoire "data_output" du projet "ocp8project" :
- Le dictionnaire des labels de fruit au format json:  labels.json ;
- Le dataframe des labels du dataset au format parquet: y.parquet ;
- Le dataframe des features du dataset au format parquet: X.parquet.

In [7]:
import json

print(f"Sauvegarde du dataset constitué de {img_count} images de {label_count}"
      f" fruits, vectorisées en {features_size} features")
labels_file = 'labels.json'
with open(labels_file, "w") as file:
    json.dump(labels, file)
print(f"Sauvegarde du dictionnaire des labels dans "
      f"s3://{AWS_S3_BUCKET}/{OUTPUT_DATA_FOLDER}{labels_file}")
s3_client.upload_file(labels_file, AWS_S3_BUCKET,
                      OUTPUT_DATA_FOLDER + labels_file)

if not LOCAL:
    print(f"Sauvegarde du dataset-labels au format '.parquet' dans "
          f"s3://{AWS_S3_BUCKET}/{OUTPUT_DATA_FOLDER}y.parquet")
    y.write.mode('overwrite').parquet('s3://' + AWS_S3_BUCKET + '/' +
                                      OUTPUT_DATA_FOLDER + 'y.parquet')
    print(f"Sauvegarde du dataset-features au format '.parquet' dans "
          f"s3://{AWS_S3_BUCKET}/{OUTPUT_DATA_FOLDER}X.parquet")
    X.write.mode('overwrite').parquet('s3://' + AWS_S3_BUCKET + '/' +
                                      OUTPUT_DATA_FOLDER + 'X.parquet')

# Nettoyage mémoire
import gc
del y, img
gc.collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Sauvegarde du dataset constitu? de 90 images de 3 fruits, vectoris?es en 1280 features
Sauvegarde du dictionnaire des labels dans s3://ocp8project/output_data/labels.json
Sauvegarde du dataset-labels au format '.parquet' dans s3://ocp8project/output_data/y.parquet
Sauvegarde du dataset-features au format '.parquet' dans s3://ocp8project/output_data/X.parquet
9016

*[retour SOMMAIRE](#toc)*

&nbsp;

<a id="standardization"></a>
### Standardisation des features de X

Avant de procéder à une réduction de dimension avec PCA, les features sont standardisées et la fonction de mise à l'échelle enregistrée dans le répertoire "data_output" du projet "ocp8project" avec la fonction 'save' de pyspark.

In [8]:
from pyspark.ml.feature import StandardScaler

print(f"Mise à l'échelle des features avec StandardScaler:")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures").fit(X)
X = scaler.transform(X).select('scaledFeatures')
print(
    f"Données X mises à l'échelle avec StandardScaler: X:{X.count()} lignes * {len(X.columns)} colonne"
)

# Sauvegarde de scaler et nettoyage mémoire
scaler_file = 'std_scaler'
if LOCAL:
    scaler.write().overwrite().save(scaler_file)
else:
    print(
        f"Sauvegarde du scaler dans s3://{AWS_S3_BUCKET}/{OUTPUT_DATA_FOLDER}{scaler_file}"
    )
    scaler.write().overwrite().save('s3://' + AWS_S3_BUCKET + '/' +
                                    OUTPUT_DATA_FOLDER + scaler_file)

del scaler
gc.collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Mise ? l'?chelle des features avec StandardScaler:
Donn?es X mises ? l'?chelle avec StandardScaler: X:90 lignes * 1 colonne
Sauvegarde du scaler dans s3://ocp8project/output_data/std_scaler
123

*[retour SOMMAIRE](#toc)*

&nbsp;

<a id="pca"></a>
### Réduction de dimension avec un PCA

Le jeu de données X contenant les features est réduit avec un PCA et enregistré dans le répertoire "data_output" du projet "ocp8project" au format parquet: X_pca.parquet.
Le nombre de features résultantes est fixé à 1/10ème du nombre de feature initial, ce qui est largement suffisant pour les 3 * 30 images de ce projet, mais sera à revoir lors d'une mise à l'échelle, en fonction de la variance expliquée.
La variance expliquée totale est affichée et son vecteur enregistré pour permettre l'analyse en fonction du nombre d'images et ajuster en conséquence la réduction.

In [9]:
# Réduction de dimension de X avec PCA
from pyspark.ml.feature import PCA

n_features = max(2, max_feat // 10) if DEBUG else features_size // 10
print(f"Réduction de dimension de X avec PCA (k={n_features}):")
pca = PCA(k=n_features, inputCol='scaledFeatures',
          outputCol='pcaFeatures').fit(X)
X = pca.transform(X).select('pcaFeatures')
print(f"Dataset réduit (X):{X.count()} lignes * {len(X.columns)} colonne")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

R?duction de dimension de X avec PCA (k=128):
Dataset r?duit (X):90 lignes * 1 colonne

In [10]:
# Écriture de X dans S3
if not LOCAL:
    print(f"Sauvegarde du dataset-features réduit dans "
          f"s3://{AWS_S3_BUCKET}/{OUTPUT_DATA_FOLDER}Xpca.parquet")
    X.write.mode('overwrite').parquet('s3://' + AWS_S3_BUCKET + '/' +
                                      OUTPUT_DATA_FOLDER + 'Xpca.parquet')

del X
gc.collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Sauvegarde du dataset-features r?duit dans s3://ocp8project/output_data/Xpca.parquet
253

In [11]:
# Variance expliquée
import pickle

print(f"Variance expliquée totale: {100*pca.explainedVariance.sum():.2f}%")
explained_var = {
    'explained_var_vec': pca.explainedVariance,
    'explained_var_vsum': pca.explainedVariance.sum()
}
explained_var_file = 'explained_var.pkl'

with open(explained_var_file, 'wb') as file:
    pickle.dump(explained_var, file)
print(f"Sauvegarde de la variance expliquée dans s3://"
      f"{AWS_S3_BUCKET}/{OUTPUT_DATA_FOLDER}{explained_var_file}")
s3_client.upload_file(explained_var_file, AWS_S3_BUCKET,
                      OUTPUT_DATA_FOLDER + explained_var_file)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Variance expliqu?e totale: 100.00%
Sauvegarde de la variance expliqu?e dans s3://ocp8project/output_data/explained_var.pkl

In [12]:
# Sauvegarde PCA et nettoyage mémoire
pca_file = 'pca'
if LOCAL:
    pca.write().overwrite().save(pca_file)
else:
    print(f"Sauvegarde du PCA dans s3://{AWS_S3_BUCKET}/{OUTPUT_DATA_FOLDER}{pca_file}")
    pca.write().overwrite().save('s3://' + AWS_S3_BUCKET + '/' + OUTPUT_DATA_FOLDER + pca_file)

del pca
gc.collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Sauvegarde du PCA dans s3://ocp8project/output_data/pca
204

In [13]:
# Reset spark configuration
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled",
               prev)  # Restaure conf précédente

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

*[retour SOMMAIRE](#toc)*

&nbsp;