# Set up

Python librairies imports :

In [None]:
# File system management
import os
from pathlib import Path
import io

# Data manipulation
import numpy as np
import pandas as pd
from typing import Iterator

# Data visualization
import matplotlib.pyplot as plt
import seaborn as sns

# Image manipulation
from PIL import Image

# Dimension reduction
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA

# Tensorflow
import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model

# Pyspark
from pyspark.ml.feature import PCA as pyPCA
from pyspark.ml.functions import array_to_vector
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

%matplotlib inline
sns.set_theme(palette="Set2")

Define work location :

In [None]:
# Current project path
PATH_PROJ = "gs://bucket-openclassrooms-p8"


# Define images paths
PATH_DATA = PATH_PROJ + "/data/training"
PATH_RESULTS = PATH_PROJ + "/data/results"

# Data processing

## Functions

### MobileNetV2 model

In [None]:
def model_create(show_summary=False):
    """Create a MobileNetV2 model with top layer removed

    Returns:
        MobileNetV2 model
    """
    # Load default model
    model_base = MobileNetV2(weights="imagenet", include_top=True, input_shape=(224, 224, 3))

    # Freeze layers
    for layer in model_base.layers:
        layer.trainable = False

    # Create model without top layer
    model_new = Model(inputs=model_base.input, outputs=model_base.layers[-2].output)

    # Show model summary
    if show_summary is True:
        print(model_new.summary())

    return model_new

### Images preprocesssing

In [None]:
def preprocess(content):
    """Preprocesses raw image bytes.

    Args:
        content: PIL Image

    Returns:
        Numpy array
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

In [None]:
def featurize_series(model, content_series):
    """Featurize a pd.Series of raw images using the input model.

    Args:
        model: CNN model
        content_series: pd.Series of image data

    Returns:
        pd.Series of image features
    """
    content_input = np.stack(content_series.map(preprocess))
    preds = model.predict(content_input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

## Distributed model inference

### Create the Spark session

In [None]:
# Spark session created by cloud notebook

# Create sparkContext
sc = spark.sparkContext

# Set log level
sc.setLogLevel("ERROR")

spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/08 15:39:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Broadcast the model weights

In [None]:
model_with = model_create()
model_without = model_create(freeze_layers=False)

In [None]:
# Create broadcast weights
broadcast_weights = spark.sparkContext.broadcast(model_create(show_summary=True).get_weights())

In [None]:
@F.pandas_udf("array<float>")
def featurize_udf(content_series_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    """This method is a Scalar Iterator pandas UDF wrapping our featurization function.
        The decorator specifies this returns a Spark DataFrame column of type ArrayType(FloatType).

    Args:
        content_series_iter: Iterator over batches of data, where each batch
                            is a pandas Series of image data.

    Yields:
        pd.Series of image features
    """
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_create()
    # Broadcast weights to workers
    model.set_weights(broadcast_weights.value)
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

### Load the images

In [None]:
# Load all images
images = spark.read.format("binaryFile").option("pathGlobFilter", "*.jpg").option("recursiveFileLookup", "true").load(PATH_DATA)

In [None]:
# Display first images
images = images.withColumn('label', F.element_at(F.split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))
print("Number of images loaded : ", images.count())

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None


                                                                                

+-----------------------------------------------------------------------------------------+-----------+
|path                                                                                     |label      |
+-----------------------------------------------------------------------------------------+-----------+
|file:/Users/victor/Documents/OPENCLASSROOMS/projet_8/data/training/apple_hit_1/r0_116.jpg|apple_hit_1|
|file:/Users/victor/Documents/OPENCLASSROOMS/projet_8/data/training/apple_hit_1/r0_114.jpg|apple_hit_1|
|file:/Users/victor/Documents/OPENCLASSROOMS/projet_8/data/training/apple_hit_1/r0_108.jpg|apple_hit_1|
|file:/Users/victor/Documents/OPENCLASSROOMS/projet_8/data/training/apple_hit_1/r0_118.jpg|apple_hit_1|
|file:/Users/victor/Documents/OPENCLASSROOMS/projet_8/data/training/apple_hit_1/r0_120.jpg|apple_hit_1|
+-----------------------------------------------------------------------------------------+-----------+
only showing top 5 rows

None




Number of images loaded :  6231


                                                                                

### Run the model inference

In [None]:
# Select 10% of dataset for local test
images_sample = images.sample(fraction=0.1, seed=42)
display(images_sample.show(5))
display(images_sample.printSchema())
print("Number of images : ", images_sample.count())

In [None]:
# Create the image features
features_df = images_sample.repartition(20).select(F.col("path"), F.col("label"),
                                                   featurize_udf("content").alias('features'))

# Create the vectors
features_df = features_df.withColumn('features_vec', array_to_vector("features"))

display(features_df.show(5))
display(features_df.printSchema())

2023-03-08 15:40:56.922692: W tensorflow/core/platform/profile_utils/cpu_utils.cc:128] Failed to get CPU frequency: 0 Hz


+--------------------+-----------+--------------------+--------------------+
|                path|      label|            features|        features_vec|
+--------------------+-----------+--------------------+--------------------+
|file:/Users/victo...|apple_hit_1|[0.3222898, 0.459...|[0.32228979468345...|
|file:/Users/victo...|apple_hit_1|[1.1753603, 0.059...|[1.17536032199859...|
|file:/Users/victo...| cucumber_3|[1.5150166, 0.201...|[1.51501655578613...|
|file:/Users/victo...|apple_red_3|[0.5331319, 0.003...|[0.53313189744949...|
|file:/Users/victo...|     pear_3|[0.6971466, 0.223...|[0.69714659452438...|
+--------------------+-----------+--------------------+--------------------+
only showing top 5 rows



                                                                                

None

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- features_vec: vector (nullable = true)



None

23/03/08 23:02:22 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 918965 ms exceeds timeout 120000 ms
23/03/08 23:02:22 WARN SparkContext: Killing executors is not supported by current scheduler.
23/03/08 23:02:30 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:117)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:116)
	at org.apache.spark.storage.B

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 58816)
Traceback (most recent call last):
  File "/Users/victor/miniconda3/envs/openclassrooms/lib/python3.10/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/Users/victor/miniconda3/envs/openclassrooms/lib/python3.10/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/Users/victor/miniconda3/envs/openclassrooms/lib/python3.10/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/Users/victor/miniconda3/envs/openclassrooms/lib/python3.10/socketserver.py", line 747, in __init__
    self.handle()
  File "/Users/victor/miniconda3/envs/openclassrooms/lib/python3.10/site-packages/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/Users/victor/miniconda3/envs/openclassrooms/lib/pyth

# Dimension reduction

In [None]:
# Number of components
PCA_K = 100

In [None]:
# Create pyspark PCA model
pca = pyPCA(k=PCA_K, inputCol='features_vec', outputCol='features_pca')

# Fit model
pca_model = pca.fit(features_df)

# Transform data
pca_data = pca_model.transform(features_df)

display(features_df.show(5))
display(features_df.printSchema())

23/03/08 13:05:07 ERROR RetryingBlockTransferor: Exception while beginning fetch of 1 outstanding blocks (after 1 retries)
java.io.IOException: Connecting to nc-ass-vip.sdv.fr/212.95.74.75:56997 failed in the last 4750 ms, fail this connection directly
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:214)
	at org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(NettyBlockTransferService.scala:126)
	at org.apache.spark.network.shuffle.RetryingBlockTransferor.transferAllOutstanding(RetryingBlockTransferor.java:173)
	at org.apache.spark.network.shuffle.RetryingBlockTransferor.lambda$initiateRetry$0(RetryingBlockTransferor.java:206)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo

[Stage 27:>                                                         (0 + 4) / 4]

In [None]:
# Save results as parquet files
features_df.write.mode("overwrite").parquet(PATH_RESULTS)