In [33]:
import pandas as pd
from PIL import Image
import numpy as np
import io
import os

import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.sql import SparkSession
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors, VectorUDT

In [34]:
PATH = os.getcwd()
PATH_Data = PATH+'/Test1'
PATH_Result = PATH+'/Results_test'
PATH_Result_PCA = PATH+'/Results_pca_test'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

PATH:        /home/nox/OpenClassroom/P9
PATH_Data:   /home/nox/OpenClassroom/P9/Test1
PATH_Result: /home/nox/OpenClassroom/P9/Results_test


In [35]:
spark = (SparkSession
             .builder
             .appName('P8')
             .master('local')
             .config("spark.sql.parquet.writeLegacyFormat", 'true')
             .getOrCreate()
)

In [36]:
sc = spark.sparkContext

In [37]:
spark

In [38]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

In [39]:
images

DataFrame[path: string, modificationTime: timestamp, length: bigint, content: binary]

In [40]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+------------------------------------------------------------------+--------------+
|path                                                              |label         |
+------------------------------------------------------------------+--------------+
|file:/home/nox/OpenClassroom/P9/Test1/Apple Braeburn/r_4_100.jpg  |Apple Braeburn|
|file:/home/nox/OpenClassroom/P9/Test1/Apple Braeburn/r_326_100.jpg|Apple Braeburn|
|file:/home/nox/OpenClassroom/P9/Test1/Apple Braeburn/r_8_100.jpg  |Apple Braeburn|
|file:/home/nox/OpenClassroom/P9/Test1/Apple Braeburn/r_324_100.jpg|Apple Braeburn|
|file:/home/nox/OpenClassroom/P9/Test1/Apple Braeburn/r_327_100.jpg|Apple Braeburn|
+------------------------------------------------------------------+--------------+
only showing top 5 rows

None


In [41]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

In [42]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

In [43]:
brodcast_weights = sc.broadcast(new_model.get_weights())

In [44]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

In [45]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)



In [46]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

In [47]:
features_df = images.repartition(20).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

In [48]:
features_df.cache()
features_df.count() 

2023-09-04 13:23:18.634417: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2023-09-04 13:23:18.635728: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-09-04 13:23:18.662572: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-09-04 13:23:18.662959: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-09-04 13:23:19.753709: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:981] could not op

330

In [49]:
features_df.head()

Row(path='file:/home/nox/OpenClassroom/P9/Test1/Apple Braeburn/r_42_100.jpg', label='Apple Braeburn', features=[0.8058841228485107, 0.07883615046739578, 0.0, 0.0, 0.0, 0.6389163136482239, 0.6069908738136292, 0.0, 0.34067487716674805, 0.13478685915470123, 0.0, 0.029806507751345634, 0.0, 0.0, 0.0, 0.2536926567554474, 0.0, 0.0, 0.12674415111541748, 0.0365186408162117, 0.0, 0.057646285742521286, 0.0, 0.0, 0.0, 0.0, 0.27735090255737305, 0.0, 0.0, 0.3822418451309204, 0.0, 0.0, 0.010006563737988472, 0.0, 0.0, 0.38149359822273254, 3.085493326187134, 1.5804065465927124, 0.12194456160068512, 0.0, 0.3676198124885559, 0.0, 0.09849461913108826, 0.10106868296861649, 0.0, 0.0, 0.10051421821117401, 0.20787304639816284, 0.21403343975543976, 0.0, 0.0, 0.767139196395874, 0.0448344387114048, 0.033961761742830276, 0.0, 1.5115808248519897, 0.0, 0.0059667606838047504, 0.0, 0.029972698539495468, 0.0, 0.38268178701400757, 0.10392799228429794, 0.0, 0.4198763370513916, 0.2493441104888916, 0.42211952805519104, 0.

In [50]:
print(PATH_Result)

/home/nox/OpenClassroom/P9/Results_test


In [51]:
features_df.write.mode("overwrite").parquet(PATH_Result)

In [52]:
df = pd.read_parquet('./EMR/Result_pca_aws/', engine='pyarrow')

In [53]:
df.head()

Unnamed: 0,path,label,features,pca_features
0,s3://oc-p9-data/EMR/Test/Watermelon/r_66_100.jpg,Watermelon,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."
1,s3://oc-p9-data/EMR/Test/Watermelon/r_73_100.jpg,Watermelon,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."
2,s3://oc-p9-data/EMR/Test/Pineapple Mini/140_10...,Pineapple Mini,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."
3,s3://oc-p9-data/EMR/Test/Pineapple Mini/129_10...,Pineapple Mini,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."
4,s3://oc-p9-data/EMR/Test/Cauliflower/r_181_100...,Cauliflower,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."


In [54]:
df.head(20)

Unnamed: 0,path,label,features,pca_features
0,s3://oc-p9-data/EMR/Test/Watermelon/r_66_100.jpg,Watermelon,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."
1,s3://oc-p9-data/EMR/Test/Watermelon/r_73_100.jpg,Watermelon,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."
2,s3://oc-p9-data/EMR/Test/Pineapple Mini/140_10...,Pineapple Mini,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."
3,s3://oc-p9-data/EMR/Test/Pineapple Mini/129_10...,Pineapple Mini,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."
4,s3://oc-p9-data/EMR/Test/Cauliflower/r_181_100...,Cauliflower,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."
5,s3://oc-p9-data/EMR/Test/Watermelon/249_100.jpg,Watermelon,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."
6,s3://oc-p9-data/EMR/Test/Raspberry/111_100.jpg,Raspberry,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."
7,s3://oc-p9-data/EMR/Test/Raspberry/88_100.jpg,Raspberry,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."
8,s3://oc-p9-data/EMR/Test/Cauliflower/r2_226_10...,Cauliflower,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."
9,s3://oc-p9-data/EMR/Test/Pineapple/3_100.jpg,Pineapple,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."


In [55]:
df.dtypes

path            object
label           object
features        object
pca_features    object
dtype: object

In [56]:
df.loc[0,'features']

{'type': 1,
 'size': None,
 'indices': None,
 'values': array([1.31945956, 0.27603984, 0.        , ..., 0.60496193, 0.        ,
        0.        ])}

In [57]:
from pyspark.sql.functions import udf
def array_to_vector(array):
    return Vectors.dense(array)

array_to_vector_udf = udf(array_to_vector, VectorUDT())

features_df_transformed = features_df.withColumn("features", array_to_vector_udf(features_df["features"]))


In [58]:
# Set the number of principal components you want, e.g., 50
num_components = 50

pca = PCA(k=num_components, inputCol="features", outputCol="pca_features")
pca_model = pca.fit(features_df_transformed)
df_pca = pca_model.transform(features_df_transformed)

                                                                                

In [59]:
df_pca.show()


+--------------------+--------------+--------------------+--------------------+
|                path|         label|            features|        pca_features|
+--------------------+--------------+--------------------+--------------------+
|file:/home/nox/Op...|Apple Braeburn|[0.80588412284851...|[-8.4536423602631...|
|file:/home/nox/Op...|Apple Braeburn|[0.87095409631729...|[-6.9772924263674...|
|file:/home/nox/Op...|Apple Braeburn|[0.67813092470169...|[-7.9456001513673...|
|file:/home/nox/Op...|Apple Braeburn|[0.91799819469451...|[-8.7203823230851...|
|file:/home/nox/Op...|    Clementine|[1.72573387622833...|[-0.7750212816482...|
|file:/home/nox/Op...|    Clementine|[0.13881731033325...|[3.03507497863312...|
|file:/home/nox/Op...|    Clementine|[1.33992254734039...|[-1.1517348866723...|
|file:/home/nox/Op...|    Clementine|[0.27237147092819...|[2.18096316602275...|
|file:/home/nox/Op...|Apple Braeburn|[0.75762093067169...|[-8.6475609154594...|
|file:/home/nox/Op...|Apple Braeburn|[0.

In [60]:
df_pca.write.mode("overwrite").parquet(PATH_Result_PCA)

                                                                                

In [61]:
first_row_pca = df_pca.limit(1).toPandas()

display(first_row_pca)

Unnamed: 0,path,label,features,pca_features
0,file:/home/nox/OpenClassroom/P9/Test1/Apple Br...,Apple Braeburn,"[0.8058841228485107, 0.07883615046739578, 0.0,...","[-8.453642360263169, -0.8938787056680216, -0.3..."


In [62]:
features_length = len(first_row_pca['features'][0])
pca_features_length = len(first_row_pca['pca_features'][0])

print(f"Length of 'features' vector: {features_length}")
print(f"Length of 'pca_features' vector: {pca_features_length}")


Length of 'features' vector: 1280
Length of 'pca_features' vector: 50


In [63]:
explained_variances = pca_model.explainedVariance
cumulative_variance = explained_variances.toArray().cumsum()
print(f"Cumulative variance explained by the first 50 components: {cumulative_variance[49]:.4f}")


Cumulative variance explained by the first 50 components: 0.5933


In [65]:
#spark.stop()