## Federer Vs Djokovic Image Classification using Deep Learning with Spark and Tensorflow


In [1]:
spark = SparkSession.builder \
   .master("local[*]") \
   .appName("ImageClassification") \
   .config("spark.executor.memory", "16gb") \
   .config("spark.driver.memory", "16G") \
   .config("spark.driver.offHeap.enabled", "true") \
   .config("spark.driver.offHeap.size", "16G") \
   .config("spark.executor.maxResultSize", "16gb") \
   .getOrCreate()

In [2]:
## .config("spark.memory.offHeap.enabled",true)
###     .config("spark.memory.offHeap.size","16g")  
### --executor-memory 64G

In [14]:
sc = spark.sparkContext
sc

In [15]:
import tensorflow as tf
tf.__version__

'1.4.0'

In [16]:
import pyspark.sql.functions as f
import sparkdl as dl

### Load Images of Djokovic and Federer

In [17]:
dfDjokovic = dl.readImages('./tennis/large/djokovic150/').withColumn('label', f.lit(0))
dfFederer = dl.readImages('./tennis/large/federer150/').withColumn('label', f.lit(1))

In [18]:
dfDjokovic.show(n=10,truncate=False)

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



In [19]:
dfFederer.show(n=10,truncate=False)

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



In [20]:
trainDFdjokovic, testDFdjokovic = dfDjokovic.randomSplit([80.00, 20.00], seed =12)
trainDFfederer, testDFfederer = dfFederer.randomSplit([80.00, 20.00], seed=12)

In [21]:
print('The number of images in trainDFdjokovic is {}'.format(trainDFdjokovic.toPandas().shape[0]))
print('The number of images in testDFdjokovic is {}'.format(testDFdjokovic.toPandas().shape[0]))
print('The number of images in trainDFfederer is {}'.format(trainDFfederer.toPandas().shape[0]))
print('The number of images in testDFfederer is {}'.format(testDFfederer.toPandas().shape[0]))

The number of images in trainDFdjokovic is 110
The number of images in testDFdjokovic is 27
The number of images in trainDFfederer is 110
The number of images in testDFfederer is 27


In [22]:
trainDF = trainDFdjokovic.unionAll(trainDFfederer)
testDF = testDFdjokovic.unionAll(testDFfederer)

In [23]:
print('The number of images in the training data is {}' .format(trainDF.toPandas().shape[0]))
print('The number of images in the testing  data is {}' .format(testDF.toPandas().shape[0]))

The number of images in the training data is 220
The number of images in the testing  data is 54


### Training with Deep Image Featurizer + Logistic Regression

In [24]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
## Xception InceptionV3
vectorizer = dl.DeepImageFeaturizer(inputCol="image", outputCol="features", modelName='InceptionV3')
logreg = LogisticRegression(maxIter=10,regParam=0.01, elasticNetParam=0.1, labelCol = "label", featuresCol="features")
pipeline = Pipeline(stages=[vectorizer, logreg])

pipeline_model = pipeline.fit(trainDF)

INFO:tensorflow:Froze 376 variables.
Converted 376 variables to const ops.
INFO:tensorflow:Froze 0 variables.
Converted 0 variables to const ops.


### Save MODELS

In [25]:
lrModel = pipeline_model
print(lrModel)  # summary only

PipelineModel_4d2182c2608e78e1f2aa


In [26]:
lrModel.stages[1].write().overwrite().save('lr-large')

### Reload LR Model

In [27]:
from pyspark.ml.classification import LogisticRegressionModel
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

lr_test = LogisticRegressionModel.load('./lr-large')

# Use a featurizer to use trained features from an existing model
featurizer_test = dl.DeepImageFeaturizer(inputCol = "image", outputCol = "features", modelName = "InceptionV3")

# Pipeline both entities
p_lr_test = PipelineModel(stages=[featurizer_test, lr_test])

# Test and evaluate
tested_lr_test = p_lr_test.transform(testDF)
evaluator_lr_test = MulticlassClassificationEvaluator(metricName = "accuracy")
print("Logistic Regression Model: Test set accuracy = " + str(evaluator_lr_test.evaluate(tested_lr_test.select("prediction", "label"))))

tested_lr_test.select("label", "probability", "prediction").show(20, False)

INFO:tensorflow:Froze 376 variables.
Converted 376 variables to const ops.
INFO:tensorflow:Froze 0 variables.
Converted 0 variables to const ops.
Logistic Regression Model: Test set accuracy = 0.7962962962962963
+-----+------------------------------------------+----------+
|label|probability                               |prediction|
+-----+------------------------------------------+----------+
|0    |[0.8586630930379967,0.1413369069620032]   |0.0       |
|0    |[0.8102327864123758,0.18976721358762408]  |0.0       |
|0    |[0.9923176089958384,0.007682391004161615] |0.0       |
|0    |[0.36591086096397507,0.6340891390360249]  |1.0       |
|0    |[0.99719731178771,0.0028026882122901154]  |0.0       |
|0    |[0.7808065939840823,0.21919340601591766]  |0.0       |
|0    |[0.9970887542727666,0.0029112457272334703]|0.0       |
|0    |[0.8802493559546595,0.11975064404534048]  |0.0       |
|0    |[0.8122615553370044,0.18773844466299563]  |0.0       |
|0    |[0.2469409900620115,0.753059009937988

### Decision Tree Classifier

In [28]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
## Xception InceptionV3
vectorizer = dl.DeepImageFeaturizer(inputCol="image", outputCol="features", modelName='InceptionV3')

dt = DecisionTreeClassifier(labelCol = "label", featuresCol="features", maxDepth = 3)

dt_pipeline = Pipeline(stages=[vectorizer, dt])

dt_pipeline_model = dt_pipeline.fit(trainDF)


INFO:tensorflow:Froze 376 variables.
Converted 376 variables to const ops.
INFO:tensorflow:Froze 0 variables.
Converted 0 variables to const ops.


In [29]:
# Test and evaluate
tested_dt_test = dt_pipeline_model.transform(testDF)
evaluator_dt_test = MulticlassClassificationEvaluator(metricName = "accuracy")
print("Decision Tree Model: Test set accuracy = " + str(evaluator_dt_test.evaluate(tested_dt_test.select("prediction", "label"))))

tested_dt_test.select("label", "probability", "prediction").show(20, False)

INFO:tensorflow:Froze 376 variables.
Converted 376 variables to const ops.
INFO:tensorflow:Froze 0 variables.
Converted 0 variables to const ops.
Decision Tree Model: Test set accuracy = 0.5925925925925926
+-----+----------------------------------------+----------+
|label|probability                             |prediction|
+-----+----------------------------------------+----------+
|0    |[0.06153846153846154,0.9384615384615385]|1.0       |
|0    |[0.98,0.02]                             |0.0       |
|0    |[0.06153846153846154,0.9384615384615385]|1.0       |
|0    |[0.8,0.2]                               |0.0       |
|0    |[0.98,0.02]                             |0.0       |
|0    |[0.06153846153846154,0.9384615384615385]|1.0       |
|0    |[0.96,0.04]                             |0.0       |
|0    |[0.54,0.46]                             |0.0       |
|0    |[0.54,0.46]                             |0.0       |
|0    |[0.0,1.0]                               |1.0       |
|0    |[0.0615

### Save DT model

In [30]:
dtModel = dt_pipeline_model
print(dtModel)  # summary only
dtModel.stages[1].write().overwrite().save('dt')

PipelineModel_434498cc68f9d9612b19


### Reload DT Model

### Random Forest Classifier

In [32]:
from pyspark.ml.classification import RandomForestClassifier

from pyspark.ml import Pipeline
## Xception InceptionV3
vectorizer = dl.DeepImageFeaturizer(inputCol="image", outputCol="features", modelName='InceptionV3')

rf = RandomForestClassifier(labelCol = "label", featuresCol="features")

rf_pipeline = Pipeline(stages=[vectorizer, rf])

rf_pipeline_model = rf_pipeline.fit(trainDF)





INFO:tensorflow:Froze 376 variables.
Converted 376 variables to const ops.
INFO:tensorflow:Froze 0 variables.
Converted 0 variables to const ops.


In [33]:
# Test and evaluate
tested_rf_test = rf_pipeline_model.transform(testDF)
evaluator_rf_test = MulticlassClassificationEvaluator(metricName = "accuracy")
print("Random Forest Model: Test set accuracy = " + str(evaluator_rf_test.evaluate(tested_rf_test.select("prediction", "label"))))

tested_dt_test.select("label", "probability", "prediction").show(20, False)

INFO:tensorflow:Froze 376 variables.
Converted 376 variables to const ops.
INFO:tensorflow:Froze 0 variables.
Converted 0 variables to const ops.
Random Forest Model: Test set accuracy = 0.6666666666666666
+-----+----------------------------------------+----------+
|label|probability                             |prediction|
+-----+----------------------------------------+----------+
|0    |[0.06153846153846154,0.9384615384615385]|1.0       |
|0    |[0.98,0.02]                             |0.0       |
|0    |[0.06153846153846154,0.9384615384615385]|1.0       |
|0    |[0.8,0.2]                               |0.0       |
|0    |[0.98,0.02]                             |0.0       |
|0    |[0.06153846153846154,0.9384615384615385]|1.0       |
|0    |[0.96,0.04]                             |0.0       |
|0    |[0.54,0.46]                             |0.0       |
|0    |[0.54,0.46]                             |0.0       |
|0    |[0.0,1.0]                               |1.0       |
|0    |[0.0615

### Save RF Model

### Reload RF Model

In [None]:
from pyspark.ml.classification import LogisticRegressionModel
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

lr_test = LogisticRegressionModel.load('./lr')

# Use a featurizer to use trained features from an existing model
featurizer_test = dl.DeepImageFeaturizer(inputCol = "image", outputCol = "features", modelName = "InceptionV3")

# Pipeline both entities
p_test = PipelineModel(stages=[featurizer_test, lr_test])

# Test and evaluate
tested_df_test = p_test.transform(testDF)
evaluator_test = MulticlassClassificationEvaluator(metricName = "accuracy")
print("Test set accuracy = " + str(evaluator_test.evaluate(tested_df_test.select("prediction", "label"))))

tested_df_test.select("label", "probability", "prediction").show(20, False)

### Gradient-Boosted Tree Classifier

In [34]:
from pyspark.ml.classification import GBTClassifier

from pyspark.ml import Pipeline
## Xception InceptionV3
vectorizer = dl.DeepImageFeaturizer(inputCol="image", outputCol="features", modelName='InceptionV3')

gbt = GBTClassifier(maxIter=10)

gbt_pipeline = Pipeline(stages=[vectorizer, gbt])

gbt_pipeline_model = gbt_pipeline.fit(trainDF)





INFO:tensorflow:Froze 376 variables.
Converted 376 variables to const ops.
INFO:tensorflow:Froze 0 variables.
Converted 0 variables to const ops.


In [35]:
# Test and evaluate
tested_gbt_test = gbt_pipeline_model.transform(testDF)
evaluator_gbt_test = MulticlassClassificationEvaluator(metricName = "accuracy")
print("GBT Model: Test set accuracy = " + str(evaluator_gbt_test.evaluate(tested_gbt_test.select("prediction", "label"))))

tested_dt_test.select("label", "probability", "prediction").show(20, False)

INFO:tensorflow:Froze 376 variables.
Converted 376 variables to const ops.
INFO:tensorflow:Froze 0 variables.
Converted 0 variables to const ops.
GBT Model: Test set accuracy = 0.6296296296296297
+-----+----------------------------------------+----------+
|label|probability                             |prediction|
+-----+----------------------------------------+----------+
|0    |[0.06153846153846154,0.9384615384615385]|1.0       |
|0    |[0.98,0.02]                             |0.0       |
|0    |[0.06153846153846154,0.9384615384615385]|1.0       |
|0    |[0.8,0.2]                               |0.0       |
|0    |[0.98,0.02]                             |0.0       |
|0    |[0.06153846153846154,0.9384615384615385]|1.0       |
|0    |[0.96,0.04]                             |0.0       |
|0    |[0.54,0.46]                             |0.0       |
|0    |[0.54,0.46]                             |0.0       |
|0    |[0.0,1.0]                               |1.0       |
|0    |[0.06153846153846

In [36]:
tested_gbt_test.select("label", "probability", "prediction").show(20, False)

+-----+----------------------------------------+----------+
|label|probability                             |prediction|
+-----+----------------------------------------+----------+
|0    |[0.07005430994087057,0.9299456900591294]|1.0       |
|0    |[0.9341221756527827,0.06587782434721734]|0.0       |
|0    |[0.06587782434721742,0.9341221756527825]|1.0       |
|0    |[0.4604947255782175,0.5395052744217825] |1.0       |
|0    |[0.9224798711479536,0.0775201288520464] |0.0       |
|0    |[0.06587782434721742,0.9341221756527825]|1.0       |
|0    |[0.9306233339041042,0.06937666609589577]|0.0       |
|0    |[0.9341221756527827,0.06587782434721734]|0.0       |
|0    |[0.9269531963068626,0.07304680369313743]|0.0       |
|0    |[0.06587782434721744,0.9341221756527825]|1.0       |
|0    |[0.06587782434721742,0.9341221756527825]|1.0       |
|0    |[0.9341221756527827,0.06587782434721734]|0.0       |
|0    |[0.08074506050044718,0.9192549394995528]|1.0       |
|0    |[0.06587782434721742,0.9341221756

In [35]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

binaryevaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
binary_rate = binaryevaluator.evaluate(prediction)*100
print("accuracy: {}%" .format(round(binary_rate,2)))

accuracy: 58.33%


### KERAS - Save Model

In [1]:
from keras.applications import InceptionV3


  from ._conv import register_converters as _register_converters
Using TensorFlow backend.
  return f(*args, **kwds)
