# Configuration
**Make Spark available in Jupyter notebook**

In [2]:
import findspark
findspark.init()
import pyspark

**Enable access to s3 data from Spark**

In [2]:
# In order to be able to read data via S3A we need a couple of dependencies / 
# we need to make sure the hadoop-aws and aws-java-sdk packages are available when we load spark:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:3.3.1 pyspark-shell"

In [3]:
# We need the aws credentials in order to be able to access the s3 bucket. 
# We can use the configparser package to read the credentials from the standard aws file.
import configparser
config = configparser.ConfigParser()
config.read(os.path.expanduser("~/.aws/credentials"))
aws_profile = 'default'
access_id = config.get(aws_profile, "aws_access_key_id") 
access_key = config.get(aws_profile, "aws_secret_access_key")

os.environ["AWS_ACCESS_KEY_ID"] = access_id
os.environ["AWS_SECRET_ACCESS_KEY"] = access_key

**Initiate a spark session**

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("Fruits").getOrCreate()

22/01/13 13:10:56 WARN Utils: Your hostname, LAPTOP-87B7CMDN resolves to a loopback address: 127.0.1.1; using 172.27.249.164 instead (on interface eth0)
22/01/13 13:10:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/mnt/c/Users/user/Ubuntu/Programs/spark-3.2.0-bin-hadoop3.2/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/mwary/.ivy2/cache
The jars for the packages stored in: /home/mwary/.ivy2/jars
com.amazonaws#aws-java-sdk-pom added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-4719a0ef-6016-44c6-b6c9-4bbe59ee13d4;1.0
	confs: [default]
	found com.amazonaws#aws-java-sdk-pom;1.10.34 in central
	found org.apache.hadoop#hadoop-aws;3.3.1 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.901 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 290ms :: artifacts dl 7ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.901 from central in [default]
	com.amazonaws#aws-java-sdk-pom;1.10.34 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.1 from central in [default]
	org.wildfly.openssl#wildfly-openssl;1.0.7.Final from central in [default]
	---------------------------------------------------------------------
	|

In [11]:
spark

# Test: reading a piece of data from S3

(within S3 bucket: (1) a folder "Data" with 2 folders "Test" and "Training", and (2) a folder "Sample")

In [8]:
testpath = "s3a://mw-projet8/Sample/Apricot/3_100.jpg" # path to a specific image
testpath2 = "s3a://mw-projet8/Sample/Apricot" # path to a specific Sample folder containing a few images

In [9]:
image_df = spark.read.format("image").load(testpath2)

22/01/04 13:49:55 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


In [10]:
image_df.show()

                                                                                

+--------------------+
|               image|
+--------------------+
|{s3a://mw-projet8...|
|{s3a://mw-projet8...|
|{s3a://mw-projet8...|
|{s3a://mw-projet8...|
|{s3a://mw-projet8...|
+--------------------+



# Data processing

In [94]:
# additionnal librairies to be imported
from pyspark.sql.functions import split, udf
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import PCA

from keras.applications.vgg16 import VGG16, preprocess_input
from keras.preprocessing.image import load_img, img_to_array
#from tensorflow.keras.utils import load_img
#from keras.preprocessing.image import img_to_array

import numpy as np
import pandas as pd

import time

## On data stored on local machine

In [11]:
localtestpath = "/mnt/c/Users/user/Ubuntu/Projet8/Sample/*"

start_time = time.time()

# loading data
local_image_df = spark.read.format("image").load(localtestpath).limit(3).toDF('image') # need to limit the size to run the PCA (RAM)

# adding a column with image path
local_image_df = local_image_df.withColumn("path", local_image_df.image.origin)
local_image_df = local_image_df.withColumn("path", split(local_image_df.path, '//')[1])

# adding a column with class
local_image_df = local_image_df.withColumn("class", split(local_image_df.path, '/')[8]) # Don't know why, but [-2] returns 'null'


local_image_df.show()

duration = time.time() - start_time
print(duration)

+--------------------+--------------------+---------+
|               image|                path|    class|
+--------------------+--------------------+---------+
|{file:///mnt/c/Us...|/mnt/c/Users/user...|Raspberry|
|{file:///mnt/c/Us...|/mnt/c/Users/user...|Raspberry|
|{file:///mnt/c/Us...|/mnt/c/Users/user...|Raspberry|
+--------------------+--------------------+---------+

1.3302748203277588


In [12]:
# defining model to extract features from images
    # Chosen model = VGG16

model = VGG16(weights="imagenet", include_top=False, pooling='max', input_shape=(224, 224, 3))
print(model.summary())

2022-01-11 16:15:54.892235: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2022-01-11 16:15:54.892288: W tensorflow/stream_executor/cuda/cuda_driver.cc:269] failed call to cuInit: UNKNOWN ERROR (303)
2022-01-11 16:15:54.892306: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (LAPTOP-87B7CMDN): /proc/driver/nvidia/version does not exist
2022-01-11 16:15:54.892571: I tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


Model: "vgg16"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 224, 224, 3)]     0         
                                                                 
 block1_conv1 (Conv2D)       (None, 224, 224, 64)      1792      
                                                                 
 block1_conv2 (Conv2D)       (None, 224, 224, 64)      36928     
                                                                 
 block1_pool (MaxPooling2D)  (None, 112, 112, 64)      0         
                                                                 
 block2_conv1 (Conv2D)       (None, 112, 112, 128)     73856     
                                                                 
 block2_conv2 (Conv2D)       (None, 112, 112, 128)     147584    
                                                                 
 block2_pool (MaxPooling2D)  (None, 56, 56, 128)       0     

In [13]:
# broadcasting the model for faster runs
sc = spark.sparkContext
#sc.broadcast(model) # works with: return Vectors.dense(model.predict(image).ravel().tolist())
model_bc = sc.broadcast(model) # works with: return Vectors.dense(model_bc.value.predict(image).ravel().tolist())



2022-01-11 16:16:25.827796: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.


INFO:tensorflow:Assets written to: ram://6b4de984-d1ac-4c2e-b77e-e0bf739eaed5/assets


In [14]:
# extracting image features as vectors, stored in a new column within the df

start_time = time.time()

def LoadPreprocessFeaturizeKerasVGG16(path):
    image = img_to_array(load_img(path, target_size=(224, 224, 3)))
    image = np.expand_dims(image, axis=0)
    image = preprocess_input(image)
    return Vectors.dense(model_bc.value.predict(image).ravel().tolist()) 

extract_features = udf(lambda x: LoadPreprocessFeaturizeKerasVGG16(x), VectorUDT())

local_img_df_feat = local_image_df.withColumn('vgg16vec', extract_features('path')).select('path', 'class', 'vgg16vec')
    #alternative:
    #local_img_df_feat = local_image_df.select('path', 'class', extract_features('path').alias('vgg16features'))

local_img_df_feat.show()
local_img_df_feat.persist()

duration = time.time() - start_time
print(duration)

2022-01-11 16:18:06.646394: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2022-01-11 16:18:06.646439: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
2022-01-11 16:18:26.685437: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2022-01-11 16:18:26.685476: W tensorflow/stream_executor/cuda/cuda_driver.cc:269] failed call to cuInit: UNKNOWN ERROR (303)
2022-01-11 16:18:26.685489: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (LAPTOP-87B7CMDN): /proc/driver/nvidia/version does not exist
2022-01-11 16:18:26.685667: I tensorflow/core/platform/cpu_fe

+--------------------+---------+--------------------+
|                path|    class|            vgg16vec|
+--------------------+---------+--------------------+
|/mnt/c/Users/user...|Raspberry|[30.6986541748046...|
|/mnt/c/Users/user...|Raspberry|[85.2619171142578...|
|/mnt/c/Users/user...|Raspberry|[40.4808235168457...|
+--------------------+---------+--------------------+

24.809859037399292


                                                                                

In [15]:
# performing PCA (https://spark.apache.org/docs/1.5.1/ml-features.html#pca)
    # for k=50, 
    # on full Sample dataset (with previous code run in undistributed manner to enable PCA run --RAM), 
    # pca_model.explainedVariance showed that k=10 explains 0.9110833438360058 of variance
    # cf below

start_time = time.time()

pca = PCA(k=10, inputCol="vgg16vec", outputCol="pcaFeatures")
pca_model = pca.fit(local_img_df_feat)
result = pca_model.transform(local_img_df_feat).select(["path", "class", "pcaFeatures"])
result.show()

duration = time.time() - start_time
print(duration)

22/01/11 16:18:58 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/01/11 16:18:58 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/01/11 16:18:58 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
22/01/11 16:18:58 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK


+--------------------+---------+--------------------+
|                path|    class|         pcaFeatures|
+--------------------+---------+--------------------+
|/mnt/c/Users/user...|Raspberry|[23.1685062618692...|
|/mnt/c/Users/user...|Raspberry|[-67.201651924885...|
|/mnt/c/Users/user...|Raspberry|[259.207172055180...|
+--------------------+---------+--------------------+

3.8878350257873535


In [16]:
# saving spark df as parquet

start = time.time()

#result.write.parquet("/mnt/c/Users/user/Ubuntu/Projet8/Sample/result.parquet")  # error if file already exists
result.write.format("parquet").mode('overwrite').save("/mnt/c/Users/user/Ubuntu/Projet8/SampleResult.parquet")

duration = time.time() - start_time
print(duration)

[Stage 22:>                                                         (0 + 1) / 1]

226.18474555015564


                                                                                

_==> Extra: PCA explained variance_

In [77]:
# for k=50
#pca_model.explainedVariance

DenseVector([0.2934, 0.1748, 0.1244, 0.0849, 0.0649, 0.0435, 0.0347, 0.0319, 0.0303, 0.0283, 0.0239, 0.0211, 0.0187, 0.0157, 0.0094, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])

In [80]:
# for k=50
#expvar = 0
#for i in range (15):
#    expvar += pca_model.explainedVariance[i]
#    print(i+1, ':', expvar)

# 10 components are enough

1 : 0.29339395245505906
2 : 0.46821396420083106
3 : 0.5926431813750801
4 : 0.6774965829997943
5 : 0.7424071058821711
6 : 0.7859508398211603
7 : 0.8206208403371235
8 : 0.8525010127567214
9 : 0.882827138500174
10 : 0.9110833438360058
11 : 0.9350045396262557
12 : 0.9561443217788522
13 : 0.9748861674372339
14 : 0.9906359038878038
15 : 0.9999999999999952


## On data stored on S3
Below code = code run on EC2 instance, without limiting the dataset to 3 rows.

https://docs.databricks.com/applications/machine-learning/preprocess-data/transfer-learning-tensorflow.html
https://stackoverflow.com/questions/61096573/using-tensorflow-keras-model-in-pyspark-udf-generates-a-pickle-error
https://stackoverflow.com/questions/70346484/how-to-limit-tracing-when-doing-transfer-learning-in-pyspark

In [6]:
img_path = "s3a://mw-projet8/Sample/"

# Reading the sample set in a distributed way as binary files
images = spark.read.format("binaryFile")\
.option("pathGlobFilter", "*.jpg")\
.option("recursiveFileLookup", "true").load(img_path).limit(3).toDF('path', 'modificationTime', 'length', 'content')
print(type(images))
print(images.printSchema())

22/01/13 13:11:46 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


<class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)

None


In [10]:
images = images.withColumn("class", split(images.path, '/')[4]).select(['path', 'content', 'class'])
images.show()

[Stage 0:>                                                          (0 + 8) / 8]

+--------------------+--------------------+---------+
|                path|             content|    class|
+--------------------+--------------------+---------+
|s3a://mw-projet8/...|[FF D8 FF E0 00 1...|Raspberry|
|s3a://mw-projet8/...|[FF D8 FF E0 00 1...|Raspberry|
|s3a://mw-projet8/...|[FF D8 FF E0 00 1...|Raspberry|
+--------------------+--------------------+---------+



                                                                                

In [11]:
# Necessary libraries for image processing (including #those already imported)

# Garbage collector
import gc
gc.enable()

# Useful libraries
#import pandas as pd  #already imported
#import numpy as np  #already imported
import io

# ComputerVision tools
from PIL import Image
from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array

# Spark tools
#from pyspark.sql.functions import split, udf  #already imported
from pyspark.sql.functions import col, pandas_udf, PandasUDFType
#from pyspark.ml.linalg import Vectors, VectorUDT  #already imported
#from pyspark.ml.feature import PCA  #already imported

2022-01-13 13:13:01.518427: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2022-01-13 13:13:01.518554: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.


In [13]:
sc = spark.sparkContext

# Loading the VVG16 model once to get the weights
# and check that the top layers are removed
model = VGG16(include_top=False, pooling='max', input_shape=(100,100,3))
print('VGG16 summary:')
print(model.summary())

# Broadcasting model weights
    # https://stackoverflow.com/questions/61096573/using-tensorflow-keras-model-in-pyspark-udf-generates-a-pickle-error
bc_model_weights = sc.broadcast(model.get_weights())
del model ; gc.collect()

def model_fn():
    """
    Returns a VGG16 model with top layer removed
    and broadcasted pretrained weights.
    """
    model = VGG16(weights=None, include_top=False, pooling='max', input_shape=(100,100,3))
    model.set_weights(bc_model_weights.value)
    return model

def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content))
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurizes a pd.Series of raw images using the input model.
    :returns: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors
    # for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds] # not necessary with ``pooling='max'`` for VGG16
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF
    wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column
    of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator
                                over batches of data, where each batch
                                is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once
    # and then re-use it for multiple data batches.
    # This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

df = images.select(col("path"), col("class"), featurize_udf("content").alias("feats"))
df.show()

# adding a persist() step
df.persist()

VGG16 summary:
Model: "vgg16"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_2 (InputLayer)        [(None, 100, 100, 3)]     0         
                                                                 
 block1_conv1 (Conv2D)       (None, 100, 100, 64)      1792      
                                                                 
 block1_conv2 (Conv2D)       (None, 100, 100, 64)      36928     
                                                                 
 block1_pool (MaxPooling2D)  (None, 50, 50, 64)        0         
                                                                 
 block2_conv1 (Conv2D)       (None, 50, 50, 128)       73856     
                                                                 
 block2_conv2 (Conv2D)       (None, 50, 50, 128)       147584    
                                                                 
 block2_pool (MaxPooling2D)  (None, 25, 25, 12

2022-01-13 13:15:23.511702: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2022-01-13 13:15:23.511741: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
2022-01-13 13:15:43.438808: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2022-01-13 13:15:43.438847: W tensorflow/stream_executor/cuda/cuda_driver.cc:269] failed call to cuInit: UNKNOWN ERROR (303)
2022-01-13 13:15:43.438863: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (LAPTOP-87B7CMDN): /proc/driver/nvidia/version does not exist
2022-01-13 13:15:43.439027: I tensorflow/core/platform/cpu_fe

+--------------------+---------+--------------------+
|                path|    class|               feats|
+--------------------+---------+--------------------+
|s3a://mw-projet8/...|Raspberry|[84.41844, 0.0, 2...|
|s3a://mw-projet8/...|Raspberry|[73.287865, 0.0, ...|
|s3a://mw-projet8/...|Raspberry|[55.54396, 0.0, 4...|
+--------------------+---------+--------------------+



                                                                                

In [14]:
# vectorizing features - i.e. requirement for PCA

vectorize_features = udf(lambda x: Vectors.dense(x), VectorUDT())

df = df.select(col("path"), col("class"), vectorize_features("feats").alias("vecfeats"))
df.show()

[Stage 8:>                                                          (0 + 1) / 1]

+--------------------+---------+--------------------+
|                path|    class|            vecfeats|
+--------------------+---------+--------------------+
|s3a://mw-projet8/...|Raspberry|[84.4184417724609...|
|s3a://mw-projet8/...|Raspberry|[73.2878646850586...|
|s3a://mw-projet8/...|Raspberry|[55.5439605712890...|
+--------------------+---------+--------------------+



                                                                                

In [16]:
# performing PCA (https://spark.apache.org/docs/1.5.1/ml-features.html#pca)
    # with k=10 --> ca. 91% of variance explained - cf above

pca = PCA(k=10, inputCol="vecfeats", outputCol="pcaFeats")
pca_model = pca.fit(df)
result = pca_model.transform(df).select(["path", "class", "pcaFeats"])
result.show()

22/01/13 13:16:20 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/01/13 13:16:20 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/01/13 13:16:21 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
22/01/13 13:16:21 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
[Stage 20:>                                                         (0 + 8) / 8]

+--------------------+---------+--------------------+
|                path|    class|            pcaFeats|
+--------------------+---------+--------------------+
|s3a://mw-projet8/...|Raspberry|[53.5481247404085...|
|s3a://mw-projet8/...|Raspberry|[335.795156953732...|
|s3a://mw-projet8/...|Raspberry|[104.780163123623...|
+--------------------+---------+--------------------+



                                                                                

In [17]:
# saving spark df 'result' as parquet

#result.write.parquet("/mnt/c/Users/user/Ubuntu/Projet8/Sample/result.parquet")  # error if file already exists
result.write.format("parquet").mode('overwrite').save("s3a://mw-projet8/SampleResult.parquet")


2022-01-13 13:22:17.931409: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2022-01-13 13:22:17.931818: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
2022-01-13 13:22:38.849706: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2022-01-13 13:22:38.849847: W tensorflow/stream_executor/cuda/cuda_driver.cc:269] failed call to cuInit: UNKNOWN ERROR (303)
2022-01-13 13:22:38.850279: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (LAPTOP-87B7CMDN): /proc/driver/nvidia/version does not exist
2022-01-13 13:22:38.850868: I tensorflow/core/platform/cpu_fe