# Notebook mis sur le serveur EC2
## avec récupération des images sur un serveur S3
## puis sauvegarde des résultats du préprocessing sur le S3

In [None]:
import findspark
findspark.init('/home/ubuntu/spark-3.3.0-bin-hadoop3/')

In [2]:
import numpy as np
import pandas as pd
import time
import seaborn as sns
import matplotlib.pyplot as plt
from PIL import Image

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, udf
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import PCA
from pyspark.ml.feature import StandardScaler
from pyspark.sql.types import ArrayType, IntegerType, FloatType, StringType
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql import Window

from tensorflow.keras.utils import load_img
from tensorflow.keras.utils import img_to_array
from keras.applications.vgg16 import preprocess_input
from keras.applications.vgg16 import VGG16

import configparser # Nécessaire pour retouver les codes AWS
import boto3 # Pour la communication avec S3
import os # Pour la variable d'environnement "PYSPARK_SUBMIT_ARGS"
from io import StringIO # Nécessaire pour l'export

2022-10-04 16:57:24.975131: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-10-04 16:57:25.379934: E tensorflow/stream_executor/cuda/cuda_blas.cc:2981] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2022-10-04 16:57:26.140152: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory
2022-10-04 16:57:26.140205: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or 

In [3]:
# Ajout d'une variable d'environnement
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:3.3.0 pyspark-shell"

In [4]:
# Création d'une session spark
spark = (SparkSession
             .builder.master('local[*]')
             .appName('p8')
             .config('spark.executor.memory', '14g')
             .config('spark.network.timeout', '1000000') 
             .getOrCreate()
            )

spark.sparkContext.setLogLevel("OFF")

spark

22/10/04 16:57:28 WARN Utils: Your hostname, Franck-ubuntu resolves to a loopback address: 127.0.1.1; using 192.168.1.116 instead (on interface enxc025a5dc297d)
22/10/04 16:57:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/agent/Logiciels/anaconda3/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/agent/.ivy2/cache
The jars for the packages stored in: /home/agent/.ivy2/jars
com.amazonaws#aws-java-sdk-pom added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-23bb8d8c-6655-421f-a42b-338896dc20ea;1.0
	confs: [default]
	found com.amazonaws#aws-java-sdk-pom;1.10.34 in central
	found org.apache.hadoop#hadoop-aws;3.3.0 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.563 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 155ms :: artifacts dl 5ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.563 from central in [default]
	com.amazonaws#aws-java-sdk-pom;1.10.34 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.0 from central in [default]
	org.wildfly.openssl#wildfly-openssl;1.0.7.Final from central in [default]
	---------------------------------------------------------------------
	|

22/10/04 16:57:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Configuration communication avec S3 et récupération des photos

In [5]:
# Récupération des acces AWS S3 (fonctionne car awscli installé et fichier ~/.aws/credentials présent)
config = configparser.ConfigParser()
config.read(os.path.expanduser("~/.aws/credentials"))
aws_profile = 'default'
access_id = config.get(aws_profile, "aws_access_key_id") 
access_key = config.get(aws_profile, "aws_secret_access_key")

In [6]:
# Configuration chemins S3
DIR_PATH = 'Training/'
# DIR_PATH = 'Test_s3/'
REGION_NAME = 'eu-west-1'
BUCKET_NAME = 'oc-p8'

dataset_path = 's3://' + BUCKET_NAME + '/'

In [7]:
# Création session S3
session = boto3.session.Session(aws_access_key_id=access_id, aws_secret_access_key=access_key)
s3_client = session.client(service_name='s3', region_name=REGION_NAME)

# Liste de toutes les photos du répertoire (list_objets_v2 limite à 1000 images)
# https://stackoverflow.com/questions/54314563/how-to-get-more-than-1000-objects-from-s3-by-using-list-objects-v2
liste_s3 = s3_client.list_objects_v2(Bucket=BUCKET_NAME, Prefix=DIR_PATH)
len(liste_s3['Contents'])

1000

In [8]:
# Création d'une liste avec les photos
liste_photos = []

for file in liste_s3['Contents']:
    photo = file['Key']
    liste_photos.append(photo)

In [9]:
# Définition d'un dataframe spark avec la liste des photos

zipped = zip(liste_photos)
cols = ['path']

df_images = spark.createDataFrame(zipped, cols)
df_images.show()

+--------------------+
|                path|
+--------------------+
|Training/apple_6/...|
|Training/apple_6/...|
|Training/apple_6/...|
|Training/apple_6/...|
|Training/apple_6/...|
|Training/apple_6/...|
|Training/apple_6/...|
|Training/apple_6/...|
|Training/apple_6/...|
|Training/apple_6/...|
|Training/apple_6/...|
|Training/apple_6/...|
|Training/apple_6/...|
|Training/apple_6/...|
|Training/apple_6/...|
|Training/apple_6/...|
|Training/apple_6/...|
|Training/apple_6/...|
|Training/apple_6/...|
|Training/apple_6/...|
+--------------------+
only showing top 20 rows



In [10]:
def find_classe(path):
    '''
    Défini la classe du fruit en fonction du nom de répertoire
    '''
    classe_detaillee = path.split('/')[-2] # Nom du repertoire complet
    classe_fruit = classe_detaillee.split('_')[0] #Nom de la classe
    return classe_fruit

In [11]:
# Ajout de la classe en tant que colonne du dataframe
udf_fruits = udf(find_classe, StringType())
df_images = df_images.withColumn('classe', udf_fruits('path'))
df_images.show()

+--------------------+------+
|                path|classe|
+--------------------+------+
|Training/apple_6/...| apple|
|Training/apple_6/...| apple|
|Training/apple_6/...| apple|
|Training/apple_6/...| apple|
|Training/apple_6/...| apple|
|Training/apple_6/...| apple|
|Training/apple_6/...| apple|
|Training/apple_6/...| apple|
|Training/apple_6/...| apple|
|Training/apple_6/...| apple|
|Training/apple_6/...| apple|
|Training/apple_6/...| apple|
|Training/apple_6/...| apple|
|Training/apple_6/...| apple|
|Training/apple_6/...| apple|
|Training/apple_6/...| apple|
|Training/apple_6/...| apple|
|Training/apple_6/...| apple|
|Training/apple_6/...| apple|
|Training/apple_6/...| apple|
+--------------------+------+
only showing top 20 rows



## Transfer Learning

In [12]:
# Création du modèle avec suppression de la dernière couche et remplacement avec une couche de pooling
model_vgg = VGG16(weights="imagenet", include_top=False, pooling='max', input_shape=(224, 224, 3))
print(model_vgg.summary())

2022-10-04 16:57:35.766428: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:980] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-10-04 16:57:35.843682: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudnn.so.8'; dlerror: libcudnn.so.8: cannot open shared object file: No such file or directory
2022-10-04 16:57:35.843700: W tensorflow/core/common_runtime/gpu/gpu_device.cc:1934] Cannot dlopen some GPU libraries. Please make sure the missing libraries mentioned above are installed properly if you would like to use GPU. Follow the guide at https://www.tensorflow.org/install/gpu for how to download and setup the required libraries for your platform.
Skipping registering GPU devices...
2022-10-04 16:57:35.844452: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN

Model: "vgg16"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 224, 224, 3)]     0         
                                                                 
 block1_conv1 (Conv2D)       (None, 224, 224, 64)      1792      
                                                                 
 block1_conv2 (Conv2D)       (None, 224, 224, 64)      36928     
                                                                 
 block1_pool (MaxPooling2D)  (None, 112, 112, 64)      0         
                                                                 
 block2_conv1 (Conv2D)       (None, 112, 112, 128)     73856     
                                                                 
 block2_conv2 (Conv2D)       (None, 112, 112, 128)     147584    
                                                                 
 block2_pool (MaxPooling2D)  (None, 56, 56, 128)       0     

In [13]:
# Sérialise le modèle pour une execution plus rapide
sc = spark.sparkContext
model_bc = sc.broadcast(model_vgg)





INFO:tensorflow:Assets written to: ram://df6eac6c-846b-4bc0-9557-9aea752b2a9f/assets


INFO:tensorflow:Assets written to: ram://df6eac6c-846b-4bc0-9557-9aea752b2a9f/assets


In [14]:
# Création des features VGG
# Pas réussi avec le dataframe spark donc on repasse par une session S3

s3 = boto3.resource('s3',
                    REGION_NAME,
                    aws_access_key_id=access_id,
                    aws_secret_access_key=access_key)
bucket = s3.Bucket('oc-p8')

vgg_features=[]

for photo in liste_s3['Contents']:
    file = photo['Key']
    obj = bucket.Object(file)
    img_body = obj.get()['Body'] # Méthode pour récupérer les photos
    
    img = Image.open(img_body).resize((224, 224))
    img_arr = np.expand_dims(img, axis=0)
    features = model_bc.value.predict(img_arr,verbose=0)
    vector_feature = Vectors.dense(features.ravel().tolist())    
    vgg_features.append(vector_feature)





In [15]:
# Création d'un dataframe spark temporaire pour récuérer les photos
temp_vgg = spark.createDataFrame([(f,) for f in vgg_features], ['VGG_features'])

# Ajout des colonnes index pour préparer la jointure
df_images = df_images.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
temp_vgg = temp_vgg.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))

# Jointure des 2 dataframes
df_images = df_images.join(temp_vgg, df_images.row_index == temp_vgg.row_index).drop('row_index')

In [16]:
# Vectorisation des features
to_vector = udf(lambda x: Vectors.dense(x), VectorUDT())
df_images_vgg = df_images.select('path', 'classe', to_vector('VGG_features').alias('VGG_features'))

## Standardisation

In [17]:
# Standardisation des données
standardizer = StandardScaler(withMean=True, withStd=True,
                              inputCol='VGG_features',
                              outputCol='features_scaled')
std = standardizer.fit(df_images_vgg)
df_image_std = std.transform(df_images_vgg)
df_image_std = df_image_std.select('path', 'classe', 'features_scaled')

df_image_std.show()

                                                                                

+--------------------+------+--------------------+
|                path|classe|     features_scaled|
+--------------------+------+--------------------+
|Training/apple_6/...| apple|[0.13037561896527...|
|Training/apple_6/...| apple|[-0.1536597553272...|
|Training/apple_6/...| apple|[-0.4919848263588...|
|Training/apple_6/...| apple|[-0.4908032568120...|
|Training/apple_6/...| apple|[-0.2817242142361...|
|Training/apple_6/...| apple|[-0.2731925101659...|
|Training/apple_6/...| apple|[0.12042080215325...|
|Training/apple_6/...| apple|[0.00159375186577...|
|Training/apple_6/...| apple|[0.40183381419129...|
|Training/apple_6/...| apple|[0.35310259910108...|
|Training/apple_6/...| apple|[0.41057468968808...|
|Training/apple_6/...| apple|[0.01257500274985...|
|Training/apple_6/...| apple|[0.79925479351586...|
|Training/apple_6/...| apple|[1.20735223107430...|
|Training/apple_6/...| apple|[0.87150011413529...|
|Training/apple_6/...| apple|[0.85806682630126...|
|Training/apple_6/...| apple|[0

                                                                                

## PCA

In [18]:
t0 = time.time()

pca = PCA(k=512, inputCol="features_scaled", outputCol="Features_PCA")
pca_model = pca.fit(df_image_std)

result = pca_model.transform(df_image_std).select('path', 'classe', 'Features_PCA')

result.show()

                                                                                

+--------------------+------+--------------------+
|                path|classe|        Features_PCA|
+--------------------+------+--------------------+
|Training/apple_6/...| apple|[10.1501684291780...|
|Training/apple_6/...| apple|[8.17683279765479...|
|Training/apple_6/...| apple|[10.0884364030640...|
|Training/apple_6/...| apple|[8.66049061761517...|
|Training/apple_6/...| apple|[8.41705675960248...|
|Training/apple_6/...| apple|[7.57242713155589...|
|Training/apple_6/...| apple|[9.03474106119812...|
|Training/apple_6/...| apple|[9.22834823859929...|
|Training/apple_6/...| apple|[8.77502046560159...|
|Training/apple_6/...| apple|[8.95114979715183...|
|Training/apple_6/...| apple|[8.89578226750046...|
|Training/apple_6/...| apple|[9.18700034422483...|
|Training/apple_6/...| apple|[8.58090088847377...|
|Training/apple_6/...| apple|[9.85677766824411...|
|Training/apple_6/...| apple|[9.51503747037526...|
|Training/apple_6/...| apple|[9.63721351710998...|
|Training/apple_6/...| apple|[9

                                                                                

## Export du fichier vers S3

In [19]:
csv_buffer = StringIO()

result_df = result.toPandas()
result_df.to_csv(csv_buffer)

s3.Object(BUCKET_NAME, 'output/result_df.csv').put(Body=csv_buffer.getvalue()) # Sauvegarde du fichier csv sur S3

                                                                                

{'ResponseMetadata': {'RequestId': '527YG9PMNEB379RZ',
  'HostId': '1zEmchfxk3o6exeWxy6BYcCtnKr8by61OI3/ydD9b7iFOKY2dbfnP2yHbELRrjz5yWf48eOg3og=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': '1zEmchfxk3o6exeWxy6BYcCtnKr8by61OI3/ydD9b7iFOKY2dbfnP2yHbELRrjz5yWf48eOg3og=',
   'x-amz-request-id': '527YG9PMNEB379RZ',
   'date': 'Tue, 04 Oct 2022 15:01:08 GMT',
   'etag': '"7d21eeddfaef59a269c3cd0a07ddf233"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"7d21eeddfaef59a269c3cd0a07ddf233"'}