# P8 Déployez un modèle dans le cloud

In [1]:
# Librairies classiques
import pandas as pd
import numpy as np
import os
import io
import time
import argparse

# Lecture d'images
import PIL
from PIL import Image

# Pyspark
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import *

from pyspark.sql.functions import element_at, split
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.functions import col, pandas_udf, PandasUDFType
from pyspark.sql.functions import element_at, split

from pyspark.ml.feature import PCA
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors, VectorUDT, DenseVector
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier

# tensorflow
import tensorflow as tf
from tensorflow.keras.applications.xception import Xception, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array, load_img
tf.__version__

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1619440062961_0003,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'2.1.0'

## Chargement des données

In [2]:
data_source = 's3://p8bucket/images/*'
output_uri = 's3://p8bucket/resultats'

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Chargement des données en format "binaryFile"

In [3]:
# Chargement des données
# En format "binaryFile"
df_binary = spark.read.format("binaryFile") \
    .option("pathGlobFilter", "*.jpg") \
    .option("recursiveFileLookup", "true") \
    .load(data_source)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
df_binary.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://p8bucket/ima...|2021-04-23 06:33:02|  6989|[FF D8 FF E0 00 1...|
|s3://p8bucket/ima...|2021-04-23 06:34:21|  6987|[FF D8 FF E0 00 1...|
|s3://p8bucket/ima...|2021-04-23 06:33:14|  6984|[FF D8 FF E0 00 1...|
|s3://p8bucket/ima...|2021-04-23 06:33:00|  6982|[FF D8 FF E0 00 1...|
|s3://p8bucket/ima...|2021-04-23 06:33:01|  6973|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

### Extraction de la classe de chaque image de fruit

In [5]:
# On extrait la classe de chaque image de fruit
df_binary = df_binary.withColumn('classe', element_at(split(df_binary['path'], "/"), -2))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Extraction des features

In [6]:
## Extraction des features
# Modèle
model = Xception(
        include_top=False, # top layer supprimé
        weights=None,
        input_shape=(100,100,3),
        pooling='max'
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
## Extraction des features    

bc_model_weights = spark.sparkContext.broadcast(model.get_weights())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
## Fonctions utiles   

# https://docs.databricks.com/_static/notebooks/deep-learning/deep-learning-transfer-learning-keras.html
def model_fn():
    """
    Returns a Xception model with top layer 
    removed and broadcasted pretrained weights.
    """
    model = Xception(
                    include_top=False, # top layer supprimé
                    weights=None,
                    pooling='max')
    model.set_weights(bc_model_weights.value)
    return model

def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    # lecture + redimension (299x299) pour Xception
    img = PIL.Image.open(io.BytesIO(content)).resize([299, 299])
    arr = img_to_array(img) # image en array
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).
  
    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                            is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



In [9]:
## Extraction des features
        
# Pandas UDFs on large records (e.g., very large images) can run into Out Of Memory (OOM) errors.
# If you hit such errors in the cell below, try reducing the Arrow batch size via `maxRecordsPerBatch`.
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

# We can now run featurization on our entire Spark DataFrame.
# NOTE: This can take a long time (about 10 minutes) since it applies a large model to the full dataset.
features_df = df_binary.repartition(16).select(col("path"), col('classe'),featurize_udf("content").alias("X_features"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Réduction de dimension PCA

In [10]:
##  Réduction de dimension PCA   

def pca_transformation(df, n_components=8):
        
    """
    Applique un algorithme de PCA sur l'ensemble des images pour réduire la dimension de chaque image 
    du jeu de données.
    
    Paramètres:
    df(pyspark dataFrame): contient une colonne avec les données images
    n_components(int): nombre de dimensions à conserver
    """
    # Initilisation du temps de calcul
    start_time = time.time()

    # Les données images sont converties au format vecteur dense
    to_vector_udf = udf(lambda r: Vectors.dense(r), VectorUDT())
    df = df.withColumn('X_vectors', to_vector_udf('X_features'))
    df = df.select('path', 'classe', 'X_vectors')

    # Entrainement de l'algorithme
    pca = PCA(k=n_components, inputCol='X_vectors', outputCol='X_vectors_pca')
    model_pca = pca.fit(df)

    # Transformation des images sur les k premières composantes
    df = model_pca.transform(df)
    
    # Affiche le temps de calcul
    print("Temps d'execution {:.2f} secondes".format(time.time() - start_time))

    return df

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
df_final = pca_transformation(features_df, n_components=8)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Temps d'execution 1569.21 secondes

In [12]:
df_final.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----------+--------------------+--------------------+--------------------+
|                path|    classe|          X_features|           X_vectors|       X_vectors_pca|
+--------------------+----------+--------------------+--------------------+--------------------+
|s3://p8bucket/ima...|Watermelon|[5.9446484E-5, 7....|[5.94464836467523...|[-7.0447401522481...|
|s3://p8bucket/ima...|Watermelon|[6.372116E-5, 8.8...|[6.37211633147671...|[-0.0010249877359...|
|s3://p8bucket/ima...|Watermelon|[8.1717444E-5, 9....|[8.17174441181123...|[-9.3966337203747...|
|s3://p8bucket/ima...|Watermelon|[7.3510615E-5, 7....|[7.35106150386855...|[-9.1395292732477...|
|s3://p8bucket/ima...|Watermelon|[7.339455E-5, 7.3...|[7.33945489628240...|[-8.6866075690433...|
|s3://p8bucket/ima...|Watermelon|[8.35455E-5, 8.20...|[8.35454993648454...|[-9.5380504757187...|
|s3://p8bucket/ima...|Watermelon|[7.2197174E-5, 8....|[7.21971737220883...|[-8.3666898956553...|
|s3://p8bucket/ima...|Watermel

## Enregistrement des données au format parquet

In [13]:
## Enregistrement des données prétraitées (parquet)    
df_final.write.mode('overwrite').parquet(output_uri)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Mini classification

In [14]:
### Mini-classification
print("Mini-classification")
## Préparation des données
# drop the original data features column
df = df_final.drop('path', 'X_features', 'X_vectors')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Mini-classification

In [15]:
# Encodage de la cible
# estimator
l_indexer = StringIndexer(inputCol="classe", outputCol="labelIndex")
df = l_indexer.fit(df).transform(df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
# data splitting
(train,test) = df.randomSplit([0.7,0.3])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Régression logistique

In [17]:
print("Regression logistique")
# train our model using training data
lr = LogisticRegression(featuresCol = 'X_vectors_pca', labelCol = 'labelIndex', maxIter=10)
lrModel = lr.fit(train)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Regression logistique

In [18]:
# test our model and make predictions using testing data
predictions = lrModel.transform(test)
predictions.select("prediction", "labelIndex").show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----------+
|prediction|labelIndex|
+----------+----------+
|       0.0|       0.0|
|       0.0|       0.0|
|       4.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
+----------+----------+
only showing top 5 rows

In [19]:
# evaluate the performance of the classifier
evaluator = MulticlassClassificationEvaluator(labelCol="labelIndex",predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g " % accuracy)

evaluator = MulticlassClassificationEvaluator(labelCol="labelIndex",predictionCol="prediction", metricName='weightedPrecision')
precision = evaluator.evaluate(predictions)
print("Precision = %g " % precision)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Accuracy = 0.910008 
Precision = 0.909711

### Random forest

In [20]:
print("Random forest")
# train our model using training data
rf = RandomForestClassifier(labelCol="labelIndex",featuresCol='X_vectors_pca', numTrees=10)
rfModel = rf.fit(train)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Random forest

In [21]:
# test our model and make predictions using testing data
predictions = rfModel.transform(test)
predictions.select("prediction", "labelIndex").show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----------+
|prediction|labelIndex|
+----------+----------+
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
|       0.0|       0.0|
+----------+----------+
only showing top 5 rows

In [22]:
# evaluate the performance of the classifier
evaluator = MulticlassClassificationEvaluator(labelCol="labelIndex",predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g " % accuracy)

evaluator = MulticlassClassificationEvaluator(labelCol="labelIndex",predictionCol="prediction", metricName='weightedPrecision')
precision = evaluator.evaluate(predictions)
print("Precision = %g " % precision)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Accuracy = 0.926829 
Precision = 0.944467