In [1]:

from google.cloud import storage
import pandas as pd
import io
from io import BytesIO
from PIL import Image
import numpy as np
from pyspark.ml.image import ImageSchema


In [29]:
storage_client = storage.Client()

In [31]:
BUCKET_NAME = "datastorageocp8"

bucket = storage_client.get_bucket(BUCKET_NAME)

file_name = list(bucket.list_blobs(prefix='images/'))
for name in file_name:
	print(name.name)
    

images/
images/0_100.jpg
images/17_100.jpg
images/185_100.jpg
images/275_100.jpg
images/27_100.jpg


In [40]:
df_im = spark.read.format("image").option("dropInvalid", True).load("gs://datastorageocp8/images/")
df_im.select("image.origin", "image.width", "image.height").show(truncate=False)

+---------------------------------------+-----+------+
|origin                                 |width|height|
+---------------------------------------+-----+------+
|gs://datastorageocp8/images/185_100.jpg|100  |100   |
|gs://datastorageocp8/images/275_100.jpg|100  |100   |
|gs://datastorageocp8/images/0_100.jpg  |100  |100   |
|gs://datastorageocp8/images/17_100.jpg |100  |100   |
|gs://datastorageocp8/images/27_100.jpg |100  |100   |
+---------------------------------------+-----+------+



In [3]:
#pip install tensorflow

In [38]:
import tensorflow as tf
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array

model = ResNet50(
    include_top=False,
    weights=None,
    pooling='max',
    input_shape=(32, 32, 3))
model.summary()


Model: "resnet50"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_2 (InputLayer)            [(None, 32, 32, 3)]  0                                            
__________________________________________________________________________________________________
conv1_pad (ZeroPadding2D)       (None, 38, 38, 3)    0           input_2[0][0]                    
__________________________________________________________________________________________________
conv1_conv (Conv2D)             (None, 16, 16, 64)   9472        conv1_pad[0][0]                  
__________________________________________________________________________________________________
conv1_bn (BatchNormalization)   (None, 16, 16, 64)   256         conv1_conv[0][0]                 
___________________________________________________________________________________________

## EXTRACTION DES FEATURES

In [35]:
from keras.preprocessing import image
from PIL import Image
import io
import numpy as np

In [33]:
len(file_name)

6

In [39]:
file_name = list(bucket.list_blobs(prefix='images/'))
resnet_features=[]
for i in range(1, len(file_name)):
    img = Image.open(io.BytesIO(bucket.blob(file_name[i].name).download_as_string())).resize((32,32))
    
    # convert image to array
    x = image.img_to_array(img).reshape((-1,32,32,3))
    x=np.array(x)
   
    # preprocess input
    x = preprocess_input(x)
    
    resnet_feature = model.predict(x).ravel().tolist()
    # add path, labels and features
    
    resnet_features.append(resnet_feature)

In [41]:
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql import Window

b = spark.createDataFrame([(l,) for l in resnet_features], ['features'])
df_im = df_im.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))
b = b.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))

images_df = df_im.join(b, df_im.row_idx == b.row_idx).\
             drop("row_idx")
images_df.show()

+--------------------+--------------------+
|               image|            features|
+--------------------+--------------------+
|[gs://datastorage...|[3.52233266830444...|
|[gs://datastorage...|[3.61504364013671...|
|[gs://datastorage...|[4.07907867431640...|
|[gs://datastorage...|[4.30316352844238...|
|[gs://datastorage...|[3.46667790412902...|
+--------------------+--------------------+



# Réduction de dimension , PCA (API saprkMLib):


In [43]:
from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors, VectorUDT
#convert array to vecteur dense
to_vector = udf(lambda x: Vectors.dense(x), VectorUDT())
sparkDF = images_df.select('image','features', to_vector("features").alias("features_vec"))

In [44]:
sparkDF.show(2)

+--------------------+--------------------+--------------------+
|               image|            features|        features_vec|
+--------------------+--------------------+--------------------+
|[gs://datastorage...|[3.52233266830444...|[3.52233266830444...|
|[gs://datastorage...|[3.61504364013671...|[3.61504364013671...|
+--------------------+--------------------+--------------------+
only showing top 2 rows



In [45]:
from pyspark.ml.feature import PCA
#https://calvinfeng.gitbook.io/machine-learning-notebook/sagemaker/population_segmentation
pcaSparkEstimator = PCA(inputCol="features_vec", outputCol="pca_Features", k=100)
pca = pcaSparkEstimator.fit(sparkDF)
pca_matrix=pca.transform(sparkDF)


In [46]:
pca_matrix.show(5)

+--------------------+--------------------+--------------------+--------------------+
|               image|            features|        features_vec|        pca_Features|
+--------------------+--------------------+--------------------+--------------------+
|[gs://datastorage...|[3.52233266830444...|[3.52233266830444...|[19.0452276147048...|
|[gs://datastorage...|[3.61504364013671...|[3.61504364013671...|[27.6882923256063...|
|[gs://datastorage...|[4.07907867431640...|[4.07907867431640...|[-2.4947937234883...|
|[gs://datastorage...|[4.30316352844238...|[4.30316352844238...|[10.4793721015670...|
|[gs://datastorage...|[3.46667790412902...|[3.46667790412902...|[26.2998957087526...|
+--------------------+--------------------+--------------------+--------------------+



# Sauvegarde des résultats sur le bucket

In [54]:
pca_matrix.toPandas().to_csv("pca_matrix.csv")

In [55]:
BUCKET_NAME = "datastorageocp8"

bucket = storage_client.get_bucket(BUCKET_NAME)
filename = "%s%s" % ('',"pca_matrix.csv")
blob = bucket.blob(filename)
with open("pca_matrix.csv", 'rb') as f:
    blob.upload_from_file(f)
    
    print("csv Uploaded : ")
    

csv Uploaded : 
