In [1]:
import pandas as pd
import numpy as np
import io
#import os
#import tensorflow as tf
from PIL import Image
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from typing import Iterator
import time
from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import PCA

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1674288775996_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
def load_data(PATH_Data):
  """
  Load the data in binary files
  """
  # Load images using Spark's binary file data source. You could alternatively use Spark's image data source, but the binary file data source provides more flexibility in how you preprocess images.
  start_time = time.time()
  images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)
  print("Temps d'execution {:.2f} secondes".format(time.time() - start_time))
  return images

def get_labels(images):
  """
  Add a column label to the dataframe spark
  """
  start_time = time.time()
  images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
  print("Temps d'execution {:.2f} secondes".format(time.time() - start_time))
  return images

def get_broadcast_weights():
  """ 
  Returns the broadcasted weights of a MobileNetV2 with top layer removed
  """
  start_time = time.time()
  model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))
  new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
  broadcast_weights = spark.sparkContext.broadcast(new_model.get_weights())
  print("Temps d'execution {:.2f} secondes".format(time.time() - start_time))
  return broadcast_weights

def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(broadcast_weights.value)
    return new_model

def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>')
def featurize_udf(content_series_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

def pca_transform(df, n_components = 20):
  start_time = time.time()
  # Les données images sont converties au format vecteur dense
  ud_f = udf(lambda r: Vectors.dense(r), VectorUDT())
  df = df.withColumn('features', ud_f('features'))

  pca = PCA(k=10, inputCol='features', outputCol='pca_features')
  model = pca.fit(df)
  df = model.transform(df)

  print("Temps d'execution {:.2f} secondes".format(time.time() - start_time))
  return df

def save_results(df, path):
  start_time = time.time()
  df.write.mode("overwrite").parquet(path)
  print("Temps d'execution {:.2f} secondes".format(time.time() - start_time))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
start_time_full = time.time()

PATH = 's3://ma-projet8'
PATH_Data = PATH+'/images'
PATH_Results = PATH+'/Results'

print("-----Loading data-----")

images = load_data(PATH_Data)
print("-----Get labels-----")
images = get_labels(images)

print("-----Broadcast the weights of the model-----")
broadcast_weights = get_broadcast_weights()

print("-----Apply featurization to the DataFrame of images----- ")
start_time = time.time()
features_df = images.repartition(24).select(col("path"),
                                        col("label"),
                                        featurize_udf("content").alias("features")
                                       )
print("Temps d'execution {:.2f} secondes".format(time.time() - start_time))

print("-----PCA on the features-----")
features_df = pca_transform(features_df, n_components = 80)

df_final = features_df.select(col("path"), col("label"),col("pca_features"))
print("-----Saving the results-----")
save_results(df_final,PATH_Results)

full_time = time.time() - start_time_full
print("Exécution total pour", images.count(), "images :", full_time, "secondes" )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-----Loading data-----
Temps d'execution 11.45 secondes
-----Get labels-----
Temps d'execution 0.34 secondes
-----Broadcast the weights of the model-----
Temps d'execution 1.75 secondes
-----Apply featurization to the DataFrame of images----- 
Temps d'execution 0.23 secondes
-----PCA on the features-----
Temps d'execution 139.23 secondes
-----Saving the results-----
Temps d'execution 66.80 secondes
Exécution total pour 573 images : 219.82818984985352 secondes