In [1]:
!pip install pyquickhelper --quiet
!pip install PIP
!pip install python-resize-image
!pip install opencv-python
!pip install pyspark


Collecting python-resize-image
  Downloading python_resize_image-1.1.19-py2.py3-none-any.whl (8.4 kB)
Installing collected packages: python-resize-image
Successfully installed python-resize-image-1.1.19
Collecting opencv-python
  Downloading opencv_python-4.5.1.48-cp38-cp38-manylinux2014_x86_64.whl (50.4 MB)
[K     |████████████████████████████████| 50.4 MB 90.4 MB/s eta 0:00:01
Installing collected packages: opencv-python
Successfully installed opencv-python-4.5.1.48


In [5]:
!conda list 

# packages in environment at /opt/conda/miniconda3:
#
# Name                    Version                   Build  Channel
_libgcc_mutex             0.1                 conda_forge    conda-forge
_openmp_mutex             4.5                      1_llvm    conda-forge
_r-mutex                  1.0.1               anacondar_1    conda-forge
abseil-cpp                20200923.3           h9c3ff4c_0    conda-forge
aiohttp                   3.7.4            py38h497a2fe_0    conda-forge
alabaster                 0.7.12                     py_0    conda-forge
alembic                   1.5.5              pyhd8ed1ab_0    conda-forge
ansiwrap                  0.8.4                      py_0    conda-forge
anyio                     2.2.0            py38h578d9bd_0    conda-forge
appdirs                   1.4.4              pyh9f0ad1d_0    conda-forge
argh                      0.26.2          pyh9f0ad1d_1002    conda-forge
argon2-cffi               20.1.0           py38h497a2fe_2    c

In [6]:
!conda install opencv

Collecting package metadata (current_repodata.json): done
Solving environment: done

# All requested packages already installed.



In [116]:
##################
### PARAMETRES ###
##################

BUCKET = "p8_fruit"
OPENCV = 'sift' # Méthode de calcul des descripteurs ('sift' ou 'orb')

rs_ = 42 # Random state

# Format des titres
def title(title,level):
    if level == 1:
        print('\n'+'='*len(title),'\n'+title,'\n'+'='*len(title)+'\n')
    elif level == 2:
        print('\n'+title,'\n'+'='*len(title)+'\n')
    elif level == 3:
        print(title,'\n'+'-'*len(title)+'\n')

        
#############################
### IMPORT DES LIBRAIRIES ###
#############################


title("> > > > Import des librairies < < < <",1)

# Librairies système
import io
from io import StringIO
import os
import sys
import ast
from pyquickhelper.filehelper import remove_folder
import time
from time import strftime, gmtime

# Traitement des données
import pandas as pd
import numpy as np
from sklearn import preprocessing

# Traitement des images
from PIL import Image
#from resizeimage import resizeimage
import resizeimage
import cv2

# Librairies Pyspark
from pyspark import SparkContext
from pyspark.ml.feature import PCA
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import StringIndexer
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import StructType, StructField, LongType, ArrayType, IntegerType
from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.mllib.clustering import KMeans
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Librairies GCP
from google.cloud import storage

################################
### DEFINITION DES FONCTIONS ###
################################


title("> > > > Définition des fonctions < < < <",1)

list_elapsed = []
list_elapsed_mem = [] # Mémorisation d'un résultat
list_ope = []

def get_blob(name):
    storage_client = storage.Client()
    bucket = storage_client.bucket(BUCKET)
    return bucket.blob(name)

def give_categ(Blob):
    name = Blob.name
    return name.split("/")[-2]

def get_df_link(bucket_name, prefix, delimiter=None):
    lst_name=[]
    lst_link=[]
    lst_blob=[]
    lst_categ=[]
    
    storage_client = storage.Client()

    # Note: Client.list_blobs requires at least package version 1.17.0.
    blobs = storage_client.list_blobs(
        bucket_name, prefix=prefix, delimiter=delimiter
    )
    

    for blob in blobs:
        lst_name.append(blob.name)
        lst_link.append(blob.self_link)
        lst_blob.append(blob)
        lst_categ.append(give_categ(blob))
    
    d = {'Name':lst_name,'Link':lst_link,'Categories':lst_categ,'Blob':lst_blob}
    df_pandas = pd.DataFrame(d)
    df_spark = spark.createDataFrame(df_pandas[['Name', 'Link', 'Categories']] )
    return(df_pandas, df_spark)

def download(Blob):
    return Blob.download_as_string(raw_download=True)

def image_to_list(Name):
    try:
        Blob = get_blob(Name)
        img = Blob.download_as_string(raw_download=True)
        image_img = Image.open(io.BytesIO(img))
        image = np.asarray(image_img).flatten().tolist()
        #image = Image.open(download(Blob))
        #image = np.asarray(image)
        #image = image.flatten().tolist()
        return image
    except:
        return[0]
    
def get_desc(Name):

    image = np.asarray(bytearray(get_blob(Name).download_as_string()), dtype="uint8")
    image = cv2.imdecode(image, cv2.IMREAD_UNCHANGED)
    sift = cv2.SIFT_create()
    keypoints, desc = sift.detectAndCompute(image, None)

    if desc is None:

        desc = 0
    else:
        desc = desc.astype(int).flatten().tolist()

    return desc



> > > > Import des librairies < < < < 


> > > > Définition des fonctions < < < < 



In [117]:
get_desc("Test2/Apple Red Yellow 2/106_100.jpg")

[31,
 2,
 0,
 0,
 0,
 0,
 0,
 5,
 161,
 33,
 0,
 0,
 0,
 0,
 0,
 5,
 178,
 80,
 0,
 0,
 0,
 0,
 0,
 0,
 26,
 7,
 0,
 0,
 0,
 0,
 0,
 0,
 35,
 0,
 0,
 0,
 0,
 0,
 0,
 18,
 150,
 23,
 0,
 1,
 1,
 0,
 0,
 8,
 178,
 76,
 0,
 0,
 0,
 0,
 0,
 10,
 53,
 15,
 0,
 0,
 0,
 0,
 0,
 2,
 24,
 0,
 0,
 3,
 2,
 0,
 0,
 19,
 158,
 5,
 1,
 8,
 5,
 0,
 0,
 15,
 178,
 11,
 0,
 0,
 0,
 0,
 0,
 69,
 52,
 3,
 0,
 0,
 0,
 0,
 0,
 15,
 8,
 1,
 1,
 7,
 4,
 0,
 0,
 7,
 163,
 2,
 1,
 10,
 5,
 0,
 0,
 28,
 178,
 0,
 0,
 0,
 0,
 0,
 0,
 63,
 27,
 0,
 0,
 0,
 0,
 0,
 0,
 6,
 81,
 20,
 0,
 0,
 0,
 0,
 0,
 1,
 199,
 59,
 0,
 0,
 0,
 0,
 0,
 0,
 98,
 14,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 81,
 8,
 0,
 0,
 0,
 1,
 3,
 7,
 199,
 31,
 0,
 0,
 0,
 0,
 0,
 4,
 165,
 8,
 0,
 0,
 0,
 0,
 0,
 4,
 1,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 60,
 1,
 0,
 0,
 0,
 4,
 5,
 15,
 199,
 8,
 0,
 0,
 0,
 0,
 0,
 26,
 166,
 3,
 0,
 0,
 0,
 0,
 0,
 7,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 48,
 0,
 0,
 0,
 2,
 7,
 2,
 15,
 199,
 0,
 0

In [110]:
sc =SparkContext()
spark = SparkSession(sc)

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=PySparkShell, master=yarn) created by <module> at /opt/conda/miniconda3/lib/python3.8/site-packages/IPython/utils/py3compat.py:168 

In [104]:
df_pandas

Unnamed: 0,Name,Link,Categories,Blob
0,Test2/Apple Red Yellow 2/106_100.jpg,https://www.googleapis.com/storage/v1/b/p8_fru...,Apple Red Yellow 2,"<Blob: p8_fruit, Test2/Apple Red Yellow 2/106_..."
1,Test2/Apple Red Yellow 2/107_100.jpg,https://www.googleapis.com/storage/v1/b/p8_fru...,Apple Red Yellow 2,"<Blob: p8_fruit, Test2/Apple Red Yellow 2/107_..."
2,Test2/Apple Red Yellow 2/108_100.jpg,https://www.googleapis.com/storage/v1/b/p8_fru...,Apple Red Yellow 2,"<Blob: p8_fruit, Test2/Apple Red Yellow 2/108_..."
3,Test2/Apple Red Yellow 2/109_100.jpg,https://www.googleapis.com/storage/v1/b/p8_fru...,Apple Red Yellow 2,"<Blob: p8_fruit, Test2/Apple Red Yellow 2/109_..."
4,Test2/Apple Red Yellow 2/110_100.jpg,https://www.googleapis.com/storage/v1/b/p8_fru...,Apple Red Yellow 2,"<Blob: p8_fruit, Test2/Apple Red Yellow 2/110_..."
...,...,...,...,...
521,Test2/Avocado/r_96_100.jpg,https://www.googleapis.com/storage/v1/b/p8_fru...,Avocado,"<Blob: p8_fruit, Test2/Avocado/r_96_100.jpg, 1..."
522,Test2/Avocado/r_97_100.jpg,https://www.googleapis.com/storage/v1/b/p8_fru...,Avocado,"<Blob: p8_fruit, Test2/Avocado/r_97_100.jpg, 1..."
523,Test2/Avocado/r_98_100.jpg,https://www.googleapis.com/storage/v1/b/p8_fru...,Avocado,"<Blob: p8_fruit, Test2/Avocado/r_98_100.jpg, 1..."
524,Test2/Avocado/r_99_100.jpg,https://www.googleapis.com/storage/v1/b/p8_fru...,Avocado,"<Blob: p8_fruit, Test2/Avocado/r_99_100.jpg, 1..."


In [122]:
#descripteur
udf_desc= udf(get_desc, ArrayType(IntegerType()))
df_spark = df_spark.withColumn("descriptors", udf_desc("Name"))
df_spark = df_spark.filter(df_spark.descriptors.isNotNull())

#image
udf_image = udf(image_to_list, ArrayType(IntegerType()))
df_spark = df_spark.withColumn("image", udf_image(df_spark.Name))
df_spark= df_spark.filter(df_spark.image.isNotNull())

df_spark.show()

+--------------------+--------------------+------------------+--------------------+--------------------+
|                Name|                Link|        Categories|         descriptors|               image|
+--------------------+--------------------+------------------+--------------------+--------------------+
|Test2/Apple Red Y...|https://www.googl...|Apple Red Yellow 2|[31, 2, 0, 0, 0, ...|[255, 255, 255, 2...|
|Test2/Apple Red Y...|https://www.googl...|Apple Red Yellow 2|[87, 25, 3, 1, 7,...|[255, 255, 255, 2...|
|Test2/Apple Red Y...|https://www.googl...|Apple Red Yellow 2|[71, 62, 0, 0, 0,...|[255, 255, 255, 2...|
|Test2/Apple Red Y...|https://www.googl...|Apple Red Yellow 2|[0, 16, 143, 28, ...|[255, 255, 255, 2...|
|Test2/Apple Red Y...|https://www.googl...|Apple Red Yellow 2|[0, 25, 144, 19, ...|[255, 255, 255, 2...|
|Test2/Apple Red Y...|https://www.googl...|Apple Red Yellow 2|[104, 41, 7, 2, 4...|[255, 255, 255, 2...|
|Test2/Apple Red Y...|https://www.googl...|Apple Red Ye

In [123]:
df_spark.write.format("parquet").partitionBy("Categories").save("gs://"+ BUCKET +"/my_table")

In [33]:
def get_methods(object, spacing=20):
  methodList = []
  for method_name in dir(object):
    try:
        if callable(getattr(object, method_name)):
            methodList.append(str(method_name))
    except:
        methodList.append(str(method_name))
  processFunc = (lambda s: ' '.join(s.split())) or (lambda s: s)
  for method in methodList:
    try:
        print(str(method.ljust(spacing)) + ' ' +
              processFunc(str(getattr(object, method).__doc__)[0:90]))
    except:
        print(method.ljust(spacing) + ' ' + ' getattr() failed')
        
get_methods(df_pandas.Blob[1])

__class__            A wrapper around Cloud Storage's concept of an ``Object``. :type name: str :param
__delattr__          Implement delattr(self, name).
__dir__              Default dir() implementation.
__eq__               Return self==value.
__format__           Default object formatter.
__ge__               Return self>=value.
__getattribute__     Return getattr(self, name).
__gt__               Return self>value.
__hash__             Return hash(self).
__init__             property :attr:`name` Get the blob's name.
__init_subclass__    This method is called when a class is subclassed. The default implementation does nothing
__le__               Return self<=value.
__lt__               Return self<value.
__ne__               Return self!=value.
__new__              Create and return a new object. See help(type) for accurate signature.
__reduce__           Helper for pickle.
__reduce_ex__        Helper for pickle.
__repr__             None
__setattr__          Implement setattr(se

In [20]:
get_blob("Test2/Apple Red Yellow 2/106_100.jpg")

'Test2/Apple Red Yellow 2/106_100.jpg'