# P8 Déployer un modèle dans le cloud - Local PySpark and AWS S3 Bucket Dataset 

In [1]:
import requests

import PIL.Image
import boto3
import requests
from tqdm import tqdm

import pyspark
import numpy as np
import torch

import os
import io
from io import BytesIO

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import Row

import datetime 

from encoder import Encoder

## AWS S3 bucket

In [2]:
S3_BUCKET_NAME = 'cloud-fruits-p8-bucket'

s3_input_images_to_process_bucket_folder = 'input_images_to_process'

s3_output_features_and_images_processed_folder = 'output_features_and_images_processed'
s3_output_images_processed_folder = 'images_processed'
s3_output_features_to_classify_folder = 'features_to_classify'

S3_OUTPUT_FEATURES_PARQUET_FILENAME = "cloud-fruits-p8-features.parquet"

## Read data from input folder

In [3]:
# Connect to S3 storage
s3_bucket = boto3.resource('s3').Bucket(S3_BUCKET_NAME)

print("... list images from Fruits 360 dataset in AWS S3 bucket input_images_to_process")

# list images from Fruits 360 dataset
image_keys = [image.key for image in s3_bucket.objects.filter(Prefix=s3_input_images_to_process_bucket_folder) if '.jpg' in image.key]
print(f"Contains {len(image_keys)} images.")
print(image_keys)  

... list images from Fruits 360 dataset in AWS S3 bucket input_images_to_process
Contains 50 images.
['input_images_to_process/Apricot_179_100.jpg', 'input_images_to_process/Apricot_r_108_100.jpg', 'input_images_to_process/Apricot_r_109_100.jpg', 'input_images_to_process/Apricot_r_110_100.jpg', 'input_images_to_process/Apricot_r_111_100.jpg', 'input_images_to_process/Apricot_r_112_100.jpg', 'input_images_to_process/Apricot_r_283_100.jpg', 'input_images_to_process/Apricot_r_284_100.jpg', 'input_images_to_process/Apricot_r_285_100.jpg', 'input_images_to_process/Apricot_r_316_100.jpg', 'input_images_to_process/Apricot_r_318_100.jpg', 'input_images_to_process/Apricot_r_319_100.jpg', 'input_images_to_process/Banana_10_100.jpg', 'input_images_to_process/Banana_11_100.jpg', 'input_images_to_process/Banana_152_100.jpg', 'input_images_to_process/Banana_153_100.jpg', 'input_images_to_process/Banana_154_100.jpg', 'input_images_to_process/Banana_279_100.jpg', 'input_images_to_process/Banana_280_10

## Process data

In [4]:
def start_spark() -> SparkSession:
    """
    Returns: (SparkSession) my Spark session
    """
    #sc = SparkContext()
    #sc.setLogLevel("ERROR")
    #session = SparkSession(sparkContext=sc)
    #print(f"\n{'#' * 100} \n Cloud Fruits P8 \n{'#' * 100}\n")
    #return session
    spark = SparkSession.builder \
          .master("local") \
          .appName("Cloud-Fruits-P8") \
          .getOrCreate()
    return spark

In [5]:
def load_image(image_key: str, bucket_name: str) -> PIL.Image.Image:
    """
    Load an image from AWS S3 storage.
    Args:
        image_key (str): image key in AWS S3 bucket
        bucket_name (str): S3 bucket name
    Returns: (PIL.Image.Image) RGB image data
    """
    bucket = boto3.resource('s3').Bucket(bucket_name)
    buffer = BytesIO()
    bucket.Object(image_key).download_fileobj(buffer)
    return PIL.Image.open(buffer)

In [6]:
def process_image(image_key: str) -> Row:
    """
    Encodes an image into a fixed-length vector of float values using a pre-trained DNN encoder.
    Args:
        img (Row): image_key (str): image key in AWS S3 bucket
    Returns: (Row) the image encoding, represented by a Row with fields:
        origin (str): image original path (i.e., the image key in S3 bucket)
        label (str): the image label
        x0 (float): first feature of image encoding
        (...)
        x_(n-1) (float): last feature of image encoding
    """
    print(f"...... process image {image_key}")
    image_label = image_key.split('/')[-2]
    image_array = load_image(image_key=image_key, bucket_name=S3_BUCKET_NAME)
    image_encoding = broadcastEncoder.value.encode(image_array)
    image_encoding = {f'x{i}': value for i, value in enumerate(image_encoding)}
    return Row(origin=image_key, label=image_label, **image_encoding)

In [7]:
if len(image_keys) > 0:
    
    spark = start_spark()
    s3_client = boto3.client('s3')
    
    currentDate = datetime.datetime.today()
    
    print("... load image encoder")
    broadcastEncoder = spark.sparkContext.broadcast(Encoder())

    print("... encode images and write output to file in output folder in AWS S3 bucket")
    output = spark.sparkContext.parallelize(image_keys).map(process_image).toDF()

    # The coalesce method reduces the number of partitions in a DataFrame
    # coalesce(1) consolidate the data in one partition only
    # output.coalesce(1).write.csv(f'output/{currentDate.strftime("%Y%m%d:%H%M%S")}/output.csv', header="true", mode="overwrite") # save as csv file
    
    # Write result in AWS s3 bucket output folder
    print(f"... Write result as parquet file in AWS S3 bucket {s3_output_features_and_images_processed_folder}")
    output.coalesce(1).write.parquet(f's3a://{S3_BUCKET_NAME}/{s3_output_features_and_images_processed_folder}/{currentDate.strftime("%Y%m%d-%H%M%S-batch")}/{s3_output_features_to_classify_folder}') # save as parquet file

    # list parquet file(s) from output folder
    s3_prefix = s3_output_features_and_images_processed_folder + "/" + currentDate.strftime("%Y%m%d-%H%M%S-batch") +"/" + s3_output_features_to_classify_folder
    parquet_files = [parquet_file for parquet_file in s3_bucket.objects.filter(Prefix=s3_prefix) if '.parquet' in parquet_file.key]    
    print(f"... Will rename file {parquet_files}")

    if len(parquet_files) == 1:
        parquet_file = parquet_files[0]
        filename = parquet_file.key.split('/')[-1]
        folder = parquet_file.key.split('/')[-2]        
        try:
            output_key = f'{s3_output_features_and_images_processed_folder}/{currentDate.strftime("%Y%m%d-%H%M%S-batch")}/{folder}/{S3_OUTPUT_FEATURES_PARQUET_FILENAME}'
            print(f"... rename file {filename} from {output_key}")
            copy_source = {'Bucket': S3_BUCKET_NAME, 'Key': parquet_file.key}
            s3_client.copy_object(Bucket = S3_BUCKET_NAME, CopySource = copy_source, Key = output_key)
            s3_client.delete_object(Bucket = S3_BUCKET_NAME, Key = parquet_file.key)    
            print(f"... File {filename} successfully renamed to {output_key}")
        except Exception as e:
            print(f"{filename} not renamed correctly. The error raised is: ", e)
        
    spark.sparkContext.stop()

else:
    print("... no file to process, end of script")
    

... load image encoder
... encode images and write output to file in output folder in AWS S3 bucket


...... process image input_images_to_process/Apricot_179_100.jpg    (0 + 1) / 1]
[W NNPACK.cpp:51] Could not initialize NNPACK! Reason: Unsupported hardware.
                                                                                

... Write result as parquet file in AWS S3 bucket output_features_and_images_processed


...... process image input_images_to_process/Apricot_179_100.jpg    (0 + 1) / 1]
[W NNPACK.cpp:51] Could not initialize NNPACK! Reason: Unsupported hardware.
...... process image input_images_to_process/Apricot_r_108_100.jpg
                                                                                

... Will rename file [s3.ObjectSummary(bucket_name='cloud-fruits-p8-bucket', key='output_features_and_images_processed/20220628-114300-batch/features_to_classify/part-00000-910b7666-12a0-4daf-adaf-73d0c150b4cf-c000.snappy.parquet')]
... rename file part-00000-910b7666-12a0-4daf-adaf-73d0c150b4cf-c000.snappy.parquet from output_features_and_images_processed/20220628-114300-batch/features_to_classify/cloud-fruits-p8-features.parquet
... File part-00000-910b7666-12a0-4daf-adaf-73d0c150b4cf-c000.snappy.parquet successfully renamed to output_features_and_images_processed/20220628-114300-batch/features_to_classify/cloud-fruits-p8-features.parquet


## Move data to processed folder

In [8]:
if len(image_keys) > 0:

    # Connect to S3 storage
    s3_bucket = boto3.resource('s3').Bucket(S3_BUCKET_NAME)
    s3_client = boto3.client('s3')

    # Empty S3 storage
    s3_bucket_images = [image for image in s3_bucket.objects.filter(Prefix=s3_input_images_to_process_bucket_folder) if '.jpg' in image.key]
    print(f"... will move {len(s3_bucket_images)} files from Fruits 360 dataset in AWS S3 bucket, from {s3_input_images_to_process_bucket_folder} to {s3_output_features_and_images_processed_folder}/{s3_output_images_processed_folder}")
    for s3_bucket_image in s3_bucket_images:
        filename = s3_bucket_image.key.split('/')[-1]
        try:
            output_key = f'{s3_output_features_and_images_processed_folder}/{currentDate.strftime("%Y%m%d-%H%M%S")}-batch/{s3_output_images_processed_folder}/{filename}'
            print(f'Will move file from {s3_bucket_image.key} to {output_key}')

            copy_source = {'Bucket': S3_BUCKET_NAME, 'Key': s3_bucket_image.key}
            s3_client.copy_object(Bucket = S3_BUCKET_NAME, CopySource = copy_source, Key = output_key)
            s3_client.delete_object(Bucket = S3_BUCKET_NAME, Key = s3_bucket_image.key)    
            print(f'File {filename} successfully moved to {output_key}')
        except:
            print(f"Exception thrown. {filename} not moved correctly")
            
else:
    print("... no file to process, end of script")
    

... will move 50 files from Fruits 360 dataset in AWS S3 bucket, from input_images_to_process to output_features_and_images_processed/images_processed
Will move file from input_images_to_process/Apricot_179_100.jpg to output_features_and_images_processed/20220628-114300-batch/images_processed/Apricot_179_100.jpg
File Apricot_179_100.jpg successfully moved to output_features_and_images_processed/20220628-114300-batch/images_processed/Apricot_179_100.jpg
Will move file from input_images_to_process/Apricot_r_108_100.jpg to output_features_and_images_processed/20220628-114300-batch/images_processed/Apricot_r_108_100.jpg
File Apricot_r_108_100.jpg successfully moved to output_features_and_images_processed/20220628-114300-batch/images_processed/Apricot_r_108_100.jpg
Will move file from input_images_to_process/Apricot_r_109_100.jpg to output_features_and_images_processed/20220628-114300-batch/images_processed/Apricot_r_109_100.jpg
File Apricot_r_109_100.jpg successfully moved to output_featur

File Kiwi_221_100.jpg successfully moved to output_features_and_images_processed/20220628-114300-batch/images_processed/Kiwi_221_100.jpg
Will move file from input_images_to_process/Kiwi_222_100.jpg to output_features_and_images_processed/20220628-114300-batch/images_processed/Kiwi_222_100.jpg
File Kiwi_222_100.jpg successfully moved to output_features_and_images_processed/20220628-114300-batch/images_processed/Kiwi_222_100.jpg
Will move file from input_images_to_process/Kiwi_r_180_100.jpg to output_features_and_images_processed/20220628-114300-batch/images_processed/Kiwi_r_180_100.jpg
File Kiwi_r_180_100.jpg successfully moved to output_features_and_images_processed/20220628-114300-batch/images_processed/Kiwi_r_180_100.jpg
Will move file from input_images_to_process/Kiwi_r_182_100.jpg to output_features_and_images_processed/20220628-114300-batch/images_processed/Kiwi_r_182_100.jpg
File Kiwi_r_182_100.jpg successfully moved to output_features_and_images_processed/20220628-114300-batch/i