# Deep Learning

### Downloads flower photos

In [None]:
#!wget http://download.tensorflow.org/example_images/flower_photos.tgz
#!tar xzf flower_photos.tgz

In [None]:
img_dir = './flower_photos'

### Creates Spark session with appropriate packages to read JPEGs

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession\
.Builder()\
.getOrCreate()

### Loads Data

In [None]:
# Create training & test DataFrames for transfer learning - this piece of code is longer than transfer learning itself below!
from pyspark.ml.image import ImageSchema
from pyspark.sql.functions import lit

tulips_df = ImageSchema.readImages(img_dir + "/tulips").withColumn("label", lit(1))
daisy_df = ImageSchema.readImages(img_dir + "/daisy").withColumn("label", lit(0))

### Train / Test split

In [None]:
tulips_train, tulips_test = tulips_df.randomSplit([0.6, 0.4])
daisy_train, daisy_test = daisy_df.randomSplit([0.6, 0.4])
train_df = tulips_train.unionAll(daisy_train)
test_df = tulips_test.unionAll(daisy_test)
# Under the hood, each of the partitions is fully loaded in memory, which may be expensive.
# This ensure that each of the paritions has a small size.
train_df = train_df.repartition(100)
test_df = test_df.repartition(100)

### Uses InceptionV3 second-to-last layer as features to a logistic regression

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from sparkdl import DeepImageFeaturizer 

featurizer = DeepImageFeaturizer(inputCol="image", outputCol="features", modelName="InceptionV3")
lr = LogisticRegression(maxIter=20, regParam=0.05, elasticNetParam=0.3, labelCol="label")
p = Pipeline(stages=[featurizer, lr])

p_model = p.fit(train_df)

### Checks performance of the model

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

tested_df = p_model.transform(test_df)
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(tested_df.select("prediction", "label"))))

In [None]:
tested_df.show()

In [None]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import expr, udf

def _p1(v):
    return float(v.array[1])
p1 = udf(_p1, DoubleType())

df = tested_df.withColumn("p_1", p1(tested_df.probability))
wrong_df = df.orderBy(expr("abs(p_1 - label)"), ascending=False)
wrong_df.select("image", "p_1", "label").limit(10).show()

### Copies some photos to a sample folder

In [None]:
sample_img_dir = './flower_photos/sample'

In [None]:
#!mkdir ./flower_photos/sample

In [None]:
#!cp ./flower_photos/daisy/100080576_f52e8ee070_n.jpg ./flower_photos/sample
#!cp ./flower_photos/daisy/10140303196_b88d3d6cec.jpg ./flower_photos/sample
#!cp ./flower_photos/tulips/100930342_92e8746431_n.jpg ./flower_photos/sample
#!cp ./flower_photos/tulips/10094729603_eeca3f2cb6.jpg ./flower_photos/sample

### Makes predictions using InceptionV3

In [None]:
from sparkdl import DeepImagePredictor

image_df = ImageSchema.readImages(sample_img_dir)

predictor = DeepImagePredictor(inputCol="image", 
                               outputCol="predicted_labels",
                               modelName="InceptionV3", 
                               decodePredictions=True, 
                               topK=10)
predictions_df = predictor.transform(image_df)

predictions_df.select("image", "predicted_labels").toPandas()

In [None]:
df = p_model.transform(image_df)
df.select("image", (1-p1(df.probability)).alias("p_daisy")).toPandas()

### Users a Keras Pretrained model as a Transformer

In [None]:
from keras.applications import InceptionV3

model = InceptionV3(weights="imagenet")
model.save('/tmp/model-full.h5')

In [None]:
from keras.applications.inception_v3 import preprocess_input
from keras.preprocessing.image import img_to_array, load_img
import numpy as np
from pyspark.sql.types import StringType
from sparkdl import KerasImageFileTransformer

def loadAndPreprocessKerasInceptionV3(uri):
    # this is a typical way to load and prep images in keras
    image = img_to_array(load_img(uri, target_size=(299, 299)))  # image dimensions for InceptionV3
    image = np.expand_dims(image, axis=0)
    return preprocess_input(image)

transformer = KerasImageFileTransformer(inputCol="uri", outputCol="predictions",
                                        modelFile='/tmp/model-full.h5',  # local file path for model
                                        imageLoader=loadAndPreprocessKerasInceptionV3,
                                        outputMode="vector")

In [None]:
import os
from pyspark.sql import SQLContext
sc = spark.sparkContext
sqlContext = SQLContext(sc)

files = [os.path.join(sample_img_dir, f) for f in os.listdir(sample_img_dir)]
uri_df = sqlContext.createDataFrame(files, StringType()).toDF("uri")

keras_pred_df = transformer.transform(uri_df)

In [None]:
results = keras_pred_df.select("uri", "predictions").toPandas()

In [None]:
np.argmax(results.predictions.iloc[0])

### Uses a regular Keras model as a Transformer

In [None]:
from sparkdl import KerasTransformer
from keras.models import Sequential
from keras.layers import Dense
import numpy as np
from pyspark.sql.types import *
from pyspark.sql import SQLContext

sc = spark.sparkContext
sqlContext = SQLContext(sc)

# Generate random input data
num_features = 10
num_examples = 100
input_data = [{"features" : np.random.randn(num_features).astype(float).tolist()} for i in range(num_examples)]
schema = StructType([ StructField("features", ArrayType(FloatType()), True)])
input_df = sqlContext.createDataFrame(input_data, schema)

# Create and save a single-hidden-layer Keras model for binary classification
# NOTE: In a typical workflow, we'd train the model before exporting it to disk,
# but we skip that step here for brevity
model = Sequential()
model.add(Dense(units=20, input_shape=[num_features], activation='relu'))
model.add(Dense(units=1, activation='sigmoid'))
model_path = "/tmp/simple-binary-classification"
model.save(model_path)

# Create transformer and apply it to our input data
transformer = KerasTransformer(inputCol="features", outputCol="predictions", modelFile=model_path)
final_df = transformer.transform(input_df)

In [None]:
final_df.show()

### Deploy a Keras pretrained model as a UDF to be used on SQL queries

In [None]:
from keras.applications import InceptionV3
from sparkdl.udf.keras_image_model import registerKerasImageUDF

registerKerasImageUDF("inceptionV3_udf", InceptionV3(weights="imagenet"))
registerKerasImageUDF("my_custom_keras_model_udf", "/tmp/model-full.h5")

def keras_load_img(fpath):
    from keras.preprocessing.image import load_img, img_to_array
    import numpy as np
    img = load_img(fpath, target_size=(299, 299))
    return img_to_array(img).astype(np.uint8)

registerKerasImageUDF("inceptionV3_udf_with_preprocessing", InceptionV3(weights="imagenet"), keras_load_img)

In [None]:
sample_img_dir = './flower_photos/sample'
from pyspark.ml.image import ImageSchema

image_df = ImageSchema.readImages(sample_img_dir)
image_df.registerTempTable("sample_images")

In [None]:
sqlContext.sql("SELECT inceptionV3_udf(image) as predictions from sample_images").show()

In [None]:
sqlContext.sql("SELECT my_custom_keras_model_udf(image) as predictions from sample_images").show()

In [None]:
sqlContext.sql("SELECT inceptionV3_udf_with_preprocessing(image) as predictions from sample_images").show()