In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import BinaryType
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import PCA, StandardScaler
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from PIL import Image
import numpy as np
import io
import os
import pandas as pd
import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model

2024-05-28 14:34:43.276860: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
if SparkSession.builder.getOrCreate().sparkContext:
    SparkSession.builder.getOrCreate().sparkContext.stop()

spark = (SparkSession
             .builder
             .appName('test10-spark')
             .master('local')
             .config("spark.sql.parquet.writeLegacyFormat", 'true')
             .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/28 14:29:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
PATH = os.getcwd()
PATH_Data = PATH+'/data/Test1'
PATH_Result = PATH+'/data/results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

PATH:        /Users/gaeldelescluse/Documents/OpenClassRooms/2.Projets/Projet11/ai-cloud-computing-spark/train
PATH_Data:   /Users/gaeldelescluse/Documents/OpenClassRooms/2.Projets/Projet11/ai-cloud-computing-spark/train/data/Test1
PATH_Result: /Users/gaeldelescluse/Documents/OpenClassRooms/2.Projets/Projet11/ai-cloud-computing-spark/train/data/results


In [4]:
sc = spark.sparkContext
spark

In [5]:
df = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

In [6]:
df.show()

                                                                                

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|file:/Users/gaeld...|2021-09-12 19:25:42|  5656|[FF D8 FF E0 00 1...|
|file:/Users/gaeld...|2021-09-12 19:25:42|  5627|[FF D8 FF E0 00 1...|
|file:/Users/gaeld...|2021-09-12 19:25:42|  5613|[FF D8 FF E0 00 1...|
|file:/Users/gaeld...|2021-09-12 19:25:42|  5611|[FF D8 FF E0 00 1...|
|file:/Users/gaeld...|2021-09-12 19:25:42|  5611|[FF D8 FF E0 00 1...|
|file:/Users/gaeld...|2021-09-12 19:25:42|  5606|[FF D8 FF E0 00 1...|
|file:/Users/gaeld...|2021-09-12 19:25:42|  5606|[FF D8 FF E0 00 1...|
|file:/Users/gaeld...|2021-09-12 19:25:42|  5602|[FF D8 FF E0 00 1...|
|file:/Users/gaeld...|2021-09-12 19:25:42|  5599|[FF D8 FF E0 00 1...|
|file:/Users/gaeld...|2021-09-12 19:25:42|  5597|[FF D8 FF E0 00 1...|
|file:/Users/gaeld...|2021-09-12 19:25:42|  5594|[FF D8 FF E0 00 1...|
|file:

In [8]:
df = df.withColumn('label', element_at(split(df['path'], '/'),-2))
print(df.printSchema())
print(df.select('path','label').show())

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+--------------------+--------------+
|                path|         label|
+--------------------+--------------+
|file:/Users/gaeld...|Apple Braeburn|
|file:/Users/gaeld...|Apple Braeburn|
|file:/Users/gaeld...|Apple Braeburn|
|file:/Users/gaeld...|Apple Braeburn|
|file:/Users/gaeld...|Apple Braeburn|
|file:/Users/gaeld...|Apple Braeburn|
|file:/Users/gaeld...|Apple Braeburn|
|file:/Users/gaeld...|Apple Braeburn|
|file:/Users/gaeld...|Apple Braeburn|
|file:/Users/gaeld...|Apple Braeburn|
|file:/Users/gaeld...|Apple Braeburn|
|file:/Users/gaeld...|Apple Braeburn|
|file:/Users/gaeld...|Apple Braeburn|
|file:/Users/gaeld...|Apple Braeburn|
|file:/Users/gaeld...|Apple Braeburn|
|file:/Users/gaeld...|Apple Braeburn|
|file:/Users/gaeld...|Apple Braeburn|
|file:/Users/gaeld...|Apple Br

In [11]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5
[1m14536120/14536120[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 0us/step


In [12]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

In [13]:
brodcast_weights = sc.broadcast(new_model.get_weights())

In [14]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

In [15]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)



In [16]:
features_df = df.repartition(20).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

In [17]:
print(PATH_Result)

/Users/gaeldelescluse/Documents/OpenClassRooms/2.Projets/Projet11/ai-cloud-computing-spark/train/data/results


In [18]:
features_df.write.mode("overwrite").parquet(PATH_Result)

2024-05-28 14:36:51.919981: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step + 1) / 20]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step + 1) / 20]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step + 1) / 20]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step + 1) / 20]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step + 1) / 20]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step + 1