In [1]:
import os
SUBMIT_ARGS = "--packages databricks:spark-deep-learning:1.1.0-spark2.3-s_2.11 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local[3]').appName("DL with Spark Deep Cognition").getOrCreate()
sc = spark.sparkContext

In [3]:
sc.defaultParallelism

3

In [4]:
from pyspark.ml.image import ImageSchema
from pyspark.sql.functions import lit
from sparkdl.image import imageIO
from functools import reduce
from pyspark.sql import DataFrame
# Read images using Spark
neutro_df = ImageSchema.readImages(r'dataset2-master/dataset2-master/images/TRAIN/NEUTROPHIL/').withColumn("label", lit(0))
mono_df = ImageSchema.readImages(r'dataset2-master/dataset2-master/images/TRAIN/MONOCYTE/').withColumn("label", lit(1))
eosin_df = ImageSchema.readImages(r'dataset2-master/dataset2-master/images/TRAIN/EOSINOPHIL/').withColumn("label", lit(2))
lympho_df = ImageSchema.readImages(r'dataset2-master/dataset2-master/images/TRAIN/LYMPHOCYTE/').withColumn("label", lit(3))
neutro_train, neutro_test, _ = neutro_df.randomSplit([0.1, 0.05, 0.85])  # use larger training sets (e.g. [0.6, 0.4] for getting more images)
mono_train, mono_test, _ = mono_df.randomSplit([0.1, 0.05, 0.85])     # use larger training sets (e.g. [0.6, 0.4] for getting more images)

eosin_train, eosin_test, _ = mono_df.randomSplit([0.1, 0.05, 0.85])     # use larger training sets (e.g. [0.6, 0.4] for getting more images)
lympho_train, lympho_test, _ = mono_df.randomSplit([0.1, 0.05, 0.85])     # use larger training sets (e.g. [0.6, 0.4] for getting more images)


df = [neutro_train,mono_train,eosin_train,lympho_train]
train_df = reduce(DataFrame.unionAll, df)

tdf = [neutro_test,mono_test,eosin_test,lympho_test]
test_df = reduce(DataFrame.unionAll, tdf)


# Under the hood, each of the partitions is fully loaded in memory, which may be expensive.
# This ensure that each of the paritions has a small size.
train_df = train_df.repartition(100)
test_df = test_df.repartition(100)


Using TensorFlow backend.


In [5]:
from py4j.java_gateway import *
port = launch_gateway()
gateway = JavaGateway(
gateway_parameters=GatewayParameters(port=port),
callback_server_parameters=CallbackServerParameters(port=0))
random = gateway.jvm.java.util.Random()

In [6]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from sparkdl import DeepImageFeaturizer

# model: InceptionV3
# extracting feature from images
featurizer = DeepImageFeaturizer(inputCol="image", outputCol="features",
                                 modelName="InceptionV3")

# used as a multi class classifier
lr = LogisticRegression(maxIter=5, regParam=0.03, 
                        elasticNetParam=0.5, labelCol="label") 

# define a pipeline model
sparkdn = Pipeline(stages=[featurizer, lr])
spark_model = sparkdn.fit(train_df)

In [7]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

tested_df = spark_model.transform(test_df)
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(tested_df.select("prediction", "label"))))

Test set accuracy = 0.7901701323251418


## Acknowledgement 
https://github.com/Shenggan/BCCD_Dataset
MIT License

In [8]:
#rename 's/^(.{1})//' * 