In [1]:
import pandas as pd
from PIL import Image
import numpy as np
import io

import tensorflow as tf
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array

# Pyspark
import findspark
findspark.init()
import pyspark


from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col, pandas_udf

import pyarrow

from contextlib import contextmanager
import time 
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

print('pyspark ==', pyspark.__version__)

pyspark == 3.2.1


In [2]:
@contextmanager
def timer(title):
    t0 = time.time()
    yield
    print("{} - done in {:.0f}s".format(title, time.time() - t0))

**Spark** is a way a to parallelize big data treatment. 
- It provides a API for Java, Scala, Python and R, a opptimized enfine to execute viualisation. 
- It provides also high level tools:
    - **Spark SQL** to handle strured data treatment, 
    - **MLlib** for data machine learning, 
    - **GraphX** for the visualisation,
    - **Spark Streaming** for data's treatment in streaming.

# SparkSession Creation - Configuration - Spark UI


**SparkSession** : entry point of Pyspark
- __builder()__ spark sesion generator.
- __getOrCreate()__ use an already existing sparksession, and or create a new one.
- __master()__ master name as  argument (yarn, mesos, or local[x] with x > 0, interger. Ideally: x = number a core processor, it's the number of partition that spark has to use for RDD, Dataframe or dataset.
- __appName()__ application's name.

In [3]:
spark = SparkSession.builder\
        .master("local[*]")\
        .appName('P8_PySpark')\
        .getOrCreate()

# where the '*' represents all the cores of the CPU.

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/24 08:39:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


__Configuration__
- __spark.sql.repl.eagerEval.enabled__: activate the configuration of the fast evualuation of Pyspark's Dataframe in Jupyter notebook. 
- __spark.sql.repl.eagerEval.maxNumRows__: Number of row to show. 
- __spark.sql.execution.arrow.pyspark.enabled__ : Arrow is available as an optimization when converting a Spark DataFrame to Pandas DataFrame using the toPandas() call and when creating a Spark DataFrame from a Pandas DataFrame with createDataFrame(pandas_df). To use Arrow when executing these calls, users must first set the Spark configuration "spark.sql.execution.arrow.enabled" to "true". This is disabled by default.

In [4]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
spark.conf.set('spark.sql.repl.eagerEval.maxNumRows', 5)
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)

__WEB interface__
- __Web User Interface__ : default port is 4040, show usefull info about the apps.
- __A list__ step and work of the planificator
- __A summary__ of RDD sizes and memory use
- __Env informations__
- __Executor informations__

http://localhost:4040

In [5]:
spark

# Data load

__"binaryFile" Format__

- __binaryFile__: Image are load using the binary sources of Spark. We could also use the image data source of Spark, but the binary sources provides more flexibility in how images are pre-processed.
   - __path (StringType)__: Path to the file
   - __modificationTime (TimestampType)__: Time of fil modification 
   - __length (LongType)__: File size in octet
   - __content (BinaryType)__: File content
   

In [6]:
# Path through image (local)
img_path = 'data/*'

# Binaryfile format
data = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(img_path)


In [7]:
data.show()

                                                                                

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|file:/Users/franc...|2021-09-12 19:27:02|  5597|[FF D8 FF E0 00 1...|
|file:/Users/franc...|2021-09-12 19:27:02|  5582|[FF D8 FF E0 00 1...|
|file:/Users/franc...|2021-09-12 19:27:02|  5193|[FF D8 FF E0 00 1...|
|file:/Users/franc...|2021-09-12 19:27:02|  5098|[FF D8 FF E0 00 1...|
|file:/Users/franc...|2021-09-12 19:26:16|  5062|[FF D8 FF E0 00 1...|
|file:/Users/franc...|2021-09-12 19:26:16|  4969|[FF D8 FF E0 00 1...|
|file:/Users/franc...|2021-09-12 19:26:16|  4961|[FF D8 FF E0 00 1...|
|file:/Users/franc...|2021-09-12 19:26:16|  4939|[FF D8 FF E0 00 1...|
|file:/Users/franc...|2021-09-12 19:25:46|  4154|[FF D8 FF E0 00 1...|
|file:/Users/franc...|2021-09-12 19:25:46|  4133|[FF D8 FF E0 00 1...|
|file:/Users/franc...|2021-09-12 19:25:46|  4098|[FF D8 FF E0 00 1...|
|file:

- __printSchema__ : Print the dataframe tree schema

In [8]:
data.printSchema()

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)



# Image's labels extraction

In [9]:
# Extraction of the label from the path string
image_df = data.withColumn('label', split(col('path'), '/').getItem(6))
image_df = image_df.select('path', 'content', 'label')
image_df.show()

+--------------------+--------------------+----------------+
|                path|             content|           label|
+--------------------+--------------------+----------------+
|file:/Users/franc...|[FF D8 FF E0 00 1...|      Strawberry|
|file:/Users/franc...|[FF D8 FF E0 00 1...|      Strawberry|
|file:/Users/franc...|[FF D8 FF E0 00 1...|      Strawberry|
|file:/Users/franc...|[FF D8 FF E0 00 1...|      Strawberry|
|file:/Users/franc...|[FF D8 FF E0 00 1...|            Kiwi|
|file:/Users/franc...|[FF D8 FF E0 00 1...|            Kiwi|
|file:/Users/franc...|[FF D8 FF E0 00 1...|            Kiwi|
|file:/Users/franc...|[FF D8 FF E0 00 1...|            Kiwi|
|file:/Users/franc...|[FF D8 FF E0 00 1...|AppleGrannySmith|
|file:/Users/franc...|[FF D8 FF E0 00 1...|AppleGrannySmith|
|file:/Users/franc...|[FF D8 FF E0 00 1...|AppleGrannySmith|
|file:/Users/franc...|[FF D8 FF E0 00 1...|AppleGrannySmith|
|file:/Users/franc...|[FF D8 FF E0 00 1...|       Blueberry|
|file:/Users/franc...|[F

# Dimension reduction

## Model preparation

Download a model file for featurization, and truncate the last layer(s). This notebook uses ResNet50.

Spark workers need to access the model and its weights.

- For moderately sized models (< 1GB in size), a good practice is to download the model to the Spark driver and then broadcast the weights to the workers. This notebook uses this approach.
- For large models (> 1GB), it is best to load the model weights from distributed storage to workers directly.


In [10]:
model = ResNet50(include_top=False)
model.summary()  # verify that the top layer is removed

2022-05-24 08:39:28.677317: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


Model: "resnet50"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, None, None,  0           []                               
                                 3)]                                                              
                                                                                                  
 conv1_pad (ZeroPadding2D)      (None, None, None,   0           ['input_1[0][0]']                
                                3)                                                                
                                                                                                  
 conv1_conv (Conv2D)            (None, None, None,   9472        ['conv1_pad[0][0]']              
                                64)                                                        

In [11]:
# Broadcast model weights to the workers
bc_model_weights = spark.sparkContext.broadcast(model.get_weights())

def model_fn():
  """
  Returns a ResNet50 model with top layer removed and broadcasted pretrained weights.
  """
  model = ResNet50(weights=None, include_top=False)
  model.set_weights(bc_model_weights.value)
  return model

**Define image loading and featurization logic in a Pandas UDF**

This notebook defines the logic in steps, building up to the Pandas UDF. The call stack is:

- pandas UDF
    - featurize a pd.Series of images 
        - preprocess one image

This notebook uses the newer Scalar Iterator pandas UDF to amortize the cost of loading large models on workers.


In [12]:
def preprocess(content):
  """
  Preprocesses raw image bytes for prediction.
  """
  img = Image.open(io.BytesIO(content)).resize([224, 224])
  arr = img_to_array(img)
  return preprocess_input(arr)

def featurize_series(model, content_series):
  """
  Featurize a pd.Series of raw images using the input model.
  :return: a pd.Series of image features
  """
  input = np.stack(content_series.map(preprocess))
  preds = model.predict(input)
  # For some layers, output features will be multi-dimensional tensors.
  # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
  output = [p.flatten() for p in preds]
  return pd.Series(output)

In [13]:
from typing import Iterator
@pandas_udf('array<float>')
def featurize_udf(content_series_iter:Iterator[pd.Series]) -> Iterator[pd.Series]:
  '''
  This method is a Scalar Iterator pandas UDF wrapping our featurization function.
  The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).
  
  :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
  '''
  # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
  # for multiple data batches.  This amortizes the overhead of loading big models.
  model = model_fn()
  for content_series in content_series_iter:
    yield featurize_series(model, content_series)

## Apply featurization to the DataFrame of images


In [14]:
# Pandas UDFs on large records (e.g., very large images) can run into Out Of Memory (OOM) errors.
# To fix this error we can try reducing the Arrow batch size via `maxRecordsPerBatch`.
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")


In [15]:
# We can now run featurization on our entire Spark DataFrame.
# NOTE: This can take a long time (if we are taking a lot of fruits image) since it applies a large model to the full dataset.
features_df = image_df.select(col("path"), col("label"), featurize_udf("content").alias("features"))

In [16]:
features_df.show()

2022-05-24 08:39:46.086507: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-05-24 08:39:56.076926: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-05-24 08:39:56.114818: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the ap

+--------------------+----------------+--------------------+
|                path|           label|            features|
+--------------------+----------------+--------------------+
|file:/Users/franc...|      Strawberry|[0.0, 0.0, 0.0, 0...|
|file:/Users/franc...|      Strawberry|[0.0, 0.0, 0.0, 0...|
|file:/Users/franc...|      Strawberry|[0.0, 0.0, 0.0, 0...|
|file:/Users/franc...|      Strawberry|[0.0, 0.0, 0.0, 0...|
|file:/Users/franc...|            Kiwi|[0.0, 0.0, 0.0, 0...|
|file:/Users/franc...|            Kiwi|[0.0, 0.0, 0.0, 0...|
|file:/Users/franc...|            Kiwi|[0.0, 0.0, 0.0, 0...|
|file:/Users/franc...|            Kiwi|[0.0, 0.0, 0.0, 0...|
|file:/Users/franc...|AppleGrannySmith|[0.0, 0.0, 0.0, 0...|
|file:/Users/franc...|AppleGrannySmith|[0.0, 0.0, 0.0, 0...|
|file:/Users/franc...|AppleGrannySmith|[0.0, 0.0, 0.0, 0...|
|file:/Users/franc...|AppleGrannySmith|[0.0, 0.0, 0.0, 0...|
|file:/Users/franc...|       Blueberry|[0.0, 0.0, 0.0, 0...|
|file:/Users/franc...|  

## PCA - Dimension reduction

In [17]:
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
import seaborn as sns
import matplotlib.pyplot as plt

In [21]:

def pca_transformation(df, col_name:str, n_components:int=10, variance_plot:bool=False):
    
    """
    Apply PCA to all the image to reduce the number of features for the model

    Paramètres:
    df(pyspark dataFrame): dataframe with all the image information
    col_name: the dataframe column which to aapply the PCA
    n_components(int): nombre de dimensions à conserver
    """

    # Image data are convert into dense vector format
    to_vector_udf = udf(lambda r: Vectors.dense(r), VectorUDT())
    df = df.withColumn('X_vectors', to_vector_udf(col_name))

    # Fitting the PCA class
    pca = PCA(k=n_components, inputCol='X_vectors', outputCol='X_vectors_pca')
    model_pca = pca.fit(df)

    # Feature PCA transformation
    df = model_pca.transform(df)

    if variance_plot == True:
        # Show the Explained Variance of the model 
        var = model_pca.explainedVariance.cumsum()
        plt.figure(figsize=(15, 10))
        sns.set_context(context='poster', font_scale=0.8)
        sns.lineplot(x=[i for i in range(n_components + 1)], y=np.insert(var,0,0)*100, color='deepskyblue')
        plt.xlabel('PCs')
        plt.ylabel('Variance (%)')
        plt.ylim(0,100)
        plt.xlim(left=0)
        plt.show()      

    return df

In [22]:
with timer('Dimension Reduction - PCA'):
    df_final = pca_transformation(df=features_df, col_name='features', n_components=10)

2022-05-24 08:45:54.644738: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-05-24 08:46:08.678984: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-05-24 08:46:08.681497: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the ap

Dimension Reduction - PCA - done in 236s


22/05/24 08:49:45 WARN RowMatrix: The input data was not directly cached, which may hurt performance if its parent RDDs are also uncached.


In [23]:
df_final.show()

22/05/24 08:58:28 WARN DAGScheduler: Broadcasting large task binary with size 7.7 MiB
2022-05-24 08:58:33.591573: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
22/05/24 08:58:38 WARN DAGScheduler: Broadcasting large task binary with size 7.7 MiB
2022-05-24 08:58:46.681534: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-05-24 08:58:47.124010: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Net

+--------------------+----------------+--------------------+--------------------+--------------------+
|                path|           label|            features|           X_vectors|       X_vectors_pca|
+--------------------+----------------+--------------------+--------------------+--------------------+
|file:/Users/franc...|      Strawberry|[0.0, 0.0, 0.0, 0...|[0.0,0.0,0.0,0.0,...|[-311.87279540705...|
|file:/Users/franc...|      Strawberry|[0.0, 0.0, 0.0, 0...|[0.0,0.0,0.0,0.0,...|[-356.12003168267...|
|file:/Users/franc...|      Strawberry|[0.0, 0.0, 0.0, 0...|[0.0,0.0,0.0,0.0,...|[-327.85831097151...|
|file:/Users/franc...|      Strawberry|[0.0, 0.0, 0.0, 0...|[0.0,0.0,0.0,0.0,...|[-330.94629336428...|
|file:/Users/franc...|            Kiwi|[0.0, 0.0, 0.0, 0...|[0.0,0.0,0.0,0.0,...|[63.6163608498191...|
|file:/Users/franc...|            Kiwi|[0.0, 0.0, 0.0, 0...|[0.0,0.0,0.0,0.0,...|[64.6787560033586...|
|file:/Users/franc...|            Kiwi|[0.0, 0.0, 0.0, 0...|[0.0,0.0,0.0,

In [24]:
with timer('Write final results'):
    df_final.write.mode('overwrite').parquet("resultats_parquet")

    df_final = df_final.select('path', 'label', 'X_vectors_pca')
    df_final_pandas = df_final.toPandas()
    df_final_pandas.to_csv('results.csv', index=False)

22/05/24 08:58:56 WARN DAGScheduler: Broadcasting large task binary with size 7.9 MiB
2022-05-24 08:59:14.939762: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-05-24 08:59:15.147792: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-05-24 08:59:16.192999: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical op

Write final results - done in 48s


                                                                                