# Image processing

In this notebook we will implement a udf that will load a Keras model that was trained to do a binary classification of images. It will classifiy the images into two classes: dog and cat.

We will implement the UDF in three different ways to see the difference in execution.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, pandas_udf

import os

import pandas as pd
import numpy as np
import tensorflow as tf

import io
import os
from typing import Iterator

from PIL import Image
from tensorflow.keras.preprocessing.image import img_to_array

keras = tf.keras

In [None]:
spark = (
    SparkSession
    .builder
    .appName('images')
    .getOrCreate()
)

In [None]:
print(spark.version)

In [None]:
# These constants are important for the image preprocessing because this is what the model expects
IMG_SIZE = 160
IMG_SHAPE = (IMG_SIZE, IMG_SIZE, 3)


base_path = os.getcwd()

project_path = ('/').join(base_path.split('/')[0:-3]) 

images_input_path = os.path.join(project_path, 'data/cat-dog-images')

model_path = os.path.join(project_path, 'models/keras-model-1/model.json')
weights_path = os.path.join(project_path, 'models/keras-model-1/weights.h5')

### Read the images into a dataframe using the binaryFormat

In [None]:
# your code here

images = spark.read.format('binaryFile').load(images_input_path)

The Keras model is represented with the model.json file and weights.h5. Load the model from these files.

In [None]:
# your code here

json_file = open(model_path, 'r')
loaded_model_json = json_file.read()
json_file.close()

loaded_model = keras.models.model_from_json(loaded_model_json)
loaded_model.load_weights(weights_path)

### Broadcast the model

In [None]:
weights = loaded_model.get_weights()

model_json_bc = spark.sparkContext.broadcast(loaded_model_json)
weights_bc = spark.sparkContext.broadcast(weights)

### Load the model

* two helper functions to
 * load the model
 * preporcess the bytes

In [None]:
def load_model():
    # load the model from brodcasted variables and return it
    model = keras.models.model_from_json(model_json_bc.value)
    model.set_weights(weights_bc.value)
    return model


def preprocess(bytes):
    # * create image from bytes and resize
    # * converts the image to numpy array
    # * return the numpy array
    img = Image.open(io.BytesIO(bytes)).resize((IMG_SIZE, IMG_SIZE))
    arr_rescaled = img_to_array(img) / 255.0
    return arr_rescaled

### Implement the UDF

* first we will try the vanilla UDF
* it will take as input the bytes and return a float

In [None]:
# your code here:

@udf('float')
def classify_udf(img_bytes):
    # load the model
    # preprocess the bytes
    # reshape the array
    # apply the model
    # extract and return the prediction

    pass

### Call the UDF

In [None]:
# your code here:


### Implement Pandas UDF

* now we will try the Pandas UDF
* it will take as input pd.Series and return another pd.Series

In [None]:
# your code here

@pandas_udf('float')
def classify_udf_pd(img_bytes: pd.Series) -> pd.Series:

    pass

### Call the UDF

In [None]:
# your code here


### Implement Pandas iter UDF

* finally we will implement the Pandas iter UDF
* it will take as input iterator of pd.Series and return iterator of pd.Series
* for this purpose implement another helper function that will be called in the for-loop inside the UDF

In [None]:
# your code here

def process_batch(model, batch):
    # preprocess the bytes:
    data = batch.map(preprocess)

    # reshape the data:
    input = np.stack(data)

    # apply the model:
    predictions = model.predict(input, batch_size=32)

    # convert the predictions to pandas series
    final_series = pd.Series(predictions[:, 0])

    return final_series


@pandas_udf('float')
def classify_udf_pd_iter(img_bytes: Iterator[pd.Series]) -> Iterator[pd.Series]:
    # The Pandas UDF should be of the type PandasUDFType.SCALAR_ITER
    # which should be specified using the type hints.

    # load the model
    
    
    # iterate over the img_bytes
    pass

In [None]:
### Call the UDF

In [None]:
# your code here