In [1]:
# spark
import findspark
findspark.init()
import pyspark
pyspark.__version__

'3.1.2'

In [2]:
# Librairies

import os
import sys
import time
import datetime
from datetime import datetime
import pandas as pd
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import warnings

# Pyspark
import pyspark
from pyspark.sql.functions import element_at, split, col, pandas_udf, PandasUDFType, udf
from pyspark.sql.types import StringType
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

# Tensorflow Keras
import tensorflow as tf
from tensorflow.keras.applications.inception_v3 import InceptionV3, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array, load_img

import pyarrow

# Images
import PIL
from PIL import Image

from pyspark.ml.image import ImageSchema

# # Dimension reduction - PCA
# from pyspark.ml.feature import PCA
# from pyspark.ml.feature import StandardScaler
# from pyspark.ml.linalg import Vectors, VectorUDT, DenseVector

# # Modeling
# from pyspark.ml.feature import StringIndexer, VectorAssembler
# from pyspark.ml.classification import DecisionTreeClassifier
# from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from typing import Iterator
warnings.filterwarnings('ignore')

In [3]:
# Versions
print('Libraries used:')
print('Python        : ' + sys.version)
print('pyspark       : ' + pyspark.__version__)
print('PIL           : ' + PIL.__version__)
print('Numpy         : ' + np.__version__)
print('Pandas        : ' + pd.__version__)
print('Matplotlib    : ' + matplotlib.__version__)
print('Pyarrow       : ' + pyarrow.__version__)

Libraries used:
Python        : 3.8.8 (tags/v3.8.8:024d805, Feb 19 2021, 13:18:16) [MSC v.1928 64 bit (AMD64)]
pyspark       : 3.1.2
PIL           : 9.0.1
Numpy         : 1.22.3
Pandas        : 1.4.1
Matplotlib    : 3.5.1
Pyarrow       : 7.0.0


In [4]:
# os.environ['PYSPARK_DRIVER_PYTHON_OPTS']= "notebook"
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['PYSPARK_PYTHON'] = sys.executable

In [5]:
import pyspark

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.sql("select 'spark' as hello ")

df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



In [6]:
# Local path
PATH = os.getcwd()
PATH_Data = PATH+'/Sample'
PATH_Result = PATH+'/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

PATH:        c:\Users\steph\Documents\Formation_Data_Scientist\P8_Lanchec_Stephane
PATH_Data:   c:\Users\steph\Documents\Formation_Data_Scientist\P8_Lanchec_Stephane/Sample
PATH_Result: c:\Users\steph\Documents\Formation_Data_Scientist\P8_Lanchec_Stephane/Results


In [7]:
import boto3
# Get resources stored in AWS S3 service
s3 = boto3.resource('s3')

In [8]:
spark = (SparkSession
             .builder
             .appName('P8')
             .master('local')
             .config("spark.sql.parquet.writeLegacyFormat", 'true')
             .getOrCreate()
)
sc = spark.sparkContext

In [9]:
nums = sc.parallelize([1, 2, 3, 4])
nums.map(lambda x: x*x).collect()

[1, 4, 9, 16]

In [10]:
data_path = "Sample/**"

In [11]:
start_time = datetime.now()
df = spark.read.format("image").load(data_path) 
# extract label from image.origin
df = df.withColumn('label', element_at(split(df['image.origin'], "/"), -2))
df.select(
    'image.origin',
    'image.height',
    'image.width',
    'image.nChannels',
    'image.mode',
    'image.data',
    'label'
    ).show(6, True)
elapsed = datetime.now() - start_time
print(f'Running time (min): {elapsed}')

+--------------------+------+-----+---------+----+--------------------+---------------+
|              origin|height|width|nChannels|mode|                data|          label|
+--------------------+------+-----+---------+----+--------------------+---------------+
|file:///c:/Users/...|   714|  721|        3|  16|[FF FF FF FF FF F...|cabbage_white_1|
|file:///c:/Users/...|   713|  715|        3|  16|[FF FF FF FF FF F...|cabbage_white_1|
|file:///c:/Users/...|   711|  713|        3|  16|[FF FF FF FF FF F...|cabbage_white_1|
|file:///c:/Users/...|   798|  323|        3|  16|[FF FF FF FF FF F...|     cucumber_1|
|file:///c:/Users/...|   799|  320|        3|  16|[FF FF FF FF FF F...|     cucumber_1|
|file:///c:/Users/...|   800|  319|        3|  16|[FF FF FF FF FF F...|     cucumber_1|
+--------------------+------+-----+---------+----+--------------------+---------------+
only showing top 6 rows

Running time (min): 0:00:14.496817


In [12]:
df.printSchema()

root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = true)
 |    |-- width: integer (nullable = true)
 |    |-- nChannels: integer (nullable = true)
 |    |-- mode: integer (nullable = true)
 |    |-- data: binary (nullable = true)
 |-- label: string (nullable = true)



### Binary file

In [13]:
start_time = datetime.now()
# Chargement des images du train set au format "binaryFile"
df_binary = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(data_path)
# extract label from image.origin
df_binary = df_binary.withColumn("Label", element_at(split(df_binary["path"], "/"), -2))
df_binary.select(
    'path',
    'Label',
    ).show()
elapsed = datetime.now() - start_time
print(f'Running time (min): {elapsed}')

+--------------------+---------------+
|                path|          Label|
+--------------------+---------------+
|file:/c:/Users/st...|cabbage_white_1|
|file:/c:/Users/st...|cabbage_white_1|
|file:/c:/Users/st...|cabbage_white_1|
|file:/c:/Users/st...|     cucumber_1|
|file:/c:/Users/st...|     cucumber_1|
|file:/c:/Users/st...|     cucumber_1|
|file:/c:/Users/st...|     cucumber_1|
|file:/c:/Users/st...|       carrot_1|
|file:/c:/Users/st...|       carrot_1|
|file:/c:/Users/st...|       carrot_1|
|file:/c:/Users/st...| apple_rotten_1|
|file:/c:/Users/st...| apple_rotten_1|
|file:/c:/Users/st...| apple_rotten_1|
+--------------------+---------------+

Running time (min): 0:00:00.320974


In [14]:
df_images = df_binary.select("path", "Label")
df_images.show()

+--------------------+---------------+
|                path|          Label|
+--------------------+---------------+
|file:/c:/Users/st...|cabbage_white_1|
|file:/c:/Users/st...|cabbage_white_1|
|file:/c:/Users/st...|cabbage_white_1|
|file:/c:/Users/st...|     cucumber_1|
|file:/c:/Users/st...|     cucumber_1|
|file:/c:/Users/st...|     cucumber_1|
|file:/c:/Users/st...|     cucumber_1|
|file:/c:/Users/st...|       carrot_1|
|file:/c:/Users/st...|       carrot_1|
|file:/c:/Users/st...|       carrot_1|
|file:/c:/Users/st...| apple_rotten_1|
|file:/c:/Users/st...| apple_rotten_1|
|file:/c:/Users/st...| apple_rotten_1|
+--------------------+---------------+



### InceptionV3 for feature extraction

Inception V3 can work any size of image as long as your image has 3 channels. Because ImageNet images consist of 3 channels. The reason it can work with any size is that convolutions do not care about image-sizes. For this, you need to set include_top = False, otherwise your image size should match with model's defined size, (299,299,3).

In [15]:
# Instanciation du modèle
model = InceptionV3(
        include_top=False,  
        weights='imagenet',  # Trained on Imagenet
        input_shape=(100,100,3), # Image 100x100 (channel=3)
        pooling='max' # Max pooling
)

In [16]:
# Description des caractéristiques du modèle
model.summary()

Model: "inception_v3"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 conv2d (Conv2D)                (None, 49, 49, 32)   864         ['input_1[0][0]']                
                                                                                                  
 batch_normalization (BatchNorm  (None, 49, 49, 32)  96          ['conv2d[0][0]']                 
 alization)                                                                                       
                                                                                       

In [17]:
# Tous les workeurs doivent pouvoir accéder au modèle ainsi qu'à ses poids.
# Une bonne pratique consiste à charger le modèle sur le driver puis à diffuser
# ensuite les poids aux différents workeurs.
# get weights as broadcasted variable over nodes (provide a copy to each node)
model_weights = spark.sparkContext.broadcast(model.get_weights())

### Through Images only

In [18]:
def model_fn():
  """
  Renvoie un modèle Inception3 avec la couche supérieure supprimée et les poids pré-entraînés sur imagenet diffusés.
  """
  model = InceptionV3(
        include_top=False,  # Couche softmax de classification supprimée
        weights='imagenet',  # Poids pré-entraînés sur Imagenet
        pooling='max') # Utilisation du max de pooling
  
  model.set_weights(model_weights.value)
  
  return model

In [19]:
# Redimensionnement des images en 299x299
def preprocess(content):
    """
    Prétraite les octets de l'image brute pour la prédiction.
    param : content : objet image, obligatoire
    return : image redimensionnée en Array
    """
    # lecture + redimension (299x299) pour Xception
    img = PIL.Image.open(io.BytesIO(content)).resize([299, 299])
    # transforme l'image en Array     
    arr = img_to_array(img)
    return preprocess_input(arr)

In [25]:
# function to get tensors from batch path
def gettensorfrompath(image_path):
    # path = image_path.replace("file://", "")
    img = load_img(image_path)
    x = img_to_array(img)
    x = preprocess_input(x)
    return x

In [26]:
# target pandas user defined function to make operation on dataframe with pyspark.sql
@pandas_udf('array<double>')
def featurize(images_data_iter: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    # load model outside of for loop
    model = model_fn()
    for image_data_series in images_data_iter:
        image_path_series = image_data_series['origin']
        # Apply functions to entire series at once
        x = image_path_series.map(gettensorfrompath)
        x = np.stack(list(x.values))
        # option is to enable batch_size
        features = model.predict(x)
        features_flat = [p.flatten() for p in features]
        yield pd.Series(features_flat)

In [27]:
# apply featurization
featurized_df = df.withColumn('cnn_features', featurize('image')).cache()

In [28]:
start = time.perf_counter()
featurized_df.show(3)
stop = time.perf_counter()
print(f'data load with spark.read, elapsed time: {stop - start:0.2f}s')

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "C:\Spark\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\worker.py", line 604, in main
  File "C:\Spark\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\worker.py", line 596, in process
  File "C:\Spark\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\sql\pandas\serializers.py", line 273, in dump_stream
    return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream)
  File "C:\Spark\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\sql\pandas\serializers.py", line 81, in dump_stream
    for batch in iterator:
  File "C:\Spark\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\sql\pandas\serializers.py", line 266, in init_stream_yield_batches
    for series in iterator:
  File "C:\Spark\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\worker.py", line 356, in func
  File "C:\Users\steph\AppData\Local\Temp\ipykernel_13088\3299834618.py", line 9, in featurize
  File "c:\Users\steph\Documents\Formation_Data_Scientist\P8_Lanchec_Stephane\venv\lib\site-packages\pandas\core\series.py", line 4237, in map
    new_values = self._map_values(arg, na_action=na_action)
  File "c:\Users\steph\Documents\Formation_Data_Scientist\P8_Lanchec_Stephane\venv\lib\site-packages\pandas\core\base.py", line 880, in _map_values
    new_values = map_f(values, mapper)
  File "pandas\_libs\lib.pyx", line 2870, in pandas._libs.lib.map_infer
  File "C:\Users\steph\AppData\Local\Temp\ipykernel_13088\1412985192.py", line 4, in gettensorfrompath
  File "c:\Users\steph\Documents\Formation_Data_Scientist\P8_Lanchec_Stephane\venv\lib\site-packages\keras\preprocessing\image.py", line 313, in load_img
    return image.load_img(path, grayscale=grayscale, color_mode=color_mode,
  File "c:\Users\steph\Documents\Formation_Data_Scientist\P8_Lanchec_Stephane\venv\lib\site-packages\keras_preprocessing\image\utils.py", line 113, in load_img
    with open(path, 'rb') as f:
OSError: [Errno 22] Invalid argument: 'file:///c:/Users/steph/Documents/Formation_Data_Scientist/P8_Lanchec_Stephane/Sample/cabbage_white_1/r0_172.jpg'


### ---------------------

In [None]:
# Extraction des features par le modèle dans un vecteur
def featurize_series(model, content_series):
  """
  Featurise une pd.Series d'images brutes en utilisant le modèle d'entrée.
  param : 
    model : modèle à utiliser pour l'extraction, obligatoire.
    content_series : image redimensionnée (299, 299, 3) en Array
  :return: les features importantes de l'image en pd.Series.
  """
  input = np.stack(content_series.map(preprocess))
  # Prédiction du modèle
  preds = model.predict(input)
  # Pour certaines couches, les caractéristiques de sortie seront des tenseurs multidimensionnels.
  # Nous aplatissons les tenseurs de caractéristiques en vecteurs pour faciliter le stockage dans
  # les DataFrames de Spark.
  output = [p.flatten() for p in preds]
  
  return pd.Series(output)

In [None]:
@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    Cette méthode est un Scalar Iterator pandas UDF enveloppant notre fonction de featurisation.
    Le décorateur spécifie que cette méthode renvoie une colonne Spark DataFrame de type ArrayType(FloatType).

    :param content_series_iter : Cet argument est un itérateur sur des lots de données, où chaque lot est une série pandas de données d'image.
    '''
  # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
  # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

In [None]:
# Les UDF de Pandas sur de grands enregistrements (par exemple, de très grandes images) peuvent rencontrer des erreurs de type Out Of Memory (OOM).
# Si vous rencontrez de telles erreurs dans la cellule ci-dessous, essayez de réduire la taille du lot Arrow via `maxRecordsPerBatch`.
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

In [None]:
# Nous pouvons maintenant exécuter la featurisation sur l'ensemble de notre DataFrame Spark.
# REMARQUE : Cela peut prendre beaucoup de temps (environ 10 minutes) car il applique un grand modèle à l'ensemble des données.
# features_df = df_binary.repartition(16).select(col("path"), col('Label'), featurize_udf("content").alias("features"))
features_df = df_binary.select(col("path"), col('Label'), featurize_udf("content").alias("features"))

#### Solution 2

In [None]:
features_df2 = df_binary.repartition(16).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

In [None]:
# Images number
features_df2.count()

In [None]:
# Exemple des features?
features_df2.show()

In [None]:
features_df2.write.mode("overwrite").parquet(PATH_Result)

In [None]:
start = time.perf_counter()
# features_df.printSchema()
features_df2.show()
stop = time.perf_counter()
print(f'data load with spark.read, elapsed time: {stop - start:0.2f}s')