# Étape 1 : Préparer l’environnement

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
import tensorflow as tf
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.preprocessing.image import img_to_array, load_img
import numpy as np
import os

Créez une session Spark pour démarrer PySpark :

In [8]:
# Créez une session Spark
spark = SparkSession.builder.appName("Manipulez des images avec PySpark").getOrCreate()

# Étape 2 : Charger des images de chats et de chiens

Le dataset Dogs vs Cats est disponible sur Kaggle : [Dogs vs Cats Dataset](https://www.kaggle.com/c/dogs-vs-cats/data).

Téléchargez et extrayez les images dans un dossier (par exemple, cats_dogs/).

**N'hésitez pas à réduire la taille du jeu de données** dans un premier temps. Quand on cuisine un code, on a pas besoin de tout le dataset ! Ce n'est qu'une fois que le code est bon qu'on test avec le dataset dans son ensemble. Dans mon cas, je n'ai conservé que 21 images par classe dans mon fichier cats_dogs_mini.zip.

Puisque j'ai uploadé un fichier zippé, je dois d'abord le dézipper.

In [9]:
%%capture
!unzip /content/chest_x_ray_mini.zip

In [10]:
# Définissez le chemin vers votre dataset (à adapter selon votre structure de fichiers)
DATASET_PATH = "/content/chest_x_ray_mini"

Cette fonction charge une image, la redimensionne (pour correspondre à l’entrée du CNN), et la prétraite.

In [11]:
# Chargez les chemins des fichiers et les classes (cat/dog)
def extract_label(filepath):
    return "pneumonia" if "bacteria" in filepath.split("/")[-1] else "normal"

def get_image_paths(directory):
    image_paths = []
    for root, dirs, files in os.walk(directory):
      for filename in files:
        if filename.endswith(".jpeg"):
            image_paths.append(os.path.join(root, filename))
    return image_paths

## Calcul distribué :

Cette partie du code est prit en charge automtiquement par PySpark pour être distribuée si possible. En effet, le DataFrame Spark est réparti entre les nœuds du cluster.

Chaque nœud traite un sous-ensemble des données (par exemple, un lot d'images).

Chargez les images et leurs étiquettes ("chat" ou "chien") dans un DataFrame :

In [12]:
# Liste des chemins
image_paths = get_image_paths(DATASET_PATH)

# Créer un DataFrame Spark à partir des chemins
image_df = spark.createDataFrame([(path, extract_label(path)) for path in image_paths], ["path", "label"])

In [13]:
# Affichez quelques lignes du DataFrame
image_df.show(5, truncate=False)

+------------------------------------------------------------+---------+
|path                                                        |label    |
+------------------------------------------------------------+---------+
|/content/chest_x_ray_mini/pneumonia/person9_bacteria_38.jpeg|pneumonia|
|/content/chest_x_ray_mini/pneumonia/person5_bacteria_15.jpeg|pneumonia|
|/content/chest_x_ray_mini/pneumonia/person7_bacteria_24.jpeg|pneumonia|
|/content/chest_x_ray_mini/pneumonia/person7_bacteria_29.jpeg|pneumonia|
|/content/chest_x_ray_mini/pneumonia/person6_bacteria_22.jpeg|pneumonia|
+------------------------------------------------------------+---------+
only showing top 5 rows



## Calcul distribué :
Les opérations comme `groupBy` sont distribuées, chaque partition effectuant un traitement local avant agrégation globale.

In [14]:
# Comptez le nombre d'images par classe
image_df.groupBy("label").count().show()

+---------+-----+
|    label|count|
+---------+-----+
|pneumonia|   20|
|   normal|   20|
+---------+-----+



# Étape 3 : Catégorisation automatique

In [15]:
# Charger le modèle pré-entrainé
model = MobileNetV2(weights="imagenet", include_top=False, pooling="avg")

  model = MobileNetV2(weights="imagenet", include_top=False, pooling="avg")


Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224_no_top.h5
[1m9406464/9406464[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step


In [16]:
# Fonction pour extraire les caractéristiques d'une image
def extract_features(path):
    try:
        img = load_img(path, target_size=(224, 224))
        img_array = img_to_array(img)
        img_array = np.expand_dims(img_array, axis=0)
        img_array = tf.keras.applications.mobilenet_v2.preprocess_input(img_array)
        features = model.predict(img_array)
        return features.flatten().tolist()
    except Exception as e:
        return None

## Calcul distribué :
L'UDF `extract_features` est appliquée en parallèle sur chaque partition des données.

In [17]:
# Déclarez la fonction utilisateur (UDF)
from pyspark.sql.types import ArrayType, FloatType
feature_udf = udf(extract_features, ArrayType(FloatType()))

In [18]:
# Ajoutez une colonne avec les caractéristiques extraites
image_df = image_df.withColumn("features", feature_udf(col("path")))

In [19]:
# Affichez le DataFrame avec les nouvelles colonnes
image_df.show(5)

+--------------------+---------+--------------------+
|                path|    label|            features|
+--------------------+---------+--------------------+
|/content/chest_x_...|pneumonia|[0.49183893, 0.0,...|
|/content/chest_x_...|pneumonia|[0.058421895, 0.0...|
|/content/chest_x_...|pneumonia|[0.7625137, 0.0, ...|
|/content/chest_x_...|pneumonia|[0.38426328, 0.0,...|
|/content/chest_x_...|pneumonia|[0.63174874, 0.05...|
+--------------------+---------+--------------------+
only showing top 5 rows



In [20]:
# Réalisez une classification simple avec les caractéristiques extraites.
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf

In [21]:
# Convertir les caractéristiques en format de vecteurs compatibles avec Spark ML
def convert_array_to_vector(features):
    return Vectors.dense(features)

In [22]:
# Convertir les caractéristiques et les labels en un format utilisable par Spark ML
array_to_vector_udf = udf(lambda features: Vectors.dense(features) if features else None, VectorUDT())
vectorized_df = image_df.withColumn("features_vec", array_to_vector_udf(col("features")))

In [23]:
# Indexer les labels pour la classification
from pyspark.ml.feature import StringIndexer
label_indexer = StringIndexer(inputCol="label", outputCol="label_index")
data = label_indexer.fit(vectorized_df).transform(vectorized_df)

In [24]:
# Diviser les données en ensembles d'entraînement et de test
train, test = data.randomSplit([0.8, 0.2], seed=42)

## Calcul distribué :
Spark ML divise les données d'entraînement entre les nœuds pour entraîner le modèle de manière parallèle.

In [25]:
%%time
# Entraîner un modèle de régression logistique
lr = LogisticRegression(featuresCol="features_vec", labelCol="label_index")
model = lr.fit(train)

CPU times: user 294 ms, sys: 35.1 ms, total: 329 ms
Wall time: 52.4 s


In [26]:
%%time
# Évaluer le modèle sur les données de test
predictions = model.transform(test)
evaluator = MulticlassClassificationEvaluator(labelCol="label_index", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Précision du modèle : {accuracy}")

Précision du modèle : 0.8
CPU times: user 106 ms, sys: 11.9 ms, total: 117 ms
Wall time: 17.3 s
