<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Introduction" data-toc-modified-id="Introduction-1">Introduction</a></span><ul class="toc-item"><li><ul class="toc-item"><li><ul class="toc-item"><li><span><a href="#time-monitoring" data-toc-modified-id="time-monitoring-1.0.0.1">time monitoring</a></span></li></ul></li></ul></li></ul></li><li><span><a href="#Spark-session" data-toc-modified-id="Spark-session-2">Spark session</a></span></li><li><span><a href="#S3-as-data-source" data-toc-modified-id="S3-as-data-source-3">S3 as data source</a></span></li><li><span><a href="#Load-data-&amp;-Featurizer" data-toc-modified-id="Load-data-&amp;-Featurizer-4">Load data &amp; Featurizer</a></span><ul class="toc-item"><li><ul class="toc-item"><li><span><a href="#Use-of-a-CNN-as-feature-extractor" data-toc-modified-id="Use-of-a-CNN-as-feature-extractor-4.0.1">Use of a CNN as feature extractor</a></span></li></ul></li></ul></li><li><span><a href="#Reducer" data-toc-modified-id="Reducer-5">Reducer</a></span></li></ul></div>

# Introduction

This notebook has been derived from the P8_Spark_Local notebook, with a few adjustements due to the configuration of the Cloud environment itself, such as:
- working with S3,
- downgrade of a few packages leading to a slight different approach for initial DataFrame building.<br/>

The core part, i.e. featurizer and reducer, still provide results we can compare with our local execution.

In [1]:
import pyspark

In [2]:
from pyspark import SparkContext

In [3]:
pyspark.__version__

'2.3.4'

In [4]:
# usefull packages
import pandas as pd
import numpy as np
import time
from PIL import Image
from io import BytesIO
from io import StringIO

In [5]:
# context & session
from pyspark.sql import SparkSession

In [6]:
import pyarrow
pyarrow.__version__

'2.0.0'

In [7]:
# data handling
from pyspark.sql.functions import input_file_name, split
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import udf
from pyspark.sql.types import *
# from typing import Iterator

In [8]:
# ml tasks
from pyspark.ml.image import ImageSchema
from pyspark.ml.feature import PCA

In [9]:
# transform
from pyspark.ml.linalg import Vectors, VectorUDT

In [10]:
# pip install tensorflow

In [11]:
# core featurizer
from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array, load_img

#### time monitoring

In [12]:
start = time.perf_counter()
stop = time.perf_counter()
print(f'process, elapsed time: {stop - start:0.2f}s')

process, elapsed time: 0.00s


# Spark session

In [13]:
import sagemaker
from sagemaker import get_execution_role
import sagemaker_pyspark

role = get_execution_role()

# Configure Spark to use the SageMaker Spark dependency jars
jars = sagemaker_pyspark.classpath_jars()

classpath = ":".join(sagemaker_pyspark.classpath_jars())

# See the SageMaker Spark Github to learn how to connect to EMR from a notebook instance
spark = SparkSession.builder.config("spark.driver.extraClassPath", classpath)\
    .master("local[*]").getOrCreate()
spark

In [14]:
# check wether arrow should be enabled by this setting
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', 'true')

# S3 as data source

In [15]:
import boto3

In [16]:
# Get resources stored in AWS S3 service
s3 = boto3.resource('s3')

In [17]:
# Print all existing buckets names (only one in this case)
for bucket in s3.buckets.all():
    print(bucket.name)

629250884158-sagemaker-eu-west-3
aws-emr-resources-629250884158-eu-west-1
aws-logs-629250884158-eu-west-1
aws-logs-629250884158-eu-west-3
ocproject-fruits


In [18]:
# Print n first files present in the bucket 'ocproject-fruits'
fruits_bucket = s3.Bucket('ocproject-fruits')
for file in fruits_bucket.objects.limit(3):
    label = file.key.split('/')[-2]
    print(label, file.key)

Apple_Braeburn AppleSample/Apple_Braeburn/321_100.jpg
Apple_Braeburn AppleSample/Apple_Braeburn/322_100.jpg
Apple_Braeburn AppleSample/Apple_Braeburn/323_100.jpg


# Load data & Featurizer
### Use of a CNN as feature extractor

In [19]:
# model for featurization, last layers truncated.
conv_base = VGG16(
    include_top=False,
    weights=None,
    pooling='max',
    input_shape=(100, 100, 3))

In [20]:
conv_base.summary()

Model: "vgg16"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 100, 100, 3)]     0         
_________________________________________________________________
block1_conv1 (Conv2D)        (None, 100, 100, 64)      1792      
_________________________________________________________________
block1_conv2 (Conv2D)        (None, 100, 100, 64)      36928     
_________________________________________________________________
block1_pool (MaxPooling2D)   (None, 50, 50, 64)        0         
_________________________________________________________________
block2_conv1 (Conv2D)        (None, 50, 50, 128)       73856     
_________________________________________________________________
block2_conv2 (Conv2D)        (None, 50, 50, 128)       147584    
_________________________________________________________________
block2_pool (MaxPooling2D)   (None, 25, 25, 128)       0     

In [21]:
# get the 2134 first cnn_features
start = time.perf_counter()
list_path_img = []
for file in fruits_bucket.objects.limit(2134):
    obj = fruits_bucket.Object(file.key)
    label = file.key.split('/')[-2]
    response = obj.get()
    file_stream = response['Body']
    img = Image.open(file_stream)
    # convert image to flatten array
    flat_array = np.array(img).ravel().tolist()
    tensor = np.array(flat_array).reshape(1, 100, 100, 3).astype(np.uint8)
    # preprocess input
    prep_tensor = preprocess_input(tensor)
    features = conv_base.predict(prep_tensor).ravel().tolist()
    # Store file key and features
    list_path_img.append((file.key, label, features))
stop = time.perf_counter()
print(f'process, elapsed time: {stop - start:0.2f}s')

process, elapsed time: 254.85s


In [22]:
# Create spark dataframe from previous list of tuples
df_img = spark.createDataFrame(list_path_img, ['origin', 'label', 'cnn_features'])
# Show spark dataframe
df_img.show()

+--------------------+--------------+--------------------+
|              origin|         label|        cnn_features|
+--------------------+--------------+--------------------+
|AppleSample/Apple...|Apple_Braeburn|[0.09752133488655...|
|AppleSample/Apple...|Apple_Braeburn|[0.09284240007400...|
|AppleSample/Apple...|Apple_Braeburn|[0.08956278115510...|
|AppleSample/Apple...|Apple_Braeburn|[0.09523339569568...|
|AppleSample/Apple...|Apple_Braeburn|[0.08958204090595...|
|AppleSample/Apple...|Apple_Braeburn|[0.09481547772884...|
|AppleSample/Apple...|Apple_Braeburn|[0.09361021220684...|
|AppleSample/Apple...|Apple_Braeburn|[0.09232464432716...|
|AppleSample/Apple...|Apple_Braeburn|[0.08942290395498...|
|AppleSample/Apple...|Apple_Braeburn|[0.08563047647476...|
|AppleSample/Apple...|Apple_Braeburn|[0.08661102503538...|
|AppleSample/Apple...|Apple_Braeburn|[0.08607620000839...|
|AppleSample/Apple...|Apple_Braeburn|[0.08676768094301...|
|AppleSample/Apple...|Apple_Braeburn|[0.08955263346433..

In [23]:
df_img.printSchema()

root
 |-- origin: string (nullable = true)
 |-- label: string (nullable = true)
 |-- cnn_features: array (nullable = true)
 |    |-- element: double (containsNull = true)



# Reducer

In [24]:
# from Array to Vectors for PCA
array_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
df_img = df_img.withColumn('cnn_vectors', array_to_vector_udf('cnn_features'))

In [25]:
df_img = df_img.select('origin', 'label', 'cnn_vectors')

In [26]:
# reduce with PCA - k=20 arbitrary setting
start = time.perf_counter()
pca = PCA(k=20, inputCol='cnn_vectors', outputCol='pca_vectors')
model = pca.fit(df_img)
stop = time.perf_counter()
print(f'pca - 20 pcs, elapsed time: {stop - start:0.2f}s')

pca - 20 pcs, elapsed time: 4.24s


In [27]:
# apply pca reduction
df_img = model.transform(df_img)

In [28]:
# from Vector to Array
vector_to_array_udf = udf(lambda v: v.toArray().tolist(), ArrayType(FloatType()))

In [29]:
df_img = df_img.withColumn('arrays', vector_to_array_udf('pca_vectors'))

In [30]:
df_img = df_img.select('origin', 'label', 'arrays')

In [31]:
df_img.show()

+--------------------+--------------+--------------------+
|              origin|         label|              arrays|
+--------------------+--------------+--------------------+
|AppleSample/Apple...|Apple_Braeburn|[-4.95163, 1.7686...|
|AppleSample/Apple...|Apple_Braeburn|[-4.9488864, 1.75...|
|AppleSample/Apple...|Apple_Braeburn|[-4.952871, 1.757...|
|AppleSample/Apple...|Apple_Braeburn|[-4.9557786, 1.75...|
|AppleSample/Apple...|Apple_Braeburn|[-4.9466696, 1.74...|
|AppleSample/Apple...|Apple_Braeburn|[-4.948851, 1.748...|
|AppleSample/Apple...|Apple_Braeburn|[-4.9509406, 1.74...|
|AppleSample/Apple...|Apple_Braeburn|[-5.0252056, 1.67...|
|AppleSample/Apple...|Apple_Braeburn|[-5.036828, 1.660...|
|AppleSample/Apple...|Apple_Braeburn|[-5.025839, 1.673...|
|AppleSample/Apple...|Apple_Braeburn|[-5.0511184, 1.68...|
|AppleSample/Apple...|Apple_Braeburn|[-5.0435066, 1.68...|
|AppleSample/Apple...|Apple_Braeburn|[-5.062506, 1.681...|
|AppleSample/Apple...|Apple_Braeburn|[-5.075663, 1.691..

In [32]:
# Turn spark dataframe into pandas dataframe
results_df = df_img.toPandas()
results_df

Unnamed: 0,origin,label,arrays
0,AppleSample/Apple_Braeburn/321_100.jpg,Apple_Braeburn,"[-4.951630115509033, 1.7686342000961304, 0.467..."
1,AppleSample/Apple_Braeburn/322_100.jpg,Apple_Braeburn,"[-4.948886394500732, 1.7506670951843262, 0.475..."
2,AppleSample/Apple_Braeburn/323_100.jpg,Apple_Braeburn,"[-4.952870845794678, 1.7571872472763062, 0.468..."
3,AppleSample/Apple_Braeburn/324_100.jpg,Apple_Braeburn,"[-4.9557785987854, 1.7540638446807861, 0.45533..."
4,AppleSample/Apple_Braeburn/325_100.jpg,Apple_Braeburn,"[-4.946669578552246, 1.7488222122192383, 0.463..."
...,...,...,...
2129,AppleSample/Apple_Red_Yellow_2/r_90_100.jpg,Apple_Red_Yellow_2,"[-5.470077991485596, 1.0921685695648193, 0.767..."
2130,AppleSample/Apple_Red_Yellow_2/r_91_100.jpg,Apple_Red_Yellow_2,"[-5.456509590148926, 1.1047818660736084, 0.752..."
2131,AppleSample/Apple_Red_Yellow_2/r_92_100.jpg,Apple_Red_Yellow_2,"[-5.457823276519775, 1.1007816791534424, 0.764..."
2132,AppleSample/Apple_Red_Yellow_2/r_93_100.jpg,Apple_Red_Yellow_2,"[-5.4950642585754395, 1.0985331535339355, 0.79..."


In [33]:
# store the results into S3 Bucket, using boto3
# buffer
csv_buffer = StringIO()
results_df.to_csv(csv_buffer)
# boto client
client = boto3.client('s3')
# put the object
response = client.put_object(
    Body=csv_buffer.getvalue(),
    Bucket='ocproject-fruits',
    Key='results.csv')

In [34]:
# overview of the upload
response

{'ResponseMetadata': {'RequestId': '2TAK6XCN4J2K3VEG',
  'HostId': 'QWnRxhcm5kQhGYQ9atukdw1kEQeVgk1Zr5C6wPI30fHosCbYuLm7DjpqohEv371gz/6Q+ZL2f/w=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'QWnRxhcm5kQhGYQ9atukdw1kEQeVgk1Zr5C6wPI30fHosCbYuLm7DjpqohEv371gz/6Q+ZL2f/w=',
   'x-amz-request-id': '2TAK6XCN4J2K3VEG',
   'date': 'Tue, 02 Feb 2021 21:15:26 GMT',
   'etag': '"3dd7029ab67d1ac8613bbfc805ac45fa"',
   'content-length': '0',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'ETag': '"3dd7029ab67d1ac8613bbfc805ac45fa"'}

In [None]:
spark.stop()