# PROJET : Déployer un modèle de Deep Learning dans le Cloud 

**Autor** : Louis BIRENHOLZ    
**Date** : 15/07/2020  

Pour ce projet, nous cherchons à venir en aide à une start-up de l'**AgriTech** voulant développer des solutions innovantes pour la récolte de fruit. Cette solution passe par la création d'une application permettant de classer des fruits en différentes catégories via une photo. Cette application permettrait de sensibiliser le grand public à la biodiversité des fruits et de mettre en place une première version du moteur de classification des images de fruits.

**Nous cherchons à anticipier le fait que le volume de données va augmenter très rapidement**, ainsi nous travaillerons via des outils **Big Data** pour se préparer au passage à l'échelle.

Ce notebook réalise une partie de preprocessing des données dans un environnement **Big Data** via le package **SparkDL**.

## Librairies

In [1]:
# Classic lib.
import os
import sys
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import seaborn as sns
import pandas as pd
import numpy as np

# Spark.
import findspark
import spark
findspark.init()

# Pyspark.
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

## Settings & SparkSession.

In [2]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2,databricks:spark-deep-learning:1.5.0-spark2.4-s_2.11 pyspark-shell'
# SparkSession.
spark = SparkSession.builder.master('local[*]').appName('P8').getOrCreate()

In [3]:
spark

## Loading Data

In [4]:
# OTHER LIBRAIRIES.

# ML.
from pyspark.ml.image import ImageSchema
from pyspark.ml.feature import PCA

# DL.
from sparkdl import DeepImageFeaturizer

# Functions.
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, ArrayType
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf, lit

Using TensorFlow backend.


### Load data from S3 bucket

Pour des raisons de taille des datasets, cette partie est réalisée sur une partie réduite des données.

In [5]:
# Training reduced is the original training set with less images.
path = "s3a://projet8louis/Training_reduced"

# Read data in a DataFrame.
df_training = ImageSchema.readImages(path, recursive=True)

# Print the schema.
df_training.printSchema()

root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = false)
 |    |-- width: integer (nullable = false)
 |    |-- nChannels: integer (nullable = false)
 |    |-- mode: integer (nullable = false)
 |    |-- data: binary (nullable = false)



In [6]:
# How many images in the dataset ?
df_training.count()

135

In [7]:
# Dimension of images.
df_training.select('image.height', 'image.width').take(1)

[Row(height=100, width=100)]

In [8]:
# Head.
df_training.show(5)

+--------------------+
|               image|
+--------------------+
|[s3a://projet8lou...|
|[s3a://projet8lou...|
|[s3a://projet8lou...|
|[s3a://projet8lou...|
|[s3a://projet8lou...|
+--------------------+
only showing top 5 rows



### Feature Extraction & PCA

Extraction des features via **DeepImageFeaturizer** utilisé sur le réseau préentrainé ResNet50. On extrait **2048 features**.

On réalise en une réduction dimensionnelle via une **PCA**.

In [9]:
# Feature extraction with ResNet50.
featurizer = DeepImageFeaturizer(inputCol="image", outputCol="features", modelName="ResNet50")

# Df with extracted features.
df_extract = featurizer.transform(df_training).cache()

# Dimension.
print(len(df_extract.select('features').take(1)[0][0]))

2048


In [10]:
# PCA.
pca = PCA(k=2048, inputCol="features", outputCol="pca")
model = pca.fit(df_extract)

# How many components are necessary to explain 95% of the inertia ?
for c,i in enumerate(np.cumsum(model.explainedVariance)):
    if i>=0.95:
        print('{} composantes principales sont nécessaires'.format(c))
        break

12 composantes principales sont nécessaires


In [10]:
# PCA à 12 composantes.
pca = PCA(k=12, inputCol="features", outputCol="pca")
model = pca.fit(df_extract)

# Dataframe with principal components
df_pca = model.transform(df_extract)

In [11]:
def split_path(path):
    
    """ Cette fonction permet de trouver le label de l'image dans son path."""
    
    path_split = path.split(os.sep)
    return path_split[4]

In [12]:
# Création de la udf pour labéliser les images.
split_path_udf = udf(lambda z: split_path(z), StringType())

# Dataframe with principal component and labels.
df_final = df_pca.select('image.origin','pca', split_path_udf('image.origin').alias('label'))

In [13]:
df_final.show()

+--------------------+--------------------+--------------+
|              origin|                 pca|         label|
+--------------------+--------------------+--------------+
|s3a://projet8loui...|[-16.262893200262...|Apple Braeburn|
|s3a://projet8loui...|[-16.546459805582...|Apple Braeburn|
|s3a://projet8loui...|[-16.402209911536...|Apple Braeburn|
|s3a://projet8loui...|[-16.892880706972...|Apple Braeburn|
|s3a://projet8loui...|[-16.834413596396...|Apple Braeburn|
|s3a://projet8loui...|[-16.408049713471...|Apple Braeburn|
|s3a://projet8loui...|[-16.087327103765...|Apple Braeburn|
|s3a://projet8loui...|[-17.259288461497...|Apple Braeburn|
|s3a://projet8loui...|[-16.678418207681...|Apple Braeburn|
|s3a://projet8loui...|[-16.395502838567...|Apple Braeburn|
|s3a://projet8loui...|[-16.640781726634...|Apple Braeburn|
|s3a://projet8loui...|[-14.619737210615...|Apple Braeburn|
|s3a://projet8loui...|[-15.973752392968...|Apple Braeburn|
|s3a://projet8loui...|[-16.322170235248...|Apple Braebur

### Exportation des données après preprocessing/PCA au format .parquet

In [14]:
# Write on the S3 bucket (au format .parquet).
df_final.write.parquet("s3a://projet8louis/df_pca_label.parquet")

### Lecture des données stockées sur S3 

In [6]:
# Read data
path = 's3a://projet8louis/df_pca_label.parquet/part-00000-b531bd24-b68c-4554-945e-1d9964f8e871-c000.snappy.parquet'
df_from_S3 = pd.read_parquet(path, engine='fastparquet')

# Delete useless columns.
del df_from_S3['pca.size']
del df_from_S3['pca.indices']

In [16]:
print("La dimension du dataframe est : {}".format(df_from_S3.shape))
print("Les labels uniques sont :")
for i in df_from_S3.label.unique():
    print(i)

La dimension du dataframe est : (135, 4)
Les labels uniques sont :
Apple Braeburn
Apple Red 1
Apricot
Banana
Cauliflower
Corn
Kiwi
Nectarine
Peach


In [8]:
# Show.
df_from_S3.head(20)

Unnamed: 0,origin,label,pca.type,pca.values
0,s3a://projet8louis/Training_reduced/Apple Brae...,Apple Braeburn,1.0,"[-16.26289320026262, 0.36708255983012333, -4.4..."
1,s3a://projet8louis/Training_reduced/Apple Brae...,Apple Braeburn,1.0,"[-16.546459805582025, -0.9776640009524855, -4...."
2,s3a://projet8louis/Training_reduced/Apple Brae...,Apple Braeburn,1.0,"[-16.402209911536417, -1.0792357804785273, -3...."
3,s3a://projet8louis/Training_reduced/Apple Brae...,Apple Braeburn,1.0,"[-16.892880706972086, -0.9529154085854423, -3...."
4,s3a://projet8louis/Training_reduced/Apple Brae...,Apple Braeburn,1.0,"[-16.83441359639642, -1.0352978724660957, -4.1..."
5,s3a://projet8louis/Training_reduced/Apple Brae...,Apple Braeburn,1.0,"[-16.40804971347165, -1.0185130291765203, -3.9..."
6,s3a://projet8louis/Training_reduced/Apple Brae...,Apple Braeburn,1.0,"[-16.087327103765027, -1.1945299833525476, -3...."
7,s3a://projet8louis/Training_reduced/Apple Brae...,Apple Braeburn,1.0,"[-17.259288461497622, -1.0901764297647694, -4...."
8,s3a://projet8louis/Training_reduced/Apple Brae...,Apple Braeburn,1.0,"[-16.67841820768182, -1.1133903000954481, -4.0..."
9,s3a://projet8louis/Training_reduced/Apple Brae...,Apple Braeburn,1.0,"[-16.39550283856705, -1.1034178963896029, -3.7..."
