# Projet 8 : Déployer un modèle dans le Cloud


# 1 - Project presentation

Fruits is a start-up that wants to make itself known by making available to the general public a mobile application that would allow users to take a picture of a fruit and obtain information about this fruit.

For the start-up, this application would make the general public aware of fruit biodiversity and set up a first version of the fruit image classification engine.
The objective of this project is to develop a Big Data environment that will include preprocessing and a dimension reduction step.

![Fruits.JPG](attachment:Fruits.JPG)


![application.jpg](attachment:application.jpg)

# 2 - Load Packages

In [1]:
#to use spark within a jupyter notebook
import findspark
findspark.init()
import pyspark
#
import pandas as pd
from PIL import Image
import numpy as np
from io import BytesIO
import os
import warnings
warnings.filterwarnings('ignore')
import boto3

#
import tensorflow as tf
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from keras.models import Model
#
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, lit, udf
from pyspark.ml.feature import PCA, StandardScaler, VectorAssembler
from pyspark.ml.linalg import Vectors, VectorUDT


2022-09-15 13:14:27.041475: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-09-15 13:14:28.991816: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2022-09-15 13:14:28.991850: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
2022-09-15 13:14:29.166376: E tensorflow/stream_executor/cuda/cuda_blas.cc:2981] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2022-09-15 13:14:32.597261: W tensorflow/stream_executor/platform/de

# 3 -  Instanciation spark session & configuration

In [2]:
# Sparksession instantiation
spark = SparkSession.builder.config("spark.driver.memory", "32g").appName("Fruits_Vegetables_recognition").getOrCreate()

#To create a sparkContext
sc = spark.sparkContext
sc.setLogLevel("WARN")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/09/15 13:14:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# AWS access configuration
credential = pd.read_csv("../p8_user_credentials.csv")
ID = credential["Access key ID"][0]
KEY = credential["Secret access key"][0]

In [4]:
# Hadoop configuration
spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key",ID)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key",KEY)
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3-eu-west-3.amazonaws.com")
spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

# 4 -  Functions 

## 4 - 1 - Load images and categories functions 

In [5]:
def parse_path_img(folder, BUCKET):
    """
    To get the list of paths of all fruits images
    args:
        
    return : a list of fruits paths, list of string
    """
    
    session = boto3.session.Session(aws_access_key_id=ID, aws_secret_access_key=KEY)
    s3 = session.resource('s3')
    my_bucket = s3.Bucket(BUCKET)

    prefix = 'sample'

    lst_path = []
    lst_category = []
    my_bucket = s3.Bucket(BUCKET)
    lst_path = []
    lst_category = []
    keys = []
    for s3_file in my_bucket.objects.filter(Prefix=prefix):
        file = s3_file.key
        category = file.split('/')[-2]
        lst_path.append(folder + s3_file.key)
        lst_category.append(category)
        keys.append(s3_file.key)
    return lst_path, lst_category, keys

        
def load_data(lst_path, lst_category):
    """  
    Get pyspark dataframe from list of paths and fruit category
    args : 
        lst_path : list of all fruit paths
        img_catg : list of fruit categories
    return : a spark dataframe    

    """
    #specify column names
    columns = ['path', 'keys', 'fruit_category']  
    # creating a dataframe by zipping the two lists
    dataframe = spark.createDataFrame(zip(lst_path, keys, lst_category), columns)  
        
    return dataframe

## 4 - 2 - Read images functions

In [6]:
def read_image_from_s3(key):
    """Load image file from s3.

    Parameters
    ----------
    bucket: string Bucket name
    key : string Path in s3

    Returns
    -------
    np array Image array
    """
    session = boto3.session.Session(aws_access_key_id=ID, aws_secret_access_key=KEY)
    s3 = session.resource('s3', region_name='eu-west-3', )
    bucket = s3.Bucket(BUCKET)
    object = bucket.Object(key)
    response = object.get()
    file_stream = response['Body']
    im = Image.open(file_stream)
    return np.array(im).flatten().tolist()

## 4 - 3 - Dimensionality reduction functions

In [7]:
def get_pca_features(df):
    """
    """
    # To convert images to a dense vector
    img_udf = udf(lambda r: Vectors.dense(r), VectorUDT())
    df = df.withColumn('image_vector', img_udf('image_vector'))
    
    scale = StandardScaler(inputCol="image_vector", outputCol="scaledFeatures", withStd=True, withMean=True)
    model_scale = scale.fit(df)
    df = model_scale.transform(df)
    
    # instantiate Spark PCA model
    #pca = PCA(k=4, inputCol="scaledFeatures", outputCol="pca_features")
    # To train PCA model
    #pca_model = pca.fit(df)
    #pca_model.explainedVariance
    # To transform images into principle components
    #df = pca_model.transform(df)
    
    return df   

# 5 - Main program execution

In [8]:
# #To define bucket name
BUCKET = "projet8sabrine"
#To define path floder of images
folder = "s3a://" + BUCKET + "/" 

In [9]:
# List of paths and categories of all images
lst_path, lst_category, keys = parse_path_img(folder, BUCKET)
print(len(lst_path), "images of different kinds of fruits were downloaded.")

20 images of different kinds of fruits were downloaded.


In [10]:
#To show spark details
print("-------------Images list & categories extraction------------\n")
images = load_data(lst_path, lst_category)
images.show()

-------------Images list & categories extraction------------



[Stage 0:>                                                          (0 + 1) / 1]

+--------------------+--------------------+--------------+
|                path|                keys|fruit_category|
+--------------------+--------------------+--------------+
|s3a://projet8sabr...|sample/Apricot/0_...|       Apricot|
|s3a://projet8sabr...|sample/Apricot/10...|       Apricot|
|s3a://projet8sabr...|sample/Apricot/11...|       Apricot|
|s3a://projet8sabr...|sample/Apricot/12...|       Apricot|
|s3a://projet8sabr...|sample/Apricot/13...|       Apricot|
|s3a://projet8sabr...|sample/Apricot/14...|       Apricot|
|s3a://projet8sabr...|sample/Apricot/15...|       Apricot|
|s3a://projet8sabr...|sample/Apricot/16...|       Apricot|
|s3a://projet8sabr...|sample/Apricot/17...|       Apricot|
|s3a://projet8sabr...|sample/Apricot/18...|       Apricot|
|s3a://projet8sabr...|sample/Apricot/19...|       Apricot|
|s3a://projet8sabr...|sample/Apricot/1_...|       Apricot|
|s3a://projet8sabr...|sample/Apricot/20...|       Apricot|
|s3a://projet8sabr...|sample/Apricot/21...|       Aprico

                                                                                

In [11]:
print("-------------To read images------------\n")   
# To create a User Defined Function(udf) to add a column that contains image pixel vector
image_udf = udf(lambda img: read_image_from_s3(img))
#To add a column of image vecto to spark dataframe
images = images.withColumn('image_vector', image_udf("keys"))
images.show()    

-------------To read images------------



[Stage 3:>                                                          (0 + 1) / 1]

+--------------------+--------------------+--------------+--------------------+
|                path|                keys|fruit_category|        image_vector|
+--------------------+--------------------+--------------+--------------------+
|s3a://projet8sabr...|sample/Apricot/0_...|       Apricot|[255, 251, 248, 2...|
|s3a://projet8sabr...|sample/Apricot/10...|       Apricot|[253, 253, 255, 2...|
|s3a://projet8sabr...|sample/Apricot/11...|       Apricot|[247, 255, 255, 2...|
|s3a://projet8sabr...|sample/Apricot/12...|       Apricot|[255, 254, 255, 2...|
|s3a://projet8sabr...|sample/Apricot/13...|       Apricot|[255, 254, 252, 2...|
|s3a://projet8sabr...|sample/Apricot/14...|       Apricot|[251, 255, 250, 2...|
|s3a://projet8sabr...|sample/Apricot/15...|       Apricot|[255, 252, 248, 2...|
|s3a://projet8sabr...|sample/Apricot/16...|       Apricot|[253, 253, 255, 2...|
|s3a://projet8sabr...|sample/Apricot/17...|       Apricot|[242, 255, 255, 2...|
|s3a://projet8sabr...|sample/Apricot/18.

                                                                                

In [12]:
print("-------------Dimensionality reduction------------\n")
images = get_pca_features(images)  
images.show()    

-------------Dimensionality reduction------------



                                                                                

+--------------------+--------------------+--------------+--------------------+--------------------+
|                path|                keys|fruit_category|        image_vector|      scaledFeatures|
+--------------------+--------------------+--------------+--------------------+--------------------+
|s3a://projet8sabr...|sample/Apricot/0_...|       Apricot|[255.0,251.0,248....|[0.92729738283398...|
|s3a://projet8sabr...|sample/Apricot/10...|       Apricot|[253.0,253.0,255....|[0.48040707785374...|
|s3a://projet8sabr...|sample/Apricot/11...|       Apricot|[247.0,255.0,255....|[-0.8602638370869...|
|s3a://projet8sabr...|sample/Apricot/12...|       Apricot|[255.0,254.0,255....|[0.92729738283398...|
|s3a://projet8sabr...|sample/Apricot/13...|       Apricot|[255.0,254.0,252....|[0.92729738283398...|
|s3a://projet8sabr...|sample/Apricot/14...|       Apricot|[251.0,255.0,250....|[0.03351677287351...|
|s3a://projet8sabr...|sample/Apricot/15...|       Apricot|[255.0,252.0,248....|[0.927297382

In [13]:
images.count()

20

In [14]:
#instantiate Spark PCA model
pca = PCA(k=4, inputCol="scaledFeatures", outputCol="pca_features")
#To train PCA model
pca_model = pca.fit(images)
#pca_model.explainedVariance
# To transform images into principle components
df = pca_model.transform(images)

                                                                                

22/09/15 13:15:12 WARN RowMatrix: 30000 columns will require at least 7200 megabytes of memory!


----------------------------------------                            (0 + 2) / 2]
Exception occurred during processing of request from ('127.0.0.1', 36754)
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/ubuntu/anaconda3/lib/python3.9/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ubuntu/anaconda3/lib/python3.9/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/ubuntu/anaconda3/lib/python3.9/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
Traceback (most recent call last):
  File "/home/ubuntu/anaconda3/lib/p

Py4JError: An error occurred while calling o130.fit

In [None]:
images.printSchema()

In [None]:
images.rdd.getNumPartitions()