In [1]:
import findspark
findspark.init()
findspark.find()
#print(findspark.find())

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.ml.image import ImageSchema
from pyspark.sql.functions import input_file_name
from pyspark.sql.functions import udf
from pyspark.sql.types import *
import pandas as pd
import numpy as np
import os
#from fnmatch import fnmatch
from PIL import Image
import time, datetime

from pyspark.ml.feature import PCA
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors, VectorUDT, DenseVector
from pyspark.sql import Row

import boto3

In [3]:
s3 = True

root = r'C:\Users\nisae\OneDrive\Documents\DataScientist\P8_Poitier_Nicolas\archive\fruits-360_dataset\fruits-360\Training'


region_name="eu-west-3"
bucket_name = 'p8imagestrain'
root_s3 = 'images'

folder = "s3://"+bucket_name+"/"+root_s3+"/"

# connection s3 et création root_s3
def load_aws_key():
    '''charge ID/KEY de AWS dans variables d'environnement si définies sinon dans le fichier keys.txt'''
    with open('keys.txt','r') as f:
        msg = f.read()
    ID = str(msg).split('\n')[0]
    KEY = msg.split('\n')[1]
    #print(ID,KEY)
    os.environ["AWS_ACCESS_KEY_ID"]=ID
    os.environ["AWS_SECRET_ACCESS_KEY"]=KEY
    return ID, KEY
load_aws_key()


session = boto3.session.Session(aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
                                        aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"]                                
                               )
role_session_name = "test-s3"
role_arn = "arn:aws:iam::660870459774:role/test_Nisaesno"

s3_client = session.client(
        service_name='s3', 
        region_name=region_name
    
    )

sts_client = session.client(
        service_name='sts', 
        region_name=region_name    
    ).assume_role(
        RoleArn=role_arn,
        RoleSessionName=role_session_name
    )
credentials = sts_client['Credentials']
test = credentials.get("SessionToken")



In [14]:
def time_value(temps):
    h = 0
    mn = 0
    sec = temps % 60
    mn = temps // 60
    if mn >= 60 :
        h = mn // 60
        mn = mn % 60
    return "{}:{}:{}".format(int(h),int(mn),sec)

In [15]:
    sc = SparkContext.getOrCreate()
    sc.setLogLevel('WARN')
    spark = SparkSession.builder.appName("name").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/11/07 14:21:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [16]:
def image_data(img_path):
    # get object et redim avec Pil
    if s3:
        img_path = img_path.replace("s3://"+bucket_name+"/", "")
        s3t = boto3.resource("s3", region_name=region_name)
        bucket = s3t.Bucket(bucket_name)
        object = bucket.Object(img_path)
        response = object.get()
        img_path = response['Body']
    
    image = Image.open(img_path)
    image = image.resize((25, 25))
    imagevec = np.asarray(image).flatten().tolist()
    return imagevec

In [17]:
def extract_categ(path):
    path = path.replace("/","\\")
    list_file = path.split("\\")
    categ = list_file[-2]    
    return categ

In [18]:
def load_Dataframe(root):
    data = []
    # connection s3 & chemins de fichiers
    if s3:        
        sub_folders = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=root_s3)
        for key in sub_folders["Contents"]:
            file = key["Key"]
            # print(file)
            file = file.replace(root_s3 + "/", "")
            if  file.find('.jpg') != -1:
                data.append(folder + file)
    else:
        for path, subdirs, files in os.walk(root):
            for name in files:
                if fnmatch(name, pattern):
                    img_path= os.path.join(path, name)                                
                    data.append(img_path)
                
                
    # Création d'un RDD à partit de la liste des chemins d'accès aux images
    rdd = sc.parallelize(data)
    row_rdd = rdd.map(lambda x: Row(x))
    # Création d'un dataFrame pyspark à partir d'un RDD
    df = spark.createDataFrame(row_rdd, ["path"])
    
    udf_categ = udf(extract_categ, StringType())
    df = df.withColumn("categ", udf_categ('path'))
    
    udf_data = udf(image_data, ArrayType(IntegerType()))
    df = df.withColumn("data", udf_data('path'))
    
    ud_f = udf(lambda r: Vectors.dense(r), VectorUDT())
    df = df.withColumn('image_dense', ud_f('data'))    
    
    return df

In [19]:
debut = time.time()

root = r'C:\Users\nisae\OneDrive\Documents\DataScientist\P8_Poitier_Nicolas\archive\fruits-360_dataset\fruits-360\Training'
pattern = "*.jpg"

df = load_Dataframe(root)
df.show()
#df.write.parquet(path='results', mode='overwrite')


temps_d_execution = time.time()-debut
print("temps d'exécution ",time_value(temps_d_execution)," en secondes ",temps_d_execution)

[Stage 1:>                                                          (0 + 1) / 1]

+--------------------+--------------+--------------------+--------------------+
|                path|         categ|                data|         image_dense|
+--------------------+--------------+--------------------+--------------------+
|s3://p8imagestrai...|Apple_Braeburn|[254, 255, 255, 2...|[254.0,255.0,255....|
|s3://p8imagestrai...|Apple_Braeburn|[253, 254, 254, 2...|[253.0,254.0,254....|
|s3://p8imagestrai...|Apple_Braeburn|[253, 254, 254, 2...|[253.0,254.0,254....|
|s3://p8imagestrai...|Apple_Braeburn|[254, 254, 254, 2...|[254.0,254.0,254....|
|s3://p8imagestrai...|Apple_Braeburn|[252, 254, 252, 2...|[252.0,254.0,252....|
|s3://p8imagestrai...|Apple_Braeburn|[253, 254, 254, 2...|[253.0,254.0,254....|
|s3://p8imagestrai...|Apple_Braeburn|[255, 254, 254, 2...|[255.0,254.0,254....|
|s3://p8imagestrai...|Apple_Braeburn|[252, 255, 253, 2...|[252.0,255.0,253....|
|s3://p8imagestrai...|Apple_Braeburn|[253, 254, 254, 2...|[253.0,254.0,254....|
|s3://p8imagestrai...|Apple_Braeburn|[25

                                                                                

In [20]:
debut = time.time()
standardizer = StandardScaler(inputCol="image_dense", outputCol="scaledFeatures",
                                  withStd=True, withMean=True)
model_std = standardizer.fit(df)
df = model_std.transform(df)

# Entrainement de l'algorithme
n_components=50
pca = PCA(k=n_components, inputCol='scaledFeatures', outputCol='pcaFeatures')
model_pca = pca.fit(df)

# Transformation des images sur les k premières composantes
df = model_pca.transform(df)  

df.show()

temps_d_execution = time.time()-debut
print("temps d'exécution ",time_value(temps_d_execution)," en secondes ",temps_d_execution)

22/11/07 14:26:39 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/11/07 14:26:39 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/11/07 14:27:04 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
22/11/07 14:27:04 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
[Stage 10:>                                                         (0 + 1) / 1]

+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+
|                path|         categ|                data|         image_dense|      scaledFeatures|         pcaFeatures|
+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+
|s3://p8imagestrai...|Apple_Braeburn|[254, 255, 255, 2...|[254.0,255.0,255....|[-0.6571120881328...|[23.0010123601878...|
|s3://p8imagestrai...|Apple_Braeburn|[253, 254, 254, 2...|[253.0,254.0,254....|[-1.8943212370559...|[4.13462872721743...|
|s3://p8imagestrai...|Apple_Braeburn|[253, 254, 254, 2...|[253.0,254.0,254....|[-1.8943212370559...|[3.94216639184256...|
|s3://p8imagestrai...|Apple_Braeburn|[254, 254, 254, 2...|[254.0,254.0,254....|[-0.6571120881328...|[3.61615728793495...|
|s3://p8imagestrai...|Apple_Braeburn|[252, 254, 252, 2...|[252.0,254.0,252....|[-3.1315303859791...|[3.51636772888551...|
|s3://p8imagestrai...|Ap

                                                                                

In [26]:
debut = time.time()
new_df = df.select("path","categ","pcaFeatures")
new_df.show()
temps_d_execution = time.time()-debut
print("temps d'exécution ",time_value(temps_d_execution)," en secondes ",temps_d_execution)

[Stage 15:>                                                         (0 + 1) / 1]

+--------------------+--------------+--------------------+
|                path|         categ|         pcaFeatures|
+--------------------+--------------+--------------------+
|s3://p8imagestrai...|Apple_Braeburn|[23.0010123601878...|
|s3://p8imagestrai...|Apple_Braeburn|[4.13462872721743...|
|s3://p8imagestrai...|Apple_Braeburn|[3.94216639184256...|
|s3://p8imagestrai...|Apple_Braeburn|[3.61615728793495...|
|s3://p8imagestrai...|Apple_Braeburn|[3.51636772888551...|
|s3://p8imagestrai...|Apple_Braeburn|[3.22406923270634...|
|s3://p8imagestrai...|Apple_Braeburn|[3.11046513743157...|
|s3://p8imagestrai...|Apple_Braeburn|[3.06724582288118...|
|s3://p8imagestrai...|Apple_Braeburn|[3.19975526764517...|
|s3://p8imagestrai...|Apple_Braeburn|[2.85242114741134...|
|s3://p8imagestrai...|Apple_Braeburn|[2.85189221708245...|
|s3://p8imagestrai...|Apple_Braeburn|[17.7109297837460...|
|s3://p8imagestrai...|Apple_Braeburn|[2.65970576763842...|
|s3://p8imagestrai...|Apple_Braeburn|[2.63597610261637..

                                                                                

In [27]:
debut = time.time()
if s3:
    chemin_save = 's3a://'+ bucket_name + '/result/preprocessed_parquet'
else:
    chemin_save = 'preprocessed_parquet'
print(chemin_save)
new_df.write.format("parquet").mode('overwrite').save(chemin_save)
temps_d_execution = time.time()-debut
print("temps d'exécution ",time_value(temps_d_execution)," en secondes ",temps_d_execution)

s3a://p8imagestrain/result/preprocessed_parquet


22/11/07 14:36:22 WARN DAGScheduler: Broadcasting large task binary with size 1002.9 KiB
                                                                                

temps d'exécution  0:0:34.06857919692993  en secondes  34.06857919692993


In [28]:
debut = time.time()
df_parquet = spark.read.format('parquet').load(chemin_save)
df_parquet.show()
temps_d_execution = time.time()-debut
print("temps d'exécution ",time_value(temps_d_execution)," en secondes ",temps_d_execution)

+--------------------+------------------+--------------------+
|                path|             categ|         pcaFeatures|
+--------------------+------------------+--------------------+
|s3://p8imagestrai...|Apple_Crimson_Snow|[-26.468702948483...|
|s3://p8imagestrai...|Apple_Crimson_Snow|[-13.358247264670...|
|s3://p8imagestrai...|Apple_Crimson_Snow|[-13.653927582205...|
|s3://p8imagestrai...|Apple_Crimson_Snow|[-13.622335603041...|
|s3://p8imagestrai...|Apple_Crimson_Snow|[-13.927992678380...|
|s3://p8imagestrai...|Apple_Crimson_Snow|[-25.985058139381...|
|s3://p8imagestrai...|Apple_Crimson_Snow|[-14.159648916978...|
|s3://p8imagestrai...|Apple_Crimson_Snow|[-14.566198228096...|
|s3://p8imagestrai...|Apple_Crimson_Snow|[-14.611084101068...|
|s3://p8imagestrai...|Apple_Crimson_Snow|[-15.154419570234...|
|s3://p8imagestrai...|Apple_Crimson_Snow|[-15.183692560593...|
|s3://p8imagestrai...|Apple_Crimson_Snow|[-15.400038299591...|
|s3://p8imagestrai...|Apple_Crimson_Snow|[-15.699666202