# BigDL installation, Spark Session Creation and Loading Required Libraries

In [1]:
!pip install --pre --upgrade bigdl-dllib-spark3

exit() # restart the runtime to refresh installed pkg

Collecting bigdl-dllib-spark3
  Downloading bigdl_dllib_spark3-2.5.0b20240324-py3-none-manylinux1_x86_64.whl.metadata (1.1 kB)
Collecting pyspark==3.4.1 (from bigdl-dllib-spark3)
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting conda-pack==0.3.1 (from bigdl-dllib-spark3)
  Downloading conda_pack-0.3.1-py2.py3-none-any.whl.metadata (1.9 kB)
Collecting bigdl-core==2.4.0.dev0 (from bigdl-dllib-spark3)
  Downloading bigdl_core-2.4.0.dev0-py3-none-manylinux2010_x86_64.whl.metadata (291 bytes)
Downloading bigdl_dllib_spark3-2.5.0b20240324-py3-none-manylinux1_x86_64.whl (64.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m64.2/64.2 MB[0m [31m8.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading bigdl_core-2.4.0.dev0-py3-none-manylinux2010_x86_64.whl (51.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━

In [1]:
import os

from bigdl.dllib.nn.criterion import *
from bigdl.dllib.nn.layer import *
from bigdl.dllib.optim.optimizer import Adam
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType, StringType

from bigdl.dllib.nncontext import *
from bigdl.dllib.feature.image import *
from bigdl.dllib.nnframes import *

from optparse import OptionParser

In [None]:
spark_conf = SparkConf().set("spark.driver.memory", "20g") \
            .set("spark.driver.cores", 6)
sc = init_nncontext(spark_conf, cluster_mode="local")

# Data Upload and Pre-processing

In [2]:
!pip install -q kaggle
from google.colab import drive
drive.mount('/content/drive')
from google.colab import files
files.upload()
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json
!cat ~/.kaggle/kaggle.json
import kagglehub

Mounted at /content/drive


Saving kaggle.json to kaggle.json
{"username":"cristina12341234","key":"7e103a7873d9d802decea3ad5418d3db"}

In [6]:
from pyspark.sql.functions import lit
from pyspark.sql.types import StructType, StructField, StringType

imagepath = kagglehub.dataset_download("paultimothymooney/chest-xray-pneumonia")
import os
data_path = os.path.join(imagepath, 'chest_xray')
print(os.listdir(data_path))


train_path = os.path.join(data_path, 'train')
val_path = os.path.join(data_path, 'val')
test_path = os.path.join(data_path, 'test')

from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
import os

pneumonia_path = os.path.join(train_path, 'PNEUMONIA')
normal_path = os.path.join(train_path, 'NORMAL')

imageDF_train_p = NNImageReader.readImages(pneumonia_path, sc, resizeH=300, resizeW=300, image_codec=1) \
    .withColumn("label", lit(2.0).cast(DoubleType()))  # Assign "pneumonia" as the label
# Read normal images and add a "label" column
imageDF_train_n = NNImageReader.readImages(normal_path, sc, resizeH=300, resizeW=300, image_codec=1) \
    .withColumn("label", lit(1.0).cast(DoubleType()))  # Assign "normal" as the label
# Combine both DataFrames
imageDF_train = imageDF_train_p.union(imageDF_train_n)
# for validation set
imageDF_val_p = NNImageReader.readImages(os.path.join(val_path, 'PNEUMONIA'), sc, resizeH=300, resizeW=300, image_codec=1) \
    .withColumn("label", lit(2.0).cast(DoubleType()))  # ✅ Match training labels
imageDF_val_n = NNImageReader.readImages(os.path.join(val_path, 'NORMAL'), sc, resizeH=300, resizeW=300, image_codec=1) \
    .withColumn("label", lit(1.0).cast(DoubleType()))  # ✅ Match training labels
imageDF_val = imageDF_val_p.union(imageDF_val_n)
# for test set
imageDF_test_p = NNImageReader.readImages(os.path.join(test_path, 'PNEUMONIA'), sc, resizeH=300, resizeW=300, image_codec=1) \
    .withColumn("label", lit(2.0).cast(DoubleType()))
imageDF_test_n = NNImageReader.readImages(os.path.join(test_path, 'NORMAL'), sc, resizeH=300, resizeW=300, image_codec=1) \
    .withColumn("label", lit(1.0).cast(DoubleType()))
imageDF_test = imageDF_test_p.union(imageDF_test_n)

Resuming download from 55574528 bytes (2407790907 bytes left)...
Resuming download from https://www.kaggle.com/api/v1/datasets/download/paultimothymooney/chest-xray-pneumonia?dataset_version_number=2 (55574528/2463365435) bytes left.


100%|██████████| 2.29G/2.29G [00:24<00:00, 96.5MB/s]

Extracting files...





['train', 'val', 'test', 'chest_xray', '__MACOSX']




In [None]:
imageDF_train.printSchema()


root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = false)
 |    |-- width: integer (nullable = false)
 |    |-- nChannels: integer (nullable = false)
 |    |-- mode: integer (nullable = false)
 |    |-- data: binary (nullable = false)
 |-- label: double (nullable = false)



# Model and Pipeline Definition

In [4]:
batch_size = 56
nb_epoch = 20
learning_rate = 0.002


In [7]:
transformer = ChainedPreprocessing(
    [RowToImageFeature(), ImageResize(256, 256), ImageCenterCrop(224, 224),
    ImageChannelNormalize(123.0, 117.0, 104.0), ImageMatToTensor(), ImageFeatureToTensor()])


preTrainedNNModel = NNModel(Model.loadModel("/content/analytics-zoo_inception-v1_imagenet_0.1.0.model"), transformer) \
    .setFeaturesCol("image") \
    .setPredictionCol("embedding")

lrModel = Sequential().add(Linear(1000, 2)).add(LogSoftMax())

classifier = NNClassifier(lrModel, ClassNLLCriterion(), SeqToTensor([1000])) \
    .setLearningRate(learning_rate) \
    .setOptimMethod(Adam()) \
    .setBatchSize(batch_size) \
    .setMaxEpoch(nb_epoch) \
    .setFeaturesCol("embedding") \
    .setCachingSample(False) \

pipeline = Pipeline(stages=[preTrainedNNModel, classifier])

creating: createRowToImageFeature
creating: createImageResize
creating: createImageCenterCrop
creating: createImageChannelNormalize
creating: createImageMatToTensor
creating: createImageFeatureToTensor
creating: createChainedPreprocessing
creating: createTensorToSample
creating: createChainedPreprocessing
creating: createNNModel
creating: createSequential
creating: createLinear
creating: createLogSoftMax
creating: createClassNLLCriterion
creating: createSeqToTensor
creating: createScalarToTensor
creating: createFeatureLabelPreprocessing
creating: createNNClassifier
creating: createAdam


#Model training

In [8]:
model = pipeline.fit(imageDF_train)

creating: createToTuple
creating: createChainedPreprocessing


#Validation and Prediction Error

In [10]:
predictionDF = model.transform(imageDF_val).cache()
predictionDF.sample(False, 0.1).show()

+--------------------+-----+--------------------+----------+
|               image|label|           embedding|prediction|
+--------------------+-----+--------------------+----------+
|{file:/root/.cach...|  1.0|[7.494957E-5, 8.2...|       1.0|
+--------------------+-----+--------------------+----------+



In [11]:
evaluator = MulticlassClassificationEvaluator(
        labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictionDF)
    # expected error should be less than 10%
print("Validation Error = %g " % (1.0 - accuracy))



Validation Error = 0.3125 


In [12]:
testDF = model.transform(imageDF_test).cache()
testDF.sample(False, 0.1).show()

+--------------------+-----+--------------------+----------+
|               image|label|           embedding|prediction|
+--------------------+-----+--------------------+----------+
|{file:/root/.cach...|  2.0|[1.2876233E-4, 2....|       2.0|
|{file:/root/.cach...|  2.0|[6.001197E-5, 1.1...|       2.0|
|{file:/root/.cach...|  2.0|[3.436058E-5, 7.2...|       2.0|
|{file:/root/.cach...|  2.0|[3.246968E-5, 3.9...|       2.0|
|{file:/root/.cach...|  2.0|[4.0650742E-5, 7....|       2.0|
|{file:/root/.cach...|  2.0|[2.8867991E-5, 1....|       2.0|
|{file:/root/.cach...|  2.0|[7.053932E-5, 1.8...|       2.0|
|{file:/root/.cach...|  2.0|[4.9989183E-5, 5....|       2.0|
|{file:/root/.cach...|  2.0|[3.5116405E-5, 1....|       2.0|
|{file:/root/.cach...|  2.0|[7.788882E-5, 9.6...|       2.0|
|{file:/root/.cach...|  2.0|[2.4530487E-4, 5....|       2.0|
|{file:/root/.cach...|  2.0|[2.2130687E-4, 1....|       1.0|
|{file:/root/.cach...|  2.0|[5.505311E-5, 2.9...|       2.0|
|{file:/root/.cach...|  

In [13]:
evaluator = MulticlassClassificationEvaluator(
        labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(testDF)
    # expected error should be less than 10%
print("Validation Error = %g " % (1.0 - accuracy))

Validation Error = 0.322115 
