### Import Libs

In [None]:
from PIL import Image
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import PCA
import pandas as pd
from pyspark.sql.functions import col, udf, pandas_udf, PandasUDFType, element_at, split
from pyspark.ml.linalg import Vectors, VectorUDT
from tensorflow.keras import Model
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import s3fs
import numpy as np
import io
import os

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Loading Spark Session (Not Use here because Spark session is load with the kernel)

In [None]:
spark = SparkSession.builder \
    .appName("FruitClassification") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.shuffle.service.enabled", "true") \
    .getOrCreate()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Create PATH vars

In [None]:
PATH = 's3://p8-data-nm'
PATH_Data = PATH+'/Test'
PATH_Result = PATH+'/Results'
PATH_plots = PATH+'/Plots'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://p8-data-nm
PATH_Data:   s3://p8-data-nm/Test
PATH_Result: s3://p8-data-nm/Results

### Load images

In [None]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Create label name from folder name

In [None]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+---------------------------------------------+----------+
|path                                         |label     |
+---------------------------------------------+----------+
|s3://p8-data-nm/Test/Watermelon/r_106_100.jpg|Watermelon|
|s3://p8-data-nm/Test/Watermelon/r_109_100.jpg|Watermelon|
|s3://p8-data-nm/Test/Watermelon/r_108_100.jpg|Watermelon|
|s3://p8-data-nm/Test/Watermelon/r_107_100.jpg|Watermelon|
|s3://p8-data-nm/Test/Watermelon/r_95_100.jpg |Watermelon|
+---------------------------------------------+----------+
only showing top 5 rows

None

### Model loading

In [None]:
mobilenet = MobileNetV2(weights="imagenet", include_top=True, input_shape=(224, 224, 3))
model = Model(inputs=mobilenet.input, outputs=mobilenet.layers[-2].output)
model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5
Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
Conv1 (Conv2D)                  (None, 112, 112, 32) 864         input_1[0][0]                    
__________________________________________________________________________________________________
bn_Conv1 (BatchNormalization)   (None, 112, 112, 32) 128         Conv1[0][0]                      
__________________________________________________________________________________________________
Conv1_relu (ReLU)               (None,

### Brodcasting of weights

In [None]:
broadcast_weights = spark.sparkContext.broadcast(model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Define Functions

In [None]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    mobilenet = MobileNetV2(weights=None,
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in mobilenet.layers:
        layer.trainable = False
    model = Model(inputs=mobilenet.input,
                  outputs=mobilenet.layers[-2].output)
    model.set_weights(broadcast_weights.value)
    return model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



In [None]:
features_df = images.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### PCA

In [None]:
to_vector_udf = udf(lambda x: Vectors.dense(x), VectorUDT())
features_df = features_df.withColumn("features", to_vector_udf(col("features")))

pca = PCA(k=3, inputCol="features", outputCol="pca_features")
pca_model = pca.fit(features_df)
df_pca = pca_model.transform(features_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
df_pca.write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
df_pca.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- features: vector (nullable = true)
 |-- pca_features: vector (nullable = true)

In [None]:
df = pd.read_parquet(PATH_Result, engine='pyarrow')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
print(df['pca_features'])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0        {'type': 1, 'size': None, 'indices': None, 'va...
1        {'type': 1, 'size': None, 'indices': None, 'va...
2        {'type': 1, 'size': None, 'indices': None, 'va...
3        {'type': 1, 'size': None, 'indices': None, 'va...
4        {'type': 1, 'size': None, 'indices': None, 'va...
                               ...                        
22683    {'type': 1, 'size': None, 'indices': None, 'va...
22684    {'type': 1, 'size': None, 'indices': None, 'va...
22685    {'type': 1, 'size': None, 'indices': None, 'va...
22686    {'type': 1, 'size': None, 'indices': None, 'va...
22687    {'type': 1, 'size': None, 'indices': None, 'va...
Name: pca_features, Length: 22688, dtype: object

In [None]:
def save_pca_3d(df, label_col="label", feature_col="pca_features"):
    """
    Save a 3D visualization of PCA-transformed data.

    :param df: Pandas DataFrame containing PCA-transformed features.
    :param label_col: Column name representing class labels.
    :param feature_col: Column name containing the PCA feature vectors.
    """
    df["pca_values"] = df[feature_col].apply(lambda x: x["values"] if isinstance(x, dict) and "values" in x else None)

    df["PC1"] = df["pca_values"].apply(lambda x: x[0])
    df["PC2"] = df["pca_values"].apply(lambda x: x[1])
    df["PC3"] = df["pca_values"].apply(lambda x: x[2])

    fig = plt.figure(figsize=(10, 7))
    ax = fig.add_subplot(111, projection='3d')

    labels = df[label_col].unique()
    colors = plt.cm.get_cmap("tab10", len(labels))

    for i, label in enumerate(labels):
        subset = df[df[label_col] == label]
        ax.scatter(subset["PC1"], subset["PC2"], subset["PC3"], label=label, color=colors(i), s=5, alpha=0.5)
    
    ax.view_init(elev=20, azim=45)

    ax.set_title("PCA Visualization (3D)")
    ax.set_xlabel("Principal Component 1")
    ax.set_ylabel("Principal Component 2")
    ax.set_zlabel("Principal Component 3")
    save_path = '/tmp/plot_pca_fruits.png'
    plt.savefig(save_path)
    fs = s3fs.S3FileSystem()
    fs.put(save_path, PATH_plots+'/plot_pca_fruits.png')


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
save_pca_3d(df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…