In [1]:
import azureml.core

from azureml.core import Experiment, Workspace, Dataset, Datastore
from azureml.train.automl import AutoMLConfig
from notebookutils import mssparkutils
from azureml.data.dataset_factory import TabularDatasetFactory

In [2]:
df = spark.read.load('abfss://files@datalakexxxxxxx.dfs.core.windows.net/data/ml/customer_churn.csv', format='csv',
    header=True,
    inferSchema=True
)
display(df.limit(10))

In [3]:
df.printSchema()

In [4]:
columns = ["ChurnLabel", "ChurnScore", "ChurnCategory", "ChurnReason"]

df = df.drop(*columns)
df.printSchema()

In [6]:
linkedService_name = "AzureMLService1"
experiment_name = "customer-churn-prediction"

ws = mssparkutils.azureML.getWorkspace(linkedService_name)
experiment = Experiment(ws, experiment_name)

In [7]:
datastore = Datastore.get_default(ws)
dataset = TabularDatasetFactory.register_spark_dataframe(df, datastore, name=experiment_name + "-dataset")

In [8]:
automl_config = AutoMLConfig(spark_context = sc,
                             task = "classification",
                             training_data = dataset,
                             label_column_name = "ChurnValue",
                             primary_metric = "accuracy",
                             experiment_timeout_hours = 0.25,
                             max_concurrent_iterations = 2,
                             enable_onnx_compatible_models = True)

In [9]:
run = experiment.submit(automl_config)

In [10]:
displayHTML("<a href={} target='_blank'>Your experiment in Azure Machine Learning portal: {}</a>".format(run.get_portal_url(), run.id))

In [11]:
run.wait_for_completion()

In [12]:
import onnxruntime
import mlflow
import mlflow.onnx

from mlflow.models.signature import ModelSignature
from mlflow.types import DataType
from mlflow.types.schema import ColSpec, Schema

# Get best model from automl run
best_run, onnx_model = run.get_output(return_onnx_model=True)

# Define utility functions to infer the schema of ONNX model
def _infer_schema(data):
    res = []
    for _, col in enumerate(data):
        t = col.type.replace("tensor(", "").replace(")", "")
        if t in ["bool"]:
            dt = DataType.boolean
        elif t in ["int8", "uint8", "int16", "uint16", "int32"]:
            dt = DateType.integer
        elif t in ["uint32", "int64"]:
            dt = DataType.long
        elif t in ["float16", "bfloat16", "float"]:
            dt = DataType.float
        elif t in ["double"]:
            dt = DataType.double
        elif t in ["string"]:
            dt = DataType.string
        else:
            raise Exception("Unsupported type: " + t)
        res.append(ColSpec(type=dt, name=col.name))
    return Schema(res)

def _infer_signature(onnx_model):
    onnx_model_bytes = onnx_model.SerializeToString()
    onnx_runtime = onnxruntime.InferenceSession(onnx_model_bytes)
    inputs = _infer_schema(onnx_runtime.get_inputs())
    outputs = _infer_schema(onnx_runtime.get_outputs())
    return ModelSignature(inputs, outputs)

# Infer signature of ONNX model
signature = _infer_signature(onnx_model)

artifact_path = experiment_name + "_artifact"
mlflow.set_tracking_uri(ws.get_mlflow_tracking_uri())
mlflow.set_experiment(experiment_name)

with mlflow.start_run() as run:
    # Save the model to the outputs directory for capture
    mlflow.onnx.log_model(onnx_model, artifact_path, signature=signature)

    # Register the model to AML model registry
    mlflow.register_model("runs:/" + run.info.run_id + "/" + artifact_path, "customer-churn-prediction-model")