**<h1>Glasses or No Glasses distributed (Image Dataset)</h1>**

In this notebook we will try to face the [Glasses or No Glasses](https://www.kaggle.com/jeffheaton/glasses-or-no-glasses) challenge by Kaggle in a distributed way, mainly using [PySpark](https://spark.apache.org/docs/latest/api/python/index.html), a Python interface for Apache Spark. The challenge provides two datasets, a numerical one and an image one. We will deal with the image dataset, using it to train a CNN.
Refer to [this notebook](https://github.com/flaviofuria/GlassesOrNoGlasses/blob/main/glasses_no_glasses.ipynb), in which we faced the challenge (using both the dataset) with the classic data analysis/machine learning tools, to understand all the analysis, the reasoning and the choice of the network architecture. For the distributed version of the MLP applied on the numerical dataset, refer to [this notebook](https://github.com/flaviofuria/GlassesOrNoGlasses/blob/main/pyspark_numerical.ipynb) instead.

---
First we need some minutes to install [Elephas](https://github.com/maxpumperla/elephas), an extension of [Keras](https://keras.io/) that brings deep learning to Spark.

In [1]:
!pip -q install elephas
!pip -q install tensorflow==2.6.0
!pip -q install keras==2.6.0

[K     |████████████████████████████████| 4.1 MB 7.5 MB/s 
[K     |████████████████████████████████| 212.4 MB 53 kB/s 
[K     |████████████████████████████████| 198 kB 64.0 MB/s 
[?25h  Building wheel for elephas (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
[K     |████████████████████████████████| 458.3 MB 9.5 kB/s 
[K     |████████████████████████████████| 4.0 MB 21.7 MB/s 
[?25h  Building wheel for clang (setup.py) ... [?25l[?25hdone
  Building wheel for wrapt (setup.py) ... [?25l[?25hdone
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
elephas 3.0.0 requires h5py==3.3.0, but you have h5py 3.1.0 which is incompatible.[0m
[K     |████████████████████████████████| 1.3 MB 5.2 MB/s 
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This

**<h4>Running Spark</h4>**

The following steps are required to run Spark.

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

!wget -q https://archive.apache.org/dist/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
!tar xf spark-3.0.3-bin-hadoop2.7.tgz

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"

!pip install -q findspark
import findspark
findspark.init("/content/spark-3.0.3-bin-hadoop2.7")
findspark.find()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

**<h4>Downloading the data</h4>**


Now we download the data using the Kaggle API. To do so, we replace `KAGGLE_USERNAME` and `KAGGLE_KEY` variables with our Kaggle credentials. Then, we unzip the data.

In [3]:
os.environ['KAGGLE_USERNAME'] = "xxxxxxxxxxxx"                     
os.environ['KAGGLE_KEY'] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

!kaggle datasets download -d jeffheaton/glasses-or-no-glasses
!unzip -o -q /content/glasses-or-no-glasses.zip

Downloading glasses-or-no-glasses.zip to /content
100% 6.11G/6.11G [02:08<00:00, 25.8MB/s]
100% 6.11G/6.11G [02:09<00:00, 50.9MB/s]


We delete the stuff we don't need anymore, in order to save some space.

In [4]:
!rm glasses-or-no-glasses.zip
!rm spark-3.0.3-bin-hadoop2.7.tgz
!rm -r sample_data

**<h4>Reproducibility</h4>**

Next, we set all the `seed` variables, in order to allow reproducibility of our experiments.

In [5]:
seed = 0

os.environ["PYTHONHASHSEED"] = "0"

import random
random.seed(seed)

import numpy as np
np.random.seed(seed)

import tensorflow as tf
tf.random.set_seed(seed)

**<h4>Downloading our own labels</h4>**

We can start to get all the data we need, starting from labels. Being that the available labels in the original dataset were wrong and incomplete, we decided to write our own label structure. We download it from the project Github repository and store them as a `Spark DataFrame`, specifying the `schema`, i.e. the structure of the DataFrame we want to create.

In [6]:
correct_labels_url = "https://raw.githubusercontent.com/flaviofuria/GlassesOrNoGlasses/main/labels.csv"
spark.sparkContext.addFile(correct_labels_url)

from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, ArrayType
schema = StructType([StructField("index", IntegerType(), False), StructField("label", DoubleType(), True)])

from pyspark import SparkFiles
labels_df = spark.read.csv(SparkFiles.get("labels.csv"), header=True, schema=schema)

In [7]:
labels_df.limit(5).toPandas()

Unnamed: 0,index,label
0,1,0.0
1,2,1.0
2,3,1.0
3,4,0.0
4,5,0.0


**<h4>Creating the Dataset</h4>**

Next, we store the actual dataset in another Spark DataFrame. In this case we let Spark automatically infer the schema.

In [8]:
images_df = spark.read.format("binaryFile").load("/content/faces-spring-2020/faces-spring-2020/", inferSchema=True)

In [9]:
images_df.limit(5).toPandas()

Unnamed: 0,path,modificationTime,length,content
0,file:/content/faces-spring-2020/faces-spring-2...,2020-04-18 00:09:48,1738798,"[137, 80, 78, 71, 13, 10, 26, 10, 0, 0, 0, 13,..."
1,file:/content/faces-spring-2020/faces-spring-2...,2020-04-18 00:07:40,1635390,"[137, 80, 78, 71, 13, 10, 26, 10, 0, 0, 0, 13,..."
2,file:/content/faces-spring-2020/faces-spring-2...,2020-04-18 00:08:30,1633872,"[137, 80, 78, 71, 13, 10, 26, 10, 0, 0, 0, 13,..."
3,file:/content/faces-spring-2020/faces-spring-2...,2020-04-18 00:07:22,1633602,"[137, 80, 78, 71, 13, 10, 26, 10, 0, 0, 0, 13,..."
4,file:/content/faces-spring-2020/faces-spring-2...,2020-04-18 00:10:06,1632559,"[137, 80, 78, 71, 13, 10, 26, 10, 0, 0, 0, 13,..."


**<h4>Preprocessing</h4>**

At this point the dataset is just made of binary files, but we need images. To be more precise, we need pixel values to feed the CNN. Thus, we need to preprocess the data. The steps we need to take are:
1. We need to link images and labels. The labels indices are associated to the unique number in the path of each image. Thus, we read the path of every image and get this number, storing it in the `id` column of the dataframe. We use `UDFs` (user defined function), that allow us to perform operations not available among Spark SQL functions.
2. Now that we have ids, we use them to `join` images and labels. This operation is heavy but required to make the CNN learn in a supervised way and, of course, to subsequently evaluate the predictions.
3. We `filter` the dataset, removing images with an undefined label (images are created by GANs and some errors can make impossible to assign a unique label to them).
4. We use another UDF that given a `bytearray` returns a `DenseVector` of size `width*height` (our experiments suggested to use `128` for both and to use the grayscale version of the original RGB images). In this step we also normalize each pixel value, dividing it by `255`.


In [10]:
import re
def get_id_from_path(path:str) -> int:
  return int(re.findall(r"\d+", path)[2])


from pyspark.ml.linalg import Vectors, VectorUDT
import cv2
import io
def get_dense_from_data(img_data):
  width, height, num_channels = 128, 128, 1
  return Vectors.dense((cv2.resize(cv2.imdecode(np.array(img_data), cv2.IMREAD_GRAYSCALE), (width, height))/255).flatten())


from pyspark.sql.functions import udf
get_id_from_path_udf = udf(lambda x: get_id_from_path(x), IntegerType())
get_dense_from_data_udf = udf(lambda x: get_dense_from_data(x), VectorUDT())

We now apply all the transformations we have just shown, obtaining the preprocessed dataset.

In [11]:
indexed_images_df = images_df.withColumn("id", get_id_from_path_udf("path"))
labeled_images_df = indexed_images_df.join(labels_df, indexed_images_df.id==labels_df.index, "inner")
no_undefined_images_df = labeled_images_df.filter((labeled_images_df.label==0.0)|(labeled_images_df.label==1.0)).drop("index")
featurized_images_df = no_undefined_images_df.withColumn("features", get_dense_from_data_udf("content"))

In [12]:
featurized_images_df.show(5)

+--------------------+-------------------+-------+--------------------+----+-----+--------------------+
|                path|   modificationTime| length|             content|  id|label|            features|
+--------------------+-------------------+-------+--------------------+----+-----+--------------------+
|file:/content/fac...|2020-04-18 00:09:48|1738798|[89 50 4E 47 0D 0...|3643|  1.0|[0.20784313725490...|
|file:/content/fac...|2020-04-18 00:07:40|1635390|[89 50 4E 47 0D 0...|2815|  1.0|[0.21568627450980...|
|file:/content/fac...|2020-04-18 00:08:30|1633872|[89 50 4E 47 0D 0...| 315|  1.0|[0.25490196078431...|
|file:/content/fac...|2020-04-18 00:07:22|1633602|[89 50 4E 47 0D 0...|2690|  0.0|[0.49803921568627...|
|file:/content/fac...|2020-04-18 00:10:06|1632559|[89 50 4E 47 0D 0...|3760|  1.0|[0.17254901960784...|
+--------------------+-------------------+-------+--------------------+----+-----+--------------------+
only showing top 5 rows



**<h4>Train/Test split</h4>**

We move to the usual `train/test split`: rows are not sorted by id but we still shuffle the dataset and then randomly split it with a ratio of `0.8/0.2`, obtaining a training and a test dataframes.

---
The task we are going to perform is pretty heavy, thus we suggest to limit the total number of samples if you are running this on Colaboratory.

In [13]:
split = .2

from pyspark.sql.functions import rand
#featurized_images_df = featurized_images_df.limit(1000)         #    tune the limit value and uncomment this if you want to train on a smaller dataset
featurized_images_df = featurized_images_df.orderBy(rand(seed))
cached_featurized_images_df = featurized_images_df.cache()

train_df, test_df = cached_featurized_images_df.randomSplit([1-split, split], seed=seed)

cached_train_df = train_df.cache()
cached_test_df = test_df.cache()

**<h4>Model</h4>**

These two functions are used to build a `Keras model` and a `Elephas Estimator`, respectively. An estimator is an algorithm that produces a `Transformer` after training on a DataFrame, while the Transformer can transform a DataFrame of features in a DataFrame of predictions. Elephas builds an Estimator from a Keras Model, giving the possibility to perform distributed deep learning. In particular, distribution is obtained by serializing the model, sending it to the workers, let them train their chunk and send gradients to the driver. A `master model` on the driver uses these gradients together with an optimizer in order to update its weights.

In [14]:
from tensorflow.keras.models import Sequential
from elephas.ml_model import ElephasEstimator

def build_model(layers_list, optimizer="adam", loss="binary_crossentropy", metrics=["acc"]):
  model = Sequential()
  for layer in layers_list:
    model.add(layer)
  model.compile(optimizer, loss, metrics)
  return model


def build_estimator(model, epochs, batch_size, categorical, num_workers=1, loss="binary_crossentropy",
                    nb_classes=None, optimizer=tf.keras.optimizers.serialize(tf.keras.optimizers.Adam()), split=.2,
                    featuresCol="features", labelCol="label", metrics=["acc"], frequency="batch", mode="asynchronous"):
  return ElephasEstimator(keras_model_config=model.to_json(), num_workers=num_workers, loss=loss, categorical=categorical,
                          nb_classes=nb_classes, optimizer_config=optimizer, validation_split=split, epochs=epochs, batch_size=batch_size,
                          featuresCol=featuresCol, labelCol=labelCol, metrics=metrics, frequency=frequency, mode=mode)   

This is the CNN we are going to train with our dataset. In our experiments (refer to [this notebook (manca link)](https://)) it has proven to give great performance and still being not so long to train. The first layer (`Reshape`) is required because we can't store a multi-dimensional array in a Spark Dataframe. Thus, we need to take the DenseVector and reshape it in order to feed it into the CNN.

In [15]:
width, height, num_channels = 128, 128, 1

from tensorflow.keras.layers import Reshape, Conv2D, MaxPooling2D, Flatten, Dense
layers_list = [Reshape(target_shape=(width, height, num_channels), input_shape=(width*height, 1)),
               Conv2D(kernel_size=3, filters=8, activation='relu', input_shape=(width, height, num_channels)),
               MaxPooling2D(strides=2),
               Conv2D(kernel_size=5, filters=16, activation='relu'),
               MaxPooling2D(strides=2),
               Flatten(),
               Dense(32, activation='relu'),
               Dense(16, activation='relu'),
               Dense(1, activation="sigmoid")]

model = build_model(layers_list)
model.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
reshape (Reshape)            (None, 128, 128, 1)       0         
_________________________________________________________________
conv2d (Conv2D)              (None, 126, 126, 8)       80        
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 63, 63, 8)         0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 59, 59, 16)        3216      
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 29, 29, 16)        0         
_________________________________________________________________
flatten (Flatten)            (None, 13456)             0         
_________________________________________________________________
dense (Dense)                (None, 32)                4

**<h4>Elephas Estimator</h4>**

We set some of the `hyperparameters` and then build the estimator. It takes the Keras model, together with these hyperparameters, the name of the features and labels columns of the training dataframe and three other parameters that actually control the distribution: 
* `frequency`, i.e. how often updates are sent to the master node. It can be done after every `batch` or `epoch`.
* `mode`, which can be `synchronous`, if workers send their updates all at the same time, `asynchronous` if they send updates whenever they are ready (locks are used to not lose any update) and `hogwild`, which is similar to the second one but without the locks (losing some updates is ok if this speeds the computation up). Asynchronous is our choice.
* the number of actual workers that will participate to the distributed training step.

In [16]:
batch_size = 128
epochs = 10

estimator = build_estimator(model=model, epochs=epochs, batch_size=batch_size, num_workers=4, mode="asynchronous", categorical=False)

**</h4>Fit/Transform Step</h4>**

We `fit` on the training set and then `transform` on both the training and test set, in order to obtain predictions. We capture the output because there is a lot of logging info that could cause Colab to crash.

In [17]:
%%capture
fitted_model = estimator.fit(cached_train_df)

In [18]:
train_predictions = fitted_model.transform(cached_train_df).select("prediction", "label")
test_predictions = fitted_model.transform(cached_test_df).select("prediction", "label")

**<h4>Results</h4>**

We used a `sigmoid` activation function for the last layer, thus the prediction, which is a value in the unit interval, can be interpreted as the probability to be a positive sample (label 1.0). We give the label `1.0` to samples for which this value is greater than or equal to `0.5`, otherwise we give it `0.0`. The threshold is tunable but if the results are satisfying, we can leave it as is.

In [19]:
train_predictions = train_predictions.cache()
test_predictions = test_predictions.cache()

from pyspark.sql.functions import when, sum, col
train_predictions = train_predictions.withColumn("predicted_label", when(train_predictions.prediction[0] >= 0.5, 1.0).otherwise(0.0))
test_predictions = test_predictions.withColumn("predicted_label", when(test_predictions.prediction[0] >= 0.5, 1.0).otherwise(0.0))

from pyspark.mllib.evaluation import MulticlassMetrics
train_metrics = MulticlassMetrics(train_predictions.select("predicted_label", "label").rdd.map(tuple))
test_metrics = MulticlassMetrics(test_predictions.select("predicted_label", "label").rdd.map(tuple))

train_accuracy = train_metrics.accuracy
test_accuracy = test_metrics.accuracy

In [21]:
print(f"Training accuracy: {round(train_accuracy, 4)}")
print(f"Test accuracy:     {round(test_accuracy, 4)}")

Training accuracy: 0.9982
Test accuracy:     0.9969


Pyspark and Elephas do not allow us to get the loss, thus we will compute it on our own. The model has been trained using `binary crossentropy`/`log loss`:
$Loss(y,p)=y_{i}*log(p_{i})+(1-y_{i})*log(1-p_{i})).\;\;$ We compute this value for all samples and then we average it for all the set

---
We do the same also for `stable binary crossentropy`, which is the loss used by Tensorflow to avoid computing the logarithm when the predicted probability is too close to 0, which can lead to numerical instability:<br>
$Loss(y,z)=max(z,0)-zy+log(1+e^{-|z|})\;\;$ with `z` being the output of the network, thus we obtain it by computing the inverse of the sigmoid function (i.e. the `logit`).

In [24]:
from pyspark.sql.functions import log, mean, abs
from functools import reduce
from math import e

def logit(p):
  return log(p/(1-p))

binary_crossentropy = reduce(lambda p,y: -(y*log(p[0]) + (1-y)*log(1-p[0])), map(col, ["prediction", "label"]))
stable_binary_crossentropy = reduce(lambda p,y: when(logit(p[0])>0, logit(p[0])).otherwise(0) - (logit(p[0])*y) + log(1+e**(-abs(logit(p[0])))), map(col, ["prediction", "label"]))

train_predictions = train_predictions.withColumn("binary_crossentropy", binary_crossentropy)
train_predictions = train_predictions.withColumn("stable_binary_crossentropy", stable_binary_crossentropy)
test_predictions = test_predictions.withColumn("binary_crossentropy", binary_crossentropy)
test_predictions = test_predictions.withColumn("stable_binary_crossentropy", stable_binary_crossentropy)

train_binary_crossentropy = train_predictions.select("binary_crossentropy").agg(mean("binary_crossentropy"))
train_stable_binary_crossentropy = train_predictions.select("stable_binary_crossentropy").agg(mean("stable_binary_crossentropy"))
test_binary_crossentropy = test_predictions.select("binary_crossentropy").agg(mean("binary_crossentropy"))
test_stable_binary_crossentropy = test_predictions.select("stable_binary_crossentropy").agg(mean("stable_binary_crossentropy"))


train_bin_cross = train_binary_crossentropy.take(1)[0][0]
train_stable_bin_cross = train_stable_binary_crossentropy.take(1)[0][0]
test_bin_cross = test_binary_crossentropy.take(1)[0][0]
test_stable_bin_cross = test_stable_binary_crossentropy.take(1)[0][0]

In [25]:
print(f"Training set log loss: {round(train_bin_cross, 4)}")
print(f"Training set log loss: {round(train_stable_bin_cross, 4)}")
print('')
print(f"Test set log loss:     {round(test_bin_cross, 4)}")
print(f"Test set log loss:     {round(test_stable_bin_cross, 4)}")

Training set log loss: 0.0099
Training set log loss: 0.0099

Test set log loss:     0.0118
Test set log loss:     0.0118


We don't have instability issues, thus the results are just the same. However, they're different from what we got in the non-distributed implementation. One of the reasons is of course that the the training/test set are not exactly the same, since in both cases we have shuffled the original dataset and then randomly splitted it. Also, the fact that every worker locally computes its gradients and then sends them to the master node, which in turn makes an overall computation with the received ones, could have led to some discrepancy.