# Projet 8 : Déployer un modèle dans le cloud

## I - Présentation du projet

Ce projet s'inscrit dans le cadre du développement d'une application mobile qui permettrait aux utilisateurs de prendre en photo un fruit et d'obtenir des informations sur ce fruit.

L'objectif de ce projet est de développer un environnement Big Data qui comprendra le preprocessing et une étape de réduction de dimension.

![fruits.png](attachment:fruits.png)

## Banque d'images

Le jeu de données est un ensemble d'images de fruits et de labels associés :
https://www.kaggle.com/moltean/fruits

### Chargement des librairies

In [None]:
import time
import pandas as pd
from PIL import Image
import numpy as np
import io

import tensorflow as tf
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array

from pyspark.sql.functions import col, pandas_udf, PandasUDFType

In [0]:
# data handling
from pyspark.sql.functions import element_at, split
from pyspark.sql.functions import pandas_udf, PandasUDFType
# from pyspark.sql.functions import col
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from typing import Iterator

In [0]:
# core featurizer
tf.__version__

In [0]:
path_work = "s3://elfenaouy/Images/*"

In [0]:
# function to load data into into a spark_df
# with spaces of folder's name removed first, inferschema optional here
start = time.perf_counter()
df_img = spark.read.format('binaryFile').load(path_work, inferschema=True) 
stop = time.perf_counter()
print(f'data load with spark.read, elapsed time: {stop - start:0.2f}s')

In [0]:
# number of images in this sample
df_img.count()

In [0]:
start = time.perf_counter()
df_img.show(20)
stop = time.perf_counter()
print(f'data load with spark.read, elapsed time: {stop - start:0.2f}s')

In [0]:
# display DataFrame schema 
df_img.printSchema()

In [0]:
# origin detail
df_img.select('path').show(1, False, True)

In [0]:
# extract label from image.origin
df_img = df_img.withColumn('label', element_at(split(df_img['path'], "/"), -2))

In [0]:
# show first 3 rows with image struct detailed
df_img.select('path','content','label').show(1, True)

# Use of a CNN as feature extractor

In [0]:
import tensorflow as tf
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array

from pyspark.sql.functions import col, pandas_udf, PandasUDFType

In [0]:
model = ResNet50(include_top=False)

In [0]:
bc_model_weights = sc.broadcast(model.get_weights())

In [0]:
def model_fn():
    
    model = ResNet50(weights=None, include_top=False)
    model.set_weights(bc_model_weights.value)
    return model

In [0]:
def preprocess(content):
    
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)
    



In [0]:
def featurize_series(model, content_series):
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    output = [p.flatten() for p in preds]
    return pd.Series(output)
    

In [0]:
@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)
    

In [0]:
features_df = df_img.select(col("path"),col("label"), featurize_udf("content").alias("features"))

In [0]:
features_df.show(20)

# Réduction de dimension , PCA (API saprkMLib):


In [0]:
from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors, VectorUDT
#convert array to vecteur dense
to_vector = udf(lambda x: Vectors.dense(x), VectorUDT())
sparkDF = features_df.select('path', 'label','features', to_vector("features").alias("features_vec"))

In [0]:
sparkDF.show()

In [0]:
from pyspark.ml.feature import PCA
pcaSparkEstimator = PCA(inputCol="features_vec", outputCol="pca_Features", k=100)
pca = pcaSparkEstimator.fit(sparkDF)



In [0]:
pca_matrix=pca.transform(sparkDF)

In [0]:
import seaborn as sns
import matplotlib.pyplot as plt
var = pca.explainedVariance.cumsum()


In [0]:
print(var)

# Sauvegarde des résultats sur  bucket  S3

In [0]:
# Save Spark DataFrame to S3
#https://stackoverflow.com/questions/38154040/save-dataframe-to-csv-directly-to-s3-python
#https://sagemaker-examples.readthedocs.io/en/latest/introduction_to_amazon_algorithms/pca_mnist/pca_mnist.html
from io import StringIO # python3; python2: BytesIO 

bucket = 'elfenaouy' # already created on S3
csv_buffer = StringIO()
pca_matrix.toPandas().to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket, 'pca_matrix_1.csv').put(Body=csv_buffer.getvalue())