In [23]:
%reset

Once deleted, variables cannot be recovered. Proceed (y/[n])? y


In [24]:
import pandas as pd
from PIL import Image
import numpy as np
import io

from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split, udf
from pyspark.sql import SparkSession

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
PATH = 's3://alex-oc-test-21/'
PATH_Data = PATH+'input/'
PATH_Result = PATH+'output/'

print(
    'PATH:        ' + PATH        + '\n' +
    'PATH_Data:   ' + PATH_Data   + '\n' +
    'PATH_Result: ' + PATH_Result
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://alex-oc-test-21/
PATH_Data:   s3://alex-oc-test-21/input/
PATH_Result: s3://alex-oc-test-21/output/

In [26]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+---------------------------------------------------+-------+
|path                                               |label  |
+---------------------------------------------------+-------+
|s3://alex-oc-test-21/input/Test1/Apricot/11_100.jpg|Apricot|
|s3://alex-oc-test-21/input/Test1/Apricot/10_100.jpg|Apricot|
|s3://alex-oc-test-21/input/Test1/Apricot/12_100.jpg|Apricot|
|s3://alex-oc-test-21/input/Test1/Apricot/0_100.jpg |Apricot|
|s3://alex-oc-test-21/input/Test1/Apricot/2_100.jpg |Apricot|
+---------------------------------------------------+-------+
only showing top 5 rows

None

In [28]:
model = MobileNetV2(
    weights='imagenet',
    include_top=True,
    input_shape=(224, 224, 3)
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
new_model = Model(
    inputs=model.input,
    outputs=model.layers[-2].output
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "functional_1"
┏━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┓
┃ Layer (type)        ┃ Output Shape      ┃    Param # ┃ Connected to      ┃
┡━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━┩
│ input_layer_1       │ (None, 224, 224,  │          0 │ -                 │
│ (InputLayer)        │ 3)                │            │                   │
├─────────────────────┼───────────────────┼────────────┼───────────────────┤
│ Conv1 (Conv2D)      │ (None, 112, 112,  │        864 │ input_layer_1[0]… │
│                     │ 32)               │            │                   │
├─────────────────────┼───────────────────┼────────────┼───────────────────┤
│ bn_Conv1            │ (None, 112, 112,  │        128 │ Conv1[0][0]       │
│ (BatchNormalizatio… │ 32)               │            │                   │
├─────────────────────┼───────────────────┼────────────┼───────────────────┤
│ Conv1_relu (ReLU)   │ (None, 112, 112,  │          0

In [31]:
broadcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcast pretrained weights.
    """
    model = MobileNetV2(
        weights='imagenet',
        include_top=True,
        input_shape=(224, 224, 3)
    )
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(
        inputs=model.input,
        outputs=model.layers[-2].output
    )
    new_model.set_weights(broadcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



In [34]:
images_with_array_features = images.repartition(20).select(
    col("path"),
    col("label"),
    featurize_udf("content").alias("features")
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
# Define a UDF to convert ArrayType to VectorUDT
def array_to_vector(array):
    return Vectors.dense(array)

# Register the UDF
array_to_vector_udf = udf(array_to_vector, VectorUDT())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [36]:
# Apply the UDF to create a new column of type Vector
images_with_array_and_vector_features = \
images_with_array_features.withColumn(
    "vector_features",
    array_to_vector_udf("features")
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
pca = PCA(k=10, inputCol="vector_features")
pca.setOutputCol("transformed_features")
pca_model = pca.fit(images_with_array_and_vector_features)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
images_with_array_and_vector_and_transformed_features = \
pca_model.transform(images_with_array_and_vector_features)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
images_with_array_and_vector_and_transformed_features.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- vector_features: vector (nullable = true)
 |-- transformed_features: vector (nullable = true)

In [40]:
images_with_transformed_features = \
images_with_array_and_vector_and_transformed_features.select(
    'path',
    'label',
    'transformed_features'
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [41]:
images_with_transformed_features.write\
.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [42]:
df_from_file = pd.read_parquet(PATH_Result, engine='pyarrow')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [43]:
df_from_file.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                                path  ...                               transformed_features
0  s3://alex-oc-test-21/input/Test1/Apricot/23_10...  ...  {'type': 1, 'size': None, 'indices': None, 'va...
1  s3://alex-oc-test-21/input/Test1/Apricot/25_10...  ...  {'type': 1, 'size': None, 'indices': None, 'va...
2  s3://alex-oc-test-21/input/Test1/Apricot/104_1...  ...  {'type': 1, 'size': None, 'indices': None, 'va...
3  s3://alex-oc-test-21/input/Test1/Banana Lady F...  ...  {'type': 1, 'size': None, 'indices': None, 'va...
4  s3://alex-oc-test-21/input/Test1/Banana Lady F...  ...  {'type': 1, 'size': None, 'indices': None, 'va...

[5 rows x 3 columns]

In [44]:
df_from_file.loc[0, 'transformed_features']

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{'type': 1, 'size': None, 'indices': None, 'values': array([-12.90199294,   2.26495812,  -8.49400134,   2.07394065,
         0.21992382,   3.95324557,  -1.46668882,  -3.03114165,
         6.37412001,  -0.90625153])}