In [None]:
import os
import io

import pandas as pd
import numpy as np
import tensorflow as tf

from PIL import Image
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from sklearn.decomposition import PCA as skl_PCA
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.sql.types import ArrayType, FloatType
from pyspark.sql import SparkSession
from pyspark.ml.feature import PCA as spa_PCA
from pyspark.ml.linalg import Vectors, VectorUDT, SparseVector, DenseVector
from pyspark.sql.functions import udf


In [None]:
PATH = os.getcwd()
PATH_Data = PATH+'/data/Test1'
PATH_Result = PATH+'/data/Results'
print('PATH:'+PATH+'\nPATH_Data:'+PATH_Data+'\nPATH_Result: '+PATH_Result)


# Creating Session

In [None]:
spark = (
    SparkSession.builder
    .appName("ds_p8")
    .master('local')
    .config("spark.sql.parquet.writeLegacyFormat", "true")
    .getOrCreate()
)


### Spark context creation

In [None]:
sc = spark.sparkContext

spark


# Loading data :

In [None]:
images = (
    spark.read.format("binaryFile")
    .option("pathGlobFilter", "*.jpg")
    .option("recursiveFileLookup", "true")
    .load(PATH_Data)
)


In [None]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))


# Model creation

In [None]:
model = MobileNetV2(
    weights='imagenet',
    include_top=True,
    input_shape=(224, 224, 3)
    )


In [None]:
new_model = Model(
    inputs=model.input,
    outputs=model.layers[-2].output
    )


In [None]:
new_model.summary()


In [None]:
# Broadcasting the weights to the workers : (useless here)

brodcast_weights = sc.broadcast(new_model.get_weights())


In [None]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(
        weights='imagenet',
        include_top=True,
        input_shape=(224, 224, 3)
        )

    for layer in model.layers:
        layer.trainable = False

    new_model = Model(
        inputs=model.input,
        outputs=model.layers[-2].output
        )

    new_model.set_weights(brodcast_weights.value)
    return new_model



In [None]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)


def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.

    Returns
    - pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)


@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    Args:
    - content_series_iter: This argument is an iterator over batches of data, where each batch
    is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)



# Feature extraction

In [None]:
features_df = images.repartition(20).select(
    col("path"),
    col("label"),
    featurize_udf("content").alias("features")
    )


In [None]:
features_df.show(n=10)


# PCA Over the features :

## Spark's PCA vs Sklearn's PCA : 
- While Sklearn's option is better on small/non distributed workloads because it can be more precisely tailored, in a context of distributed computing, the sklearn's PCA would need workarounds like Ray or Dask
- Spark's MLlib features a PCA option which, while not as customizable as its Sklearn's counterpart, is compatible by design with the formats and nature of distributed computing

--> [Source](https://towardsdatascience.com/apache-spark-mllib-vs-scikit-learn-building-machine-learning-pipelines-be49ecc69a82) (Medium) , partial explanation not technically on PCA


### However :

- k, the number of principal components, needs to be specified as input to spark's PCA - one way to do this is to guesstimate it (25% the number of features for example) - another way is to use sklearn's PCA to choose the amount of explained variance expected (let's say 80%+) - performing the PCA locally on a small sample and have a pretty good idea of k

In [None]:
feature_df_pd = features_df.toPandas()


### Finding the optimal k (number of components) with Sklearn :

- lets aim for 95% explained variance, we can adjust the variable value if results are not satisfactory


In [None]:
feature_df_pd.head()


In [None]:
explained_variance = 0.95  # if a float < 1 is passed to n_components of sklearn's pca, it will default to explained variance

sk_pca = skl_PCA(n_components=explained_variance)

reduced_features_values = sk_pca.fit_transform(np.stack(feature_df_pd["features"].values))


In [None]:
print("original array size :", np.stack(feature_df_pd["features"].values).shape[1])
print("reduced array size :", reduced_features_values.shape[1])


#### Looks like 138 principal components is a good candidate value for k, let's apply it to spark's expected k components

<hr>

# Converting arrays to vector in the spark dataframe :
(Sparks PCA expects vectors)

In [None]:
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())

df_spark_vector = features_df.select(
    features_df["path"],
    features_df["label"], 
    list_to_vector_udf(features_df["features"]).alias("features")
)


In [None]:
df_spark_vector.printSchema()


In [None]:
k_components = reduced_features_values.shape[1]  # Number of PCs for 95% explained variance in sklearn's pca

spark_pca = spa_PCA(k=k_components, inputCol="features")
spark_pca.setOutputCol("reduced_features")


In [None]:
spark_pca_model = spark_pca.fit(df_spark_vector)
print(spark_pca_model.getK())


In [None]:
df_spark_vector = spark_pca_model.transform(dataset=df_spark_vector)


# Converting vectors back to arrays to be readable in python as such and not dict, saving as parquet

In [None]:
vector_to_array_udf = udf(
    lambda vector: vector.toArray().tolist() 
                  if isinstance(vector, (DenseVector, SparseVector)) 
                  else vector, 
    ArrayType(FloatType())
)

df_spark_vector = df_spark_vector.withColumn("features", vector_to_array_udf(df_spark_vector["features"]))
df_spark_vector = df_spark_vector.withColumn("reduced_features", vector_to_array_udf(df_spark_vector["reduced_features"]))


In [None]:
df_spark_vector.write.mode("overwrite").parquet(PATH_Result)


# Checking correct save format :

In [None]:
df = pd.read_parquet(PATH_Result, engine="pyarrow")


In [None]:
df.head()


In [None]:
df.loc[0, "features"].shape


In [None]:
df.loc[0, "reduced_features"].shape


Shows the initial shape of the features and the reduced features via PCA