In [13]:
%pip install --upgrade pip
%pip cache purge

# General
import pandas as pd
import numpy as np
import io
import os
from PIL import Image

# Neural network (mobilenet)
%pip install tensorflow
%pip install tensorflow-gpu
import tensorflow as tf
print('\n', 'Tensorflow version ' + tf.__version__, '\n')
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model

# map reduce
# %pip install pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split

# Acces bucket
from google.cloud import storage

# PCA
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors, VectorUDT

# Visu
import pprint
import matplotlib.pyplot as plt

print('\n', 'Done')


Note: you may need to restart the kernel to use updated packages.
Files removed: 125
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Collecting tensorflow-gpu
  Downloading tensorflow-gpu-2.12.0.tar.gz (2.6 kB)
  Preparing metadata (setup.py) ... [?25lerror
  [1;31merror[0m: [1msubprocess-exited-with-error[0m
  
  [31m×[0m [32mpython setup.py egg_info[0m did not run successfully.
  [31m│[0m exit code: [1;36m1[0m
  [31m╰─>[0m [31m[39 lines of output][0m
  [31m   [0m Traceback (most recent call last):
  [31m   [0m   File "/opt/conda/lib/python3.10/site-packages/setuptools/_vendor/packaging/requirements.py", line 35, in __init__
  [31m   [0m     parsed = _parse_requirement(requirement_string)
  [31m   [0m   File "/opt/conda/lib/python3.10/site-packages/setuptools/_vendor/packaging/_parser.py", line 64, in parse_requirement
  [31m   [0m     return _parse_requirement(Tokenizer(sour

In [2]:
# test

def list_folders(bucket_name, prefix):
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    blobs = bucket.list_blobs(prefix=prefix)  # Filter blobs by prefix

    folders = set()
    for blob in blobs:
        folder = blob.name.split('/')[1]  # Get the folder name inside Test1
        if folder:
            folders.add(folder)

    return folders


bucket_name = 'data-fruits-p8'
prefix = 'Test1/'
folders = list_folders(bucket_name, prefix)

print(f'Number of folders on Google Cloud Storage: {len(folders)}', '\n') # 131

print("15 first folders in bucket:", '\n')
for i, folder in enumerate(list(folders)[:10]):
    print(f'{i+1}   {folder}')


Number of folders on Google Cloud Storage: 131 

15 first folders in bucket: 

1   Quince
2   Mulberry
3   Kaki
4   Tomato Maroon
5   Plum
6   Tomato not Ripened
7   Cherry Wax Red
8   Huckleberry
9   Apple Red 1
10   Avocado ripe


In [3]:
spark = (SparkSession
             .builder
             .appName('P8')
             .master('local')
             .config("spark.sql.parquet.writeLegacyFormat", 'true')
             .getOrCreate()
)

sc = spark.sparkContext

spark


24/05/12 15:21:23 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
images = spark.read.format("binaryFile") \
    .option("pathGlobFilter", "*.jpg") \
    .option("recursiveFileLookup", "true") \
    .load("gs://data-fruits-p8/Test1")

type(images)


                                                                                

pyspark.sql.dataframe.DataFrame

In [5]:
images.show(5, True)


                                                                                

+--------------------+--------------------+------+--------------------+
|                path|    modificationTime|length|             content|
+--------------------+--------------------+------+--------------------+
|gs://data-fruits-...|2024-05-12 13:22:...|  7353|[FF D8 FF E0 00 1...|
|gs://data-fruits-...|2024-05-12 13:22:...|  7350|[FF D8 FF E0 00 1...|
|gs://data-fruits-...|2024-05-12 13:22:...|  7349|[FF D8 FF E0 00 1...|
|gs://data-fruits-...|2024-05-12 13:22:...|  7348|[FF D8 FF E0 00 1...|
|gs://data-fruits-...|2024-05-12 13:22:...|  7328|[FF D8 FF E0 00 1...|
+--------------------+--------------------+------+--------------------+
only showing top 5 rows



In [6]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))


root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+--------------------------------------------------+----------+
|path                                              |label     |
+--------------------------------------------------+----------+
|gs://data-fruits-p8/Test1/Watermelon/r_106_100.jpg|Watermelon|
|gs://data-fruits-p8/Test1/Watermelon/r_109_100.jpg|Watermelon|
|gs://data-fruits-p8/Test1/Watermelon/r_108_100.jpg|Watermelon|
|gs://data-fruits-p8/Test1/Watermelon/r_107_100.jpg|Watermelon|
|gs://data-fruits-p8/Test1/Watermelon/r_95_100.jpg |Watermelon|
+--------------------------------------------------+----------+
only showing top 5 rows

None


In [7]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

# new_model.summary()


In [8]:
brodcast_weights = sc.broadcast(new_model.get_weights())


In [9]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)

    return new_model


In [10]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)

    return preprocess_input(arr)


def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input) 

    return pd.Series(preds)


@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER) # type hints instead of type ?
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for i, content_series in enumerate(content_series_iter):
        print('\n', i)
        yield featurize_series(model, content_series)




In [11]:
features_df = images.repartition(20).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

features_df.persist()


DataFrame[path: string, label: string, features: array<float>]

In [12]:
PATH_extraction = "gs://data-fruits-p8/feature-extraction"

features_df.write.mode("overwrite").parquet(PATH_extraction)


2024-05-12 15:24:36.533688: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-05-12 15:24:36.533692: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-05-12 15:24:36.536255: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-05-12 15:24:36.536667: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-05-12 15:24:36.542595: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-05-12 15:24:36.542595: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-05-12 15:24:36.543668: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not b

24/05/12 15:26:23 WARN BlockManager: Putting block rdd_21_7 failed due to exception org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/tmp/ipykernel_1/3857089266.py", line 36, in featurize_udf
  File "/tmp/ipykernel_1/3857089266.py", line 19, in featurize_series
  File "/opt/conda/lib/python3.10/site-packages/pandas/core/series.py", line 584, in __init__
    data = sanitize_array(data, index, dtype, copy)
  File "/opt/conda/lib/python3.10/site-packages/pandas/core/construction.py", line 659, in sanitize_array
    subarr = _sanitize_ndim(subarr, data, dtype, index, allow_2d=allow_2d)
  File "/opt/conda/lib/python3.10/site-packages/pandas/core/construction.py", line 718, in _sanitize_ndim
    raise ValueError(
ValueError: Data must be 1-dimensional, got ndarray of shape (1165, 1280) instead
.
24/05/12 15:26:23 WARN BlockManager: Putting block rdd_21_4 failed due to exception org.apache.spark.api.python.PythonException: Traceback (most recent call las

[1m37/37[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m93s[0m 2s/step6) / 20]
[1m37/37[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m93s[0m 2s/step
[Stage 4:>                                                         (0 + 5) / 20]

24/05/12 15:26:24 WARN BlockManager: Putting block rdd_21_5 failed due to exception org.apache.spark.TaskKilledException.
24/05/12 15:26:24 WARN BlockManager: Block rdd_21_5 could not be removed as it was not found on disk or in memory
24/05/12 15:26:24 WARN TaskSetManager: Lost task 5.0 in stage 4.0 (TID 847) (5024d3e999d0 executor driver): TaskKilled (Stage cancelled)
24/05/12 15:26:24 WARN BlockManager: Putting block rdd_21_6 failed due to exception org.apache.spark.TaskKilledException.
24/05/12 15:26:24 WARN BlockManager: Block rdd_21_6 could not be removed as it was not found on disk or in memory
24/05/12 15:26:24 WARN TaskSetManager: Lost task 6.0 in stage 4.0 (TID 848) (5024d3e999d0 executor driver): TaskKilled (Stage cancelled)


2024-05-12 15:26:25.003020: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-05-12 15:26:25.007860: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-05-12 15:26:25.030101: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-05-12 15:26:25.034490: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-05-12 15:26:25.070040: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2024-05-12 15:26:25.095084: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optim

Py4JJavaError: An error occurred while calling o93.parquet.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:650)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:309)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:793)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 4.0 failed 1 times, most recent failure: Lost task 4.0 in stage 4.0 (TID 846) (5024d3e999d0 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/tmp/ipykernel_1/3857089266.py", line 36, in featurize_udf
  File "/tmp/ipykernel_1/3857089266.py", line 19, in featurize_series
  File "/opt/conda/lib/python3.10/site-packages/pandas/core/series.py", line 584, in __init__
    data = sanitize_array(data, index, dtype, copy)
  File "/opt/conda/lib/python3.10/site-packages/pandas/core/construction.py", line 659, in sanitize_array
    subarr = _sanitize_ndim(subarr, data, dtype, index, allow_2d=allow_2d)
  File "/opt/conda/lib/python3.10/site-packages/pandas/core/construction.py", line 718, in _sanitize_ndim
    raise ValueError(
ValueError: Data must be 1-dimensional, got ndarray of shape (1140, 1280) instead

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:552)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:101)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:50)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:505)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.hasNext(InMemoryRelation.scala:118)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:302)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1538)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1465)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1529)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1352)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:327)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2717)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2653)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2652)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2652)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1189)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1189)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1189)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2913)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2855)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2844)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:959)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:257)
	... 42 more
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/tmp/ipykernel_1/3857089266.py", line 36, in featurize_udf
  File "/tmp/ipykernel_1/3857089266.py", line 19, in featurize_series
  File "/opt/conda/lib/python3.10/site-packages/pandas/core/series.py", line 584, in __init__
    data = sanitize_array(data, index, dtype, copy)
  File "/opt/conda/lib/python3.10/site-packages/pandas/core/construction.py", line 659, in sanitize_array
    subarr = _sanitize_ndim(subarr, data, dtype, index, allow_2d=allow_2d)
  File "/opt/conda/lib/python3.10/site-packages/pandas/core/construction.py", line 718, in _sanitize_ndim
    raise ValueError(
ValueError: Data must be 1-dimensional, got ndarray of shape (1140, 1280) instead

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:552)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:101)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:50)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:505)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.hasNext(InMemoryRelation.scala:118)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:302)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1538)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1465)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1529)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1352)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:327)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more


In [None]:
df = pd.read_parquet(PATH_extraction, engine='pyarrow')
type(df)


In [None]:
df.head()


In [None]:
# Define a UDF to convert array-like objects to DenseVectors
array_to_dense_vector_udf = udf(lambda arr: Vectors.dense(arr), VectorUDT())

# Apply the UDF to the 'features' column
vectors_df = features_df.withColumn('vectors', array_to_dense_vector_udf(col('features')))

vectors_df.show()
vectors_df.printSchema()


In [None]:
# Perform PCA
pca = PCA(k=180, inputCol="vectors", outputCol="pca_features") # environ 0.9 de variance expliquée
model_pca = pca.fit(vectors_df)
result_pca = model_pca.transform(vectors_df)

# Output result
result_pca.show(truncate=False)


In [None]:
print(model_pca.explainedVariance)

# Plot
explained_variance_ratios = model_pca.explainedVariance.toArray()

# Compute cumulative explained variance
cumulative_variance = np.cumsum(explained_variance_ratios)

plt.figure(figsize=(8, 6))
plt.plot(range(1, len(cumulative_variance) + 1), cumulative_variance, marker='o', linestyle='-')
plt.title('Cumulative Variance Explained by PCA Components')
plt.xlabel('Number of Components')
plt.ylabel('Cumulative Variance Explained')
plt.grid(True)
plt.show()


In [None]:
PATH_Result_PCA = "gs://data-fruits-p8/pca"

# delete feature extraction col & vectors (étapes intermédiaires, plus nécessaire.)
result_pca = result_pca.drop('features', 'vectors')
result_pca.show()

result_pca.write.mode("overwrite").parquet(PATH_Result_PCA)




24/05/12 15:26:26 WARN ArrowPythonRunner: Incomplete task 2.0 in stage 4 (TID 844) interrupted: Attempting to kill Python Worker
24/05/12 15:26:26 WARN BlockManager: Putting block rdd_21_2 failed due to exception org.apache.spark.TaskKilledException.
24/05/12 15:26:26 WARN BlockManager: Block rdd_21_2 could not be removed as it was not found on disk or in memory
24/05/12 15:26:26 WARN ArrowPythonRunner: Incomplete task 0.0 in stage 4 (TID 842) interrupted: Attempting to kill Python Worker
24/05/12 15:26:26 WARN BlockManager: Putting block rdd_21_0 failed due to exception org.apache.spark.TaskKilledException.
24/05/12 15:26:26 WARN BlockManager: Block rdd_21_0 could not be removed as it was not found on disk or in memory
24/05/12 15:26:26 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 842) (5024d3e999d0 executor driver): TaskKilled (Stage cancelled)
24/05/12 15:26:26 WARN TaskSetManager: Lost task 2.0 in stage 4.0 (TID 844) (5024d3e999d0 executor driver): TaskKilled (Stage cancell