# Sur le cloud (avec Google Dataproc + Storage).

### 1. Importation des librairies.

In [20]:
import pandas as pd
from PIL import Image
import numpy as np
import io

from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, adjusted_rand_score
from sklearn.preprocessing import LabelEncoder

from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D
from tensorflow.keras.utils import to_categorical, set_random_seed
from tensorflow.keras.optimizers import Nadam
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping

from pyspark.sql.functions import col, udf, element_at, split
from pyspark.sql.types import ArrayType, FloatType
from pyspark.sql import SparkSession

### 2. Démarrage d'une sessions spark.

**yarn** est le gestionnaire par défaut du cluster, il n'est donc pas nécessaire de le préciser.

In [2]:
# Démarrage d'une session Spark
spark = (SparkSession.builder
                     .appName('P8')
                     .getOrCreate())

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/27 19:27:29 INFO SparkEnv: Registering MapOutputTracker
24/09/27 19:27:29 INFO SparkEnv: Registering BlockManagerMaster
24/09/27 19:27:29 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
24/09/27 19:27:29 INFO SparkEnv: Registering OutputCommitCoordinator


In [3]:
sc = spark.sparkContext

In [4]:
spark

### 3. Entraînement du modèle avec un échantillon de trois classes.

In [5]:
train_dir = 'gs://chakir-p8/data/train_local'
test_dir = 'gs://chakir-p8/data/test_local'

In [6]:
def create_model(base_model, nbr_de_classes):
    for layer in base_model.layers:
        layer.trainable = False
    # Ajout des couches de classification
    model = Sequential([base_model,
                        GlobalAveragePooling2D(),
                        Dense(1024, activation='relu'),
                        Dense(nbr_de_classes, activation='softmax')
                       ])
    return model

In [7]:
def metrics_display(true_labels, predicted_labels):
    accuracy = accuracy_score(true_labels, predicted_labels)
    precision = precision_score(true_labels, predicted_labels, average='macro')
    recall = recall_score(true_labels, predicted_labels, average='macro')
    f1 = f1_score(true_labels, predicted_labels, average='macro')
    ARI = adjusted_rand_score(true_labels, predicted_labels)
    print(f'accuracy = {accuracy}   precision = {precision}   recall={recall}   f1 = {f1}   ARI = {ARI}')

In [8]:
# Fonction pour obtenir les chemins des images et leurs étiquettes
def get_image_paths_and_labels(spark_session, dir_path):
    
    images = (spark_session.read.format('binaryFile')
                           .option('pathGlobFilter', '*.jpg')
                           .option('recursiveFileLookup', 'true')
                           .load(dir_path))

    images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
    
    return images

In [9]:
# Fonction pour prétraiter une image
def preprocess_image(binary_image):
    image = Image.open(io.BytesIO(binary_image))
    image = image.resize((224, 224))
    img_array = img_to_array(image)
    img_array = preprocess_input(img_array)
    return img_array.flatten().tolist()
    
# Création d'une UDF pour prétraiter les images
preprocess_image_udf = udf(preprocess_image, ArrayType(FloatType()))

In [10]:
def train_and_test_model_spark(train_dir, 
                               test_dir, 
                               model,
                               spark_session,
                               seed_value=42, 
                               target_size=(224, 224), 
                               optimizer=Nadam, 
                               learning_rate=0.001, 
                               epochs=10):
    
    # Fixation des germes pour la reproductibilité des résultats
    set_random_seed(seed_value)
    
    # Obtention des chemins des images et leurs étiquettes
    train_df = get_image_paths_and_labels(spark_session, train_dir)
    test_df = get_image_paths_and_labels(spark_session, test_dir)

    # Prétraitement des images
    train_df = train_df.withColumn('preprocessed_image', preprocess_image_udf(col('content')))
    test_df = test_df.withColumn('preprocessed_image', preprocess_image_udf(col('content')))

    # Conversion en format numpy pour le modèle tensorflow
    train_data = train_df.select('preprocessed_image', 'label').collect()
    test_data = test_df.select('preprocessed_image', 'label').collect()

    X_train = np.array([np.array(row['preprocessed_image']).reshape(224, 224, 3) for row in train_data])
    X_test = np.array([np.array(row['preprocessed_image']).reshape(224, 224, 3) for row in test_data])

    y_train = np.array([row['label'] for row in train_data])
    y_test = np.array([row['label'] for row in test_data])

    # Encodage des étiquettes en one-hot
    encoder = LabelEncoder()
    y_train = encoder.fit_transform(y_train)
    y_test = encoder.transform(y_test)
    y_train_cnn = to_categorical(y_train)
    y_test_cnn = to_categorical(y_test)

    # Vérifie les valeurs de X_train et X_test
    print("X_train shape:", X_train.shape)
    print("X_test shape:", X_test.shape)

    # Compiler le modèle
    model.compile(optimizer=optimizer(learning_rate=learning_rate),
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])
    
    # Callbacks pour le modèle
    early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
    reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, min_lr=0.00001)
    
    # Entraîner le modèle
    history = model.fit(X_train, y_train_cnn,
                        validation_data=(X_test, y_test_cnn),
                        callbacks=[early_stopping, reduce_lr],
                        epochs=epochs)
    
    # Prédictions
    predictions = model.predict(X_test)
    y_pred = np.argmax(predictions, axis=1)
    
    # Évaluation du modèle
    metrics_display(y_test, y_pred)

In [11]:
base_model = MobileNetV2(weights='imagenet',
                         include_top=False,
                         input_shape=(224, 224, 3))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224_no_top.h5
[1m9406464/9406464[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step


In [14]:
model = create_model(base_model, 3)

In [15]:
brodcast_weights = sc.broadcast(model.get_weights())

In [16]:
train_and_test_model_spark(train_dir, test_dir, model, spark)

24/09/27 19:29:43 ERROR ClusterManager: Could not initialize cluster nodes=[cluster-p8-w-1.europe-west1-b.c.eloquent-branch-436714-u0.internal, cluster-p8-w-0.europe-west1-b.c.eloquent-branch-436714-u0.internal] nodeHostName=cluster-p8-m.europe-west1-b.c.eloquent-branch-436714-u0.internal nodeHostAddress=10.132.0.21 currentNodeIndex=null
                                                                                

X_train shape: (1381, 224, 224, 3)
X_test shape: (461, 224, 224, 3)
Epoch 1/10
[1m44/44[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 163ms/step - accuracy: 0.8937 - loss: 0.3220 - val_accuracy: 1.0000 - val_loss: 7.3474e-05 - learning_rate: 0.0010
Epoch 2/10
[1m44/44[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 130ms/step - accuracy: 1.0000 - loss: 1.2164e-05 - val_accuracy: 1.0000 - val_loss: 2.8806e-05 - learning_rate: 0.0010
Epoch 3/10
[1m44/44[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 130ms/step - accuracy: 1.0000 - loss: 5.3822e-06 - val_accuracy: 1.0000 - val_loss: 2.0219e-05 - learning_rate: 0.0010
Epoch 4/10
[1m44/44[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 136ms/step - accuracy: 1.0000 - loss: 3.4096e-06 - val_accuracy: 1.0000 - val_loss: 1.4014e-05 - learning_rate: 0.0010
Epoch 5/10
[1m44/44[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 135ms/step - accuracy: 1.0000 - loss: 2.4729e-06 - val_accuracy: 1.0000 - val_loss: 

On peut observer que l'entraînement a été dix fois plus rapide qu'en local !

### 3. Entraînement du modèle avec le dataset complet.

In [17]:
train_dir = 'gs://chakir-p8/data/train'
test_dir = 'gs://chakir-p8/data/test'

In [19]:
model = create_model(base_model, 131) # Il y a 131 classes de fruits
model.summary()

In [21]:
brodcast_weights = sc.broadcast(model.get_weights())

In [23]:
train_and_test_model_spark(train_dir, test_dir, model, spark)

24/09/27 19:59:33 WARN AsyncEventQueue: Dropped 22523 events from dataprocEvent since Fri Sep 27 19:48:14 UTC 2024.
24/09/27 20:00:10 WARN YarnAllocator: Container from a bad node: container_1727464744947_0001_01_000025 on host: cluster-p8-w-0.europe-west1-b.c.eloquent-branch-436714-u0.internal. Exit status: 137. Diagnostics: [2024-09-27 20:00:10.661]Container killed on request. Exit code is 137
[2024-09-27 20:00:10.661]Container exited with a non-zero exit code 137. 
[2024-09-27 20:00:10.662]Killed by external signal
.
24/09/27 20:00:10 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 25 for reason Container from a bad node: container_1727464744947_0001_01_000025 on host: cluster-p8-w-0.europe-west1-b.c.eloquent-branch-436714-u0.internal. Exit status: 137. Diagnostics: [2024-09-27 20:00:10.661]Container killed on request. Exit code is 137
[2024-09-27 20:00:10.661]Container exited with a non-zero exit code 137. 
[2024-09-27 20:00:10.662]Killed by ex

Py4JJavaError: An error occurred while calling o201.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 826 tasks (8.0 GiB) is bigger than spark.driver.maxResultSize (8.0 GiB)
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2449)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2470)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2489)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2514)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:4148)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4322)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:547)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4320)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4320)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:4145)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


24/09/27 20:07:19 WARN TaskSetManager: Lost task 838.1 in stage 7.0 (TID 2422) (cluster-p8-w-1.europe-west1-b.c.eloquent-branch-436714-u0.internal executor 49): TaskKilled (Stage cancelled: Job aborted due to stage failure: Total size of serialized results of 826 tasks (8.0 GiB) is bigger than spark.driver.maxResultSize (8.0 GiB))
24/09/27 20:07:19 ERROR AsyncEventQueue: Listener ExecutorAllocationListener threw an exception
java.util.NoSuchElementException: key not found: Stage 7 (Attempt 0)
	at scala.collection.MapLike.default(MapLike.scala:236) ~[scala-library-2.12.18.jar:?]
	at scala.collection.MapLike.default$(MapLike.scala:235) ~[scala-library-2.12.18.jar:?]
	at scala.collection.AbstractMap.default(Map.scala:65) ~[scala-library-2.12.18.jar:?]
	at scala.collection.mutable.HashMap.apply(HashMap.scala:69) ~[scala-library-2.12.18.jar:?]
	at org.apache.spark.ExecutorAllocationManager$ExecutorAllocationListener.onTaskEnd(ExecutorAllocationManager.scala:837) ~[spark-core_2.12-3.5.1.jar: