#  **Pneumonia identification from X-Ray images**

Group:

Evaluation criteria:
* 5 points for the delivery of a meaningful Spark-based solution
* 2 points for the quality of the results obtained (using BigDL means a minimum of 1 point in this section).
* 2 points for style / code cleanliness / documentation
* 2 points for cross-evaluation of all members of the group among yourselves

## Introduction

## Code

In [None]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

### **Environment Preparation**

**Install bigdl-dllib**

You can install the latest pre-release version using `pip install --pre --upgrade bigdl-dllib`. 

In [None]:
# Install latest pre-release version of bigdl-dllib with spark3
# Find the latest bigdl-dllib with spark3 from https://sourceforge.net/projects/analytics-zoo/files/dllib-py-spark3/ and intall it
#!pip install https://sourceforge.net/projects/analytics-zoo/files/dllib-py-spark3/bigdl_dllib_spark3-0.14.0b20211107-py3-none-manylinux1_x86_64.whl

#exit() # restart the runtime to refresh installed pkg

In [1]:
!pip install bigdl-spark3
exit()

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


#### **Step 0: Intialization of pyspark and bigdl** 

First we import necessary libraries.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import lit
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from bigdl.dllib.nncontext import *
from bigdl.dllib.keras.layers import *
from bigdl.dllib.keras.models import *
import bigdl.dllib.keras.Sequential
from bigdl.dllib.nnframes import *
from bigdl.dllib.nn.criterion import *
from bigdl.dllib.feature.image import *

Init NNContext and create Spark session

In [2]:
sc = init_nncontext(cluster_mode="local") # run in local mode
spark = SparkSession(sc)

Current pyspark location is : /usr/local/lib/python3.8/dist-packages/pyspark/__init__.py
Start to getOrCreate SparkContext
pyspark_submit_args is:  --driver-class-path /usr/local/lib/python3.8/dist-packages/bigdl/share/dllib/lib/bigdl-dllib-spark_3.1.3-2.2.0-jar-with-dependencies.jar:/usr/local/lib/python3.8/dist-packages/bigdl/share/core/lib/all-2.2.0.jar:/usr/local/lib/python3.8/dist-packages/bigdl/share/orca/lib/bigdl-orca-spark_3.1.3-2.2.0-jar-with-dependencies.jar:/usr/local/lib/python3.8/dist-packages/bigdl/share/friesian/lib/bigdl-friesian-spark_3.1.3-2.2.0-jar-with-dependencies.jar pyspark-shell 
Successfully got a SparkContext


#### **Step 1: Load the data** 

Documentation

https://bigdl.readthedocs.io/en/latest/doc/DLlib/QuickStart/python-getting-started.html?highlight=NNImageReader#distributed-model-training

https://bigdl.readthedocs.io/en/latest/doc/DLlib/Overview/nnframes.html?highlight=NNImageReader#nnimagereader




In [6]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Import images with NNImageReader of BigDL from our drive.

In [19]:
normal_train = NNImageReader.readImages("/content/drive/MyDrive/chest_xray/train/NORMAL/", sc)
normal_test = NNImageReader.readImages("/content/drive/MyDrive/chest_xray/test/NORMAL/", sc)
normal_val = NNImageReader.readImages("/content/drive/MyDrive/chest_xray/val/NORMAL/", sc)

In [20]:
pneumonia_train = NNImageReader.readImages("/content/drive/MyDrive/chest_xray/train/PNEUMONIA/", sc)
pneumonia_test = NNImageReader.readImages("/content/drive/MyDrive/chest_xray/test/PNEUMONIA/", sc)
pneumonia_val = NNImageReader.readImages("/content/drive/MyDrive/chest_xray/val/PNEUMONIA/", sc)

In [21]:
normal_train

DataFrame[image: struct<origin:string,height:int,width:int,nChannels:int,mode:int,data:binary>]

In [22]:
#The data schema
normal_train.printSchema()

root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = false)
 |    |-- width: integer (nullable = false)
 |    |-- nChannels: integer (nullable = false)
 |    |-- mode: integer (nullable = false)
 |    |-- data: binary (nullable = false)



#### **Step 2: Distributed Data Loading**


We use the function *.lit* to label the images, 0=normal and 1=pneumonia.


In [23]:
from pyspark.sql.functions import col,lit

label_normal_train = normal_train.select(col("image"),lit("0").alias("label"))
label_normal_test = normal_test.select(col("image"),lit("0").alias("label"))
label_normal_val = normal_val.select(col("image"),lit("0").alias("label"))

label_pneumonia_train = pneumonia_train.select(col("image"),lit("1").alias("label"))
label_pneumonia_test = pneumonia_test.select(col("image"),lit("1").alias("label"))
label_pneumonia_val = pneumonia_val.select(col("image"),lit("1").alias("label"))

In [24]:
label_normal_train.printSchema()

root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = false)
 |    |-- width: integer (nullable = false)
 |    |-- nChannels: integer (nullable = false)
 |    |-- mode: integer (nullable = false)
 |    |-- data: binary (nullable = false)
 |-- label: string (nullable = false)



Then with .union we leave in the same dataframe all the images both normal and with pneumonia, this for test, validation and training.


In [25]:
train_ = label_normal_train.union(label_pneumonia_train)
test_ = label_normal_test.union(label_pneumonia_test)
val_ = label_normal_val.union(label_pneumonia_val)

In [26]:
#Print the schema of the "train_" dataframe.
train_.printSchema()

root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = false)
 |    |-- width: integer (nullable = false)
 |    |-- nChannels: integer (nullable = false)
 |    |-- mode: integer (nullable = false)
 |    |-- data: binary (nullable = false)
 |-- label: string (nullable = false)



#### **Step 3: Model Definition**


We use input to determine the first layer of the model and for the remaining layers, the input dimension will be automatically inferred.
And we use the "relu" layer which replaces all negative values received at the input with zeros. The interest of these activation layers is to make the model nonlinear and therefore more complex.

In [27]:
x1 = Input(shape=[8])
dense1 = Dense(12, activation="relu")(x1)
dense2 = Dense(8, activation="relu")(dense1)
dense3 = Dense(2)(dense2)
model = Model(input=x1, output=dense3)

creating: createZooKerasInput
creating: createZooKerasDense
creating: createZooKerasDense
creating: createZooKerasDense
creating: createZooKerasModel


Now we use the compile function of the model to set the loss function and optimization method.

In [28]:
model.compile(optimizer = "adam", loss = "sparse_categorical_crossentropy")

creating: createAdam
creating: createZooKerasSparseCategoricalCrossEntropy


#### **Step 4: Distributed Model Training**



Use BigDL Library to perform deep learning on image data.
We consider resizing the image to 50x50 using the "ImageResize" class and mirrored the image using the "ImageMirrror" class.
We have got an error due to the values in ImageResize, batch_size and nb_epoch. We have tried several options but we did not succeed.

In [36]:
from bigdl.dllib.feature.image import transforms
transformers = transforms.Compose([ImageResize(50, 50), ImageMirror()])
#model.fit(train_, label_cols=["label"], batch_size=1, nb_epoch=1, transform=transformers)

creating: createImageResize
creating: createImageMirror
creating: createCompose


#### **Step 5: Model saving and loading**


In [None]:
# save
modelPath = "/tmp/demo/keras.model"
dmodel.saveModel(modelPath)

In [None]:
# load
loadModel = Model.loadModel(modelPath)
preDF = loadModel.predict(df, feature_cols=["features"], prediction_col="predict")

#### **Step 6: Distributed evaluation and inference**


In [None]:
# inference
model.predict(df, prediction_col="predict", transform=transformers)

In [None]:
# evaluation
model.evaluate(image_df, batch_size=1, label_cols=["label"], transform=transformers)

#### **Step 7: Checkpointing and resuming training**


In [None]:
cpPath = "/tmp/demo/cp"
dmodel.set_checkpoint(cpPath)

loadModel = Model.loadModel(path)

#### **Step 8: Monitor your training**


In [None]:
dmodel.set_tensorboard("./", "dllib_demo")

#### **Step 9: Transfer learning and finetuning**


In [None]:
dmodel.freeze(layer_names)

In [None]:
dmodel.unFreeze(layer_names)

#### **Step 10: Hyperparameter tuning**

* optimizer

DLLib supports a list of optimization methods. For more details, please refer optimization

* learning rate scheduler

DLLib supports a list of learning rate scheduler. For more details, please refer lr_scheduler

* batch size

DLLib supports set batch size during training and prediction. We can adjust the batch size to tune the model’s accuracy.

* regularizer

DLLib supports a list of regularizers. For more details, please refer regularizer

* clipping

DLLib supports gradient clipping operations. For more details, please refer gradient_clip