# Projet 8 : Déployez un modèle dans le cloud
## Notebook preprocessing et featuring

*Julie Neury-Ormanni*

### Import des librairies

In [2]:
#imports
import pandas as pd
from PIL import Image
import numpy as np
import time
import io
import tensorflow as tf
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input, decode_predictions
from tensorflow.keras.preprocessing import image
from tensorflow.keras.preprocessing.image import img_to_array
from pyspark.sql.functions import col, pandas_udf, element_at, PandasUDFType, split, udf
from pyspark.ml.linalg import Vectors, VectorUDT
import gc

from pyspark.ml.feature import PCA, VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.linalg import Vectors, VectorUDT, DenseVector

### Import des images

In [4]:
path_image = 's3://neurybucket/Lychee_smaller/**'

images = spark.read.format("binaryFile") \
  .option("recursiveFileLookup", "true") \
  .load(path_image).limit(100)

images.show(n=5)

In [5]:
df.show()

+--------------------+--------------------+------+--------------------+
|                path|    modificationTime|length|             content|
+--------------------+--------------------+------+--------------------+
|file:/C:/Users/ne...|2022-03-24 11:47:...|  6064|[FF D8 FF E0 00 1...|
|file:/C:/Users/ne...|2022-03-24 11:47:...|  5899|[FF D8 FF E0 00 1...|
+--------------------+--------------------+------+--------------------+



### Preprocessing des images et transfer learning

In [6]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
  # For some layers, output features will be multi-dimensional tensors.
  # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    #ouput = np.array(output)
    return pd.Series(output)

model = ResNet50(weights=None, include_top=False)
bc_model_weights = sc.broadcast(model.get_weights())

def model_fn():
    """
    Returns a ResNet50 model with top layer removed and broadcasted pretrained weights.
    """
    model = ResNet50(weights=None, include_top=False)
    model.set_weights(bc_model_weights.value)
    return model

### Extraction des features

In [7]:
from typing import Iterator, Tuple
@pandas_udf('array<float>')
def featurize_udf(content_series_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).
    :param content_series_iter: This argument is an iterator over batches of data, where each batch
    is a pandas Series of image data.
    '''
  # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
  # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

In [8]:
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

In [9]:
df_feat = df.repartition(16).select(col("path"), featurize_udf("content").alias("feats"))

In [10]:
# Getting the label by splitting the path of the image and getting its last directory
df_feat = (df_feat.withColumn('label',element_at(split(df_feat['path'],"/"),-2)))

df_feat.show(n=3)

+--------------------+--------------------+--------------+
|                path|               feats|         label|
+--------------------+--------------------+--------------+
|file:/C:/Users/ne...|[9.599431, 2.8754...|Lychee_smaller|
|file:/C:/Users/ne...|[9.559483, 2.8543...|Lychee_smaller|
+--------------------+--------------------+--------------+



### Réduction des features par ACP

In [11]:
def pca_transformation(df, n_components=50, col_image='feats'):
    
    """
    Applique un algorithme de PCA sur l'ensemble des images pour réduire la dimension de chaque image 
    du jeu de données.
    
    Paramètres:
    df(pyspark dataFrame): contient une colonne avec les données images
    n_components(int): nombre de dimensions à conserver
    col_image(string): nom de la colonne où récupérer les données images
    """

    # Initilisation du temps de calcul
    start_time = time.time()

    # Les données images sont converties au format vecteur dense
    ud_f = udf(lambda r: Vectors.dense(r), VectorUDT())
    df = df.withColumn('feats', ud_f('feats'))
    
    standardizer = StandardScaler(inputCol='feats', outputCol="scaledFeatures",
                                  withStd=True, withMean=True)
    model_std = standardizer.fit(df)
    df = model_std.transform(df)

    # Entrainement de l'algorithme
    pca = PCA(k=n_components, inputCol='scaledFeatures', outputCol='pcaFeatures')
    model_pca = pca.fit(df)

    # Transformation des images sur les k premières composantes
    df = model_pca.transform(df)

    df = df.filter(df.pcaFeatures.isNotNull())
    
    # Affiche le temps de calcul
    print("Temps d'execution {:.2f} minutes".format((time.time() - start_time)/60))


    return df

In [12]:
print("---Réduction dimmensionnelle---")
pca_df = pca_transformation(df_feat)
pca_df.show(n=1)

---Réduction dimmensionnelle---


PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "C:\Users\neury\Anaconda3\lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 584, in main
  File "C:\Users\neury\Anaconda3\lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 562, in read_int
    length = stream.read(4)
  File "C:\Users\neury\Anaconda3\lib\socket.py", line 669, in readinto
    return self._sock.recv_into(b)
socket.timeout: timed out


### Export des features en parquet

In [12]:
pca_df.select(['path', 'label', 'pcaFeatures']).write.parquet(r'C:\Users\neury\Documents\Python\P8\output_features.parquet')