FRUITS wants to develop a mobile application that would allow users to take a picture of a fruit and obtain information about this fruit. This application would make it possible to set up a first version of the fruit image classification engine.

Our mission is therefore to develop in a Big Data (AWS) environment a first data processing chain which will include preprocessing and a dimension reduction step (PCA).

In [2]:
# spark initialisation
import findspark
findspark.init("/home/bitnami/spark-3.0.0-bin-hadoop3.2")
import pyspark
import pyarrow
# context & session
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
# usefull packages
import pandas as pd
import numpy as np
import random
import time
import os
# deal with image
from PIL import Image
# data handling
from pyspark.sql.functions import element_at, split, col,size
from pyspark.sql.functions import pandas_udf, PandasUDFType, udf
from pyspark.sql.types import *
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql import Window
from typing import Iterator
# ml tasks
from pyspark.ml.image import ImageSchema
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import PCA
# transform
from pyspark.ml.linalg import Vectors, VectorUDT
# core featurizer
import tensorflow as tf
from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array, load_img
import io
from pyspark.ml.linalg import DenseVector, VectorUDT

In [3]:
import os
import boto3
import pickle

In [4]:
with open('credential_pickle.pickle', 'rb') as file:
    credential = pickle.Unpickler(file).load()

### Spark configuration

In [5]:
conf=SparkConf()
conf.set('spark.jars.packages','org.apache.hadoop:hadoop-aws:3.2.0')
conf.set('spark.executor.extraJavaOptions','-Dcom.amazonaws.services.s3.enableV4=true')
conf.set('spark.driver.extraJavaOptions','-Dcom.amazonaws.services.s3.enableV4=true')
conf.set("spark.hadoop.fs.s3a.aws.access.key", credential['AWS_ACCESS_KEY_ID'])
conf.set("spark.hadoop.fs.s3a.aws.secret.key", credential['AWS_SECRET_ACCESS_KEY'])
conf.set('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
conf.set('fs.s3a.endpoint', "s3-eu-west-3.amazonaws.com")

<pyspark.conf.SparkConf at 0x7f0bb94c4cd0>

In [6]:
spark = SparkSession.builder.config(conf=conf).appName('Projet8_new').getOrCreate()
spark._sc.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")

22/12/06 19:41:38 WARN Utils: Your hostname, ip-172-31-45-74 resolves to a loopback address: 127.0.0.1; using 172.31.45.74 instead (on interface eth0)
22/12/06 19:41:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /home/bitnami/.ivy2/cache
The jars for the packages stored in: /home/bitnami/.ivy2/jars
:: loading settings :: url = jar:file:/home/bitnami/spark-3.0.0-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f09c3ee6-30ee-4512-86f9-01712fdccea9;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.2.0 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.375 in central
:: resolution report :: resolve 521ms :: artifacts dl 10ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.375 from central in [default]
	org.apache.hadoop#hadoop-aws;3.2.0 from central in [default]
	--

In [7]:
spark

### AWS S3 

In [8]:
s3 = boto3.resource('s3')

Let's define a function to display the present s3 buckets:

In [9]:
def list_buckets(ressource):
    """Print all existing buckets names"""
    for bucket in ressource.buckets.all():
        print(bucket.name)

Show the buckets present in S3:

In [10]:
list_buckets(s3)

test-bucket-lemishkot


## Pre-processing of the images

### Load categories of the fruits

In [11]:
def list_categories(bucket, prefix):
    """Function to create a list of all directories containing the image."""
    client=boto3.client('s3')
    sub_folders =[]
    result=client.list_objects(Bucket=bucket ,Prefix=prefix, Delimiter='/')
    for o in result.get('CommonPrefixes'):
        sub_folders.append(list(o.get('Prefix').split(" ")))
    print("Number of images's category is:", len(sub_folders))
    return sub_folders

In [12]:
bucket='test-bucket-lemishkot'
prefix="sample/"
list_cat = list_categories(bucket, prefix)
list_cat

Number of images's category is: 5


[['sample/Apricot/'],
 ['sample/Avocado/'],
 ['sample/Banana/'],
 ['sample/Blueberry/'],
 ['sample/Strawberry/']]

**Test - Random choice of a fruit category:**

In [13]:
fruit_category = random.choice(list_cat)[0]
fruit_category

'sample/Banana/'

### Loading of the images and extraction of the labels

S3 path of the images of the chosen category:

In [14]:
path = "s3a://test-bucket-lemishkot/" + fruit_category
path

's3a://test-bucket-lemishkot/sample/Banana/'

In [15]:
images_df = spark.read.format("binaryFile").option("pathGlobFilter", "*.jpg").load(path).cache()
images_df = images_df.withColumn('label', element_at(split(col('path'), '/'),-2))
images_df.printSchema()
images_df.show(5)

22/12/06 19:41:47 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)



                                                                                

+--------------------+-------------------+------+--------------------+------+
|                path|   modificationTime|length|             content| label|
+--------------------+-------------------+------+--------------------+------+
|s3a://test-bucket...|2022-11-28 14:18:56|  3643|[FF D8 FF E0 00 1...|Banana|
|s3a://test-bucket...|2022-11-28 14:18:57|  3583|[FF D8 FF E0 00 1...|Banana|
|s3a://test-bucket...|2022-11-28 14:18:57|  3574|[FF D8 FF E0 00 1...|Banana|
|s3a://test-bucket...|2022-11-28 14:18:57|  3484|[FF D8 FF E0 00 1...|Banana|
|s3a://test-bucket...|2022-11-28 14:18:57|  3425|[FF D8 FF E0 00 1...|Banana|
+--------------------+-------------------+------+--------------------+------+
only showing top 5 rows



### Creation of image features via CNN Transfer Learning (VGG16)

In [16]:
model = VGG16(
    include_top=False,
    weights=None,
    pooling='max',
    input_shape=(100, 100, 3))

2022-12-06 19:42:03.592748: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2022-12-06 19:42:03.597250: W tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:265] failed call to cuInit: UNKNOWN ERROR (303)
2022-12-06 19:42:03.600428: I tensorflow/compiler/xla/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (ip-172-31-45-74): /proc/driver/nvidia/version does not exist
2022-12-06 19:42:03.627110: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-12-06 19:42:04.717842: W tensorflow/tsl/framework/cpu_allocator

In [17]:
model.summary()

Model: "vgg16"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 100, 100, 3)]     0         
                                                                 
 block1_conv1 (Conv2D)       (None, 100, 100, 64)      1792      
                                                                 
 block1_conv2 (Conv2D)       (None, 100, 100, 64)      36928     
                                                                 
 block1_pool (MaxPooling2D)  (None, 50, 50, 64)        0         
                                                                 
 block2_conv1 (Conv2D)       (None, 50, 50, 128)       73856     
                                                                 
 block2_conv2 (Conv2D)       (None, 50, 50, 128)       147584    
                                                                 
 block2_pool (MaxPooling2D)  (None, 25, 25, 128)       0     

In [18]:
# Function for the extractions of the features in the form of list, which takes as the arguments the 
# fruit cathegory (see line 13) and the bucket
def Featurizer(category,bucket):
    """Function to load the images of the chosen category
    and extraction of features from images via the pre-trained model VGG16"""
    start = time.perf_counter()
    feat = []
    for file in bucket.objects.filter(Prefix=category):
        #load image
        obj = bucket.Object(file.key)
        label = file.key.split('/')[-2]
        response = obj.get()
        file_stream = response['Body']
        img = Image.open(file_stream)
        # convert image to flatten array
        flat_array = np.array(img).ravel().tolist()
        tensor = np.array(flat_array).reshape(1, 100, 100, 3).astype(np.uint8)
        #  vgg 16 preprocess input
        prep_tensor = preprocess_input(tensor)
        # vgg16 extraction features
        features = model.predict(prep_tensor).ravel().tolist()
        # Store file key, flat array and features
        feat.append((features))
    stop = time.perf_counter()
    print(f'process, elapsed time: {stop - start:0.2f}s')
    return feat

In [19]:
fruits_bucket = s3.Bucket('test-bucket-lemishkot')
features = Featurizer(fruit_category,fruits_bucket)

process, elapsed time: 4.73s


In [20]:
# Build a new column in a dataframe with the extracted features
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql import Window

b = spark.createDataFrame([(l,) for l in features], ['features'])
images_df = images_df.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))
b = b.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))

images_df = images_df.join(b, images_df.row_idx == b.row_idx).\
             drop("row_idx")
images_df.show()

22/12/06 19:42:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/06 19:42:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

+--------------------+-------------------+------+--------------------+------+--------------------+
|                path|   modificationTime|length|             content| label|            features|
+--------------------+-------------------+------+--------------------+------+--------------------+
|s3a://test-bucket...|2022-11-28 14:18:56|  3643|[FF D8 FF E0 00 1...|Banana|[0.18677103519439...|
|s3a://test-bucket...|2022-11-28 14:18:57|  3583|[FF D8 FF E0 00 1...|Banana|[0.21086037158966...|
|s3a://test-bucket...|2022-11-28 14:18:57|  3574|[FF D8 FF E0 00 1...|Banana|[0.22115191817283...|
|s3a://test-bucket...|2022-11-28 14:18:57|  3484|[FF D8 FF E0 00 1...|Banana|[0.19759744405746...|
|s3a://test-bucket...|2022-11-28 14:18:57|  3425|[FF D8 FF E0 00 1...|Banana|[0.19242888689041...|
|s3a://test-bucket...|2022-11-28 14:18:58|  3408|[FF D8 FF E0 00 1...|Banana|[0.19521124660968...|
|s3a://test-bucket...|2022-11-28 14:18:58|  3348|[FF D8 FF E0 00 1...|Banana|[0.16757418215274...|
|s3a://tes

### Dimension reduction via PCA

**Operation that transforms feature lists into vectors**

In [21]:
array_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())

**Convert features into vectors**

In [22]:
df_vectors = images_df.withColumn('vectors', array_to_vector_udf('features'))

In [23]:
df_vectors.show(5, True)

22/12/06 19:42:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/06 19:42:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 8:>                                                          (0 + 1) / 1]

+--------------------+-------------------+------+--------------------+------+--------------------+--------------------+
|                path|   modificationTime|length|             content| label|            features|             vectors|
+--------------------+-------------------+------+--------------------+------+--------------------+--------------------+
|s3a://test-bucket...|2022-11-28 14:18:56|  3643|[FF D8 FF E0 00 1...|Banana|[0.18677103519439...|[0.18677103519439...|
|s3a://test-bucket...|2022-11-28 14:18:57|  3583|[FF D8 FF E0 00 1...|Banana|[0.21086037158966...|[0.21086037158966...|
|s3a://test-bucket...|2022-11-28 14:18:57|  3574|[FF D8 FF E0 00 1...|Banana|[0.22115191817283...|[0.22115191817283...|
|s3a://test-bucket...|2022-11-28 14:18:57|  3484|[FF D8 FF E0 00 1...|Banana|[0.19759744405746...|[0.19759744405746...|
|s3a://test-bucket...|2022-11-28 14:18:57|  3425|[FF D8 FF E0 00 1...|Banana|[0.19242888689041...|[0.19242888689041...|
+--------------------+------------------

                                                                                

**PCA dimension reduction model**

In [24]:
pca = PCA(k=10, inputCol='vectors', outputCol='pca_vectors')
model_pca = pca.fit(df_vectors)

22/12/06 19:42:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/06 19:42:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/06 19:42:27 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/12/06 19:42:27 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
22/12/06 19:42:27 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
22/12/06 19:42:27 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK


In [25]:
# applying the model
df_pca = model_pca.transform(df_vectors)

One should use the function limit in order to improve the visualization

In [27]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df_pca.limit(5)

22/12/06 19:42:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/06 19:42:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/06 19:42:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/06 19:42:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

path,modificationTime,length,content,label,features,vectors,pca_vectors
s3a://test-bucket...,2022-11-28 14:18:56,3643,[FF D8 FF E0 00 1...,Banana,[0.18677103519439...,[0.18677103519439...,[1.80532790014871...
s3a://test-bucket...,2022-11-28 14:18:57,3583,[FF D8 FF E0 00 1...,Banana,[0.21086037158966...,[0.21086037158966...,[1.76155586971014...
s3a://test-bucket...,2022-11-28 14:18:57,3574,[FF D8 FF E0 00 1...,Banana,[0.22115191817283...,[0.22115191817283...,[1.80613388469232...
s3a://test-bucket...,2022-11-28 14:18:57,3484,[FF D8 FF E0 00 1...,Banana,[0.19759744405746...,[0.19759744405746...,[1.84911991928906...
s3a://test-bucket...,2022-11-28 14:18:57,3425,[FF D8 FF E0 00 1...,Banana,[0.19242888689041...,[0.19242888689041...,[1.92510207279949...


In [28]:
# Inverse Transformation (from Vectors to Arrays)
vector_to_array_udf = udf(lambda v: v.toArray().tolist(), ArrayType(FloatType()))

In [29]:
final_df = df_pca.withColumn('pca_features', vector_to_array_udf('pca_vectors'))

In [30]:
final_df.limit(5)

22/12/06 19:42:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/06 19:42:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/06 19:42:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/06 19:42:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

path,modificationTime,length,content,label,features,vectors,pca_vectors,pca_features
s3a://test-bucket...,2022-11-28 14:18:56,3643,[FF D8 FF E0 00 1...,Banana,[0.18677103519439...,[0.18677103519439...,[1.80532790014871...,"[1.8053279, -0.10..."
s3a://test-bucket...,2022-11-28 14:18:57,3583,[FF D8 FF E0 00 1...,Banana,[0.21086037158966...,[0.21086037158966...,[1.76155586971014...,"[1.7615559, -0.14..."
s3a://test-bucket...,2022-11-28 14:18:57,3574,[FF D8 FF E0 00 1...,Banana,[0.22115191817283...,[0.22115191817283...,[1.80613388469232...,"[1.8061339, -0.18..."
s3a://test-bucket...,2022-11-28 14:18:57,3484,[FF D8 FF E0 00 1...,Banana,[0.19759744405746...,[0.19759744405746...,[1.84911991928906...,"[1.8491199, -0.23..."
s3a://test-bucket...,2022-11-28 14:18:57,3425,[FF D8 FF E0 00 1...,Banana,[0.19242888689041...,[0.19242888689041...,[1.92510207279949...,"[1.9251021, -0.32..."


### Save resulting file to S3

In [31]:
# saving the final df in parquet format into S3 bucket
final_df.write.format("parquet").mode('overwrite').save('s3a://test-bucket-lemishkot/results_parquet')

22/12/06 19:42:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/12/06 19:42:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

### Read and charge the resulting file

In [32]:
final_df_parquet = spark.read.format('parquet').load('s3a://test-bucket-lemishkot/results_parquet')

In [33]:
final_df_parquet.limit(5)

path,modificationTime,length,content,label,features,vectors,pca_vectors,pca_features
s3a://test-bucket...,2022-11-28 14:18:56,3643,[FF D8 FF E0 00 1...,Banana,[0.18677103519439...,[0.18677103519439...,[1.80532790014871...,"[1.8053279, -0.10..."
s3a://test-bucket...,2022-11-28 14:18:57,3583,[FF D8 FF E0 00 1...,Banana,[0.21086037158966...,[0.21086037158966...,[1.76155586971014...,"[1.7615559, -0.14..."
s3a://test-bucket...,2022-11-28 14:18:57,3574,[FF D8 FF E0 00 1...,Banana,[0.22115191817283...,[0.22115191817283...,[1.80613388469232...,"[1.8061339, -0.18..."
s3a://test-bucket...,2022-11-28 14:18:57,3484,[FF D8 FF E0 00 1...,Banana,[0.19759744405746...,[0.19759744405746...,[1.84911991928906...,"[1.8491199, -0.23..."
s3a://test-bucket...,2022-11-28 14:18:57,3425,[FF D8 FF E0 00 1...,Banana,[0.19242888689041...,[0.19242888689041...,[1.92510207279949...,"[1.9251021, -0.32..."
