# Code that works with google colab

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, col, split,udf
from pyspark.sql.types import StructType, StructField, ArrayType, IntegerType, FloatType
from typing import Iterator
import pandas as pd

In [None]:
import tensorflow as tf

from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array, load_img

In [None]:
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.getOrCreate()

In [None]:
df = spark.read.format("image").option("recursiveFileLookup","true").load("/content/drive/MyDrive/A/*").limit(10)

df.show()

+--------------------+
|               image|
+--------------------+
|{file:///content/...|
|{file:///content/...|
|{file:///content/...|
|{file:///content/...|
|{file:///content/...|
|{file:///content/...|
|{file:///content/...|
|{file:///content/...|
|{file:///content/...|
|{file:///content/...|
+--------------------+



In [None]:
df.count()

10

In [None]:
df.printSchema()

root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = true)
 |    |-- width: integer (nullable = true)
 |    |-- nChannels: integer (nullable = true)
 |    |-- mode: integer (nullable = true)
 |    |-- data: binary (nullable = true)



In [None]:
from pyspark.sql.functions import split, element_at


df = df.withColumn('Data Set', element_at(split(df['image.origin'], "/"), -3))
df = df.withColumn('Fruit', element_at(split(df['image.origin'], "/"), -2))
df = df.withColumn('File Name', element_at(split(df['image.origin'], "/"), -1))

df.show()

+--------------------+--------+------------------+--------------+
|               image|Data Set|             Fruit|     File Name|
+--------------------+--------+------------------+--------------+
|{file:///content/...|       A|Banana_Lady_Finger| r2_95_100.jpg|
|{file:///content/...|       A|Banana_Lady_Finger|r2_154_100.jpg|
|{file:///content/...|       A|Banana_Lady_Finger| r2_92_100.jpg|
|{file:///content/...|       A|Banana_Lady_Finger| r2_86_100.jpg|
|{file:///content/...|       A|Banana_Lady_Finger| r2_87_100.jpg|
|{file:///content/...|       A|Banana_Lady_Finger|r2_109_100.jpg|
|{file:///content/...|       A|Banana_Lady_Finger| r2_93_100.jpg|
|{file:///content/...|       A|Banana_Lady_Finger| r2_89_100.jpg|
|{file:///content/...|       A|Banana_Lady_Finger|r2_183_100.jpg|
|{file:///content/...|       A|Banana_Lady_Finger|r2_199_100.jpg|
+--------------------+--------+------------------+--------------+



In [None]:
import pyspark.sql.functions as F
from pyspark.ml.image import ImageSchema
from pyspark.ml.linalg import DenseVector, VectorUDT, Vectors

ImageSchema.imageFields

img2vec = F.udf(lambda x: DenseVector(ImageSchema.toNDArray(x).flatten()), VectorUDT())
df = df.withColumn('vecs', img2vec("image"))

df.show()

+--------------------+--------+------------------+--------------+--------------------+
|               image|Data Set|             Fruit|     File Name|                vecs|
+--------------------+--------+------------------+--------------+--------------------+
|{file:///content/...|       A|Banana_Lady_Finger| r2_95_100.jpg|[255.0,255.0,255....|
|{file:///content/...|       A|Banana_Lady_Finger|r2_154_100.jpg|[255.0,255.0,255....|
|{file:///content/...|       A|Banana_Lady_Finger| r2_92_100.jpg|[255.0,255.0,255....|
|{file:///content/...|       A|Banana_Lady_Finger| r2_86_100.jpg|[255.0,255.0,255....|
|{file:///content/...|       A|Banana_Lady_Finger| r2_87_100.jpg|[255.0,255.0,255....|
|{file:///content/...|       A|Banana_Lady_Finger|r2_109_100.jpg|[255.0,255.0,255....|
|{file:///content/...|       A|Banana_Lady_Finger| r2_93_100.jpg|[255.0,255.0,255....|
|{file:///content/...|       A|Banana_Lady_Finger| r2_89_100.jpg|[255.0,255.0,255....|
|{file:///content/...|       A|Banana_Lady_

In [None]:
neural_network = ResNet50(
    include_top=False,
    weights=None,
    pooling='max',
    input_shape=(100, 100, 3))

neural_network_weights = spark.sparkContext.broadcast(neural_network.get_weights())


In [None]:
def neural_network_init():
    # returns a ResNet50 model
    neural_network = ResNet50(
        include_top=False,
        weights=None,
        pooling='max',
        input_shape=(100, 100, 3))

    neural_network.set_weights(neural_network_weights.value)
    return neural_network

In [None]:
import numpy as np

def gettensorfrompath(image_path):
    path = image_path.replace("file:", "")
    img = load_img(path)
    x = img_to_array(img)
    x = preprocess_input(x)
    return x

@pandas_udf('array<double>')
def featurize(images_data_iter: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    # load model outside of for loop
    neural_network = neural_network_init()
    for image_data_series in images_data_iter:
        image_path_series = image_data_series['origin']
        # Apply functions to entire series at once
        x = image_path_series.map(gettensorfrompath)
        x = np.stack(list(x.values))
        # option is to enable batch_size
        features = neural_network.predict(x)
        features_flat = [p.flatten() for p in features]
        yield pd.Series(features_flat)

In [None]:
featurized_df = df.withColumn('ResNet50_Features', featurize('image')).cache()

In [None]:
featurized_df.show()

+--------------------+--------+------------------+--------------+--------------------+--------------------+
|               image|Data Set|             Fruit|     File Name|                vecs|   ResNet50_Features|
+--------------------+--------+------------------+--------------+--------------------+--------------------+
|{file:///content/...|       A|Banana_Lady_Finger| r2_95_100.jpg|[255.0,255.0,255....|[16.2176494598388...|
|{file:///content/...|       A|Banana_Lady_Finger|r2_154_100.jpg|[255.0,255.0,255....|[16.9476146697998...|
|{file:///content/...|       A|Banana_Lady_Finger| r2_92_100.jpg|[255.0,255.0,255....|[16.2326622009277...|
|{file:///content/...|       A|Banana_Lady_Finger| r2_86_100.jpg|[255.0,255.0,255....|[16.1318664550781...|
|{file:///content/...|       A|Banana_Lady_Finger| r2_87_100.jpg|[255.0,255.0,255....|[16.2308292388916...|
|{file:///content/...|       A|Banana_Lady_Finger|r2_109_100.jpg|[255.0,255.0,255....|[15.9981527328491...|
|{file:///content/...|      

In [None]:
array_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())

In [None]:
vectorized_df = featurized_df.withColumn('ResNet50_Vectors', array_to_vector_udf('ResNet50_Features'))


In [None]:
vectorized_df.show()

+--------------------+--------+------------------+--------------+--------------------+--------------------+--------------------+
|               image|Data Set|             Fruit|     File Name|                vecs|   ResNet50_Features|    ResNet50_Vectors|
+--------------------+--------+------------------+--------------+--------------------+--------------------+--------------------+
|{file:///content/...|       A|Banana_Lady_Finger| r2_95_100.jpg|[255.0,255.0,255....|[16.2176494598388...|[16.2176494598388...|
|{file:///content/...|       A|Banana_Lady_Finger|r2_154_100.jpg|[255.0,255.0,255....|[16.9476146697998...|[16.9476146697998...|
|{file:///content/...|       A|Banana_Lady_Finger| r2_92_100.jpg|[255.0,255.0,255....|[16.2326622009277...|[16.2326622009277...|
|{file:///content/...|       A|Banana_Lady_Finger| r2_86_100.jpg|[255.0,255.0,255....|[16.1318664550781...|[16.1318664550781...|
|{file:///content/...|       A|Banana_Lady_Finger| r2_87_100.jpg|[255.0,255.0,255....|[16.2308292

In [None]:
from pyspark.ml.feature import PCA

pca = PCA(k=10, inputCol='ResNet50_Vectors', outputCol='PCA_Vectors')
model = pca.fit(vectorized_df)

In [None]:
final_df = model.transform(vectorized_df)

In [None]:
final_df.show()

+--------------------+--------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+
|               image|Data Set|             Fruit|     File Name|                vecs|   ResNet50_Features|    ResNet50_Vectors|         PCA_Vectors|
+--------------------+--------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+
|{file:///content/...|       A|Banana_Lady_Finger| r2_95_100.jpg|[255.0,255.0,255....|[16.2176494598388...|[16.2176494598388...|[-166.77434302816...|
|{file:///content/...|       A|Banana_Lady_Finger|r2_154_100.jpg|[255.0,255.0,255....|[16.9476146697998...|[16.9476146697998...|[-100.32605925791...|
|{file:///content/...|       A|Banana_Lady_Finger| r2_92_100.jpg|[255.0,255.0,255....|[16.2326622009277...|[16.2326622009277...|[-169.58092669487...|
|{file:///content/...|       A|Banana_Lady_Finger| r2_86_100.jpg|[255.0,255.0,255....|[16.1318664550

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def array_to_string(my_list):
    return '[' + ','.join([str(elem) for elem in my_list]) + ']'

array_to_string_udf = udf(array_to_string, StringType())

df1 = final_df.withColumn('Features', array_to_string_udf(final_df["PCA_Vectors"]))

In [None]:
df2 = df1.drop("image", "vecs", "ResNet50_Features", "ResNet50_Vectors", "PCA_Vectors")

In [None]:
df2.show()

+--------+------------------+--------------+--------------------+
|Data Set|             Fruit|     File Name|            Features|
+--------+------------------+--------------+--------------------+
|       A|Banana_Lady_Finger| r2_95_100.jpg|[-166.77434302816...|
|       A|Banana_Lady_Finger|r2_154_100.jpg|[-100.32605925791...|
|       A|Banana_Lady_Finger| r2_92_100.jpg|[-169.58092669487...|
|       A|Banana_Lady_Finger| r2_86_100.jpg|[-163.36378067045...|
|       A|Banana_Lady_Finger| r2_87_100.jpg|[-164.51476954341...|
|       A|Banana_Lady_Finger|r2_109_100.jpg|[-136.75514207152...|
|       A|Banana_Lady_Finger| r2_93_100.jpg|[-168.24304255516...|
|       A|Banana_Lady_Finger| r2_89_100.jpg|[-165.09575395126...|
|       A|Banana_Lady_Finger|r2_183_100.jpg|[-122.18477178633...|
|       A|Banana_Lady_Finger|r2_199_100.jpg|[-118.93139648601...|
+--------+------------------+--------------+--------------------+



In [None]:
# Write DataFrame data to CSV file
df2.write.csv("/content/drive/MyDrive/data_to_csv")

In [None]:
sc.stop()

# Code that works with AWS

In [None]:
#pip install pandas
#pip install tensorflow
#pip install pillow
#pip install boto3

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, col, split,udf
from pyspark.sql.types import StructType, StructField, ArrayType, IntegerType, FloatType
from typing import Iterator

import pandas as pd
import numpy as np
import tensorflow as tf
from PIL import Image

from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array, load_img

sc = SparkContext.getOrCreate()
spark = SparkSession.builder.getOrCreate()

from pyspark.sql.functions import split, element_at
import pyspark.sql.functions as F
from pyspark.ml.image import ImageSchema
from pyspark.ml.linalg import DenseVector, VectorUDT, Vectors

neural_network = ResNet50(
    include_top=False,
    weights=None,
    pooling='max',
    input_shape=(100, 100, 3))

neural_network_weights = spark.sparkContext.broadcast(neural_network.get_weights())


import boto3
# Get resources stored in AWS S3 service
s3 = boto3.resource('s3')
files = s3.Bucket('cdk-hnb659fds-assets-127319039344-eu-west-3')

def neural_network_init():
    neural_network = ResNet50(
        include_top=False,
        weights=None,
        pooling='max',
        input_shape=(100, 100, 3))
    # error if sparkcontext as it will be called on workers and not only drivers
    # neural_network_weights = sc.broadcast(neural_network.get_weights())
    neural_network.set_weights(neural_network_weights.value)
    return neural_network

list_path_img = []
for file in files.objects.limit(500):
    obj = files.Object(file.key)
    Fruit = file.key.split('/')[-2]
    #File_Name = file.key.split('/')[-1]
    response = obj.get()
    file_stream = response['Body']
    img = Image.open(file_stream)
    # convert image to flatten array
    flat_array = np.array(img).ravel().tolist()
    tensor = np.array(flat_array).reshape(1, 100, 100, 3).astype(np.uint8)
    # preprocess input
    prep_tensor = preprocess_input(tensor)
    features = neural_network.predict(prep_tensor).ravel().tolist()
    # Store file key and features
    list_path_img.append((file.key, Fruit, features))


# Create spark dataframe from previous list of tuples
df_img = spark.createDataFrame(list_path_img, ['origin', 'Fruit', 'cnn_features'])

array_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())

vectorized_df = df_img.withColumn('ResNet50_Vectors', array_to_vector_udf('cnn_features'))

vectorized_df.show()

from pyspark.ml.feature import PCA

pca = PCA(k=10, inputCol='ResNet50_Vectors', outputCol='PCA_Vectors')
model = pca.fit(vectorized_df)

final_df = model.transform(vectorized_df)

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def array_to_string(my_list):
    return '[' + ','.join([str(elem) for elem in my_list]) + ']'

array_to_string_udf = udf(array_to_string, StringType())

df1 = final_df.withColumn('Features', array_to_string_udf(final_df["PCA_Vectors"]))

df2 = df1.drop("image", "vecs", "cnn_features", "ResNet50_Vectors", "PCA_Vectors")

# Write DataFrame data to CSV file
df2.write.csv("s3://cdk-hnb659fds-assets-127319039344-eu-west-3/final_data")

sc.stop()