<div style='text-align:center; font-size:24px'>
<B>Notebook de traitement distribué sur le Cloud</B>
<br>Projet N° 9 - <I>Réalisez un traitement Big Data sur le Cloud

<div style='text-align:right; font-size:20px; font-weight:bold; font-style: italic' > 
par Jean Vallée

<div style='background-color: darkblue; text-align: center'>.

- Ce notebook est utilisé pour le traitement sur le Cloud, pour traiter le jeu complet d'images.
- Sélectionnez un Kernel PySpark

# Session Spark
Le lancement de Spark échoue parfois, alors exécuter cette 1e case individuellement

In [1]:
pass         # Displays Spark-session info and links to UI & logs
if ( 'spark' in globals() ) and ( spark.version ) :
    print(f'OK. Spark is active. Version: {spark.version}')

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1725619841828_0002,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

OK. Spark is active. Version: 3.5.1-amzn-0

## Lancement

- Sur un poste local, on lance la session Spark puis on crée le contexte
- Sur le Cloud, EMR se charge de ces opérations

Informations de configuration Spark

In [368]:
cluster_name = 'emr-1core-2task'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [369]:
spark_conf = sc.getConf()
print('Executor Cores :',  spark_conf.get('spark.executor.cores')  )
print('Executor Memory :', spark_conf.get('spark.executor.memory') )
print('Driver Memory :',   spark_conf.get('spark.driver.memory')   )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Executor Cores : 2
Executor Memory : 4743M
Driver Memory : 1000M

### Dossiers de travail

In [370]:
path_s3         = 's3://jv-s3/'
path_s3_output  = path_s3 + 'output/'
path_s3_images  = path_s3 + 'data/fruits-360/'
path_output     = './output/' 
print( 'path_s3_images:  '  + path_s3_images + \
      '\npath_s3_output:  ' + path_s3_output + \
      '\npath_output:     ' + path_output )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

path_s3_images:  s3://jv-s3/data/fruits-360/
path_s3_output:  s3://jv-s3/output/
path_output:     ./output/

# Modules & fonctions
- pour un traitement local, leur installation est faîte sur un terminal 
- pour le Cloud, elle est faîte à l'amorçage “bootstrap” du cluster

In [371]:
%%sh
pip list | grep -iE 'pandas|numpy|pip|ipython|spark|tensorflow'

ipython                      7.25.0
ipython-genutils             0.2.0
numpy                        1.26.4
pandas                       1.4.4
pip                          24.0
sparkmagic                   0.20.0


## Import

In [372]:
import pandas as pd
import numpy as np
import io
import os
import cv2
import time
import boto3
import subprocess

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [373]:
import tensorflow as tf

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [374]:
from pyspark.sql.functions import pandas_udf, PandasUDFType, element_at, split, col 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Fonctions de calcul du temps d'exécution
- Calcule le temps d'exécution des commandes les plus longues
- Crée un tableau récapitulatif qui sera affiché à la fin du notebook

In [375]:
df_duration = pd.DataFrame(columns=['cluster', 'job_group_id', 'nb_images', 'nb_secs', 'duration'])
timestamp = time.strftime('%Y%m%d_%H%M')
def get_time() : 
    return int(time.time())
def get_start_time(job_group_id) : 
    sc.setJobGroup(job_group_id, job_group_id)
    return get_time()
def get_HHMMSS(secs) : 
    HH, MM, SS = secs // 3600, (secs % 3600) // 60, secs % 60 # // returns integer
    if HH == 0 : return f'         {MM:02}m {SS:02}s' 
    else :       return f'{HH:02}h {MM:02}m {SS:02}s'
def get_duration(job_group_id, start_time) :
    nb_secs = get_time() - start_time
    duration = get_HHMMSS(nb_secs)
    print('Execution time = ', duration)
    global df_duration
    dict_row = pd.DataFrame({'job_group_id': [job_group_id], 'nb_secs': [nb_secs], 'duration': [duration]})
    df_duration = pd.concat([df_duration, dict_row], ignore_index=True)
    sc.cancelJobGroup(job_group_id)
def get_total_duration() :
    df_total_duration = df_duration.groupby('nb_images').agg({'nb_secs': 'sum'}).reset_index()
    df_total_duration['duration'] = df_total_duration['nb_secs'].apply(lambda x : get_HHMMSS(x))
    return df_total_duration

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Fonction d'exécution de commande Shell

In [376]:
def run_cmd(str_cmd) :
    print('Command =', str_cmd)
    result = subprocess.run(str_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 
    ret = result.returncode
    if ret != 0 : 
        print( 'ERROR : code =', result.returncode, '\n', result.stderr.decode('utf-8'))
        return 'ERROR'
    else : # ret == 0 
        print( 'OK, success\n', result.stdout.decode('utf-8'))    
        return 'OK'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Pré-traitement des DataFrame Spark

In [377]:
global_start_time = get_time()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Fichiers sur AWS S3
_Boto 3_ permet la gestion de fichiers sur S3 : les lister, copier, déplacer, supprimer, etc.

In [378]:
def list_files_in_bucket(bucket_name):
    paginator = s3.get_paginator('list_objects_v2')
    page_iterator = paginator.paginate(Bucket=bucket_name)   
    full_list = []    
    for partial_list_i in page_iterator:
        if 'Contents' in partial_list_i :
            full_list.extend(partial_list_i['Contents'])
    return full_list

s3 = boto3.client('s3')
my_bucket = 'jv-s3'
li_dict_images = list_files_in_bucket(my_bucket)
print('nb_files in bucket =', len(li_dict_images))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

nb_files in bucket = 10862

### Jeu de données original

In [379]:
def count_files_in_folder(li_dict_paths, target_path, suffix) :
    nb_files = 0
    for dict_path in li_dict_paths :  
        if dict_path['Key'].startswith(target_path) & dict_path['Key'].endswith(suffix) :
            nb_files += 1
    return nb_files
nb_images = count_files_in_folder(li_dict_images, 'data/fruits-360/', '.jpg')
print('nb_images =', nb_images)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

nb_images = 10812

<div style='background-color: blue; text-align: center'>.

## Chargement des fichiers image

In [380]:
sdf_images = spark.read.format('binaryFile') \
  .option('pathGlobFilter', '*.jpg') \
  .option('recursiveFileLookup', 'true') \
  .load(path_s3_images)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Nombre d'images variable
Normalement on traite un nombre unique d'images. Mais on peut fait varier ce nombre pour l'extrapolation de la durée en fonction du nombre d'images 

## _Feature Engineering_

### Ajout de _image_id_
On remplace le champ _path_ par un identifiant de l'image; L'identifiant est bien plus court que le chemin des fichiers

In [381]:
from pyspark.sql.functions import monotonically_increasing_id

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [382]:
sdf_images = sdf_images.withColumn( 'image_id', monotonically_increasing_id() )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Ajout de _class_

In [383]:
sdf_images = sdf_images.withColumn('class', element_at(split(sdf_images['path'], '/'),-2))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Ajout de _class_id_

In [384]:
from pyspark.ml.feature import StringIndexer

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Codage des classes en indexes entiers

In [385]:
start_time = get_start_time('add_class_id')
indexer = StringIndexer(inputCol='class', outputCol='class_id')
sdf_images = indexer.fit(sdf_images).transform(sdf_images)
get_duration('add_class_id', start_time)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Execution time =           00m 22s

In [386]:
print(sdf_images.printSchema())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- image_id: long (nullable = false)
 |-- class: string (nullable = true)
 |-- class_id: double (nullable = false)

None

## Suppression de colonnes inutiles

In [387]:
sdf_images = sdf_images.drop(*['path', 'modificationTime', 'length'])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [388]:
print(sdf_images.printSchema())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- content: binary (nullable = true)
 |-- image_id: long (nullable = false)
 |-- class: string (nullable = true)
 |-- class_id: double (nullable = false)

None

In [389]:
start_time = get_start_time('get_spark_df_sample')
print(sdf_images.drop('content').show(5, False))
get_duration('get_spark_df_sample', start_time)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-----+--------+
|image_id|class|class_id|
+--------+-----+--------+
|0       |Cocos|7.0     |
|1       |Cocos|7.0     |
|2       |Cocos|7.0     |
|3       |Cocos|7.0     |
|4       |Cocos|7.0     |
+--------+-----+--------+
only showing top 5 rows

None
Execution time =           00m 01s

## Aggrégats
### Distribution de classes

In [390]:
start_time = get_start_time('get_class_distribution')
df_count_per_class = sdf_images.groupBy('class_id', 'class').count().toPandas()
df_count_per_class['class_id'] = df_count_per_class['class_id'].astype(int)
get_duration('get_class_distribution', start_time)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Execution time =           00m 12s

In [391]:
print( df_count_per_class.sort_values('class_id').to_string(index=False) )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

 class_id              class  count
        0             Plum 3    900
        1     Cherry Rainier    738
        2       Cantaloupe 1    492
        3     Cherry Wax Red    492
        4              Lemon    492
        5              Peach    492
        6             Banana    490
        7              Cocos    490
        8        Grape White    490
        9    Grapefruit Pink    490
       10        Huckleberry    490
       11           Maracuja    490
       12         Pear Abate    490
       13       Pear Monster    490
       14      Pear Williams    490
       15          Corn Husk    462
       16               Corn    450
       17         Potato Red    450
       18 Apple Crimson Snow    444
       19             Plum 2    420
       20      Cucumber Ripe    392
       21         Peach Flat    178

### Nombre de classes

In [392]:
nb_classes = len(df_count_per_class)
print('nb_classes =', nb_classes)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

nb_classes = 22

### Nombre d'images

In [393]:
nb_images = df_count_per_class['count'].sum()
print('nb_images =', nb_images)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

nb_images = 10812

# Remplacement d'attributs originaux
Le modèle de classification choisi requiert une résolution de 224x224x3 couleurs de pixel d’image, ce qui représente 150 258 attributs 

## Génération d'attributs plus représentatifs
Le modèle choisi pour générer des attributs plus représentatifs des images est le modèle neuronal pré-entraîné "Mobil Net V2"

In [394]:
import keras as k

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Récupération du modèle tronqué
- récupère le modèle pré-entraîné 
- le tronque en enlevant la toute dernière couche
- diffuse les poids du modèle
    - depuis le Driver dans le noeud Master
    - vers les Executors dans les noeuds Core

#### Récupération du modèle complet

In [395]:
model_cnn = k.applications.mobilenet_v2.MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Troncature du modèle
La dernière couche est enlevée pour l'adapter à notre jeu d'images

In [396]:
model_cnn_truncated = k.Model(inputs=model_cnn.input,
                  outputs=model_cnn.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

En sortie du modèle tronqué on récupère 1280 attributs par image

In [397]:
model_cnn_truncated.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "functional_5"
┏━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┓
┃ Layer (type)        ┃ Output Shape      ┃    Param # ┃ Connected to      ┃
┡━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━┩
│ input_layer_5       │ (None, 224, 224,  │          0 │ -                 │
│ (InputLayer)        │ 3)                │            │                   │
├─────────────────────┼───────────────────┼────────────┼───────────────────┤
│ Conv1 (Conv2D)      │ (None, 112, 112,  │        864 │ input_layer_5[0]… │
│                     │ 32)               │            │                   │
├─────────────────────┼───────────────────┼────────────┼───────────────────┤
│ bn_Conv1            │ (None, 112, 112,  │        128 │ Conv1[0][0]       │
│ (BatchNormalizatio… │ 32)               │            │                   │
├─────────────────────┼───────────────────┼────────────┼───────────────────┤
│ Conv1_relu (ReLU)   │ (None, 112, 112,  │          0

#### Diffusion des poids du modèle

In [398]:
sc.broadcast(model_cnn_truncated.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.broadcast.Broadcast object at 0x7ff36d117130>

### Fonction de changement de résolution des images

In [399]:
def preprocess_image(content):
    '''
    Preprocesses raw image bytes for prediction.
    '''
    #img_image = Image.open(io.BytesIO(content)).resize([224, 224])
    np_image_original = np.frombuffer(content, np.uint8)
    img_original = cv2.imdecode(np_image_original, cv2.IMREAD_COLOR)
    img_resized  = cv2.resize(img_original, (224, 224))
    np_image_resized = k.preprocessing.image.img_to_array(img_resized)
    return k.applications.mobilenet_v2.preprocess_input(np_image_resized)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Fonctions de génération d’attributs par lots
- Traitement distribuée par lots de DataFrames d’images
    - divise l’ensemble de DataFrames en lots
    - itère sur les lots
    - récupère les résultats de l’ensemble de lots 

#### Fonction de prédiction d'attributs
- fonction Spark UDF de traitement distribué de Pandas
- traite les DataFrames par lots
- charge le modèle une seule fois
- appelle get_predicted_features() par lots 

In [400]:
def get_predicted_features(model, content_series):
    '''
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    '''
    input = np.stack(content_series.map(preprocess_image))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Fonction d'appel de _get_predicted_features()_ par lots d'images
- traite un seul lot 
- applique preprocess() au lot
- applique predict() au lot
- convertit un lot de tensors générés en lot de tableaux Numpy d’attributs

In [401]:
@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def get_predicted_features_udf(content_series_iter):
    '''
    UDF function Scalar Iterator that wraps get_predicted_features
    Returns a Spark DataFrame column of type array(float).

    :param content_series_iter: iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.

    for content_series in content_series_iter:
        yield get_predicted_features(model_cnn_truncated, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



### Récupération d'attributs
- Traitement distribué de 24 lots d'images
- récupère les résultats de l’ensemble de lots
- le nombre d’attributs d’une image est 1280

In [402]:
start_time = get_start_time('get_cnn_features')
sdf_features_cnn = sdf_images.repartition(24) \
                    .select(col('image_id'),
                            col('class'),
                            col('class_id'),
                            get_predicted_features_udf('content').alias('features_cnn_array')
                           )
get_duration('get_cnn_features', start_time) 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Execution time =           00m 00s

In [403]:
sdf_features_cnn.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- image_id: long (nullable = false)
 |-- class: string (nullable = true)
 |-- class_id: double (nullable = false)
 |-- features_cnn_array: array (nullable = true)
 |    |-- element: float (containsNull = true)

## Vectorisation d'attributs
- une conversion des tableaux Numpy en vecteurs est requise par PCA

In [404]:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.functions import array_to_vector

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Le champ _features_ contient le tableau et _features_vector_ le vecteur

In [405]:
sdf_features_vector = sdf_features_cnn.withColumn('features_cnn_vector', array_to_vector('features_cnn_array'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [406]:
sdf_features_vector = sdf_features_vector.drop('features_cnn_array')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [407]:
sdf_features_vector.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- image_id: long (nullable = false)
 |-- class: string (nullable = true)
 |-- class_id: double (nullable = false)
 |-- features_cnn_vector: vector (nullable = true)

## Partition du jeu de données
Le jeux de données est divisé en jeu d’entraînement et de test

In [408]:
sdf_train, sdf_test = sdf_features_vector.randomSplit([0.8, 0.2], seed=0)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

On choisit un calcul approximatif du nombre d'images au lieu du réel qui est beaucoup plus long

In [409]:
print('sdf_train approximative size :', int(nb_images*.8))
print('sdf_test  approximative size :', int(nb_images*.2))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

sdf_train approximative size : 8649
sdf_test  approximative size : 2162

In [410]:
sdf_train.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- image_id: long (nullable = false)
 |-- class: string (nullable = true)
 |-- class_id: double (nullable = false)
 |-- features_cnn_vector: vector (nullable = true)

## _Pipeline_

On regroupe les fonctionnalités suivantes dans une _pipeline_
- Normalisation des attributs par StandardScaler()
- Réduction du nombre d’attributs
- Classification par régression logistique
- le détail des étapes est décrit en [Annexe](#Annexe)

Ces fonctions sont prises du module pyspark.ml pour un traitement distribué

In [411]:
from pyspark.ml                import Pipeline
from pyspark.ml.feature        import PCA, StandardScaler # (mean=0, std=1)
from pyspark.ml.classification import LogisticRegression

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [412]:
nb_features_pca = 10

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Etapes

In [413]:
scaler = StandardScaler(        inputCol='features_cnn_vector',    outputCol='features_scaled_vector')
pca    = PCA(k=nb_features_pca, inputCol='features_scaled_vector', outputCol='features_pca')
lr     = LogisticRegression( featuresCol='features_scaled_vector',  labelCol='class_id')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [414]:
model_pipeline = Pipeline(stages=[scaler, pca, lr])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Entraînement

In [415]:
start_time = get_start_time('train_model')
model_pipeline = model_pipeline.fit(sdf_train)  # Fit pipeline to data
get_duration('train_model', start_time) 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Execution time =           07m 38s

### Prédictions

In [416]:
sdf_train_pred = model_pipeline.transform(sdf_train)
sdf_train_pred.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- image_id: long (nullable = false)
 |-- class: string (nullable = true)
 |-- class_id: double (nullable = false)
 |-- features_cnn_vector: vector (nullable = true)
 |-- features_scaled_vector: vector (nullable = true)
 |-- features_pca: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

In [417]:
sdf_test_pred  = model_pipeline.transform(sdf_test)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Sauvegarde des attributs issus de PCA

La sauvegarde u format CSV requiert 
- une ligne d'entêtes
- les vecteurs de 10 attributs issus du PCA convertis en 10 colonnes

## Répertoires de travail et de sauvegarde pérenne

In [418]:
path_livy_result  = 'file:///tmp/results/'
path_local_result = '/tmp/results/'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [419]:
print(path_s3_output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://jv-s3/output/

## Contenu du DataFrame

In [420]:
sdf_train_pred.select("features_pca").printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- features_pca: vector (nullable = true)

In [421]:
sdf_test_pred.select("features_pca").printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- features_pca: vector (nullable = true)

## Conversion des vecteurs en colonnes

In [422]:
from pyspark.ml.linalg import DenseVector
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DoubleType

# Convert the vector to an array (list)
udf_vector_to_array = udf(lambda v: v.toArray().tolist() if isinstance(v, DenseVector) else v, ArrayType(DoubleType()))

# Apply the UDF to convert vector to array
sdf_train_array = sdf_train_pred.withColumn("features_pca", udf_vector_to_array(col("features_pca")))
sdf_test_array  = sdf_test_pred .withColumn("features_pca", udf_vector_to_array(col("features_pca")))
sdf_features_array = sdf_train_array.union(sdf_test_array)

# Flatten the array into multiple columns
sdf_features_cols = sdf_features_array.select(["image_id"] + [col("features_pca")[i].alias(f"pca_feature_{i+1}") for i in range(nb_features_pca)])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Génération du CSV
- un fichier temporaire local avec les attributs PCA est généré par partition
- le fichier CSV résulte de la concatenation des fichiers temporaires
- ce CSV est copie vers S3

Génération de fichiers temporaires localement sur le cluster

In [456]:
start_time = get_start_time('save_pca_features')    
sdf_features_cols.write.mode('overwrite').csv(path_livy_result)
get_duration('save_pca_features', start_time) 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Execution time =           02m 47s

Création du fichier CSV de résultats

In [458]:
li_fields = sdf_features_cols.columns
str_li_fields = ','.join(li_fields)
temp_files_mask = path_local_result + 'part-*.csv'
coalesced_file  = path_local_result + cluster_name + '_results_' + timestamp + '.csv'   

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [459]:
str_command = f'echo {str_li_fields} > {coalesced_file}' 
ret = run_cmd(str_command)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Command = echo image_id,pca_feature_1,pca_feature_2,pca_feature_3,pca_feature_4,pca_feature_5,pca_feature_6,pca_feature_7,pca_feature_8,pca_feature_9,pca_feature_10 > /tmp/results/emr-1core-2task_results_20240906_1223.csv
OK, success

In [470]:
if ret == 'OK' :
    str_command = f'cat {temp_files_mask} >> {coalesced_file} ; if [ $? -eq 0 ]; then rm -rf {temp_files_mask} ; fi' 
    ret = run_cmd(str_command)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Command = cat /tmp/results/part-*.csv >> /tmp/results/emr-1core-2task_results_20240906_1223.csv ; if [ $? -eq 0 ]; then rm -rf /tmp/results/part-*.csv ; fi
OK, success

In [473]:
if ret == 'OK' :
    str_command = f'wc -l {coalesced_file}' 
    ret = run_cmd(str_command)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Command = wc -l /tmp/results/emr-1core-2task_results_20240906_1223.csv
OK, success
 3402 /tmp/results/emr-1core-2task_results_20240906_1223.csv

In [474]:
if ret == 'OK' :
    str_command = f'aws s3 cp {coalesced_file} {path_s3_output}'
    run_cmd(str_command)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Command = aws s3 cp /tmp/results/emr-1core-2task_results_20240906_1223.csv s3://jv-s3/output/
OK, success
 Completed 256.0 KiB/675.1 KiB (4.5 MiB/s) with 1 file(s) remainingCompleted 512.0 KiB/675.1 KiB (8.7 MiB/s) with 1 file(s) remainingCompleted 675.1 KiB/675.1 KiB (5.2 MiB/s) with 1 file(s) remainingupload: ../../../../../../../tmp/results/emr-1core-2task_results_20240906_1223.csv to s3://jv-s3/output/emr-1core-2task_results_20240906_1223.csv

'OK'

# Evaluation

In [429]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [430]:
start_time = get_start_time('get_metrics')
for str_metric in ['accuracy', 'recallByLabel'] :
    evaluator_metric = MulticlassClassificationEvaluator(
        labelCol='class_id', predictionCol='prediction', metricName=str_metric )
    float_metric = np.round(evaluator_metric.evaluate(sdf_test_pred), 3)
    print(f'{str_metric}: {float_metric}')
get_duration('get_metrics', start_time)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

accuracy: 1.0
recallByLabel: 1.0
Execution time =           03m 35s

# Sauvegarde de rapports Spark UI

Récupération d'URL 

In [431]:
url_spark_ui = sc.uiWebUrl
print('url_spark_ui = ', url_spark_ui)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

url_spark_ui =  http://ip-172-31-93-96.ec2.internal:34171

Téléchargement des pages HTML de Spark UI

In [432]:
li_reports = 'jobs stages environment executors SQL'
report_local_dir = path_output + 'reports/'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [433]:
str_command = f'for report in {li_reports} ; do wget --quiet --page-requisites --convert-links --no-parent --directory-prefix={report_local_dir} \
    --no-host-directories {url_spark_ui}/$report/ ; done'
ret = run_cmd(str_command)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Command = for report in jobs stages environment executors SQL ; do wget --quiet --page-requisites --convert-links --no-parent --directory-prefix=./output/reports/     --no-host-directories http://ip-172-31-93-96.ec2.internal:34171/$report/ ; done
OK, success

In [434]:
if ret == 'OK' :
    str_command = f'ls {report_local_dir}*/index.html' 
    ret = run_cmd(str_command)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Command = ls ./output/reports/*/index.html
OK, success
 ./output/reports/SQL/index.html
./output/reports/environment/index.html
./output/reports/executors/index.html
./output/reports/jobs/index.html
./output/reports/stages/index.html

Les rapports Spark UI sont archivés dans un fichier ZIP

In [435]:
file_zipped = path_output + cluster_name + '_reports_' + timestamp + '.tar.gz'
if ret == 'OK' :
    str_command = f'tar -czvf {file_zipped} -C {report_local_dir} . '
    ret = run_cmd(str_command)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Command = tar -czvf ./output/emr-1core-2task_reports_20240906_1223.tar.gz -C ./output/reports/ . 
OK, success
 ./
./jobs/
./jobs/index.html
./stages/
./stages/index.html
./environment/
./environment/index.html
./executors/
./executors/index.html
./SQL/
./SQL/index.html

In [436]:
if ret == 'OK' :
    run_cmd(f'aws s3 cp {file_zipped} {path_s3_output}')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Command = aws s3 cp ./output/emr-1core-2task_reports_20240906_1223.tar.gz s3://jv-s3/output/
OK, success
 Completed 55.1 KiB/55.1 KiB (413.3 KiB/s) with 1 file(s) remainingupload: output/emr-1core-2task_reports_20240906_1223.tar.gz to s3://jv-s3/output/emr-1core-2task_reports_20240906_1223.tar.gz

'OK'

<div style='background-color: green; text-align: center'>.

# Fin du traitement

## Evaluation de durée de traitement

### Durée globale

In [437]:
print('Duration of execution of all cells =', get_HHMMSS(get_time() - global_start_time))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Duration of execution of all cells =          14m 36s

### Durée des principales opérations

#### Ajout du nombre d'images à df_duration

Normalement on traite un nombre unique d'images. Mais on peut fait varier ce nombre :

    - pour l'extrapolation de la durée en fonction du nombre d'images 
    - on exécutera le traitement depuis la section "Chargement des fichiers image" pour chaque nombre d'images choisi

In [438]:
df_duration['nb_images'] = df_duration['nb_images'].fillna(nb_images).astype('int')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Ajout du nom du cluster à df_duration

Le nom du cluster contient l'information du nombre et du type des noeuds 

In [439]:
df_duration['cluster'] = cluster_name

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Durée totale

Somme des durées d'exécution des principales opérations seulement (le cas échéant, regroupées par nombre d'images)

In [440]:
get_total_duration()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

   nb_images  nb_secs          duration
0      10812      861           14m 21s

#### Durée des principales opérations
Le cas échéant, triées par nombre d'images

In [441]:
df_duration[['job_group_id', 'nb_images', 'duration']]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

             job_group_id  nb_images          duration
0            add_class_id      10812           00m 22s
1     get_spark_df_sample      10812           00m 01s
2  get_class_distribution      10812           00m 12s
3        get_cnn_features      10812           00m 00s
4             train_model      10812           07m 38s
5       save_pca_features      10812           02m 33s
6             get_metrics      10812           03m 35s

#### Opérations les plus coûteuses
Durée des principales opérations triées par coût décroissant (durée moyenne si nombre d'images variable)

In [442]:
df_duration['nb_secs'] = df_duration['nb_secs'].astype('int')
df_duration.groupby('job_group_id').agg({'nb_secs': 'mean'}).reset_index().sort_values('nb_secs', ascending=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

             job_group_id  nb_secs
6             train_model      458
3             get_metrics      215
5       save_pca_features      153
0            add_class_id       22
1  get_class_distribution       12
4     get_spark_df_sample        1
2        get_cnn_features        0

#### Durée de l'opération principale
L'opération principale est la celle qui contient l'extraction d'atributs CNN et la réduction PCA de dimensions. Il s'agit du job group "train_model" 

In [443]:
df_duration[df_duration['job_group_id'] == 'train_model'].sort_values('nb_images', ascending=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

           cluster job_group_id  nb_images  nb_secs          duration
4  emr-1core-2task  train_model      10812      458           07m 38s

#### Sauvegarde dans CSV

In [444]:
path_duration = path_s3_output + cluster_name + '_duration_' + timestamp + '.csv'
df_duration.to_csv(path_duration, index=False) 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Extrapolation pour montée en échelle
Estimation de la durée d'exécution de l'opération principale pour un nombre d'images plus important

In [445]:
def extrapolate_duration(df_in, label_x, label_y, li_x) :
    np_coeff = np.polyfit(df_in[label_x], df_in[label_y], 1) # 1:linear, 2:quadratic
    np_equation = np.poly1d(np_coeff) 
    li_x = list(df_in[label_x]) + li_x
    li_y = np_equation(li_x)  # predict list of y 
    return pd.DataFrame({
        label_x : li_x,
        'estimated_secs' : [int(y)  for y in li_y],
        'estimated_' + label_y : [get_HHMMSS(int(y)) for y in li_y]
    }).sort_values(label_x)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [446]:
li_nb_images = [20000, 40000, 60000, 80000, 100000]
df_duration_train = df_duration[df_duration['job_group_id'] == 'train_model']
df_duration_extrapolated = extrapolate_duration(df_duration_train, 'nb_images', 'nb_secs', li_nb_images)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



#### Ajout du nom du cluster à df_duration

Le nom du cluster contient l'information du nombre et du type des noeuds 

In [447]:
df_duration_extrapolated['cluster'] = cluster_name

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Sauvegarde dans CSV

In [448]:
path_duration_extrapolated = path_s3_output + cluster_name + '_duration_extrapolated_' + timestamp + '.csv'
df_duration_extrapolated.to_csv(path_duration_extrapolated, index=False) 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Envoi de notification de fin
- Une notification par e-mail signale la fin du traitement par le notebook
- Cette notification se déclénche suite à la suppression d'un fichier sur S3

In [449]:
file_flag = 'COMPLETED.flag'
run_cmd(f'touch {file_flag}')                # local file created
run_cmd(f'aws s3 cp {file_flag} {path_s3}')  # file copied to S3
run_cmd(f'aws s3 rm {path_s3}{file_flag}')  # file removed from S3

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Command = touch COMPLETED.flag
OK, success
 
Command = aws s3 cp COMPLETED.flag s3://jv-s3/
OK, success
 upload: ./COMPLETED.flag to s3://jv-s3/COMPLETED.flag

Command = aws s3 rm s3://jv-s3/COMPLETED.flag
OK, success
 delete: s3://jv-s3/COMPLETED.flag

'OK'

<div style='background-color: orange; text-align: center'>.

# Annexe : Détail des étapes du _pipeline_
Voici les étapes avec vérification qui ont servi à préparer le _pipeline_

## Normalisation des attributs
- Par StandardScaler() : moyenne nulle et déviation standard=1
- entraînement sur jeu d’entraînement
- transformation sur les 2 jeux 

## Réduction du nombre d’attributs
    - par PCA de pyspark.ml pour un traitement distribué
    - les 10 premières valeurs propres deviennent les  nouveaux attributs
    - application du modèle PCA aux vecteurs
    - le nouveau JDD est plus réduit et plus représentatif que l’original
        - on passe de 1280 attributs à 10

#### Sélection de premières valeurs propres
Les 10 premières valeurs propres deviennent les nouveaux attributs

### Application de PCA aux vecteurs
Traitement des attributs générés par le modèle pré-entraîné

### Valeurs propres

### Valeurs propres cumulées

## Classification par régression logistique

## Entraînement et test

### Entraînement sur jeu d'entraînement

### Prédiction sur jeu de test

<div style='background-color: red; text-align: center'>.

# Brouillons

## Chargement des données enregistrées et validation du résultat