#Understanding Parallelization of Machine Learning Algorithms in Apache Spark™

##Pandas_UDFs for training lots of models in Parallel

The dataset used for this example is Bank marketing. Given a set of features about a customer can we predict whether the person will open a term deposit account.

Original Source: [UCI Machine Learning Repository 
Bank Marketing Data Set](https://archive.ics.uci.edu/ml/datasets/bank+marketing)
[Moro et al., 2014] S. Moro, P. Cortez and P. Rita. A Data-Driven Approach to Predict the Success of Bank Telemarketing. Decision Support Systems, Elsevier, 62:22-31, June 2014

In [0]:
train_data_path_2 = "dbfs:/ml-workshop-datasets/employee/delta/trainingData"

In [0]:
%scala
val tags = com.databricks.logging.AttributionContext.current.tags
val name = tags.getOrElse(com.databricks.logging.BaseTagDefinitions.TAG_USER, java.util.UUID.randomUUID.toString.replace("-", ""))
var username = if (name != "unknown") name else dbutils.widgets.get("databricksUsername")
spark.conf.set("my.fq_username", username)

In [0]:
username = spark.conf.get("my.fq_username")
user_name = username.replace("@databricks.com", "").replace(".","_")
print("User Name:", user_name)

mlflow_exp = "/Users/"+username+"/example_experiment"
print("MLFlow Experiment:", mlflow_exp)

### 1. Read in the training data

In [0]:
trainingData = spark.read.format("delta").load(train_data_path_2)

In [0]:
display(trainingData)

### 2. ML Flow Setup

In [0]:
# ML Flow in pandas udf
import mlflow
cntx = dbutils.entry_point.getDbutils().notebook().getContext()
api_token = cntx.apiToken().get()
api_url = cntx.apiUrl().get()
mlflow.set_tracking_uri("databricks")

### 3. Define the function to run on each campaign group

In [0]:
# We need to define the return schema
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import DoubleType, StructType, StructField
schema = StructType([StructField('Predictions', DoubleType(), True), 
                     StructField('campaign', DoubleType(), True)
                    ])

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def train_rf_groups(data):
  # Library imports
  import pandas as pd
  import mlflow
  import mlflow.sklearn
  import os
  import sklearn
  from sklearn.model_selection import train_test_split
  from sklearn.ensemble import RandomForestClassifier
  from sklearn.metrics import roc_auc_score
  
  
  # ML Flow setup
  os.environ["DATABRICKS_TOKEN"] = api_token
  os.environ["DATABRICKS_HOST"] = api_url
  mlflow.set_experiment(mlflow_exp)
  
  # get relevant info from the input dataframe
  campaign = data['campaign'].iloc[0]
  
  # Train, test split
  train, test = train_test_split(data)

  # The predicted column is "label" 
  train_x = train[["age","balance","previous","day","duration", "pdays"]]
  test_x = test[["age","balance","previous","day","duration", "pdays"]]
  train_y = train[["label"]]
  test_y = test[["label"]]
  
  # log the run into mlflow
  with mlflow.start_run() as run:
    # could also apply feature engineering steps here as well
    
    rf = RandomForestClassifier(n_estimators=100, max_depth=7,random_state=0)
    rf.fit(train_x, train_y)

    predicted_qualities = pd.DataFrame(rf.predict(test_x), columns=["Predictions"])
    auc = roc_auc_score(test_y, predicted_qualities)
    
    # Log mlflow attributes for mlflow UI
    mlflow.log_param("max_depth", 7)
    mlflow.log_param("n_estimators", 100)
    mlflow.log_metric("ROC AUC", auc)
    mlflow.sklearn.log_model(rf, "model")
    
  predicted_qualities['campaign'] = data['campaign'][0]
    
  return predicted_qualities
  

### Perform some cleaning steps on the data

In [0]:
# First, limit to those campaigns which have sufficient data for a model
# find the relevant campaign ids
from pyspark.sql.functions import count, col
campaigns = trainingData.groupby("campaign").agg(count("campaign")).where(col("count(campaign)")>300).select("campaign")

# now limit the training data just to those campaigns
trainingData_campaigns = trainingData.join(campaigns, "campaign")
display(trainingData_campaigns)

In [0]:
# Can't convert a Spark feature vector to pandas here, everything needs to be done in the pandas_udf
trainingData_campaigns = trainingData_campaigns.drop("features")
display(trainingData_campaigns)

In [0]:
display(trainingData_campaigns.groupBy("campaign").count())

### 4. Apply the function to the different campaign groups

In [0]:
output = trainingData_campaigns.groupby("campaign").apply(train_rf_groups)
display(output)

In [0]:
import time
model_name = "<FILL IN>"
client = mlflow.tracking.MlflowClient()
registered_model = client.get_registered_model(model_name)
exp_id = client.get_experiment_by_name(mlflow_exp).experiment_id
runs = mlflow.search_runs(exp_id)
artifact_uri=runs["artifact_uri"][0]
model_version = client.create_model_version(model_name, f"{artifact_uri}/model", runs["run_id"][0])
time.sleep(5)
client.update_model_version(model_name, model_version.version, stage="Production", description="My next prod version")