In [1]:
import findspark
findspark.init()
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, SparkSession, functions as F
from pyspark.sql.types import BinaryType, ArrayType, IntegerType, StringType
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import PCA
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import cv2 as cv

In [2]:
# os environnement definition for java and hadoop with aws s3
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2 pyspark-shell'

In [3]:
# define id and key for credential in S3
access_key_ID = os.environ['AccessKeyId']
access_secret_key = os.environ['AccessKeySecret'] 
#SparkSession set up
spark = SparkSession\
         .builder.master('local[*]')\
         .appName('images_preprocessing_aws')\
         .config('spark.driver.memory', '2g')\
         .config('spark.hadoop.fs.s3a.access.key', access_key_ID)\
         .config('spark.hadoop.fs.s3a.secret.key', access_secret_key)\
         .config('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')\
         .getOrCreate()
# Trying to optimise memory usage
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "100")        
#Launch & access SparkContext
sc = spark.sparkContext.getOrCreate()
sc.setSystemProperty('com.amazonaws.services.s3.enableV4', 'true')
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.us-east-2.amazonaws.com")

In [4]:
# Define S3 path
path_s3 = 's3a://oc-projet8-yannick/fruits_images_samples_t2micro/'
path_s3_root = 's3a://oc-projet8-yannick/'

### **Images loading in dataframe spark**

In [5]:
df_images = spark.read.format('image').load(f'{path_s3}/*')
df_images.show()
df_images.printSchema()

+--------------------+
|               image|
+--------------------+
|[s3a://oc-projet8...|
|[s3a://oc-projet8...|
|[s3a://oc-projet8...|
|[s3a://oc-projet8...|
|[s3a://oc-projet8...|
|[s3a://oc-projet8...|
|[s3a://oc-projet8...|
|[s3a://oc-projet8...|
|[s3a://oc-projet8...|
|[s3a://oc-projet8...|
+--------------------+

root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = true)
 |    |-- width: integer (nullable = true)
 |    |-- nChannels: integer (nullable = true)
 |    |-- mode: integer (nullable = true)
 |    |-- data: binary (nullable = true)



### **Let's get the dataframe in the format that we want, add path and label column and drop 'image' column'**

In [6]:
# getting Image path with origin column
# getting some other information on the same level of column image
# getting image labels with the name of the folder, be carreful of [2] should be changed in function of the path
# getting data bytes of images
df_images = df_images.withColumn('image_path_s3', F.split(F.col('image.origin'), 's3a://')[1])\
.withColumn('height', F.col('image.height'))\
.withColumn('width', F.col('image.width')).withColumn('nChannels', F.col('image.nChannels'))\
.withColumn('label',(F.split((F.split(F.col('image.origin'), 's3a://')[1]), '/'))[2])\
.withColumn('data', F.col('image.data'))
# droping image column
df_images = df_images.drop(F.col('image'))
# display the new dataframe with our images data
df_images.show()

+--------------------+------+-----+---------+--------------+--------------------+
|       image_path_s3|height|width|nChannels|         label|                data|
+--------------------+------+-----+---------+--------------+--------------------+
|oc-projet8-yannic...|   100|  100|        3|   Apple_Red_1|[FF FD FF FF FD F...|
|oc-projet8-yannic...|   100|  100|        3|   Apple_Red_1|[FF FF FF FF FF F...|
|oc-projet8-yannic...|   100|  100|        3|Apple_Braeburn|[FF FF FE FF FF F...|
|oc-projet8-yannic...|   100|  100|        3|Cherry_Rainier|[FF FF F8 FF FF F...|
|oc-projet8-yannic...|   100|  100|        3|Apple_Braeburn|[FF FF FE FF FF F...|
|oc-projet8-yannic...|   100|  100|        3|Cherry_Rainier|[FF FE FF FF FE F...|
|oc-projet8-yannic...|   100|  100|        3|       Avocado|[FF FF FF FF FF F...|
|oc-projet8-yannic...|   100|  100|        3|       Avocado|[FF FF FF FF FF F...|
|oc-projet8-yannic...|   100|  100|        3|  Cactus_fruit|[FF FF FF FF FF F...|
|oc-projet8-yann

### Define and perform preprocessing function with UDF (for distributed calcul) and return new columns into the DataFrame

In [7]:
# function definition
def preprocess_images(data_bytes):
    '''Function to preprocess distribute image data with openCV
    we enter the image data from DataFrame Spark and return an array of integer to work with opencv on it
    The images are threated with blurrig and equalization function and resize in 32X32'''
    # getting data from image and pass it to img array shape
    img_data_bgr = np.array(data_bytes).reshape(100,100,3)
    # getting gray images
    img_data_gray = cv.cvtColor(img_data_bgr, cv.COLOR_BGR2GRAY)
    # Blurring
    img_data_gray = cv.GaussianBlur(img_data_gray, (3,3), 0)
    # Equalization
    img_data_gray = cv.equalizeHist(img_data_gray)
    # Resize
    img_data_gray = cv.resize(img_data_gray, (32,32))
    # flatten the result
    img_data_gray = img_data_gray.flatten()
    # forcing python list for the result instead of numpy array for spark to understand
    img_data_gray = img_data_gray.tolist()
    return img_data_gray

#UDF definition with returning array of integer for spark
udf_img_preprocess = F.udf(preprocess_images, ArrayType(IntegerType()))

In [8]:
# create column data_preprocess with flatten data
df_images = df_images.withColumn('preprocess_data',udf_img_preprocess(df_images.data))
df_images.show()

+--------------------+------+-----+---------+--------------+--------------------+--------------------+
|       image_path_s3|height|width|nChannels|         label|                data|     preprocess_data|
+--------------------+------+-----+---------+--------------+--------------------+--------------------+
|oc-projet8-yannic...|   100|  100|        3|   Apple_Red_1|[FF FD FF FF FD F...|[255, 255, 228, 2...|
|oc-projet8-yannic...|   100|  100|        3|   Apple_Red_1|[FF FF FF FF FF F...|[255, 255, 255, 2...|
|oc-projet8-yannic...|   100|  100|        3|Apple_Braeburn|[FF FF FE FF FF F...|[255, 255, 247, 2...|
|oc-projet8-yannic...|   100|  100|        3|Cherry_Rainier|[FF FF F8 FF FF F...|[225, 225, 255, 2...|
|oc-projet8-yannic...|   100|  100|        3|Apple_Braeburn|[FF FF FE FF FF F...|[255, 255, 255, 2...|
|oc-projet8-yannic...|   100|  100|        3|Cherry_Rainier|[FF FE FF FF FE F...|[228, 255, 255, 2...|
|oc-projet8-yannic...|   100|  100|        3|       Avocado|[FF FF FF FF 

### **Preprocessing results formating before performing PCA**

In [9]:
# transform our preprocess data to vector for PCA model via udf fonction to transform array into vector
to_vector = F.udf(lambda x: Vectors.dense(x), VectorUDT())

In [10]:
# creating vector column
df_images = df_images.withColumn('preprocess_data_vector', to_vector(df_images.preprocess_data))
df_images[['preprocess_data', 'preprocess_data_vector']].show()

+--------------------+----------------------+
|     preprocess_data|preprocess_data_vector|
+--------------------+----------------------+
|[255, 255, 228, 2...|  [255.0,255.0,228....|
|[255, 255, 255, 2...|  [255.0,255.0,255....|
|[255, 255, 247, 2...|  [255.0,255.0,247....|
|[225, 225, 255, 2...|  [225.0,225.0,255....|
|[255, 255, 255, 2...|  [255.0,255.0,255....|
|[228, 255, 255, 2...|  [228.0,255.0,255....|
|[255, 255, 255, 2...|  [255.0,255.0,255....|
|[255, 255, 255, 2...|  [255.0,255.0,255....|
|[255, 255, 255, 2...|  [255.0,255.0,255....|
|[255, 255, 255, 2...|  [255.0,255.0,255....|
+--------------------+----------------------+



### **Performing PCA with our vector data**

In [11]:
k_number = 10
pca = PCA(k=k_number, inputCol="preprocess_data_vector")
pca.setOutputCol("pca_data_vector")
model = pca.fit(df_images)

In [12]:
df_PCA = model.transform(df_images)
df_PCA[['preprocess_data_vector', 'pca_data_vector']].show()
print('Explained_Variance_Ratio:', model.explainedVariance.sum())

+----------------------+--------------------+
|preprocess_data_vector|     pca_data_vector|
+----------------------+--------------------+
|  [255.0,255.0,228....|[-661.95527199896...|
|  [255.0,255.0,255....|[-682.96144962879...|
|  [255.0,255.0,247....|[-289.36356177605...|
|  [225.0,225.0,255....|[-1684.4265943424...|
|  [255.0,255.0,255....|[-276.34928556947...|
|  [228.0,255.0,255....|[-1654.2687498265...|
|  [255.0,255.0,255....|[-2686.0682303350...|
|  [255.0,255.0,255....|[-2710.1059529450...|
|  [255.0,255.0,255....|[-3746.6211286444...|
|  [255.0,255.0,255....|[-3731.0603346980...|
+----------------------+--------------------+

Explained_Variance_Ratio: 0.9999999999999826


### **Saving our output file on S3 (by s3a)**

#### **Parquet**

In [13]:
df_PCA.select('image_path_s3', 'label', 'pca_data_vector').write.mode('overwrite')\
.parquet(f'{path_s3_root}output/PCA_parquet')

#### **CSV**

In [14]:
# convert denseVector into string for saving in CSV, spark does'nt accept array vector format to save into csv
vector_to_string = F.udf(lambda x: str(x), StringType())
df_PCA = df_PCA.withColumn('pca_data_vector_str', vector_to_string(df_PCA.pca_data_vector))
# saving to CSV with column image_path, label and pca_vector_str
df_PCA.select('image_path_s3', 'label', 'pca_data_vector_str').write.option("header", True).mode('overwrite')\
.csv(f'{path_s3_root}output/PCA_results_CSV')