In [1]:
import os
from pyspark.sql import SparkSession

In [2]:
PATH = os.getcwd()
PATH_Data = PATH+'/data/fruits-360_100x100/fruits-360/Training'
PATH_Result = PATH+'/data/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

PATH:        /workspace
PATH_Data:   /workspace/data/fruits-360_100x100/fruits-360/Training
PATH_Result: /workspace/data/Results


In [3]:
# Cr√©er le dossier ./spark-events s'il n'existe pas
local_eventlog_path = os.path.join(os.getcwd(), "spark-events")
os.makedirs(local_eventlog_path, exist_ok=True)

# Construire l'URI file:// pour Spark
eventlog_uri = "file://" + local_eventlog_path

In [4]:
spark = (SparkSession
             .builder
             .appName('P11-local-am√©lior√©') # mise √† jour du num√©ro du projet
             .master('local[*]')
             .config("spark.sql.parquet.writeLegacyFormat", 'true')
             .config("spark.eventLog.enabled", "true") # activation des logs SparkServer UI
             .config("spark.eventLog.dir", eventlog_uri)  # chemin local pour logs
             .config("spark.driver.memory", "4g") \
             .config("spark.executor.memory", "6g") \
             .config("spark.executor.cores", "4") \
             .config("spark.num.executors", "2") \
             .config("spark.default.parallelism", "8") \
             .config("spark.rpc.askTimeout", "300s") \
             .config("spark.network.timeout", "300s") \
             .config("spark.executor.heartbeatInterval", "180s") \
             .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/13 08:22:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
spark

In [6]:
spark.sparkContext.getConf().getAll()

[('spark.eventLog.dir', 'file:///workspace/spark-events'),
 ('spark.eventLog.enabled', 'true'),
 ('spark.network.timeout', '300s'),
 ('spark.rpc.askTimeout', '300s'),
 ('spark.driver.memory', '4g'),
 ('spark.default.parallelism', '8'),
 ('spark.executor.id', 'driver'),
 ('spark.app.name', 'P11-local-simplifi√©'),
 ('spark.driver.host', '3c2a5b8db2f2'),
 ('spark.sql.parquet.writeLegacyFormat', 'true'),
 ('spark.num.executors', '2'),
 ('spark.executor.cores', '4'),
 ('spark.executor.memory', '6g'),
 ('spark.driver.extraJavaOptions',
  '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java

In [7]:
from pyspark.sql.types import StructType, StructField, StringType

# Pr√©paration des donn√©es : liste des tuples (path, label)
image_tuples = []

for label_name in os.listdir(PATH_Data):
    label_path = os.path.join(PATH_Data, label_name)
    if os.path.isdir(label_path):
        for fname in os.listdir(label_path):
            if fname.lower().endswith(('.jpg', '.jpeg', '.png')):
                full_path = os.path.join(label_path, fname)
                image_tuples.append((full_path, label_name))

# Cr√©ation du DataFrame Spark
schema = StructType([
    StructField("path", StringType(), True),
    StructField("label", StringType(), True)
])

df_spark = spark.createDataFrame(image_tuples, schema=schema)

# Affichage de quelques lignes
# df_spark.show(5, truncate=False)

In [8]:
from pyspark.sql import functions as F
# S√©lection de lignes au hasard
df_sample = df_spark.orderBy(F.rand()).limit(1000)

# Affichage des 5 premi√®res pour v√©rification
# df_sample.show(5, truncate=False)
# ‚ö†Ô∏è Mise en cache
# d√©finition d'un partitionnement .repartition(10)
df_sample = df_sample.repartition(10).cache()

In [9]:
df_sample.count()
print("Nombre de partitions apr√®s .repartition :", df_sample.rdd.getNumPartitions())

25/07/13 08:22:04 WARN TaskSetManager: Stage 0 contains a task of very large size (1086 KiB). The maximum recommended task size is 1000 KiB.
[Stage 0:>                                                          (0 + 8) / 8]

Nombre de partitions apr√®s .repartition : 10


                                                                                

In [10]:
import tensorflow as tf
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.preprocessing.image import load_img, img_to_array

# ‚ö†Ô∏è Pas de Sequential ‚Üí √ßa cause souvent des erreurs silencieuses
def build_mobilenetv2_model_imagenet():
    base_model = tf.keras.applications.MobileNetV2(
        include_top=False, weights='imagenet', input_shape=(224, 224, 3)
    )
    x = tf.keras.layers.GlobalAveragePooling2D()(base_model.output)
    model = tf.keras.models.Model(inputs=base_model.input, outputs=x)
    return model

In [11]:
model = build_mobilenetv2_model_imagenet()
weights = model.get_weights()
# üì° Diffusion les poids avec Spark
bc_model_weights = spark.sparkContext.broadcast(weights)

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224_no_top.h5
[1m9406464/9406464[0m [32m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m[37m[0m [1m0s[0m 0us/step


In [12]:
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors, VectorUDT
import numpy as np

# Reconstruction locale √† partir des poids broadcast√©s
def build_model_from_weights(weights):
    model = build_mobilenetv2_model_imagenet()
    model.set_weights(weights)
    return model

# UDF avec initialisation une seule fois par worker
model_instance = None  # stock√© localement sur le worker

def extract_features(path):
    global model_instance
    try:
        # Mod√®le reconstruit une seule fois par worker
        if model_instance is None:
            model_instance = build_model_from_weights(bc_model_weights.value)

        # Pr√©paration de l'image
        img = load_img(path, target_size=(224, 224))
        img_array = img_to_array(img)
        img_array = tf.keras.applications.mobilenet_v2.preprocess_input(img_array)
        img_array = np.expand_dims(img_array, axis=0)

        # Pr√©diction
        features = model_instance.predict(img_array, verbose=0)
        return Vectors.dense(features.flatten())

    except Exception as e:
        return Vectors.dense([0.0] * 1280)  # Fallback

In [13]:
from pyspark.sql.functions import udf
from pyspark.ml.linalg import VectorUDT
# üîÅ Int√©gration dans Spark avec UDF et ‚ö†Ô∏è Mise en cache
extract_udf = udf(extract_features, VectorUDT())
df_features = df_sample.withColumn("features", extract_udf("path")).cache()

# df_features.select("path", "label", "features").show(5, truncate=False)

In [14]:
from pyspark.ml.feature import StandardScaler

# Normalisation des features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
scaler_model = scaler.fit(df_features)
df_scaled = scaler_model.transform(df_features)
# ‚ö†Ô∏è Mise en cache
df_scaled.cache()

                                                                                

DataFrame[path: string, label: string, features: vector, scaled_features: vector]

In [15]:
from pyspark.ml.feature import PCA
# PCA.1 Calcul de la PCA avec toutes les dimensions 
num_features = len(df_scaled.select("scaled_features").first()["scaled_features"])
pca = PCA(k=num_features, inputCol="scaled_features", outputCol="pca_features")
pca_model = pca.fit(df_scaled)

25/07/13 08:22:33 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/07/13 08:22:33 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
25/07/13 08:22:34 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [16]:
# PCA.2 R√©cup√©rer la variance expliqu√©e
explained_variance = pca_model.explainedVariance.toArray()

# 5. Trouver le nombre minimal de composantes capturant ‚â• 90 % de la variance
import numpy as np
cumulative_variance = np.cumsum(explained_variance)
k_optimal = int(np.argmax(cumulative_variance >= 0.9) + 1)

print(f"Nombre de composantes principales n√©cessaires pour capturer ‚â• 90% de la variance : {k_optimal}")

Nombre de composantes principales n√©cessaires pour capturer ‚â• 90% de la variance : 246


In [17]:
# PCA.3 Recalcul de la PCA avec le bon nombre de dimensions
pca_final = PCA(k=k_optimal, inputCol="scaled_features", outputCol="pca_features")
pca_model_final = pca_final.fit(df_scaled)
df_pca = pca_model_final.transform(df_scaled)

# ‚ö†Ô∏è Mise en cache
df_pca.cache()

DataFrame[path: string, label: string, features: vector, scaled_features: vector, pca_features: vector]

In [18]:
# Visualisation des premi√®res dimensions PCA dans panda
# df_pandas = df_pca.select("path", "label", "pca_features").limit(5).toPandas()
# df_pandas['pca_preview'] = df_pandas['pca_features'].apply(lambda x: x[:5])
# print(df_pandas[['label', 'pca_preview']])

In [19]:
# Sauvegarde du r√©sultat final (3 colonnes) en Parquet
df_pca.select("path", "label", "pca_features") \
      .write.mode("overwrite") \
      .parquet(PATH_Result)

25/07/13 08:22:44 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
25/07/13 08:22:45 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB
                                                                                

In [20]:
# Lire pour tester que l'√©criture a bien fonctionn√©
df_parquet = spark.read.parquet(PATH_Result)
df_parquet.show(5, truncate=False)

+-----------------------------------------------------------------------------------+--------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [21]:
spark.stop()