# **Projet 8 :** Déployez un modèle dans le cloud

## Notebook de test des script Pyspark

### Import

In [2]:
%matplotlib inline
import os
import pandas as pd
from tqdm.notebook import tqdm

### Configuration

In [5]:
%load_ext autoreload
%autoreload 2

# configurations pandas
pd.set_option('display.max_rows', 250)
pd.set_option('display.max_columns', 250)

In [4]:
images_path = os.path.normpath("./data/fruits-360_dataset/fruits-360/training")

### Chargement des données avec pandas

Nous commençons par charger les images disponible dans le dossier training dans une dataframe pandas pour analyser la taille et le mode des images
Toutes les les images sont en 100x100x3 et il y a exactement 131 classes

In [6]:
from PIL import Image
df = pd.DataFrame(columns=['filename', 'path', 'label', 'width', 'height', 'mode'])
for i, (root, subdirs, files) in tqdm(enumerate(os.walk(images_path))):
    for j, file in enumerate(files):
        if file.lower().endswith(".jpg") or file.lower().endswith(".png") or file.lower().endswith(".bmp") or file.lower().endswith(".gif"):
            image_fullpath = os.path.join(root, file)
            image = Image.open(image_fullpath)
            width, height = image.size
            mode = image.mode
            label = root.split(os.path.sep)[-1]
            df.loc[f"{i}.{j}"] = [file, root, label, width, height, mode]
df.info()

0it [00:00, ?it/s]

<class 'pandas.core.frame.DataFrame'>
Index: 67692 entries, 1.0 to 131.474
Data columns (total 6 columns):
 #   Column    Non-Null Count  Dtype 
---  ------    --------------  ----- 
 0   filename  67692 non-null  object
 1   path      67692 non-null  object
 2   label     67692 non-null  object
 3   width     67692 non-null  int64 
 4   height    67692 non-null  int64 
 5   mode      67692 non-null  object
dtypes: int64(2), object(4)
memory usage: 3.6+ MB


In [7]:
df.head()

Unnamed: 0,filename,path,label,width,height,mode
1.0,0_100.jpg,data\fruits-360_dataset\fruits-360\training\ap...,apple_braeburn,100,100,RGB
1.1,100_100.jpg,data\fruits-360_dataset\fruits-360\training\ap...,apple_braeburn,100,100,RGB
1.2,101_100.jpg,data\fruits-360_dataset\fruits-360\training\ap...,apple_braeburn,100,100,RGB
1.3,102_100.jpg,data\fruits-360_dataset\fruits-360\training\ap...,apple_braeburn,100,100,RGB
1.4,103_100.jpg,data\fruits-360_dataset\fruits-360\training\ap...,apple_braeburn,100,100,RGB


In [5]:
df.describe(include='all')

Unnamed: 0,filename,path,label,width,height,mode
count,67692,67692,67692,67692.0,67692.0,67692
unique,1717,131,131,,,1
top,14_100.jpg,data\fruits-360_dataset\fruits-360\training\gr...,grape_blue,,,RGB
freq,115,984,984,,,67692
mean,,,,100.0,100.0,
std,,,,0.0,0.0,
min,,,,100.0,100.0,
25%,,,,100.0,100.0,
50%,,,,100.0,100.0,
75%,,,,100.0,100.0,


In [6]:
sample_size = 1  # Numbre de sample par classe
sample_df = df.groupby('label').apply(lambda x: x.sample(sample_size))

In [17]:
## Export sous forme csv pour des questions pratique et pour le chargement des images dans le S3

# def _apply_fn(x):
#     x['image'] = os.path.normpath(os.path.join(x.path, x.filename))
#     return x
# df.apply(_apply_fn, axis=1)[['label', 'image']].to_csv("./data/fruits-360_dataset/fruits-360/training_images.csv", index=False)

### Chargement des données pyspark

In [7]:
## Imports
import io
import os
import numpy as np
import pandas as pd
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import element_at, split, col, udf
from PIL import Image
from tensorflow.keras.applications.imagenet_utils import preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.applications import ResNet50
from pyspark.ml.linalg import DenseVector, VectorUDT
from pyspark.ml.clustering import KMeans

In [8]:
## PYSPARK CONFIGURATION
os.environ['PYSPARK_PYTHON'] = r"python.exe"
os.environ['PYSPARK_DRIVER_PYTHON'] = r"python.exe"

In [16]:
## STEP 0: Create SparkSession
conf = SparkConf() \
        .setMaster("local[*]") \
        .setAppName('oc_ds_p8') \
        .set("spark.executor.heartbeatInterval", "600000") \
        .set("spark.network.timeout", "600000")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext
spark

In [17]:
## STEP 1: Load images

images_paths = sample_df[['filename', 'path']].apply(lambda x: os.path.join(x.path, x.filename), axis=1).to_list()
images_df = spark.read.format("binaryFile").option("pathGlobFilter", "*.jpg").load(images_paths).cache()
images_df = images_df.withColumn('label', element_at(split(col('path'), '/'),-2))
images_df.printSchema()
print(images_df.count())
images_df.show(5)

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

131
+--------------------+-------------------+------+--------------------+--------------+
|                path|   modificationTime|length|             content|         label|
+--------------------+-------------------+------+--------------------+--------------+
|file:/c:/Users/fl...|2021-09-12 19:26:52|  6837|[FF D8 FF E0 00 1...|pineapple_mini|
|file:/c:/Users/fl...|2021-09-12 19:26:22|  6433|[FF D8 FF E0 00 1...|        lychee|
|file:/c:/Users/fl...|2021-09-12 19:27:06|  6366|[FF D8 FF E0 00 1...|    watermelon|
|file:/c:/Users/fl...|2021-09-12 19:26:00|  6184|[FF D8 FF E0 00 1...|   cauliflower|
|file:/c:/Users/fl...|2021-09-12 19:27:00|  6112|[FF D8 FF E0 00 1...|      rambutan|
+--------------------+-------------------+------+--------------------+--------------+
only showing top 

In [18]:
## STEP 2: Featurize images avec ResNet50 (bonnes performances)
# Utilisation de udf pour extraire les features par batch d'images

model = ResNet50(include_top=False, input_shape=(None, None, 3), weights="imagenet", pooling="avg")
bc_model_weights = sc.broadcast(model.get_weights())

@udf(returnType=VectorUDT())
def features_vectorizer_1(content):
    img = Image.open(io.BytesIO(content))
    arr = img_to_array(img)
    arr = preprocess_input(arr)
    model = ResNet50(include_top=False, input_shape=arr.shape, weights=None, pooling="avg")
    model.set_weights(bc_model_weights.value)
    features = model.predict(np.array([arr]))
    return DenseVector(features.flatten())

features_df = images_df.select('path', 'label', features_vectorizer_1('content').alias('features'))
features_df.printSchema()
print(features_df.count())
features_df.show(5)

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- features: vector (nullable = true)

131
+--------------------+--------------+--------------------+
|                path|         label|            features|
+--------------------+--------------+--------------------+
|file:/c:/Users/fl...|pineapple_mini|[0.0,0.4202145636...|
|file:/c:/Users/fl...|        lychee|[0.0,0.0832873880...|
|file:/c:/Users/fl...|    watermelon|[0.04883571714162...|
|file:/c:/Users/fl...|   cauliflower|[0.0,3.6171073913...|
|file:/c:/Users/fl...|      rambutan|[0.0,0.2057400047...|
+--------------------+--------------+--------------------+
only showing top 5 rows



In [20]:
## STEP 3: Cluster images (facultatif: catégorisation des images grace aux features)

k = features_df.select("label").distinct().count()
print('k' , k)

kmeans = KMeans(k=k, seed=1)
kmeans.setFeaturesCol('features')
kmeans.setPredictionCol('prediction')

kmeans_model = kmeans.fit(features_df)
cluster_df = kmeans_model.transform(features_df)

cluster_df.printSchema()
print(cluster_df.count())
cluster_df.show(5)

k 131
root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- features: vector (nullable = true)
 |-- prediction: integer (nullable = false)

131
+--------------------+--------------+--------------------+----------+
|                path|         label|            features|prediction|
+--------------------+--------------+--------------------+----------+
|file:/c:/Users/fl...|pineapple_mini|[0.0,0.4202145636...|         1|
|file:/c:/Users/fl...|        lychee|[0.0,0.0832873880...|         2|
|file:/c:/Users/fl...|    watermelon|[0.04883571714162...|         3|
|file:/c:/Users/fl...|   cauliflower|[0.0,3.6171073913...|         4|
|file:/c:/Users/fl...|      rambutan|[0.0,0.2057400047...|         5|
+--------------------+--------------+--------------------+----------+
only showing top 5 rows



In [21]:
## Lesture du resultat

result = cluster_df.select('path', 'label', 'features', 'prediction').toPandas()
result.head()

  Unsupported type in conversion to Arrow: VectorUDT
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.


Unnamed: 0,path,label,features,prediction
0,file:/c:/Users/flori/DataProjects/P8_lotte_flo...,pineapple_mini,"[0.0, 0.42021456360816956, 5.729567050933838, ...",1
1,file:/c:/Users/flori/DataProjects/P8_lotte_flo...,lychee,"[0.0, 0.08328738808631897, 0.15632647275924683...",2
2,file:/c:/Users/flori/DataProjects/P8_lotte_flo...,watermelon,"[0.048835717141628265, 0.5986989736557007, 0.9...",3
3,file:/c:/Users/flori/DataProjects/P8_lotte_flo...,cauliflower,"[0.0, 3.617107391357422, 2.1287989616394043, 0...",4
4,file:/c:/Users/flori/DataProjects/P8_lotte_flo...,rambutan,"[0.0, 0.20574000477790833, 1.0261861085891724,...",5


### Execution JupyterHub sur cluster EMR

In [4]:
# Execution du code sur le JupiterHub du cluster EMR de Amazon

# start pyspark session

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1658852978466_0002,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1658852978466_0002,pyspark,idle,Link,Link,,✔


In [11]:
## STEP 1: Load images

s3_uri = "s3://oc-p8/images/**"
images_df = spark.read.format("binaryFile").option("pathGlobFilter", "*.jpg").load(s3_uri).cache()
images_df = images_df.withColumn('label', element_at(split(col('path'), '/'),-2))
images_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

In [12]:
## STEP 2: Featurize images

model = ResNet50(include_top=False, input_shape=(None, None, 3), weights="imagenet", pooling="avg")
bc_model_weights = sc.broadcast(model.get_weights())

@udf(returnType=VectorUDT())
def features_vectorizer_1(content):
    img = Image.open(io.BytesIO(content))
    arr = img_to_array(img)
    arr = preprocess_input(arr)
    model = ResNet50(include_top=False, input_shape=arr.shape, weights=None, pooling="avg")
    model.set_weights(bc_model_weights.value)
    features = model.predict(np.array([arr]))
    return DenseVector(features.flatten())

features_df = images_df.select('path', 'label', features_vectorizer_1('content').alias('features'))
features_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5
root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)
 |-- features: vector (nullable = true)

In [13]:
## STEP 3: Cluster images

k = features_df.select("label").distinct().count()
print('k' , k)

kmeans = KMeans(k=k, seed=1)
kmeans.setFeaturesCol('features')
kmeans.setPredictionCol('prediction')

kmeans_model = kmeans.fit(features_df.select('features'))
cluster_df = kmeans_model.transform(features_df)
cluster_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

k 131
root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)
 |-- features: vector (nullable = true)
 |-- prediction: integer (nullable = false)

In [None]:
## Enregistrement du resultat sur le S3 dans un fichier CSV
## Le fichier CSV est accessible via le lien suivant: https://oc-p8.s3.eu-west-3.amazonaws.com/training_result.csv

cluster_df.select('path', 'label', 'features', 'prediction').toPandas().to_csv("s3://oc-p8/training_result.csv")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Export du notebook en HTML

In [8]:
!jupyter nbconvert --config nbconvert/config_html.py

[NbConvertApp] Converting notebook P8_01_notebookexploration.ipynb to html
[NbConvertApp] Writing 637222 bytes to P8_01_notebookexploration.html
