# MLflow and PySpark integration:

source example: https://dev.to/bookletai/a-true-end-to-end-ml-example-lead-scoring-3162

## train locally a model as example:

In [10]:
import pandas as pd

## Import dataset
leads_dataset = pd.read_csv('Leads.csv')
leads_dataset.columns = map(str.lower, leads_dataset.columns)
leads_dataset.dropna(inplace=True)

In [11]:
# Create data pre-processing steps before plugging into model
leads_categorical_columns = ['lead origin',
                             'lead source',
                             'last activity',
                             'specialization',
                             'what is your current occupation',
                             'what matters most to you in choosing a course',
                             'city',
                             'last notable activity']

leads_numeric_columns = ['totalvisits',
                         'total time spent on website',
                         'page views per visit']

leads_response_columns = ['converted']

In [12]:
#split data for training, remove extras
from sklearn.model_selection import train_test_split

leads_x = leads_dataset.drop(leads_response_columns, axis=1)
leads_y = leads_dataset[leads_response_columns]

leads_x_train, leads_x_test, leads_y_train, leads_y_test = train_test_split(leads_x,
                                                                            leads_y,
                                                                            train_size=0.7,
                                                                            test_size=0.3,
                                                                            random_state=5050)


In [13]:
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
scaler = scaler.fit(leads_x_train[leads_numeric_columns])


In [14]:
def pre_process_leads_data(df,
                           numeric_columns,
                           categorical_columns,
                           fitted_scaler,
                           train_df_columns = None):
    ## create new df with selected columns
    df.columns = map(str.lower, df.columns)
    _df = df[set(numeric_columns + categorical_columns)].copy()
    
    ## scale the numeric columns with the pre-built scaler
    _df[numeric_columns] = fitted_scaler.transform(_df[numeric_columns])
         
    # First, make categorical text lowercase
    _df[categorical_columns] = _df[categorical_columns].apply(lambda x: x.str.lower())
    # Next, create one-hot-encoded variables, add to dataframe, drop old columns
    _df_dummies = pd.get_dummies(_df[categorical_columns], drop_first=True)
    _df = pd.concat([_df, _df_dummies], axis=1)
    _df.drop(categorical_columns, axis=1, inplace = True)

    if train_df_columns:
        _df = _df.reindex(columns=train_df_columns, fill_value=0)

    return _df

In [15]:
leads_x_train_clean = pre_process_leads_data(df = leads_x_train,
                                            numeric_columns = leads_numeric_columns,
                                            categorical_columns = leads_categorical_columns,
                                            fitted_scaler = scaler)

leads_x_test_clean = pre_process_leads_data(df = leads_x_test,
                                           numeric_columns = leads_numeric_columns,
                                           categorical_columns = leads_categorical_columns,
                                           fitted_scaler = scaler,
                                           train_df_columns = leads_x_train_clean.columns.tolist())

In [43]:
leads_x_test_clean.to_csv('tablon_para_predict.csv', sep=",", index=False)

In [16]:
## Train the random forest model
from sklearn.ensemble import RandomForestClassifier


num_estimators = 100
min_samples = 4

rf = RandomForestClassifier(n_estimators=num_estimators,
                            min_samples_split=min_samples)
rf.fit(leads_x_train_clean, leads_y_train.values.ravel())

RandomForestClassifier(min_samples_split=4)

In [20]:
from sklearn.metrics import accuracy_score, roc_auc_score

leads_y_test_predicted = rf.predict(leads_x_test_clean)

accuracy = accuracy_score(leads_y_test, leads_y_test_predicted)
auc_score = roc_auc_score(leads_y_test, leads_y_test_predicted)

print(accuracy)
print(auc_score)

0.7855917667238422
0.7859899447670301


## connect to mlflow

In [21]:
import mlflow

# connect to MLflow
mlflow.set_tracking_uri("http://mlflow:5000")
mlflow.set_experiment("LeadScoringProcessed") # creates an experiment if it doesn't exist


INFO: 'LeadScoringProcessed' does not exist. Creating a new experiment


## create a MLflow Model with pyfunc

In [22]:
import mlflow.pyfunc

class leadsModel(mlflow.pyfunc.PythonModel):
   
    ## defining objects needed for leadsModel prediction. 
    def __init__(self,
                 train_df_columns,
                 model,
                 leads_categorical_columns,
                 leads_numeric_columns,
                 fitted_scaler,
                 pre_process_leads_data):
        
        ## Setting up all needed objects
        self.train_df_columns = train_df_columns
        self.model = model
        self.leads_categorical_columns = leads_categorical_columns
        self.leads_numeric_columns = leads_numeric_columns
        self.fitted_scaler = fitted_scaler
        self.pre_process_leads_data = pre_process_leads_data
    
    ## define function with processing and feeding data into prediction at the end
    def predict(self,context,model_input):
        
        # make sure all inputted columns are lowercase
        model_input.columns = map(str.lower, model_input.columns)
        
        # run inputted dataset through our processing function
        # note: we are excluding the response columns here since not needed for deploy
        model_input_processed = self.pre_process_leads_data(
                                   df = model_input,
                                   numeric_columns = self.leads_numeric_columns,
                                   categorical_columns = self.leads_categorical_columns,
                                   fitted_scaler = self.fitted_scaler,
                                   train_df_columns = self.train_df_columns)       
        
        # finally input the cleaned/adjusted dataset into our model for prediction
        return self.model.predict(model_input_adjusted)

  and should_run_async(code)


In [23]:
# define specific python and package versions for environment
mlflow_conda_env = {
 'name': 'mlflow-env',
 'channels': ['defaults'],
 'dependencies': ['python=3.6.2', {'pip': ['mlflow==1.6.0','scikit-learn','cloudpickle==1.3.0']}]
}

In [26]:
# start mlflow run, log parameters, metrics, and the model
with mlflow.start_run(run_name="Leads Model with Processing") as run:
    # log the parameters that we defined for the model training
    mlflow.log_param("num_estimators", num_estimators)
    mlflow.log_param("min_samples", min_samples)
    
    # log the performance metrics that we calculated earlier
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("auc_score", auc_score)
    
    # log model with all objects referenced in the leadsModel class
    mlflow.pyfunc.log_model(
        artifact_path = "leads_pyfunc",
        python_model = leadsModel(train_df_columns = leads_x_train_clean.columns.tolist(),
                                  model = rf,
                                  leads_categorical_columns = leads_categorical_columns,
                                  leads_numeric_columns = leads_numeric_columns,
                                  fitted_scaler = scaler,
                                  pre_process_leads_data = pre_process_leads_data
                                 ),
        conda_env = mlflow_conda_env
    )
    
    # save run_id and experiment_id for deployment
    run_id = run.info.run_uuid
    experiment_id = run.info.experiment_id
    
    # end the mlflow run!
    mlflow.end_run()

## Consume model with Pyspark UDF

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc').getOrCreate()

In [44]:
model_path = 's3://mlflow/1/f639a287b7e140e2b27f5a0d86299a10/artifacts/leads_pyfunc/'# 's3://<bucket>/mlflow/artifacts/1/0f8691808e914d1087cf097a08730f17/artifacts/model' 
prediction_data_path = 'tablon_para_predict.csv'
wine_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("header", "true").option('delimiter', ',').load(prediction_data_path)
         
df.withColumn('prediction', wine_udf(*train_cols)).show(100, False)

Py4JJavaError: An error occurred while calling o263.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 12, e76dc0194b9f, executor driver): java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
	at io.netty.util.internal.PlatformDependent.directBuffer(PlatformDependent.java:490)
	at io.netty.buffer.NettyArrowBuf.getDirectBuffer(NettyArrowBuf.java:243)
	at io.netty.buffer.NettyArrowBuf.nioBuffer(NettyArrowBuf.java:233)
	at io.netty.buffer.ArrowBuf.nioBuffer(ArrowBuf.java:245)
	at org.apache.arrow.vector.ipc.message.ArrowRecordBatch.computeBodyLength(ArrowRecordBatch.java:222)
	at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:240)
	at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:132)
	at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:120)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.$anonfun$writeIteratorToStream$1(ArrowPythonRunner.scala:94)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIteratorToStream(ArrowPythonRunner.scala:101)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
	at io.netty.util.internal.PlatformDependent.directBuffer(PlatformDependent.java:490)
	at io.netty.buffer.NettyArrowBuf.getDirectBuffer(NettyArrowBuf.java:243)
	at io.netty.buffer.NettyArrowBuf.nioBuffer(NettyArrowBuf.java:233)
	at io.netty.buffer.ArrowBuf.nioBuffer(ArrowBuf.java:245)
	at org.apache.arrow.vector.ipc.message.ArrowRecordBatch.computeBodyLength(ArrowRecordBatch.java:222)
	at org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:240)
	at org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:132)
	at org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:120)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.$anonfun$writeIteratorToStream$1(ArrowPythonRunner.scala:94)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.writeIteratorToStream(ArrowPythonRunner.scala:101)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:383)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:218)
