# Projet 8 - Déployez un modèle dans le cloud

## 1/ Chargement des librairies

In [15]:
from PIL import Image
from io import BytesIO, StringIO
import pandas as pd
import numpy as np
import io
import boto3

from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array

from pyspark.sql.functions import col, pandas_udf, PandasUDFType, udf, split

## 2/ Création Session et Context Spark

In [2]:
import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

spark = SparkSession.builder \
          .master('local[*]') \
          .appName('sparkyfruitp8') \
          .getOrCreate()

sc = spark.sparkContext
sc.setLogLevel('ERROR')

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/11/23 09:20:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', 'true')

## 3/ Chargement des images

### Lecture des images au format binaire dans notre bucket S3

In [4]:
s3path = 's3a://sparkyfruitp8/'

imgS3 = spark.read.format('binaryFile')\
.option('pathGlobFilter', '*.jpg')\
.option('recursiveFileLookup', 'true').load(s3path).toDF('path', 'modificationTime', 'length', 'content')

print(type(imgS3))
print(imgS3.printSchema())

<class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)

None


In [5]:
imgS3.show(5)

                                                                                

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3a://sparkyfruit...|2022-11-16 10:36:06|  5969|[FF D8 FF E0 00 1...|
|s3a://sparkyfruit...|2022-11-16 10:36:05|  5768|[FF D8 FF E0 00 1...|
|s3a://sparkyfruit...|2022-11-16 10:36:06|  5654|[FF D8 FF E0 00 1...|
|s3a://sparkyfruit...|2022-11-16 10:36:06|  5449|[FF D8 FF E0 00 1...|
|s3a://sparkyfruit...|2022-11-16 10:36:04|  4984|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows



### Récupération du label

In [6]:
imgS3 = imgS3.withColumn('label', split(imgS3.path, '/')[3]).select(['path', 'content', 'label'])
imgS3.show(3)

+--------------------+--------------------+-----+
|                path|             content|label|
+--------------------+--------------------+-----+
|s3a://sparkyfruit...|[FF D8 FF E0 00 1...|Cocos|
|s3a://sparkyfruit...|[FF D8 FF E0 00 1...|Cocos|
|s3a://sparkyfruit...|[FF D8 FF E0 00 1...|Cocos|
+--------------------+--------------------+-----+
only showing top 3 rows



## 4/ Chargement du CNN

### Suppression de la dernière couche de classification et récupération des poids

In [7]:
model = VGG16(include_top=False, pooling='max')

bc_model_weights = sc.broadcast(model.get_weights())

2022-11-23 09:20:45.605376: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2022-11-23 09:20:45.605428: W tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:265] failed call to cuInit: UNKNOWN ERROR (303)
2022-11-23 09:20:45.605454: I tensorflow/compiler/xla/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (ip-172-31-41-170.eu-west-3.compute.internal): /proc/driver/nvidia/version does not exist
2022-11-23 09:20:45.605766: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


## 5/ Création des fonctions pour l'extraction des features

### CNN

In [8]:
def model_fn():
    """
    Returns a VGG16 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model_vgg = VGG16(weights=None, include_top=False, pooling='max')
    model_vgg.set_weights(bc_model_weights.value)
    return model_vgg

### Preprocessing des images

In [9]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input_ = np.stack(content_series.map(preprocess))
    preds = model.predict(input_)
    # For some layers, output features will be multi-dimensional 
    #tensors.
    # We flatten the feature tensors to vectors for easier storage
    #in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

### Fonction Pandas UDF

In [10]:
@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping 
    our featurization function.
    The decorator specifies that this returns a 
    Spark DataFrame column of type ArrayType(FloatType).
  
    :param content_series_iter: This argument is an 
    iterator over batches of data, where each batch
    is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load 
    #the model once and then re-use it
    # for multiple data batches.  This amortizes the 
    #overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

## 6/ Extraction des features et enregistrement des résultats dans le bucket S3

In [11]:
features_df = imgS3.repartition(1).select(col('path'), col('label'), featurize_udf('content').alias('features'))

### Format parquet

In [12]:
features_df.write.mode('overwrite').parquet('s3a://sparkyfruitp8-results/results_udf')

2022-11-23 09:20:49.900184: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-11-23 09:20:50.171725: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2022-11-23 09:20:50.171838: I tensorflow/compiler/xla/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
2022-11-23 09:20:51.072843: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory
2022-

### Format CSV

In [13]:
results_df = features_df.toPandas()

2022-11-23 09:21:03.479831: W tensorflow/tsl/framework/cpu_allocator_impl.cc:82] Allocation of 308281344 exceeds 10% of free system memory.
2022-11-23 09:21:03.694937: W tensorflow/tsl/framework/cpu_allocator_impl.cc:82] Allocation of 308281344 exceeds 10% of free system memory.
                                                                                

In [14]:
csv_buffer = StringIO()
results_df.to_csv(csv_buffer)
# boto client
client = boto3.client('s3')
# put the object
response = client.put_object(
    Body=csv_buffer.getvalue(),
    Bucket='sparkyfruitp8-results',
    Key='results_udf.csv')