# Importations et définitions de variables

In [1]:
from pyspark.ml.image import ImageSchema
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from io import BytesIO
import cv2
import json
import boto.s3, boto.s3.key
import pandas as pd
from pyspark.sql.functions import col, pandas_udf, PandasUDFType
from pyspark.mllib.linalg.distributed import RowMatrix
import numpy as np
import os
from pyspark.sql import Row
import boto3

2022-04-29 13:53:28.763175: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2022-04-29 13:53:28.763222: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.


In [2]:
# sc: Contexte Spark
sc = SparkContext('local')
# spark: Session Spark
spark = SparkSession(sc)

22/04/29 13:54:00 WARN Utils: Your hostname, DESKTOP-OL5H896 resolves to a loopback address: 127.0.1.1; using 172.29.208.25 instead (on interface eth0)
22/04/29 13:54:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/29 13:54:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Lecture des données

On définie les variables principales, notamment le nom du bucket et le chemin vers les images.

In [3]:
# bucket: Nom du bucket s3
bucket = 'ib-ds-p8'
# prefix_bucket: Préfixe pour les chemins de fichiers sur le serveur S3
prefix_bucket = 's3a://' + bucket + '/'
# data_dir: Répertoire contenant les dossiers d'entraînement et de test, notamment
data_dir = 'archive/fruits-360_dataset/fruits-360'
# train_dir: Répertoire contenant les dossiers d'entraînement pour chaque fruit
train_dir = data_dir + '/Training'


On crée un RDD à partir des chemins des images, qu'on récupère à l'aide de boto3.

In [4]:
s3 = boto3.resource('s3')
my_bucket = s3.Bucket(bucket)
# file_paths: Chemins des images des fruits/légumes.
file_paths = [obj.key for obj in my_bucket.objects.filter(Prefix=(train_dir+'/'))]
# n_images: Nombre d'images à traiter
n_images = 10
file_paths = file_paths[:n_images]
# rdd: Contient la base de données des chemins sous forme distribuée
rdd = sc.parallelize(file_paths)

In [30]:
fruit_names = [path.split('/')[-2] for path in file_paths]

In [5]:
# s3_client: Client s3 qui va nous permettre de lire les images
s3_client = boto3.client('s3')

In [6]:
print("On va pré-traiter %d images" % len(file_paths))

On va pré-traiter 5 images


# Pré-traitement des données

## Featurisation

Pour featuriser les images, on va utiliser le réseau de neurones ResNet50 sans sa la dernière couche. Pour cela, on va enregistrer les poids du réseau, de façon à les télécharger une seule fois.

In [7]:
# model: Contient le modèle ResNet50, sans la dernière couche
model = ResNet50(include_top=False)

# bc_model_weights: Variable partagée par tous les clusters
bc_model_weights = sc.broadcast(model.get_weights())

def model_fn():
  """Fonction permettant d'accéder au modèle ResNet50 sans re-télécharger les poids"""
  model = ResNet50(weights=None, include_top=False)
  model.set_weights(bc_model_weights.value)
  return model

2022-04-29 13:54:33.220872: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:922] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
2022-04-29 13:54:33.231527: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /home/ilyes/OC/P8/venv/lib/python3.9/site-packages/cv2/../../lib64:
2022-04-29 13:54:33.231652: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcublas.so.11'; dlerror: libcublas.so.11: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /home/ilyes/OC/P8/venv/lib/python3.9/site-packages/cv2/../../lib64:
2022-04-29 13:54:33.231716: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcublasLt.so.11'; dle

On configure Spark pour transférer les bases de données de Pandas à Pyspark: 

In [8]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

On définit les fonctions servant à la lecture des fichiers:

In [9]:
def path_to_bytes_local(file_path):
    """Sert uniquement à tester le fonctionnement du prétraitement sans utiliser le serveur S3"""
    with open(file_path, 'rb') as f:
        body = f.read()
    return body

In [10]:
def path_to_bytes(file_path):
    """Sert à lire le contenu d'un fichier dans le bucket ib-ds-p8"""
    # bucket: Nom du bucket contenant les données
    bucket = 'ib-ds-p8'
    # s3_client: Client s3 
    s3_client = boto3.client('s3')
    # bytesio: Flux de bits
    bytesio = BytesIO()
    # Lit le contenu du fichier de chemin file_path dans bytesio
    s3_client.download_fileobj(bucket, file_path, bytesio)
    # Se place au début du contenu pour la lecture
    bytesio.seek(0)
    return bytesio.read()

On parcours le RDD, en remplacant les noms de fichiers par le contenu des fichiers.

In [11]:
# rdd: Contient à présent la base de données du contenu de chaque image
rdd = rdd.map(path_to_bytes)

On définit les fonctions pour décoder le contenu des fichiers, et les featuriser:

In [12]:
def bytes_to_array(bytes_str):
    """Transforme une liste de bits en matrice RGB d'une image 100x100px"""
    # arr: Transformation de la suite de bits en matrice 
    arr = np.frombuffer(bytes_str, dtype=np.uint8)
    # arr: Transformation de la matrice au format RGB
    arr = cv2.imdecode(arr, cv2.IMREAD_COLOR)
    arr = arr.astype(np.float32)  # cast en flottant
    return arr

In [13]:
def featurize_series(model, content_series):
  """Featurise une série d'images grâce au modèle"""
  # input: Contient la série des matrices d'images
  input = content_series.map(bytes_to_array)
  # On va formater les matrices d'images en entrées pour ResNet50
  input = np.array(input.to_list(), ndmin=4)
  input = preprocess_input(input)
  # preds: Contient les images featurisées
  preds = model.predict(input)
  preds = pd.Series([pred.flatten() for pred in preds])
  return preds

In [14]:
@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
  """
  Itérateur permettant de Pré-traiter l'ensemble des séries d'image
  :param content_series_iter: Itérateur sur des séries d'images
  """
  # model: Contient le modèle enregistré
  model = model_fn()
  # On parcours les séries d'images en les featurisant
  for content_series in content_series_iter:
    yield featurize_series(model, content_series)



Pour appliquer ces transformation, on repasse en format dataframe.

In [15]:
# data: RDD des images sous forme de dataframe à une colonne
data = rdd.map(Row).toDF()

                                                                                

In [16]:
# data: Dataframe des images featurisées
data = data.withColumn('_1', featurize_udf('_1'))

## Réduction de dimension: SVD

Nos données sont des images. La featurisation produit donc des vecteurs creux. Un outils adapté est donc la décomposition par valeurs singulières.

On commence par obtenir la taille d'un vecteur d'une image, en appliquant la featurisation sur une image d'exemple:

In [17]:
# test_file: Fichier de l'image d'exemple
test_file = 'archive/fruits-360_dataset/fruits-360/Training/Apple_Braeburn/0_100.jpg'
# img_bytes: Contenu du fichier de l'image d'exemple
img_bytes = path_to_bytes(test_file)
# features: Vecteur des features de l'image d'exemple
features = featurize_series(model, pd.Series([img_bytes]))
# size_vect: Nombre de features
size_vect = len(features[0])

In [18]:
print('La featurisation retourne des vecteurs de taille %d' % size_vect)

La featurisation retourne des vecteurs de taille 32768


On constate que la featurisation n'a pas diminué la taille de nos données.

In [19]:
data_rdd = data.rdd.map(lambda vector: list(vector))

On eregistre nos données featurisées dans le cache.

In [20]:
data_rdd.cache()

PythonRDD[13] at RDD at PythonRDD.scala:53

Pour faire la décomposition par valeur singulière, on va utiliser la classe RowMatrix.

In [21]:
mat = RowMatrix(data_rdd)

Ici, on prend un nombre faible de de composantes, pour les besoins de la démonstration. Mais pour contrôler la représentativité de la réduction de dimension, il faudrai prendre le nombre de composantes maximal. De cette façon, on aurais toutes les valeurs singulières et on connaîtrais ainsi le pourcentage de la variance du système représentée.

In [22]:
# n_comps: Nombre de variables après réduction de dimension
n_comps = min([n_images, size_vect, 2])

In [23]:
svd = mat.computeSVD(n_comps, computeU=True)

2022-04-29 13:54:58.007844: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /home/ilyes/OC/P8/venv/lib/python3.9/site-packages/cv2/../../lib64:
2022-04-29 13:54:58.007895: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
2022-04-29 13:55:00.447345: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:922] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
2022-04-29 13:55:00.447511: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /home/ilyes/OC/P8/venv/lib/python3.9/site-packages/cv

On enregistre les résultats dans le S3.

In [33]:
# Pour pouvoir accéder à la région eu-west-3:
os.environ['BOTO_USE_ENDPOINT_HEURISTICS'] = 'True'
# conn: Connexion à la région eu-west-
conn = boto.s3.connect_to_region("eu-west-3")
# conn_bucket: Connexion au bucket ib-ds-p8
conn_bucket = conn.get_bucket("ib-ds-p8")

On enregistre la nouvelle matrice de nos données:

In [25]:
# key: Fichier qui va contenir la matrice de nos données
key = boto.s3.key.Key(conn_bucket, "svd_U.csv")
key.set_contents_from_string(svd.U.rows.map(Row).toDF().toPandas().to_csv())

  Unsupported type in conversion to Arrow: VectorUDT
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.


233

Et on enregistre nos valeurs singulières:

In [26]:
# key: Fichier qui va contenir les valeurs singulières
key = boto.s3.key.Key(conn_bucket, "svd_s.txt")
key.set_contents_from_string(json.dumps(svd.s.toArray().tolist(), indent=2))

45

On enregistre aussi les catégories attendues:

In [34]:
# key: Fichier qui va contenir les valeurs singulières
key = boto.s3.key.Key(conn_bucket, "categories.txt")
key.set_contents_from_string(json.dumps(fruit_names, indent=2))

102

On peut à présent lire les résultats enregistrés dans notre serveur S3

In [36]:
s3_client = boto3.client('s3')

In [27]:
bytesio = BytesIO()
svd_u = 'svd_U.csv'
s3_client.download_fileobj(bucket, svd_u, bytesio)
bytesio.seek(0)
import pandas as pd
df = pd.read_csv(bytesio)
print("Données featurisées et de dimensions réduites:")
df

Données featurisées et de dimensions réduites:


Unnamed: 0.1,Unnamed: 0,_1
0,0,"[-0.4138718114327048,-0.9093679869284459]"
1,1,"[-0.4588569134322118,0.20633828442354954]"
2,2,"[-0.4580016557118478,0.23349066398283208]"
3,3,"[-0.4509786456302524,0.21738387605356027]"
4,4,"[-0.4527838350113478,0.1694129186208554]"


In [28]:
bytesio = BytesIO()
svd_s = 'svd_s.txt'
s3_client.download_fileobj(bucket, svd_s, bytesio)
bytesio.seek(0)
svalues = json.loads(bytesio.read().decode('utf-8'))
print("Valeurs singulières:")
print(svalues)

Valeurs singulières:
[821.5190352618974, 176.44595993169142]


In [37]:
bytesio = BytesIO()
fruit_names = 'categories.txt'
s3_client.download_fileobj(bucket, fruit_names, bytesio)
bytesio.seek(0)
categories = json.loads(bytesio.read().decode('utf-8'))
print("Categories: attendues")
print(categories)

Categories: attendues
['Apple_Braeburn', 'Apple_Braeburn', 'Apple_Braeburn', 'Apple_Braeburn', 'Apple_Braeburn']
