# 1. Imports

In [None]:
import pandas as pd
import numpy as np
import io
import boto3

from PIL import Image

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, split, udf
from pyspark.ml.feature import StandardScaler, PCA
from pyspark.ml.linalg import Vectors, VectorUDT

from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array

# 2. Featurization

In [None]:
session = boto3.Session()
credentials = session.get_credentials()
current_credentials = credentials.get_frozen_credentials()

In [None]:
spark = SparkSession.builder.appName('P8').getOrCreate()

In [None]:
spark

In [None]:
sc = spark.sparkContext.getOrCreate()

In [None]:
s3 = boto3.client('s3',
                  aws_access_key_id=current_credentials.access_key,
                  aws_secret_access_key=current_credentials.secret_key)

In [None]:
# To get images paths

img_path = []
for key in s3.list_objects(Bucket='fruits-oc-projet-8')['Contents']:
    img_path.append(key['Key'])

In [None]:
img_path = map(lambda x : Row(x), img_path)
df = spark.createDataFrame(img_path, ['img_path'])

In [None]:
# To get category

df = df.withColumn('category', split(df['img_path'], '/').getItem(0))

In [None]:
df.show()

In [None]:
model = ResNet50(include_top=False, weights='imagenet', pooling='avg')
model.summary() # verify that the top layer is removed

In [None]:
bc_model_weights = sc.broadcast(model.get_weights())

def model_fn():
    """
    Returns a ResNet50 model with top layer removed and broadcasted pretrained weights.
    """
    model = ResNet50(weights='imagenet', pooling='avg', include_top=False)
    model.set_weights(bc_model_weights.value)
    return model

In [None]:
def preprocess(img_path):
    """
    Preprocesses image for prediction.
    """
    s3 = boto3.resource('s3',
                        aws_access_key_id=current_credentials.access_key,
                        aws_secret_access_key= current_credentials.secret_key)
    
    content = s3.Object(bucket_name='fruits-oc-projet-8', key=img_path)
    cont = content.get()
    body = cont['Body']
    
    img = Image.open(body).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

In [None]:
@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

In [None]:
# Pandas UDFs on large records (e.g., very large images) can run into Out Of Memory (OOM) errors.
# If you hit such errors in the cell below, try reducing the Arrow batch size via `maxRecordsPerBatch`.
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

In [None]:
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())

In [None]:
# We can now run featurization on our entire Spark DataFrame.
# NOTE: This can take a long time (about 10 minutes) since it applies a large model to the full dataset.
features_df = df.repartition(16).select(col('img_path'), col('category'), featurize_udf('img_path').alias('features'))
features_df = features_df.withColumn("features_udf", list_to_vector_udf(col("features")))

In [None]:
features_df = features_df.drop('features')

In [None]:
features_df.show()

# 3. Dimension Reduction

In [None]:
# Features scaling before PCA application

scaler = StandardScaler(inputCol="features_udf", outputCol="features_udf_sca")
features_df = scaler.fit((features_df)).transform(features_df)

In [None]:
pca = PCA(k=50, inputCol="features_udf_sca", outputCol="pcaFeatures_udf")
model = pca.fit(features_df)
result = model.transform(features_df)

In [None]:
result = result.select(col('img_path'), col('category'), col('pcaFeatures_udf'))

In [None]:
result.show()

# 4. Uploading results on S3

In [None]:
result_pd = result.toPandas()

In [None]:
s3.create_bucket(Bucket='ocprojet8result', CreateBucketConfiguration={'LocationConstraint': 'eu-west-3'})

In [None]:
csv_buffer = io.StringIO()
result_pd.to_csv(csv_buffer)

response = s3.put_object(
    ACL='public-read',
    Body=csv_buffer.getvalue(),
    Bucket='ocprojet8result',
    Key='result.csv'
)