In [None]:
#Program to classify steve jobs and zukerbergs images using spark-dl lib
#$ cat jupyter.sh
#function dlnotebook()
#{
#export PYSPARK_PYTHON=python3
#export PYSPARK_DRIVER_PYTHON=/hadoop/softwares/anaconda3/bin/jupyter
#export PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --ip=190.165.0.102"
#$SPARK_HOME/bin/pyspark --packages databricks:spark-deep-learning:1.5.0-spark2.4-s_2.11
#}
#dlnotebook
#####################################
#for input data set 
#curl -O https://github.com/zsellami/images_classification/blob/master/personalities.zip
#unzip personalities.zip
#hdfs dfs -put personalities /tmp
#######################################################################################

In [2]:
import sys
import pyspark
from pyspark.sql import SparkSession,HiveContext
from pyspark import SparkConf,SparkContext
from pyspark.sql.functions import *
import sparkdl as dl
from keras.applications import InceptionV3

Using TensorFlow backend.


In [3]:
spark = SparkSession.builder\
.master("yarn") \
.appName("ImageClassification") \
.config("spark.executor.memory", "16gb") \
.config("spark.driver.memory", "16G") \
.config("spark.driver.offHeap.enabled", "true") \
.config("spark.driver.offHeap.size", "32G") \
.config("spark.executor.maxResultSize", "16gb") \
.config("spark.sql.broadcastTimeout","360000") \
.getOrCreate()
   

In [8]:
img_dir = "/tmp/personalities/"

#Read images and Create training & test DataFrames for transfer learning
from pyspark.ml.image import ImageSchema
jobs_df =  ImageSchema.readImages(img_dir + "/jobs").withColumn("label", lit(0))
zuckerberg_df =  ImageSchema.readImages(img_dir + "/zuckerberg").withColumn("label", lit(1))
jobs_train, jobs_test = jobs_df.randomSplit([0.8, 0.2])
zuckerberg_train, zuckerberg_test = zuckerberg_df.randomSplit([0.8, 0.2])

#dataframe for training a classification model
trainDF = jobs_train.unionAll(zuckerberg_train)
#dataframe for testing the classification model
testDF = jobs_test.unionAll(zuckerberg_test)
trainDF.show()#select('label').distinct().show()

+--------------------+-----+
|               image|label|
+--------------------+-----+
|[hdfs://Namenode:...|    0|
|[hdfs://Namenode:...|    0|
|[hdfs://Namenode:...|    0|
|[hdfs://Namenode:...|    0|
|[hdfs://Namenode:...|    0|
|[hdfs://Namenode:...|    0|
|[hdfs://Namenode:...|    0|
|[hdfs://Namenode:...|    0|
|[hdfs://Namenode:...|    0|
|[hdfs://Namenode:...|    0|
|[hdfs://Namenode:...|    0|
|[hdfs://Namenode:...|    0|
|[hdfs://Namenode:...|    0|
|[hdfs://Namenode:...|    0|
|[hdfs://Namenode:...|    1|
|[hdfs://Namenode:...|    1|
|[hdfs://Namenode:...|    1|
|[hdfs://Namenode:...|    1|
|[hdfs://Namenode:...|    1|
|[hdfs://Namenode:...|    1|
+--------------------+-----+
only showing top 20 rows



In [16]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from sparkdl import DeepImageFeaturizer

featurizer = DeepImageFeaturizer(inputCol="image", outputCol="features", modelName="InceptionV3")
lr = LogisticRegression(maxIter=20, regParam=0.05, elasticNetParam=0.3, labelCol="label",featuresCol="features")  
p = Pipeline(stages=[featurizer, lr])
model = p.fit(trainDF)    # train_images_df is a dataset of images and labels

In [17]:
lrmodel=model
lrmodel.stages[1].write().overwrite().save('lr')

In [27]:
# Inspect training error
df = lrmodel.transform(testDF)
predictionAndLabels = df.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Training set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

Training set accuracy = 0.8571428571428571


In [28]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
vectorizer = dl.DeepImageFeaturizer(inputCol="image", outputCol="features", modelName='InceptionV3')

dt = DecisionTreeClassifier(labelCol = "label", featuresCol="features", maxDepth = 3)

dt_pipeline = Pipeline(stages=[vectorizer, dt])

dt_model = dt_pipeline.fit(trainDF)

In [29]:
# Inspect training error
df = dt_model.transform(testDF)
predictionAndLabels = df.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Training set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

Training set accuracy = 0.5714285714285714


In [30]:
from pyspark.ml.classification import RandomForestClassifier

from pyspark.ml import Pipeline
## Xception InceptionV3
vectorizer = dl.DeepImageFeaturizer(inputCol="image", outputCol="features", modelName='InceptionV3')
rf = RandomForestClassifier(labelCol = "label", featuresCol="features")
rf_pipeline = Pipeline(stages=[vectorizer, rf])
rfmodel = rf_pipeline.fit(trainDF)

In [31]:
# Inspect training error
df = rfmodel.transform(testDF)
predictionAndLabels = df.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Training set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

Training set accuracy = 0.7142857142857143


In [32]:
from pyspark.ml.classification import GBTClassifier

from pyspark.ml import Pipeline
## Xception InceptionV3
vectorizer = dl.DeepImageFeaturizer(inputCol="image", outputCol="features", modelName='InceptionV3')
gbt = GBTClassifier(maxIter=10)
gbt_pipeline = Pipeline(stages=[vectorizer, gbt])
gbt_model = gbt_pipeline.fit(trainDF)


In [33]:
# Inspect training error
df = gbt_model.transform(testDF)
predictionAndLabels = df.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Training set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

Training set accuracy = 0.5714285714285714


In [37]:
prediction= predictionAndLabels#.select("prediction")
from pyspark.ml.evaluation import BinaryClassificationEvaluator
binaryevaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
binary_rate = binaryevaluator.evaluate(prediction)*100
print("accuracy: {}%" .format(binary_rate))

accuracy: 70.0%
