# Serve a pyfunc model via pyspark

This notebook shows how a registred model can be served via pyspark for parallelization purposes. 

Note that the local simulation here is for the purpose of code and concept representation. Parallelization only makes sense where several processing units can be controlled in one cluster. 

In [None]:
from pyspark.sql import SparkSession, DataFrame
from pyspark import StorageLevel
import mlflow
import sys
import os
sys.path.append("../")
from config import mlflow_server_uri
from pyspark.sql.functions import struct
from pyspark.sql.types import StructType, StringType, StructField, ArrayType, LongType, DoubleType, IntegerType, \
    DecimalType, FloatType, BooleanType

In [None]:
mlflow.set_tracking_uri(mlflow_server_uri)
mlflow.set_registry_uri(mlflow_server_uri)

## Init pyspark and load data

Please note the following pitfalls regarding Spark in a local setup:
    
* Make sure that the PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON variables are set correctly: The driver and worker should point to the venv.
* In this repo, utils und config are used as modules. They need to be copied to the venv, since the driver and worker search here for packages. This must also be taken into account when working with a cluster in the cloud. 

In [None]:
python_path = os.path.abspath("../venv/bin/python")

# Set spark environments
os.environ['PYSPARK_PYTHON'] = python_path
os.environ['PYSPARK_DRIVER_PYTHON'] = python_path

In [None]:
! cp -r ../utils ../venv/lib/python3.8/site-packages/utils
! cp -r ../config ../venv/lib/python3.8/site-packages/config

In [None]:
spark_session: SparkSession = SparkSession.builder.master("local").appName("HelloWorld").getOrCreate()

In [None]:
SCHEMA: StructType = StructType([
        StructField("host_total_listings_count", FloatType(), True),
        StructField("neighbourhood_cleansed", IntegerType(), True),
        StructField("zipcode", IntegerType(), True),
        StructField("latitude", FloatType(), True),
        StructField("longitude", FloatType(), True),
        StructField("property_type", IntegerType(), True),
        StructField("room_type", IntegerType(), True),
        StructField("accommodates", FloatType(), True),
        StructField("bathrooms", FloatType(), True),
        StructField("bedrooms", FloatType(), True),
        StructField("beds", FloatType(), True),
        StructField("bed_type", IntegerType(), True),
        StructField("minimum_nights", FloatType(), True),
        StructField("number_of_reviews", FloatType(), True),
        StructField("review_scores_rating", FloatType(), True),
        StructField("review_scores_accuracy", FloatType(), True),
        StructField("review_scores_cleanliness", FloatType(), True),
        StructField("review_scores_checkin", FloatType(), True),
        StructField("review_scores_communication", FloatType(), True),
        StructField("review_scores_location", FloatType(), True),
        StructField("review_scores_value", FloatType(), True),
        StructField("price", FloatType(), True),
    ])

In [None]:
df = spark_session.read.csv("../data/airbnb-cleaned-mlflow.csv", header=True, schema=SCHEMA)

## Load the price regressor model

In [None]:
model_name="My_airbnb_model"
stage = "Production"

In [None]:
loaded_model = mlflow.pyfunc.spark_udf(model_uri = f"models:/{model_name}/{stage}",
                                       spark=spark_session)

## Predict

In [None]:
df = df.persist(StorageLevel.MEMORY_AND_DISK)

In [None]:
df = df.withColumn("prediction", loaded_model())

In [None]:
df.select("prediction").show()

## Stop spark session

In [None]:
spark_session.stop()

# Learnings


* Always stick to the latest version of mlflow. When it comes to the model registry and deployment purposes it becomes visible that this tool is constantly being worked on.
* When building a custom pyfunc model, we would recommended to define a signature for the model to gain security here.
* However, the wrapper of the PyFunc Objekt which enforces the correct data types for the internal is extremely sensitive (!). Schema compatibility between training and prediction datasets should be ensured up front.
* In the meantime, however, any error messages that may occur are already much more verbose than in earlier versions, that closes the circle to the first point :-) 

## Further reading

https://mlflow.org/docs/latest/models.html#export-a-python-function-model-as-an-apache-spark-udf