In [3]:
pip install --upgrade pyspark

You should consider upgrading via the '/home/ec2-user/anaconda3/envs/amazonei_tensorflow2_p36/bin/python -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.


In [4]:
import pandas as pd
from PIL import Image
import numpy as np
import io

import tensorflow as tf
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array

from pyspark.sql.functions import col, pandas_udf, PandasUDFType

In [5]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

In [6]:
import boto3
import os 

def downloadDirectoryFroms3(bucketName, remoteDirectoryName):
    s3_resource = boto3.resource('s3')
    bucket = s3_resource.Bucket(bucketName) 
    for obj in bucket.objects.filter(Prefix = remoteDirectoryName):
        if not os.path.exists(os.path.dirname(obj.key)):
            os.makedirs(os.path.dirname(obj.key))
        bucket.download_file(obj.key, obj.key) # save to same path

downloadDirectoryFroms3('dataocrbucket', 'fruits-360')

In [10]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load("fruits-360/Test")

In [11]:
display(images.limit(5))

DataFrame[path: string, modificationTime: timestamp, length: bigint, content: binary]

In [12]:
model = ResNet50(include_top=False)
model.summary()  # verify that the top layer is removed

Model: "resnet50"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, None, None,  0                                            
__________________________________________________________________________________________________
conv1_pad (ZeroPadding2D)       (None, None, None, 3 0           input_1[0][0]                    
__________________________________________________________________________________________________
conv1_conv (Conv2D)             (None, None, None, 6 9472        conv1_pad[0][0]                  
__________________________________________________________________________________________________
conv1_bn (BatchNormalization)   (None, None, None, 6 256         conv1_conv[0][0]                 
___________________________________________________________________________________________

In [13]:
bc_model_weights = sc.broadcast(model.get_weights())

def model_fn():
  """
  Returns a ResNet50 model with top layer removed and broadcasted pretrained weights.
  """
  model = ResNet50(weights=None, include_top=False)
  model.set_weights(bc_model_weights.value)
  return model

In [14]:
def preprocess(content):
  """
  Preprocesses raw image bytes for prediction.
  """
  img = Image.open(io.BytesIO(content))
  arr = img_to_array(img)
  return preprocess_input(arr)

def featurize_series(model, content_series):
  """
  Featurize a pd.Series of raw images using the input model.
  :return: a pd.Series of image features
  """
  input = np.stack(content_series.map(preprocess))
  preds = model.predict(input)
  # For some layers, output features will be multi-dimensional tensors.
  # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
  output = [p.flatten() for p in preds]
  return pd.Series(output)

In [15]:
pip install PyArrow

You should consider upgrading via the '/home/ec2-user/anaconda3/envs/amazonei_tensorflow2_p36/bin/python -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.


In [16]:
@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
  '''
  This method is a Scalar Iterator pandas UDF wrapping our featurization function.
  The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).
  
  :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
  '''
  # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
  # for multiple data batches.  This amortizes the overhead of loading big models.
  model = model_fn()
  for content_series in content_series_iter:
    yield featurize_series(model, content_series)



In [17]:
# Pandas UDFs on large records (e.g., very large images) can run into Out Of Memory (OOM) errors.
# If you hit such errors in the cell below, try reducing the Arrow batch size via `maxRecordsPerBatch`.
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

In [25]:
# We can now run featurization on our entire Spark DataFrame.
# NOTE: This can take a long time (about 10 minutes) since it applies a large model to the full dataset.
features_df = images.repartition(16).select(col("path"),featurize_udf("content").alias("features"))
features_df.write.mode("overwrite").parquet("Test_features")

In [26]:
features_df.show()

+--------------------+--------------------+
|                path|            features|
+--------------------+--------------------+
|file:/home/ec2-us...|[0.0, 0.0, 1.7025...|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...|
|file:/home/ec2-us...|[0.0, 0.0,

In [27]:
from pandas import option_context

with option_context('display.max_colwidth', 400):
    display(features_df.head())

Row(path='file:/home/ec2-user/SageMaker/P8cloudDeployModel/fruits-360/Test/Beetroot/r_178_100.jpg', features=[0.0, 0.0, 1.7025721073150635, 0.0, 0.059667110443115234, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.7049663066864014, 0.0, 0.0, 0.0, 0.0, 2.5600314140319824, 4.4736328125, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.07053577899932861, 0.0, 9.763364791870117, 0.0, 0.9025743007659912, 9.88847827911377, 0.0, 0.0, 1.8362088203430176, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.4391789436340332, 2.2531633377075195, 0.0, 0.0, 1.8636975288391113, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.908503770828247, 0.49085044860839844, 0.11501181125640869, 0.0, 0.016127586364746094, 0.0, 0.0, 7.776346206665039, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 7.248244285583496, 0.0, 0.0, 0.0, 0.0, 0.0, 4.230730056762695, 1.9561779499053955, 0.0, 1.5292762517929077, 0.0, 0.0, 0.0, 0.0, 4.274105548858643, 0.0, 0.0, 0.0, 0.0, 14.22756290435791, 0.0, 0.0, 0.0, 0.6590266227722168, 0.0, 0.0, 0.0

In [28]:
def parse_category(path):
    if len(path) > 0:
        return path.split('/')[-2]
    else:
        return ''

In [29]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

udf_parse_category = udf(parse_category, StringType())

df_with_labels = features_df.withColumn("label", udf_parse_category("path"))

In [30]:
df_with_labels.show()

+--------------------+--------------------+----------+
|                path|            features|     label|
+--------------------+--------------------+----------+
|file:/home/ec2-us...|[0.0, 0.0, 1.7025...|  Beetroot|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...|  Beetroot|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...|  Beetroot|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...|  Beetroot|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...|  Beetroot|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...|  Beetroot|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...| Blueberry|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...| Blueberry|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...| Blueberry|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...| Blueberry|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...|  Beetroot|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...| Blueberry|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...|  Beetroot|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...|  Beetroot|
|file:/home/ec2-us...|[0.0, 0.0, 0.0, 0...| Blueberry|
|file:/hom

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.functions import col

sparse_format_udf = udf(lambda x: str(x), StringType())

df_csv = df_with_labels.withColumn('features', sparse_format_udf(col('features')))

df_csv.write.mode("overwrite").option("delimiter", "\t").csv("Test_df_to_csv")