# Start spark session

In [1]:
import pyspark

from pyspark.sql import SparkSession

# Create spark session
spark = SparkSession.builder.appName("Images dimension reduction").getOrCreate()

# Connect to AWS S3 and get resources

In [2]:
import os
import boto3

# Get resources stored in AWS S3 service
s3 = boto3.resource('s3')

In [3]:
# Print all existing buckets names (only one in this case)
for bucket in s3.buckets.all():
    print(bucket.name)

oc-p8-bucket


In [4]:
# Print files present in the bucket 'oc-p8-bucket'
fruits_bucket = s3.Bucket('oc-p8-bucket')
for file in fruits_bucket.objects.all():
    print(file.key)

reduced_fruits/Apple_Braeburn/0_100.jpg
reduced_fruits/Apple_Braeburn/10_100.jpg
reduced_fruits/Apple_Braeburn/1_100.jpg
reduced_fruits/Apple_Braeburn/2_100.jpg
reduced_fruits/Avocado/12_100.jpg
reduced_fruits/Avocado/13_100.jpg
reduced_fruits/Avocado/14_100.jpg
reduced_fruits/Avocado/15_100.jpg
reduced_fruits/Cauliflower/58_100.jpg
reduced_fruits/Cauliflower/61_100.jpg
reduced_fruits/Cauliflower/68_100.jpg
reduced_fruits/Cauliflower/69_100.jpg


# Read images from S3 and process

In [5]:
from PIL import Image
from io import BytesIO
import numpy as np

df_data = [] # list of tuples; each tuple represents one row
for file in fruits_bucket.objects.all():
    obj = fruits_bucket.Object(file.key)
    response = obj.get()
    file_stream = response['Body']
    im = Image.open(file_stream)
    # Convert image to array
    im_arr = np.array(im)
    # Flatten image array
    im_arr_flat = np.ravel(im_arr)
    # Store file key and image flattened array as tuple in list
    df_data.append((file.key, im_arr_flat.tolist()))

In [7]:
# Create spark dataframe from previous list of tuples
df_tuple = spark.createDataFrame(df_data, ["path", "img_array"])
# Show spark dataframe
df_tuple.show(truncate=True)

+--------------------+--------------------+
|                path|           img_array|
+--------------------+--------------------+
|reduced_fruits/Ap...|[254, 255, 255, 2...|
|reduced_fruits/Ap...|[255, 255, 255, 2...|
|reduced_fruits/Ap...|[254, 255, 255, 2...|
|reduced_fruits/Ap...|[255, 255, 255, 2...|
|reduced_fruits/Av...|[255, 255, 255, 2...|
|reduced_fruits/Av...|[255, 255, 255, 2...|
|reduced_fruits/Av...|[255, 255, 255, 2...|
|reduced_fruits/Av...|[255, 255, 255, 2...|
|reduced_fruits/Ca...|[255, 255, 255, 2...|
|reduced_fruits/Ca...|[255, 255, 255, 2...|
|reduced_fruits/Ca...|[254, 255, 250, 2...|
|reduced_fruits/Ca...|[254, 255, 251, 2...|
+--------------------+--------------------+



In [8]:
# Print Schema of dataframe
df_tuple.printSchema()

root
 |-- path: string (nullable = true)
 |-- img_array: array (nullable = true)
 |    |-- element: long (containsNull = true)



In [9]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType

# Create a new column in the dataframe corresponding to images labels
def get_label(value):
    return value.split('/')[-2]

# Convert to a UDF Function by passing in the function and return type of function
udf_get_label = F.udf(get_label, StringType())
df_labelised = df_tuple.withColumn("label", udf_get_label("path"))

# Show new dataframe
df_labelised.printSchema()
df_labelised.show(truncate=True)

root
 |-- path: string (nullable = true)
 |-- img_array: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- label: string (nullable = true)

+--------------------+--------------------+--------------+
|                path|           img_array|         label|
+--------------------+--------------------+--------------+
|reduced_fruits/Ap...|[254, 255, 255, 2...|Apple_Braeburn|
|reduced_fruits/Ap...|[255, 255, 255, 2...|Apple_Braeburn|
|reduced_fruits/Ap...|[254, 255, 255, 2...|Apple_Braeburn|
|reduced_fruits/Ap...|[255, 255, 255, 2...|Apple_Braeburn|
|reduced_fruits/Av...|[255, 255, 255, 2...|       Avocado|
|reduced_fruits/Av...|[255, 255, 255, 2...|       Avocado|
|reduced_fruits/Av...|[255, 255, 255, 2...|       Avocado|
|reduced_fruits/Av...|[255, 255, 255, 2...|       Avocado|
|reduced_fruits/Ca...|[255, 255, 255, 2...|   Cauliflower|
|reduced_fruits/Ca...|[255, 255, 255, 2...|   Cauliflower|
|reduced_fruits/Ca...|[254, 255, 250, 2...|   Cauliflower|
|reduced_

In [13]:
df_labelised.select("path", "label").show(truncate=False)

+----------------------------------------+--------------+
|path                                    |label         |
+----------------------------------------+--------------+
|reduced_fruits/Apple_Braeburn/0_100.jpg |Apple_Braeburn|
|reduced_fruits/Apple_Braeburn/10_100.jpg|Apple_Braeburn|
|reduced_fruits/Apple_Braeburn/1_100.jpg |Apple_Braeburn|
|reduced_fruits/Apple_Braeburn/2_100.jpg |Apple_Braeburn|
|reduced_fruits/Avocado/12_100.jpg       |Avocado       |
|reduced_fruits/Avocado/13_100.jpg       |Avocado       |
|reduced_fruits/Avocado/14_100.jpg       |Avocado       |
|reduced_fruits/Avocado/15_100.jpg       |Avocado       |
|reduced_fruits/Cauliflower/58_100.jpg   |Cauliflower   |
|reduced_fruits/Cauliflower/61_100.jpg   |Cauliflower   |
|reduced_fruits/Cauliflower/68_100.jpg   |Cauliflower   |
|reduced_fruits/Cauliflower/69_100.jpg   |Cauliflower   |
+----------------------------------------+--------------+



In [10]:
from pyspark.sql.types import ArrayType, IntegerType

# Downsample images for memory use reduction purpose
def downsample_array(value):
    return value[::30]

udf_downsample_array = F.udf(downsample_array, ArrayType(IntegerType()))
df_labelised = df_labelised.withColumn("features_ds", udf_downsample_array("img_array"))

In [15]:
# Show new dataframe
df_labelised.printSchema()
df_labelised.show(truncate=True)

root
 |-- path: string (nullable = true)
 |-- img_array: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- label: string (nullable = true)
 |-- features_ds: array (nullable = true)
 |    |-- element: integer (containsNull = true)

+--------------------+--------------------+--------------+--------------------+
|                path|           img_array|         label|         features_ds|
+--------------------+--------------------+--------------+--------------------+
|reduced_fruits/Ap...|[254, 255, 255, 2...|Apple_Braeburn|[254, 255, 253, 2...|
|reduced_fruits/Ap...|[255, 255, 255, 2...|Apple_Braeburn|[255, 255, 250, 2...|
|reduced_fruits/Ap...|[254, 255, 255, 2...|Apple_Braeburn|[254, 254, 255, 2...|
|reduced_fruits/Ap...|[255, 255, 255, 2...|Apple_Braeburn|[255, 254, 254, 2...|
|reduced_fruits/Av...|[255, 255, 255, 2...|       Avocado|[255, 255, 255, 2...|
|reduced_fruits/Av...|[255, 255, 255, 2...|       Avocado|[255, 255, 255, 2...|
|reduced_fruits/Av...|[2

In [16]:
first_row_coll = df_labelised.select("img_array").collect()[0]
first_row_coll_ds = df_labelised.select("features_ds").collect()[0]

In [17]:
# Check success of downsampling of images
print(len(first_row_coll[0]))
print(len(first_row_coll_ds[0]))

30000
1000


In [11]:
from pyspark.ml.linalg import Vectors, VectorUDT

# Convert images arrays to spark vectors
list_to_vector_udf = F.udf(lambda vs: Vectors.dense([np.double(i) for i in vs]), VectorUDT())
df_with_vectors = df_labelised.withColumn("featuresDouble", list_to_vector_udf("features_ds"))

In [19]:
# Print schema of new dataframe
df_with_vectors.printSchema()

root
 |-- path: string (nullable = true)
 |-- img_array: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- label: string (nullable = true)
 |-- features_ds: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- featuresDouble: vector (nullable = true)



In [20]:
# Show new dataframe
df_with_vectors.select("featuresDouble").show(truncate=True)

+--------------------+
|      featuresDouble|
+--------------------+
|[254.0,255.0,253....|
|[255.0,255.0,250....|
|[254.0,254.0,255....|
|[255.0,254.0,254....|
|[255.0,255.0,255....|
|[255.0,255.0,255....|
|[255.0,255.0,255....|
|[255.0,255.0,255....|
|[255.0,255.0,255....|
|[255.0,255.0,255....|
|[254.0,255.0,254....|
|[254.0,255.0,255....|
+--------------------+



# Dimension reduction: PCA

In [12]:
# Apply PCA on vectors representing downsampled images

from pyspark.ml.feature import PCA

pca = PCA(k=100, inputCol="featuresDouble", outputCol="pcaFeatures")
model = pca.fit(df_with_vectors)

result = model.transform(df_with_vectors).select("pcaFeatures")

# Show PCA result
result.show(truncate=True)

+--------------------+
|         pcaFeatures|
+--------------------+
|[1806.57083104063...|
|[1641.46618310563...|
|[1800.10984031156...|
|[1790.00542616467...|
|[-1546.4888593670...|
|[-1567.9106858745...|
|[-1586.4709611181...|
|[-1591.5630895189...|
|[123.313254464571...|
|[128.661121751511...|
|[99.1668768156642...|
|[78.6374048875883...|
+--------------------+



In [22]:
first_row_pca = result.collect()[0]

In [23]:
# Check PCA components number
print(len(first_row_pca[0]))

100


In [24]:
# Count elements of dataframes
df_with_vectors.count()

12

In [25]:
result.count()

12

In [13]:
# Create index in dataframe with images arrays
df_with_vectors_index = df_with_vectors.withColumn("id", F.monotonically_increasing_id())

In [27]:
# Print schema
df_with_vectors_index.printSchema()

root
 |-- path: string (nullable = true)
 |-- img_array: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- label: string (nullable = true)
 |-- features_ds: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- featuresDouble: vector (nullable = true)
 |-- id: long (nullable = false)



In [28]:
# Show new dataframe
df_with_vectors_index.show(truncate=True)

+--------------------+--------------------+--------------+--------------------+--------------------+---+
|                path|           img_array|         label|         features_ds|      featuresDouble| id|
+--------------------+--------------------+--------------+--------------------+--------------------+---+
|reduced_fruits/Ap...|[254, 255, 255, 2...|Apple_Braeburn|[254, 255, 253, 2...|[254.0,255.0,253....|  0|
|reduced_fruits/Ap...|[255, 255, 255, 2...|Apple_Braeburn|[255, 255, 250, 2...|[255.0,255.0,250....|  1|
|reduced_fruits/Ap...|[254, 255, 255, 2...|Apple_Braeburn|[254, 254, 255, 2...|[254.0,254.0,255....|  2|
|reduced_fruits/Ap...|[255, 255, 255, 2...|Apple_Braeburn|[255, 254, 254, 2...|[255.0,254.0,254....|  3|
|reduced_fruits/Av...|[255, 255, 255, 2...|       Avocado|[255, 255, 255, 2...|[255.0,255.0,255....|  4|
|reduced_fruits/Av...|[255, 255, 255, 2...|       Avocado|[255, 255, 255, 2...|[255.0,255.0,255....|  5|
|reduced_fruits/Av...|[255, 255, 255, 2...|       Avoca

In [14]:
# Create index in dataframe with PCA results
result_index = result.withColumn("id", F.monotonically_increasing_id())
# Show new indexed dataframe
result_index.printSchema()
result_index.show(truncate=True)

root
 |-- pcaFeatures: vector (nullable = true)
 |-- id: long (nullable = false)

+--------------------+---+
|         pcaFeatures| id|
+--------------------+---+
|[1806.57083104063...|  0|
|[1641.46618310563...|  1|
|[1800.10984031156...|  2|
|[1790.00542616467...|  3|
|[-1546.4888593670...|  4|
|[-1567.9106858745...|  5|
|[-1586.4709611181...|  6|
|[-1591.5630895189...|  7|
|[123.313254464571...|  8|
|[128.661121751511...|  9|
|[99.1668768156642...| 10|
|[78.6374048875883...| 11|
+--------------------+---+



In [15]:
# Join the two dataframes based on their index
df_pca = df_with_vectors_index.join(result_index, on=['id'])

In [31]:
# Print schema of global joined dataframe
df_pca.printSchema()

root
 |-- id: long (nullable = false)
 |-- path: string (nullable = true)
 |-- img_array: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- label: string (nullable = true)
 |-- features_ds: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- featuresDouble: vector (nullable = true)
 |-- pcaFeatures: vector (nullable = true)



In [32]:
# Show global dataframe
df_pca.show(truncate=True)

+---+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+
| id|                path|           img_array|         label|         features_ds|      featuresDouble|         pcaFeatures|
+---+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+
|  0|reduced_fruits/Ap...|[254, 255, 255, 2...|Apple_Braeburn|[254, 255, 253, 2...|[254.0,255.0,253....|[1806.57083104063...|
|  7|reduced_fruits/Av...|[255, 255, 255, 2...|       Avocado|[255, 255, 255, 2...|[255.0,255.0,255....|[-1591.5630895189...|
|  6|reduced_fruits/Av...|[255, 255, 255, 2...|       Avocado|[255, 255, 255, 2...|[255.0,255.0,255....|[-1586.4709611181...|
|  9|reduced_fruits/Ca...|[255, 255, 255, 2...|   Cauliflower|[255, 255, 255, 2...|[255.0,255.0,255....|[128.661121751511...|
|  5|reduced_fruits/Av...|[255, 255, 255, 2...|       Avocado|[255, 255, 255, 2...|[255.0,255.0,255....|[-1567.9106858

In [16]:
# Order dataframe based on index
df_pca_ordered = df_pca.orderBy("id")

In [34]:
# Show dataframe
df_pca_ordered.show(truncate=True)

+---+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+
| id|                path|           img_array|         label|         features_ds|      featuresDouble|         pcaFeatures|
+---+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+
|  0|reduced_fruits/Ap...|[254, 255, 255, 2...|Apple_Braeburn|[254, 255, 253, 2...|[254.0,255.0,253....|[1806.57083104063...|
|  1|reduced_fruits/Ap...|[255, 255, 255, 2...|Apple_Braeburn|[255, 255, 250, 2...|[255.0,255.0,250....|[1641.46618310563...|
|  2|reduced_fruits/Ap...|[254, 255, 255, 2...|Apple_Braeburn|[254, 254, 255, 2...|[254.0,254.0,255....|[1800.10984031156...|
|  3|reduced_fruits/Ap...|[255, 255, 255, 2...|Apple_Braeburn|[255, 254, 254, 2...|[255.0,254.0,254....|[1790.00542616467...|
|  4|reduced_fruits/Av...|[255, 255, 255, 2...|       Avocado|[255, 255, 255, 2...|[255.0,255.0,255....|[-1546.4888593

In [35]:
# Check number of elements in global dataframe
df_pca_ordered.count()

12

In [17]:
# Cast PCA features to String (in order to be written in a CSV file)
df_pca_ordered_str = df_pca_ordered.withColumn("pcaFeaturesStr", df_pca_ordered.pcaFeatures.cast("string"))

In [18]:
df_pca_ordered_str.printSchema()

root
 |-- id: long (nullable = false)
 |-- path: string (nullable = true)
 |-- img_array: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- label: string (nullable = true)
 |-- features_ds: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- featuresDouble: vector (nullable = true)
 |-- pcaFeatures: vector (nullable = true)
 |-- pcaFeaturesStr: string (nullable = true)



In [38]:
# Show dataframe columns to be stored in csv file
df_pca_ordered_str.select("label", "pcaFeaturesStr").show(truncate=True)

+--------------+--------------------+
|         label|      pcaFeaturesStr|
+--------------+--------------------+
|Apple_Braeburn|[1806.57083104063...|
|Apple_Braeburn|[1641.46618310563...|
|Apple_Braeburn|[1800.10984031156...|
|Apple_Braeburn|[1790.00542616467...|
|       Avocado|[-1546.4888593670...|
|       Avocado|[-1567.9106858745...|
|       Avocado|[-1586.4709611181...|
|       Avocado|[-1591.5630895189...|
|   Cauliflower|[123.313254464571...|
|   Cauliflower|[128.661121751511...|
|   Cauliflower|[99.1668768156642...|
|   Cauliflower|[78.6374048875883...|
+--------------+--------------------+



# PCA result storage

In [19]:
# Save data in a single CSV file
df_pca_ordered_str.select("label", "pcaFeaturesStr").coalesce(1).write.csv("pca_res.csv")

In [22]:
# List files in storage directory
os.listdir("pca_res.csv")[0]

'part-00000-955ff586-2622-49b0-8b8c-4e03f72f3753-c000.csv'

In [23]:
# Set path of CSV file of interest
fn_path = "pca_res.csv/" + os.listdir("pca_res.csv")[0]
fn_path

'pca_res.csv/part-00000-955ff586-2622-49b0-8b8c-4e03f72f3753-c000.csv'

In [24]:
# Upload CSV file to S3 in "oc-p8-bucket" as "pca_res.csv"
s3.meta.client.upload_file(fn_path, "oc-p8-bucket", "pca_res.csv")