# P8 : Model deployment on the cloud

In [127]:
import os
import io
import pyspark.sql
from pyspark import SparkContext
from pyspark.sql import SparkSession
import findspark
findspark.init()
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, split
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
from pyspark.ml.feature import StringIndexer, StandardScaler
from pyspark.ml.feature import PCA
import tensorflow as tf
from tensorflow.keras.applications import resnet50
from tensorflow.keras.preprocessing.image import img_to_array
from PIL import Image
import cv2
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

## Set-up bucket and spark

Set-up spark session and spark context.

In [2]:
spark = SparkSession \
    .builder \
    .appName("computer_vision") \
    .master("local") \
    .config("spark.hadoop.google.cloud.auth.service.account.enable","true") \
    .config("google.cloud.auth.service.account.json.keyfile", "/path/json") \
    .getOrCreate()

In [3]:
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

In [4]:
#!gsutil ls gs://name_bucket/img/*

## Load images from google storage

In [5]:
path_img = "gs://p8_computer_vision/img/*"
img_all = spark.read.format("image").load(path_img)
img_all.printSchema()

root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = true)
 |    |-- width: integer (nullable = true)
 |    |-- nChannels: integer (nullable = true)
 |    |-- mode: integer (nullable = true)
 |    |-- data: binary (nullable = true)



In [6]:
img_df = img_all.select("image.origin", "image.width", "image.height", "image.data")
img_df = img_df.withColumn("target", split(col("origin"), "/") \
    .getItem(4))
img_df.show()

+--------------------+-----+------+--------------------+------------------+
|              origin|width|height|                data|            target|
+--------------------+-----+------+--------------------+------------------+
|gs://p8_computer_...|  763|   722|[FF FF FF FF FF F...|           cabbage|
|gs://p8_computer_...|  724|   706|[FF FF FF FF FF F...|           cabbage|
|gs://p8_computer_...|  754|   720|[FF FF FF FF FF F...|           cabbage|
|gs://p8_computer_...|  566|   567|[FF FF FF FF FF F...|apple_crimson_snow|
|gs://p8_computer_...|  421|   527|[FF FF FF FF FF F...|              pear|
|gs://p8_computer_...|  639|   558|[FF FF FF FF FF F...|apple_crimson_snow|
|gs://p8_computer_...|  526|   431|[FF FF FF FF FF F...|              pear|
|gs://p8_computer_...|  583|   578|[FF FF FF FF FF F...|apple_crimson_snow|
|gs://p8_computer_...|  510|   414|[FF FF FF FF FF F...|              pear|
+--------------------+-----+------+--------------------+------------------+



In [123]:
path_img = "gs://p8_computer_vision/img/*"
img_all = spark.read.format("binaryfile").load(path_img)
img_all.printSchema()

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)



In [124]:
img_df = img_all.select("path", "content")
img_df = img_df.withColumn("target", split(col("path"), "/") \
    .getItem(4))
img_df.show()

+--------------------+--------------------+------------------+
|                path|             content|            target|
+--------------------+--------------------+------------------+
|gs://p8_computer_...|[FF D8 FF E0 00 1...|           cabbage|
|gs://p8_computer_...|[FF D8 FF E0 00 1...|           cabbage|
|gs://p8_computer_...|[FF D8 FF E0 00 1...|           cabbage|
|gs://p8_computer_...|[FF D8 FF E0 00 1...|apple_crimson_snow|
|gs://p8_computer_...|[FF D8 FF E0 00 1...|              pear|
|gs://p8_computer_...|[FF D8 FF E0 00 1...|apple_crimson_snow|
|gs://p8_computer_...|[FF D8 FF E0 00 1...|              pear|
|gs://p8_computer_...|[FF D8 FF E0 00 1...|apple_crimson_snow|
|gs://p8_computer_...|[FF D8 FF E0 00 1...|              pear|
+--------------------+--------------------+------------------+



## Preprocess images

In [69]:
test = img_df.collect()[0]['data']

In [72]:
type(test)

bytearray

In [113]:
image_stream = io.BytesIO(test)

In [115]:
ggg = Image.frombytes('RGB', (763, 722), test)

TypeError: argument 1 must be read-only bytes-like object, not bytearray

In [108]:
for i,item in enumerate(image_stream):
    image_stream[i] = 255-item

TypeError: unsupported operand type(s) for -: 'int' and 'bytes'

In [102]:
grr = np.frombuffer(image_stream.read(), np.uint8)

In [109]:
grr2 = 255-grr

In [104]:
type(grr)

numpy.ndarray

In [110]:
img = cv2.imdecode(grr2, flags=1)

In [64]:
gray = cv2.cvtColor(grr, cv2.IMREAD_GRAYSCALE)

plt.imshow(gray)
plt.title('my picture')
plt.show()

error: OpenCV(4.5.5) /io/opencv/modules/imgproc/src/color.simd_helpers.hpp:92: error: (-2:Unspecified error) in function 'cv::impl::{anonymous}::CvtHelper<VScn, VDcn, VDepth, sizePolicy>::CvtHelper(cv::InputArray, cv::OutputArray, int) [with VScn = cv::impl::{anonymous}::Set<3, 4>; VDcn = cv::impl::{anonymous}::Set<1>; VDepth = cv::impl::{anonymous}::Set<0, 2, 5>; cv::impl::{anonymous}::SizePolicy sizePolicy = cv::impl::<unnamed>::NONE; cv::InputArray = const cv::_InputArray&; cv::OutputArray = const cv::_OutputArray&]'
> Invalid number of channels in input image:
>     'VScn::contains(scn)'
> where
>     'scn' is 1


In [46]:
img = cv2.imdecode(grr, 1)

In [47]:
img

In [39]:
img = cv2.imdecode(np.frombuffer(image_stream.read(), np.uint8), 1)

In [40]:
img

In [None]:
image_stream.write(connection.read(image_len))
image_stream.seek(0)
file_bytes = np.asarray(bytearray(image_stream.read()), dtype=np.uint8)
img = cv2.imdecode(file_bytes, cv2.IMREAD_COLOR)

In [32]:
img = cv2.imdecode(np.frombuffer(test_converted.read(), np.uint8), 1)

In [33]:
img2 = cv2.imread(img)

In [34]:
img2

In [15]:
def preprocess_img(bytes_img, height, width, border_color, final_size):
    """
    """
    img = Image.open(io.BytesIO(bytes_img))
    if width == height:
        img_border=img
    elif height > width:
        border = round((height - width) / 2)
        img_border = cv2.copyMakeBorder(img, 0, 0, border, border, 
                                        cv2.BORDER_CONSTANT, None, value = border_color)
    else:
        border = round((width - height) / 2)
        img_border = cv2.copyMakeBorder(img, border, border, 0, 0, 
                                        cv2.BORDER_CONSTANT, None, value = border_color)
    img_resized = cv2.resize(img_border, final_size, interpolation = cv2.INTER_AREA)

    return img_resized

In [16]:
img_df2 = img_df.withColumn("preprocessed_data", preprocess_img(img_df, (255, 255, 255), (224, 224)))

ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.

In [None]:
img_df.select()

In [97]:
def preprocess(bytes_img):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(bytes_img)).resize([224, 224])
    arr = img_to_array(img)
    return arr

In [98]:
test2 = preprocess(test)

UnidentifiedImageError: cannot identify image file <_io.BytesIO object at 0x7f7ccfb9e720>

## Load model

We will use the pre-trained ResNet50 model without top layers.

In [10]:
resnet = resnet50.ResNet50(weights="imagenet", include_top=False)
resnet.summary()

Model: "resnet50"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, None, None,  0           []                               
                                 3)]                                                              
                                                                                                  
 conv1_pad (ZeroPadding2D)      (None, None, None,   0           ['input_1[0][0]']                
                                3)                                                                
                                                                                                  
 conv1_conv (Conv2D)            (None, None, None,   9472        ['conv1_pad[0][0]']              
                                64)                                                        

In [116]:
bc_model_weights = spark.sparkContext.broadcast(resnet.get_weights())

In [117]:
def model_fn():
    """
    Returns a ResNet50 model with top layer removed and broadcasted pretrained weights.
    """
    model = resnet50.ResNet50(weights=None, include_top=False) 
    model.set_weights(bc_model_weights.value)
    return model

In [118]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return resnet50.preprocess_input(arr) 

In [129]:
def featurize_series(model, content_series):
    
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    
    input = np.stack(content_series.map(preprocess)) 
    preds = model.predict(input)
    output = [p.flatten() for p in preds]
    output = pd.Series(output)
    return output

In [121]:
@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).
  
    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                                is a pandas Series of image data.
    '''
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)



In [130]:
features_df = img_df.select(col("path"), col("target"), featurize_udf("content").alias("features"))
features_df.printSchema()
features_df.show()

root
 |-- path: string (nullable = true)
 |-- target: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: float (containsNull = true)



PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/home/fbajolet/spark-3.0.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/home/fbajolet/spark-3.0.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/home/fbajolet/spark-3.0.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 258, in dump_stream
    return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream)
  File "/home/fbajolet/spark-3.0.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 88, in dump_stream
    for batch in iterator:
  File "/home/fbajolet/spark-3.0.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 251, in init_stream_yield_batches
    for series in iterator:
  File "/home/fbajolet/spark-3.0.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 359, in func
    for result_batch, result_type in result_iter:
  File "<ipython-input-121-f701bb698283>", line 12, in featurize_udf
  File "<ipython-input-119-f0a88bcfec58>", line 11, in featurize_series
NameError: name 'pd' is not defined
