In [None]:
import boto3
import botocore.session
from boto3.s3.transfer import TransferConfig
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract, col, split, udf, \
                                  split, monotonically_increasing_id
from pyspark.ml.feature import StringIndexer
from pyspark import SparkConf, SparkContext
from pyspark.sql.types import StringType, ArrayType, DoubleType
import io
from PIL import Image
import numpy as np
from pyspark.mllib.feature import StandardScaler, StandardScalerModel
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.ml.feature import StandardScaler, PCA
from datetime import datetime
from pyspark.ml import Pipeline
from pyspark.ml.functions import vector_to_array
import re

In [None]:
# Tensorflow 2 = INFO and WARNING messages are not printed

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

from keras.applications.vgg16 import VGG16

<h1>Set UP

<h6>Pyspark Config

In [None]:
# Retrieves given aws credentials 

session = botocore.session.get_session()
credentials = session.get_credentials()

# Applies necessary packages to pyspark to work

os.environ['PYSPARK_SUBMIT_ARGS'] = \
    '--packages com.amazonaws:aws-java-sdk-pom:1.12.249,org.apache.hadoop:hadoop-aws:3.3.1 pyspark-shell'

In [None]:
# Defines a Boto3 Session

boto_session = boto3.Session( 
         aws_access_key_id=credentials.access_key, 
         aws_secret_access_key=credentials.secret_key)

s3 = boto_session.resource('s3')

bucket = s3.Bucket('oc-project-8-pca-csv')

In [None]:
# Defines a SparkSession

spark = SparkSession \
    .builder\
    .appName("ETL Pipeline")\
    .master("local[4]")\
    .config("spark.hadoop.fs.s3a.access.key", credentials.access_key)\
    .config("spark.hadoop.fs.s3a.secret.key", credentials.secret_key)\
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\
    .getOrCreate()

In [None]:
# Displays the SparkSession object

spark

<h6>Bucket

In [None]:
# Sets the S3 bucket path of images

s3_object_path = "s3a://oc-project-8-bucket/Sub_S3/**"

<h6> Tensorflow Model (VGG16)

In [None]:
# Defines VGG16 model and sets parameters as non trainable
# NB : the model implements a max poolings as the ultimate layer 
# to use the model as a feature extraction "tool"

model = VGG16(weights="imagenet",
                     pooling='max', 
                     include_top=False,
                     input_shape=(224, 224, 3))

for layer in model.layers:
            layer.trainable = False

model.compile()
model.summary()

<h6>UDF

In [None]:
# Defines several UDFs functions

# Functions

def feature_extract(bytes_type):
    
    """Transforms a given picture in a 224*224 matrix"""

    lambda_list = np.asarray(Image.open(io.BytesIO(bytes_type)).resize((224,224)))
    
    vectorized = model.predict(np.array([lambda_list])).tolist()[0]
                             
    return vectorized

def extract_label(path):
    
    """Extracts fruit label from the image path"""
    
    splitted_list = re.split('/', path)
    
    return splitted_list[4]

# UDFs

feature_extract_UDF = udf(lambda x: feature_extract(x), ArrayType(DoubleType()))

to_vector_UDF = udf(lambda x: Vectors.dense(x), VectorUDT())

label_extract_UDF = udf(lambda path: extract_label(path), StringType())

<h1>Preprocessing

In [None]:
# Retreives images from S3 bucket

df = spark.read.format("binaryFile").load(s3_object_path)

In [None]:
# Extracts the features from the images

feature_extracted_df = df.select(feature_extract_UDF(df.content).alias("features"))

In [None]:
# Transform the rows of the dataframe in dense vectors for further pyspark ml usage 

vector_df = feature_extracted_df.select(to_vector_UDF('features').alias('features'))

<h1>Dimentionality Reduction

In [None]:
# Sets up a ML pipeline with Standardisation and PCA (n_compoenents = 150)

n_components = 150

std = StandardScaler(inputCol="features", outputCol="scaled")
pca = PCA(inputCol="scaled", outputCol="pca").setK(n_components)
stages = [std, pca]

pipeline = Pipeline().setStages(stages)

In [None]:
# Applies the ml pipeline

pca_df = pipeline.fit(vector_df).transform(vector_df)

In [None]:
# Transforms the single output column in as many column as there are Pcs Dataframe

pca_df_multiple_columns = pca_df.withColumn("pc", vector_to_array("pca"))\
                          .select([col("pc")[i] for i in range(n_components)])

In [None]:
# Retrieves the labels from the df

label_df = df.select(label_extract_UDF(df.path).alias("labels"))

In [None]:
# Gives ids to both dataframes for further join

pca_df_multiple_columns_id = \
    pca_df_multiple_columns.withColumn('id', monotonically_increasing_id())

label_df_id = label_df.withColumn('id', monotonically_increasing_id())

In [None]:
# Joins PCs dataframe with relative labels

labeled_pca_df = label_df_id.join(pca_df_multiple_columns_id, on=['id'], how='inner').drop('id')

In [None]:
# Write in a bucket the dataframe as a csv
# N.B : coalesce(1) is implemented to have one csv as an output

labeled_pca_df.coalesce(1)\
           .write.mode('overwrite')\
           .save("s3a://oc-project-8-pca-csv/run/", format='csv', header=True)

In [None]:
# Retrieves the name of the created csv file (as pyspark doesn't have a way to name its written files)

for bucket_object in bucket.objects.all():
    if re.compile('^run/part').match(bucket_object.key):
        pca_csv_file_name = bucket_object.key

# Copy the csv file under a constant name for easier retrieval

s3.Object('oc-project-8-pca-csv','pca_csv.csv')\
.copy_from(CopySource='oc-project-8-pca-csv/{}'.format(pca_csv_file_name))