# ResNet50 With Pandas

# Launch Spark

Three configuration items have to be added to the Spark configuration to enable Arrow as it is disabled by default. This can be done without modifying SparkLauncher now, but you can just modify that if you like.

```python
    # Apache Arrow Config
    conf.set('spark.yarn.appMasterEnv.ARROW_PRE_0_15_IPC_FORMAT', '1')
    conf.set('spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT', '1')
    conf.set('spark.sql.execution.arrow.enabled', 'true')
```

In [None]:
import import_ipynb
from data603 import SparkLauncher

# get a configuration object
conf = SparkLauncher.get_spark_conf()

# add a file to the configuration that will get copied to all the nodes on the cluster
conf.set('spark.yarn.dist.files', 'keras_data/resnet50_weights_tf_dim_ordering_tf_kernels.h5')

# Apache Arrow Config
conf.set('spark.yarn.appMasterEnv.ARROW_PRE_0_15_IPC_FORMAT', '1')
conf.set('spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT', '1')
conf.set('spark.sql.execution.arrow.enabled', 'true')

# launch the cluster using the configuration
spark = SparkLauncher.get_spark_session(pack_venv = False, conf = conf)


importing Jupyter notebook from /scratch/data603/rgite1/data603/SparkLauncher.ipynb
Creating Spark Configuration
Creating Spark Configuration
Setting Environment Variables
Creating Spark Session: rgite1_data603_spark_session


# Read Dataframe

In [None]:
image_chips = spark.read.parquet("/user/shreyaa2/image_face.parquet")

In [None]:
import os
import pyspark.sql.functions as F
from pyspark.sql.types import *

In [None]:
def extract_chip(data, xmin, xmax, ymin, ymax):
    from PIL import Image
    import io, math
    
    # Read the image data using Pillow
    img = Image.open(io.BytesIO(data))
    # Get the size of the image 
    (width, height) = img.size
    
    # Calculate the bounding box pixels
    # observe the use of float function here. That's necessary
    # because the bounding box data were read in as strings, not doubles.
    left = math.floor(float(xmin)*width)
    upper = math.floor(float(ymin)*height)
    right = math.floor(float(xmax)*width)
    lower = math.floor(float(ymax)*height)
    
    # Crop the image to the bounding box size
    img = img.crop(box = (left, upper, right, lower))
    
    # Save the image to a byte-buffer
    buff = io.BytesIO()
    img.save(buff, format = "JPEG")
    
    # Get the raw bytes of the jpeg data.
    byte_array = buff.getvalue()
    return byte_array   # return buff.getvalue() doesn't work. This a quirk of pyspark not being able to determine the output type of a function call.

# Wrap the function as a spark udf (user-defined function) with a binary return type
udf_extract_chip = F.udf(extract_chip, returnType = BinaryType())

# Create a new column with the image chip data
image_chips = image_chips.withColumn("chip_data", udf_extract_chip("Data","XMin","XMax","YMin","YMax"))

In [None]:
image_chips = image_chips.drop('data') # remove the full-image data.

# Add In a Grouping Column

In [None]:
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import col, lit, udf
from pyspark.sql.types import IntegerType

def group_id(n):
    ret = n % 10
    return ret

udf_group_id = udf(group_id, IntegerType())

# create the counter 1 - # of rows
image_chips = image_chips.withColumn("n", monotonically_increasing_id())

# modulo the counter to get a repeating pattern of 0,1,2,3,4,5,6,7,8,9 for the group number
image_chips = image_chips.withColumn("grp", udf_group_id("n"))

# Create Ouput Column(s)

In [None]:
# create two empty columns for result of udf
image_chips = image_chips.withColumn('prediction_label', lit(""))
image_chips = image_chips.withColumn('prediction_confidence', lit(0.0))

# Create the Pandas UDF

Creating the Pandas UDF using the `@pandas_udf` decorator.

In [None]:
import pandas as pd

from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf(image_chips.schema, PandasUDFType.GROUPED_MAP)
def evaluate_chip(pdf):
    # pdf is a pandas dataframe
    import io
    import os
    from keras.applications.resnet50 import ResNet50
    from keras.applications.resnet50 import preprocess_input
    from keras.applications.resnet50 import decode_predictions
    from keras.preprocessing.image import load_img
    from keras.preprocessing.image import img_to_array

    # Load Model Data
    model = ResNet50(weights = f'{os.getcwd()}/resnet50_weights_tf_dim_ordering_tf_kernels.h5',
                 include_top = True)

    # Create arrays to hold prediction outputs.
    prediction_label = []
    prediction_confidence = []
    for chip_data in pdf['chip_data']:
        # Load the image
        img = load_img(io.BytesIO(chip_data), target_size = (224,224))

        # Prepare Image
        image = img_to_array(img)
        image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))
        image = preprocess_input(image)

        # Run prediction
        yhat = model.predict(image)

        # Decode Predictions
        label = decode_predictions(yhat)
        label = label[0][0]  # Get first prediction (most confident)
        
        # Save prediction results in arrays
        prediction_label.append(label[1])
        prediction_confidence.append(label[2])

    # Assign result array data to the correct columns in the pandas dataframe
    pdf['prediction_label'] = prediction_label
    pdf['prediction_confidence'] = prediction_confidence
    
    return pdf

# Group the Image Dataframe, Apply Pandas UDF

Using the group column to separate the data into processing chunks, call `apply` on each chunk to apply the Pandas UDF.

In [None]:
image_chips = image_chips.groupby('grp').apply(evaluate_chip)

In [None]:
image_chips.printSchema()

root
 |-- ImageID: string (nullable = true)
 |-- XMin: double (nullable = true)
 |-- XMax: double (nullable = true)
 |-- YMin: double (nullable = true)
 |-- YMax: double (nullable = true)
 |-- chip_data: binary (nullable = true)
 |-- n: long (nullable = false)
 |-- grp: integer (nullable = true)
 |-- prediction_label: string (nullable = false)
 |-- prediction_confidence: double (nullable = false)



# View the Result!

In [None]:
image_chips.show(50)

+----------------+--------+--------+--------+--------+--------------------+---+---+-------------------+---------------------+
|         ImageID|    XMin|    XMax|    YMin|    YMax|           chip_data|  n|grp|   prediction_label|prediction_confidence|
+----------------+--------+--------+--------+--------+--------------------+---+---+-------------------+---------------------+
|000caf6b38df250f|0.200625| 0.22625|0.327471|0.344221|[FF D8 FF E0 00 1...|  1|  1|             hamper|  0.35466238856315613|
|000caf6b38df250f|    0.73| 0.76625|0.177554|0.201005|[FF D8 FF E0 00 1...| 11|  1|             hamper|   0.5753193497657776|
|000caf6b38df250f|  0.1375|0.254375|0.409548|0.490787|[FF D8 FF E0 00 1...| 21|  1|             nipple|  0.21943114697933197|
|000caf6b38df250f| 0.65125|  0.7975|0.056951|0.306533|[FF D8 FF E0 00 1...| 31|  1|                wig|  0.20163631439208984|
|000caf6b38df250f|0.065625|   0.185|0.681742|0.922948|[FF D8 FF E0 00 1...| 41|  1|        Windsor_tie|   0.3849346339

In [None]:
image_chips.filter(image_chips['prediction_label'].contains("Human face")).show()

+-------+----+----+----+----+---------+---+---+----------------+---------------------+
|ImageID|XMin|XMax|YMin|YMax|chip_data|  n|grp|prediction_label|prediction_confidence|
+-------+----+----+----+----+---------+---+---+----------------+---------------------+
+-------+----+----+----+----+---------+---+---+----------------+---------------------+



# Analyze the Result!

In [None]:
ic_summary = image_chips.filter('prediction_confidence > 0.90')\
                        .groupby('prediction_label')\
                        .count()\
                        .filter("count > 100")\
                        .sort(col("count").desc())

In [None]:
ic_summary.show(5)

+----------------+-----+
|prediction_label|count|
+----------------+-----+
+----------------+-----+



In [None]:
spark.stop()