# Script transfer learning et réduction de dimension

In [None]:
print("Welcome to my EMR Notebook!")

In [None]:
%%info

In [None]:
spark

In [3]:
import pandas as pd
import numpy as np
import io
import os
import tensorflow as tf
from PIL import Image
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Configuration des paths

In [4]:
# Images + Output + Outpu PCA + Output CSV: 

PATH = 's3://projetfruits'
PATH_Data = PATH+'/data/input/Test'
PATH_Data_sample = PATH+'/data/input/sample'
PATH_Result = PATH+'/data/output'
PATH_Result_pca = PATH+'/data/output_pca'
PATH_Result_csv = PATH+'/data/output_csv'

# Output sample 
PATH_Result_sample = PATH+'/data_sample/output'
PATH_Result_pca_sample = PATH+'/data_sample/output_pca'
PATH_Result_csv_sample = PATH+'/data_sample/output_csv'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
print('PATH:        '+ PATH + '\nPATH_Data:   '+ PATH_Data + '\nPATH_Result: '+ PATH_Result + '\nPATH_Result_pca: ' + PATH_Result_pca + '\nPATH_Result_CSV: ' + PATH_Result_csv)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://projetfruits
PATH_Data:   s3://projetfruits/data/input/Test
PATH_Result: s3://projetfruits/data/output
PATH_Result_pca: s3://projetfruits/data/output_pca

# Importation des images

Les images sont situées dans un bucket S3, en suivant le chemin : `s3://projetfruits/data/input/Test`

In [6]:
# images_sample = spark.read.format("binaryFile") \
#   .option("pathGlobFilter", "*.jpg") \
#   .option("recursiveFileLookup", "true") \
#   .load(PATH_Data_sample)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# images_sample.show(5)

In [None]:
# Pour comparer à l'exécution en local :
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

<u>Affichage des 5 premières images contenant</u> :
 - le path de l'image
 - la date et heure de sa dernière modification
 - sa longueur
 - son contenu encodé en valeur hexadécimal

In [7]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://projetfruits...|2023-06-18 09:35:06|125135|[FF D8 FF E0 00 1...|
|s3://projetfruits...|2023-06-18 09:35:06|124785|[FF D8 FF E0 00 1...|
|s3://projetfruits...|2023-06-18 09:35:04|123514|[FF D8 FF E0 00 1...|
|s3://projetfruits...|2023-06-18 09:35:08|122958|[FF D8 FF E0 00 1...|
|s3://projetfruits...|2023-06-18 09:35:05|122807|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

On conserve le chemin d'accès à l'image, et nous ajoutons une colonne labels de chaque image:

`withColumn()`: est une fonction de transformation de DataFrame qui est utilisée pour modifier la valeur, convertir le type de données d'une colonne existante, créer une nouvelle colonne, etc.

In [8]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+--------------------------------------------------------+-----------+
|path                                                    |label      |
+--------------------------------------------------------+-----------+
|s3://projetfruits/data/input/Test/apple_hit_1/r0_115.jpg|apple_hit_1|
|s3://projetfruits/data/input/Test/apple_hit_1/r0_119.jpg|apple_hit_1|
|s3://projetfruits/data/input/Test/apple_hit_1/r0_107.jpg|apple_hit_1|
|s3://projetfruits/data/input/Test/apple_hit_1/r0_143.jpg|apple_hit_1|
|s3://projetfruits/data/input/Test/apple_hit_1/r0_111.jpg|apple_hit_1|
+--------------------------------------------------------+-----------+
only showing top 5 rows

None

In [None]:
# images_sample = images_sample.withColumn('label', element_at(split(images_sample['path'], '/'),-2))
# print(images_sample.printSchema())
# print(images_sample.select('path','label').show(5,False))

# MobileNetV2

In [9]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5

In [10]:
new_model = Model(inputs=model.input, outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Spark Broadcast 

Dans Spark RDD et DataFrame, les variables de diffusion sont des variables partagées en lecture seule qui sont mises en cache et disponibles sur tous les nœuds d'un cluster afin d'accéder ou d'utiliser les tâches.

Au lieu d'envoyer ces données avec chaque tâche, Spark distribue des variables de diffusion à la machine en utilisant des algorithmes de diffusion efficaces pour réduire les coûts de communication.

Au lieu de distribuer ces informations avec chaque tâche sur le réseau (surcharge et perte de temps), nous pouvons utiliser la variable de diffusion pour mettre en cache ces informations de recherche sur chaque machine et les tâches utilisent ces informations mises en cache lors de l'exécution des transformations.

## Comment fonctionne Spark Broadcast ?

Les variables de diffusion sont utilisées de la même manière pour RDD, DataFrame et Dataset. Lorsque vous exécutez un RDD Spark, des jobs DataFrame dont les variables de diffusion sont définies et utilisées, Spark effectue les opérations suivantes:

Spark breaks the job into stages that have distributed shuffling and actions are executed with in the stage.
Later Stages are also broken into tasks
Spark broadcasts the common data (reusable) needed by tasks within each stage.
The broadcasted data is cache in serialized format and deserialized before executing each task.

In [11]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 Conv1 (Conv2D)                 (None, 112, 112, 32  864         ['input_1[0][0]']                
                                )                                                                 
                                                                                                  
 bn_Conv1 (BatchNormalization)  (None, 112, 112, 32  128         ['Conv1[0][0]']                  
                                )                                                             

In [13]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



In [15]:
print(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://projetfruits/data/output

In [16]:
features_df = images.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# features_df_sample = images_sample.repartition(24).select(col("path"),
#                                             col("label"),
#                                             featurize_udf("content").alias("features")
#                                            )

In [None]:
# features_df_sample.write.mode("overwrite").parquet(PATH_Result_sample)

In [17]:
features_df.write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [74]:
df = pd.read_parquet(PATH_Result, engine='pyarrow')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [75]:
df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                                path  ...                                           features
0  s3://projetfruits/data/input/Test/apple_hit_1/...  ...  [0.21089473, 0.08602117, 1.3680781, 0.0, 0.353...
1  s3://projetfruits/data/input/Test/apple_hit_1/...  ...  [0.00890907, 0.20073263, 2.0521815, 0.0, 0.176...
2  s3://projetfruits/data/input/Test/apple_hit_1/...  ...  [0.21464187, 0.24185033, 0.29451635, 0.0, 0.0,...
3  s3://projetfruits/data/input/Test/apple_hit_1/...  ...  [0.0, 0.4196838, 0.25495315, 0.0, 0.3626423, 0...
4  s3://projetfruits/data/input/Test/cabbage_whit...  ...  [0.0, 0.5918541, 1.0169322, 0.0030952473, 1.66...

[5 rows x 3 columns]

In [76]:
df.loc[0,'features'].shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(1280,)

In [77]:
df.shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(3110, 3)

# ResNet50

In [None]:
from tensorflow.keras.applications.resnet50 import ResNet50

In [None]:
model_resnet = ResNet50(include_top=False)
model_resnet.summary() 

In [None]:
bc_model_weights = sc.broadcast(model.get_weights())
def model_fn():
    model_resnet = ResNet50(weights=None, include_top=False)
    model_resnet.set_weights(bc_model_weights.value)
    return model

In [None]:
features_df_resnet = images.repartition(16).select(col("path"), featurize_udf("content").alias("features"))

# ACP

In [22]:
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.ml.functions import array_to_vector, vector_to_array
from pyspark.ml.feature import PCA

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [78]:
df_for_pca = spark.read.parquet(PATH_Result)
df_for_pca.show(6)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----------+--------------------+
|                path|      label|            features|
+--------------------+-----------+--------------------+
|s3://projetfruits...|apple_hit_1|[0.263347, 0.1623...|
|s3://projetfruits...|apple_hit_1|[0.58557224, 1.87...|
|s3://projetfruits...|apple_hit_1|[0.26966664, 0.97...|
|s3://projetfruits...|apple_hit_1|[1.0179651, 0.093...|
|s3://projetfruits...|apple_hit_1|[0.5515656, 0.021...|
|s3://projetfruits...|apple_hit_1|[0.34845087, 0.02...|
+--------------------+-----------+--------------------+
only showing top 6 rows

In [82]:
# conversion du type de la colonne 'features' en vecteur pyspark
df_for_pca = df_for_pca.withColumn('features', array_to_vector('features'))
df.show(6)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
scaler = StandardScaler(
    inputCol = 'features', 
    outputCol = 'scaledFeatures',
    withMean = True,
    withStd = True
).fit(df_for_pca)

# when we transform the dataframe, the old
# feature will still remain in it
df_scaled = scaler.transform(df_for_pca)
df_scaled.show(6)

In [85]:
# PCA
pca = PCA(k=1280, inputCol="scaledFeatures", outputCol="pca_features")
model_pca = pca.fit(df_scaled)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Explained Variance Ratio [1.44961505e-01 1.15144214e-01 7.38885286e-02 5.09932777e-02
 3.92854630e-02 3.52634933e-02 2.92427913e-02 2.65665064e-02
 2.27421408e-02 2.14503369e-02 1.91034375e-02 1.71291278e-02
 1.60559494e-02 1.40448307e-02 1.22771976e-02 1.11807230e-02
 1.07439526e-02 1.00952885e-02 9.62175239e-03 9.03170276e-03
 8.35238400e-03 7.20685676e-03 6.94363588e-03 6.80979339e-03
 6.60176396e-03 6.26345089e-03 5.87395728e-03 5.62397441e-03
 5.15209631e-03 5.05334855e-03 4.72562421e-03 4.50104595e-03
 4.27514616e-03 4.09877532e-03 3.92757084e-03 3.81403976e-03
 3.53539137e-03 3.46280770e-03 3.27054083e-03 3.20247262e-03
 3.05860352e-03 2.91223264e-03 2.90376925e-03 2.86105559e-03
 2.70295579e-03 2.66640834e-03 2.57354994e-03 2.51597879e-03
 2.48221000e-03 2.37467662e-03 2.33495183e-03 2.25474774e-03
 2.16052045e-03 2.12848518e-03 2.07046965e-03 2.03657355e-03
 1.99707444e-03 1.95644681e-03 1.92090929e-03 1.89121152e-03
 1.84761902e-03 1.78972728e-03 1.76778673e-03 1.75651187e-03

In [None]:
eigenvalues = model_pca.explainedVariance

In [86]:
import matplotlib.pyplot as plt
plt.plot(range(1, len(eigenvalues) + 1), eigenvalues, 'bo-')
plt.xlabel("Nombre de composantes")
plt.ylabel("Valeur propre")
plt.title("Éboulis des valeurs propres")
plt.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0.9755732253410361

In [None]:
cumulative_variance = [sum(eigenvalues[:i + 1]) for i in range(len(eigenvalues))]
plt.plot(range(1, len(cumulative_variance) + 1), cumulative_variance, 'bo-')
plt.xlabel("Nombre de composantes")
plt.ylabel("Variance expliquée cumulée")
plt.title("Variance expliquée cumulée")
plt.show()

In [None]:
df_pca = model_pca.transform(df_scaled)
df_pca.show(6)

In [None]:
# Variance expliquée
print('Explained Variance Ratio', model_pca.explainedVariance.toArray())

In [None]:
# Calcul de la variance expliquée cumulée
cumulative_variance = np.cumsum(model_pca.explainedVariance.toArray())

In [None]:
# Trouvez l'indice où la variance expliquée cumulée atteint 99%
index = np.argmax(cumulative_variance >= 0.99)

In [None]:
print('Le nombre de composante retenue : ', index + 1)

In [None]:
# On ne garde pas la colonne 'features' et on convertit 'pca_features' en array
df_pca = df_pca.select(['path', 'label', vector_to_array('pca_features').alias('pca_features')])
df_pca.show(6)

In [None]:
# Enregistrement des données traitées au format "parquet" :
df_pca.repartition(20).write.mode("overwrite").parquet(PATH_Result_pca)

In [90]:
# not sure if this is the best way to do it
# X_pca = df_pca.rdd.map(lambda row: row.pca_features).collect()
# X_pca = np.array(X_pca)
# X_pca

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

array([[  6.622054  ,   0.35199402,  -0.8340074 , ...,   0.18379909,
          0.17600623,  -0.17411002],
       [  7.42664255,  -4.3644764 ,   2.71973628, ...,  -0.21253421,
         -0.39610376,  -0.46381433],
       [  6.54201508,   0.96564316,   1.04585803, ...,   0.51018851,
         -0.09006563,  -0.67336332],
       ...,
       [  2.75335649,   6.24051248, -11.76083331, ...,   0.44407091,
         -0.23837532,  -0.56726685],
       [  1.04280615,   6.64341848, -12.72842514, ...,  -0.19549823,
         -0.05322051,  -0.2028444 ],
       [ -0.10904562,   6.45431899, -13.03566351, ...,   0.2002107 ,
         -0.13538717,  -0.04478604]])

In [28]:
# Enregistrement des données traitées au format "parquet" :
df_pca.repartition(20).write.mode("overwrite").parquet(PATH_Result_pca)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
# Lecture des résultat pca
df__pca = pd.read_parquet(PATH_Result_pca, engine='pyarrow')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
df__pca

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                                   path  ...                                       pca_features
0     s3://projetfruits/data/input/Test/apple_golden...  ...  [5.074897954419245, 7.380378554618761, -9.0521...
1     s3://projetfruits/data/input/Test/carrot_1/r0_...  ...  [3.290617649943695, -12.6965803492289, -2.6680...
2     s3://projetfruits/data/input/Test/zucchini_dar...  ...  [3.9143467931413474, -12.762743617140975, -12....
3     s3://projetfruits/data/input/Test/pear_1/r0_11...  ...  [4.776133265656421, 7.377103263864188, -4.0594...
4     s3://projetfruits/data/input/Test/eggplant_vio...  ...  [2.759077231734376, -11.002419585909834, -6.80...
...                                                 ...  ...                                                ...
3105  s3://projetfruits/data/input/Test/pear_3/r0_31...  ...  [1.8465232983500581, 5.645154383883263, -1.589...
3106  s3://projetfruits/data/input/Test/apple_red_3/...  ...  [-5.745054683989173, 1.8257846187282498, 0

In [31]:
print(spark.sparkContext.getConf().get("spark.driver.host"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

ip-172-31-41-180.eu-west-3.compute.internal

In [32]:
configurations = spark.sparkContext.getConf().getAll()
for item in configurations: print(item)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

('spark.eventLog.enabled', 'true')
('spark.executor.memory', '9486M')
('spark.yarn.app.container.log.dir', '/var/log/hadoop-yarn/containers/application_1687080233630_0001/container_1687080233630_0001_01_000001')
('spark.app.id', 'application_1687080233630_0001')
('spark.driver.extraJavaOptions', "-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.se

# Test save au format csv Features

In [None]:
from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col,lit
from pyspark.sql.types import StringType, FloatType

In [14]:
df = spark.read.parquet(PATH_Result)

In [15]:
df.printSchema()

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: float (containsNull = true)



In [16]:
df.show()

+--------------------+-----------+--------------------+
|                path|      label|            features|
+--------------------+-----------+--------------------+
|file:/home/daipc/...|     pear_1|[0.65473557, 0.17...|
|file:/home/daipc/...|     pear_1|[1.2782416, 0.171...|
|file:/home/daipc/...| zucchini_1|[0.0, 0.46113133,...|
|file:/home/daipc/...| zucchini_1|[0.0, 0.6455605, ...|
|file:/home/daipc/...|     pear_1|[0.6565265, 0.245...|
|file:/home/daipc/...| zucchini_1|[0.18821537, 0.84...|
|file:/home/daipc/...|     pear_1|[0.22975095, 0.30...|
|file:/home/daipc/...|     pear_1|[2.0570443, 0.408...|
|file:/home/daipc/...| zucchini_1|[0.0, 0.50699013,...|
|file:/home/daipc/...| zucchini_1|[0.0, 0.6695162, ...|
|file:/home/daipc/...|     pear_1|[2.3288968, 0.070...|
|file:/home/daipc/...| zucchini_1|[0.0, 0.2755962, ...|
|file:/home/daipc/...|     pear_1|[1.5394454, 0.438...|
|file:/home/daipc/...|     pear_1|[1.4804848, 0.192...|
|file:/home/daipc/...|     pear_1|[0.45501274, 0

In [19]:
# Nombre de features : 
num_features = len(df.select("features").first()[0]) 
num_features

1280

In [20]:
# Extraction des composante avec udf function
extract_features = udf(lambda x, i: float(x[i]), FloatType())

In [21]:
extract_features

<function __main__.<lambda>(x, i)>

In [22]:
for i in range(num_features):
    df = df.withColumn(f"feature_{i+1}", extract_features("features", lit(i)))

In [23]:
df.printSchema()

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- component_1: float (nullable = true)
 |-- component_2: float (nullable = true)
 |-- component_3: float (nullable = true)
 |-- component_4: float (nullable = true)
 |-- component_5: float (nullable = true)
 |-- component_6: float (nullable = true)
 |-- component_7: float (nullable = true)
 |-- component_8: float (nullable = true)
 |-- component_9: float (nullable = true)
 |-- component_10: float (nullable = true)
 |-- component_11: float (nullable = true)
 |-- component_12: float (nullable = true)
 |-- component_13: float (nullable = true)
 |-- component_14: float (nullable = true)
 |-- component_15: float (nullable = true)
 |-- component_16: float (nullable = true)
 |-- component_17: float (nullable = true)
 |-- component_18: float (nullable = true)
 |-- component_19: float (nullable = true)
 |-- component_20: float (nu

In [24]:
# Colonne à enregistrer : 
selected_columns = ["path", "label"] + [f"feature_{i+1}" for i in range(num_features)]
df_selected = df.select(*selected_columns)

In [25]:
df_selected.printSchema()

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- component_1: float (nullable = true)
 |-- component_2: float (nullable = true)
 |-- component_3: float (nullable = true)
 |-- component_4: float (nullable = true)
 |-- component_5: float (nullable = true)
 |-- component_6: float (nullable = true)
 |-- component_7: float (nullable = true)
 |-- component_8: float (nullable = true)
 |-- component_9: float (nullable = true)
 |-- component_10: float (nullable = true)
 |-- component_11: float (nullable = true)
 |-- component_12: float (nullable = true)
 |-- component_13: float (nullable = true)
 |-- component_14: float (nullable = true)
 |-- component_15: float (nullable = true)
 |-- component_16: float (nullable = true)
 |-- component_17: float (nullable = true)
 |-- component_18: float (nullable = true)
 |-- component_19: float (nullable = true)
 |-- component_20: float (nullable = true)
 |-- component_21: float (nullable = true)
 |-- component_22: float (nu

In [32]:
# Méthode 01
output_path = PATH_Result_csv + "/features_csv_1"
df_selected.write.csv(output_path, header=True, mode="overwrite")

                                                                                

In [33]:
# Méthode 02
output_path = PATH_Result_csv + "/features_csv_2"
df_selected.write.mode("overwrite").option("header","true").csv(output_path)

                                                                                

In [34]:
# Méthode 03
output_path = PATH_Result_csv + "/features_csv_3"
df_selected.coalesce(1).write.csv(output_path, header=True, mode="overwrite")

                                                                                

In [36]:
# Importation 01
output_path = PATH_Result_csv + "/features_csv_1"
Test_1 = pd.read_csv(output_path + "/" + ".csv")
Test_1.shape

(451, 1282)

In [37]:
output_path = PATH_Result_csv + "/features_csv_2"
Test_2 = pd.read_csv(output_path + "/" + ".csv")
Test_2.shape

(451, 1282)

In [38]:
output_path = PATH_Result_csv + "/features_csv_3"
Test_2 = pd.read_csv(output_path + "/" + ".csv")
Test_2.shape

(451, 1282)

# Test save au format csv PCA

In [39]:
# 18:21:00
df_pca = spark.read.parquet(PATH_Result_pca)

In [40]:
# 18:21:00
df_pca.printSchema()

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- pca_features: array (nullable = true)
 |    |-- element: double (containsNull = true)



In [41]:
# 18:21:00
df_pca.show()

+--------------------+-----------+--------------------+
|                path|      label|        pca_features|
+--------------------+-----------+--------------------+
|file:/home/daipc/...|apple_red_2|[-4.4579277138267...|
|file:/home/daipc/...|     pear_1|[-6.9365886016231...|
|file:/home/daipc/...| zucchini_1|[9.39008334869121...|
|file:/home/daipc/...| zucchini_1|[6.63079878474374...|
|file:/home/daipc/...|   carrot_1|[16.4312965586575...|
|file:/home/daipc/...|apple_red_2|[-5.1023044471677...|
|file:/home/daipc/...| zucchini_1|[9.34122503348220...|
|file:/home/daipc/...|apple_red_2|[-1.1084710443346...|
|file:/home/daipc/...|apple_red_2|[-2.3547063856848...|
|file:/home/daipc/...|apple_red_2|[-6.1522940768051...|
|file:/home/daipc/...|apple_red_2|[-4.8419157363350...|
|file:/home/daipc/...|apple_red_2|[-3.0340259199311...|
|file:/home/daipc/...| zucchini_1|[10.3705548004934...|
|file:/home/daipc/...|     pear_1|[-5.2281617226676...|
|file:/home/daipc/...|apple_red_2|[-3.2346384619

In [106]:
# Nombre de composante principale : 
num_components = len(df_pca.select("pca_features").first()[0]) 
num_components

350

In [107]:
# Extraction des composante avec udf function
extract_component = udf(lambda x, i: float(x[i]), FloatType())

In [108]:
extract_component

<function __main__.<lambda>(x, i)>

In [109]:
for i in range(num_components):
    df_pca = df_pca.withColumn(f"component_{i+1}", extract_component("pca_features", lit(i)))

In [110]:
df_pca.printSchema()

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- pca_features: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- component_1: float (nullable = true)
 |-- component_2: float (nullable = true)
 |-- component_3: float (nullable = true)
 |-- component_4: float (nullable = true)
 |-- component_5: float (nullable = true)
 |-- component_6: float (nullable = true)
 |-- component_7: float (nullable = true)
 |-- component_8: float (nullable = true)
 |-- component_9: float (nullable = true)
 |-- component_10: float (nullable = true)
 |-- component_11: float (nullable = true)
 |-- component_12: float (nullable = true)
 |-- component_13: float (nullable = true)
 |-- component_14: float (nullable = true)
 |-- component_15: float (nullable = true)
 |-- component_16: float (nullable = true)
 |-- component_17: float (nullable = true)
 |-- component_18: float (nullable = true)
 |-- component_19: float (nullable = true)
 |-- component_20: floa

In [111]:
# Colonne à enregistrer : 
selected_columns = ["path", "label"] + [f"component_{i+1}" for i in range(num_components)]
df_pca = df_pca.select(*selected_columns)

In [112]:
df_pca.printSchema()

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- component_1: float (nullable = true)
 |-- component_2: float (nullable = true)
 |-- component_3: float (nullable = true)
 |-- component_4: float (nullable = true)
 |-- component_5: float (nullable = true)
 |-- component_6: float (nullable = true)
 |-- component_7: float (nullable = true)
 |-- component_8: float (nullable = true)
 |-- component_9: float (nullable = true)
 |-- component_10: float (nullable = true)
 |-- component_11: float (nullable = true)
 |-- component_12: float (nullable = true)
 |-- component_13: float (nullable = true)
 |-- component_14: float (nullable = true)
 |-- component_15: float (nullable = true)
 |-- component_16: float (nullable = true)
 |-- component_17: float (nullable = true)
 |-- component_18: float (nullable = true)
 |-- component_19: float (nullable = true)
 |-- component_20: float (nullable = true)
 |-- component_21: float (nullable = true)
 |-- component_22: float (nu

In [113]:
# Méthode 01
output_path = PATH_Result_csv + "/pca_csv_1"
df_pca.write.csv(output_path, header=True, mode="overwrite")

In [114]:
# Méthode 02
output_path = PATH_Result_csv + "/pca_csv_2"
df_pca.write.mode("overwrite").option("header","true").csv(output_path)

In [115]:
# Méthode 03
output_path = PATH_Result_csv + "/pca_csv_3"
df_pca.coalesce(1).write.csv(output_path, header=True, mode="overwrite")

In [116]:
# Importation 01
output_path = PATH_Result_csv + "/pca_csv_1"
Test_pca_1 = pd.read_csv(output_path + "/" + "file_name.csv")
Test_pca_1.shape

(451, 352)

In [117]:
# Importation 02
output_path = PATH_Result_csv + "/pca_csv_2"
Test_pca_2 = pd.read_csv(output_path + "/" + "file_name.csv")
Test_pca_2.shape

(451, 352)

In [42]:
# Importation 03
output_path = PATH_Result_csv + "/pca_csv_3"
Test_pca_3 = pd.read_csv(output_path + "/" + "file_name.csv")
Test_pca_3.shape

(451, 352)

In [43]:
Test_pca_3

Unnamed: 0,path,label,component_1,component_2,component_3,component_4,component_5,component_6,component_7,component_8,...,component_341,component_342,component_343,component_344,component_345,component_346,component_347,component_348,component_349,component_350
0,file:/home/daipc/oc-projets/projetfruits/noteb...,apple_red_2,-4.457928,-6.700167,-0.450851,1.713894,-2.330002,2.673411,3.115550,-1.292409,...,-0.159494,0.175289,0.134298,0.157681,0.010945,0.288909,-0.228167,0.339564,-0.185704,-0.318244
1,file:/home/daipc/oc-projets/projetfruits/noteb...,pear_1,-6.936589,11.989290,-5.567602,8.594714,-3.599343,-1.505842,3.333837,-4.666601,...,-0.265635,0.141402,0.129588,0.328233,0.170131,0.332687,-0.335579,0.528246,-0.113957,-0.290963
2,file:/home/daipc/oc-projets/projetfruits/noteb...,zucchini_1,9.390083,5.995908,5.664782,6.812156,-1.462526,-0.506818,3.569033,0.886619,...,-0.148626,0.229828,0.117160,0.330004,0.199985,0.246670,-0.351446,0.399250,-0.124625,-0.296629
3,file:/home/daipc/oc-projets/projetfruits/noteb...,zucchini_1,6.630799,6.284658,6.851362,6.547358,-0.693735,-1.159967,1.619842,-0.251467,...,-0.265741,0.314162,0.087772,0.329134,0.192901,0.204236,-0.255134,0.374595,-0.087262,-0.435572
4,file:/home/daipc/oc-projets/projetfruits/noteb...,carrot_1,16.431297,-1.858502,-10.695012,6.315997,-3.137121,0.538829,3.244335,0.244492,...,-0.179208,0.219398,0.211376,0.286463,0.207592,0.172234,-0.252858,0.388376,-0.095368,-0.311801
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
446,file:/home/daipc/oc-projets/projetfruits/noteb...,apple_red_2,-4.865873,-7.940898,1.004774,9.549164,-3.581604,-1.334430,3.662225,-0.459957,...,-0.088795,0.102319,-0.007602,0.337858,0.213238,0.337695,-0.204959,0.485822,-0.223121,-0.298649
447,file:/home/daipc/oc-projets/projetfruits/noteb...,pear_1,-5.036658,4.622024,-2.007984,-0.107654,-6.804373,-4.180892,4.890408,-0.969034,...,-0.119309,0.013146,0.191909,0.223400,0.249209,0.132571,-0.307575,0.415025,-0.103356,-0.420846
448,file:/home/daipc/oc-projets/projetfruits/noteb...,apple_red_2,-1.090696,-4.355439,-0.504698,-3.186336,-0.039313,6.573718,2.880600,-0.868170,...,-0.065835,0.152010,0.128262,0.280048,0.151948,0.343342,-0.395873,0.442403,-0.154784,-0.385237
449,file:/home/daipc/oc-projets/projetfruits/noteb...,pear_1,-6.393210,10.634883,-4.994072,12.427262,0.522687,1.759581,2.512892,2.129437,...,-0.067878,0.237673,0.072029,0.264939,0.219583,0.300271,-0.466307,0.458801,-0.118503,-0.321207


In [120]:
type(Test_pca_3)

pandas.core.frame.DataFrame

In [44]:
Test_pca_3.columns

Index(['path', 'label', 'component_1', 'component_2', 'component_3',
       'component_4', 'component_5', 'component_6', 'component_7',
       'component_8',
       ...
       'component_341', 'component_342', 'component_343', 'component_344',
       'component_345', 'component_346', 'component_347', 'component_348',
       'component_349', 'component_350'],
      dtype='object', length=352)