# Setup

In [None]:
from azure.identity import DefaultAzureCredential
from azure.ai.ml import MLClient

credential = DefaultAzureCredential()  # Portable, works both in interactive and automated workloads
ml_client = MLClient.from_config(credential=credential)  # Load config from current workspace (only works on personal compute)

In [None]:
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes

dataset = Data(
    name="ames-housing-raw",
    description="Raw CSV of Ames Housing dataset",
    path="AmesHousing.csv",  # relative to notebook path
    type=AssetTypes.URI_FILE,
)

ml_client.data.create_or_update(dataset)

# Pipeline components

## 1. Split

In [None]:
from azure.ai.ml import command, Input, Output, dsl

# Convenience, as we'll reuse it in many components.
SKLEARN_ENV = "azureml://registries/azureml/environments/sklearn-1.5/labels/latest"         # Curated Environment includes Scikit-learn for ML logic
ML_SDK_ENV = "azureml://registries/azureml/environments/python-sdk-v2/labels/latest"        # Curated Environment includes AzureML SDK for API access

split_component = command(
    code="./stages",                                # Directory hosting the component's code base. Becomes the working directory
    command="python split.py " \
                "--input_data ${{inputs.input_data}} " \
                "--test_size ${{inputs.test_size}} " \
                "--random_state ${{inputs.random_state}} " \
                "--X_train ${{outputs.X_train}} " \
                "--y_train ${{outputs.y_train}} " \
                "--X_test ${{outputs.X_test}} " \
                "--y_test ${{outputs.y_test}}",     # As this is a command components, we need to invoke the Python script
    environment=SKLEARN_ENV,                        # Needed, as Scikit learn functionality is used in the Python file
    display_name="split",
    inputs={                                        # Input slots
        "input_data": Input(type="uri_file"),
        "test_size": Input(type="string"),          # float-type is not yet supported by the SDK
        "random_state": Input(type="string")        # int-type is not yet supported by the SDK
    },
    outputs={                                       # Output slots
        "X_train": Output(type="uri_file"),
        "y_train": Output(type="uri_file"),
        "X_test" : Output(type="uri_file"),
        "y_test" : Output(type="uri_file")

    }
)

## 2. Prep

In [None]:
prep_component = command(
    code="./stages",    # Will also upload the preprocessing.py module
    command="python prep.py " \
                "--raw_data ${{inputs.raw_data}} " \
                "$[[--xform_params_in ${{inputs.xform_params_in}}]] "\
                "--prepped_data ${{outputs.prepped_data}} " \
                "--xform_params_out ${{outputs.xform_params_out}}", # $[[]] allows specifying optional parameters. If missing, the whole flag is ommited
    environment=SKLEARN_ENV,
    display_name="prep",
    inputs={
        "raw_data": Input(type="uri_file"),                         # Instead of hardcoding the input data, this allows parameterization for reusablilty
        "xform_params_in": Input(type="uri_file", optional=True)    # Preprocessing is not stateless, so we might need parameters derived during training
    },
    outputs={
        "prepped_data": Output(type="uri_file"),
        "xform_params_out": Output(type="uri_file")

    }
)

## 3. Train

In [None]:
model_asset_name = "minimal-model"  # Enforces consitent model asset name

train_component = command(
    code="./stages",
    command="python train.py " \
                "--X ${{inputs.X}} " \
                "--y ${{inputs.y}} " \
                "--xform_params ${{inputs.xform_params}} " \
                "--model_path ${{outputs.model_path}}",
    environment=SKLEARN_ENV,
    display_name="train-model",
    inputs={
        "X": Input(type="uri_file"),
        "y": Input(type="uri_file"),
        "xform_params": Input(type="uri_file"),
    },
    outputs={
        "model_path": Output(type="custom_model", mode="upload", name="minimal-model")  # custom_model output will automatically register the model as a directory
    }
)

## 4. deploy

In [None]:
deploy_component = command(
    code="./stages",    # Will also include score.py, making it available for the deployment script
    command="python deploy.py " \
                "--model_name ${{inputs.model_name}} " \
                "--endpoint_name ${{inputs.endpoint_name}} " \
                "--example_payload ${{outputs.example_payload}}",
    environment=ML_SDK_ENV,
    display_name="deploy-model",
    inputs={
        "model_path": Input(type="custom_model"),       # Forces this step to wait for model registration, but is not used in the script
        "model_name": Input(type="string"),
        "endpoint_name": Input(type="string"),
    },
    outputs={
        "example_payload": Output(type="uri_file")      # Needed to create a data dependency (added for pedagogical reasons)
    }
)

## 5. test

In [None]:
test_component = command(
    code="./stages",
    command="python test.py --endpoint_name ${{inputs.endpoint_name}} --example_payload ${{inputs.example_payload}}",
    environment="azureml://registries/azureml/environments/python-sdk-v2/versions/31",
    compute="mlops-cluster",
    display_name="test-endpoint",
    inputs={
        "endpoint_name": Input(type="string"),
        "example_payload": Input(type="uri_file"),    # Needed to create a data dependency (added for pedagogical reasons)

    }
)

## 6. Evaluate

In [None]:
evaluate_component = command(
    code="./stages",
    command="python evaluate.py " \
                "--model_path ${{inputs.model_path}} " \
                "--X ${{inputs.X}} " \
                "--y ${{inputs.y}} " \
                "--metrics ${{outputs.metrics}}",
    environment=SKLEARN_ENV,
    display_name="evaluate model",
    inputs={
        "model_path": Input(type="custom_model"),
        "X": Input(type="uri_file"),
        "y": Input(type="uri_file")
    },
    outputs={
        "metrics": Output(type="uri_file")
    }
)

## 7. Tag

In [None]:
tag_component = command(
    code="./stages",
    command="python tag.py " \
                "--model_name ${{inputs.model_name}} " \
                "--metrics ${{inputs.metrics}}",
    environment=ML_SDK_ENV,
    display_name="tag model",
    inputs={
        "model_name": Input(type="string"),
        "metrics": Input(type="uri_file")
    }
)

# Samenstellen pipeline

In [None]:
@dsl.pipeline()         # The job-name will default to the name of the function, experiment name will default to name of the directory.
def train_and_deploy(dataset_uri, model_name, endpoint_name):
    split = split_component(
        input_data=dataset_uri,
        test_size="0.2",              # Withhold 20% of the dataset as test set
        random_state="42"             # Fixing the random_state makes the process deterministic. ideal for reproducibility during development, but unsuitable for production
    )
    prep_train = prep_component(
        raw_data=split.outputs["X_train"]
    )
    prep_test = prep_component(
        raw_data=split.outputs["X_test"],
        xform_params_in=prep_train.outputs["xform_params_out"]
    )
    train = train_component(
        X=prep_train.outputs["prepped_data"],
        y=split.outputs["y_train"],
        xform_params=prep_train.outputs["xform_params_out"],
    )
    evaluate = evaluate_component(
        model_path=train.outputs["model_path"],
        X=prep_test.outputs["prepped_data"],
        y=split.outputs["y_test"],
    )
    tag = tag_component(
        model_name=model_name,
        metrics=evaluate.outputs["metrics"]
    )
    deploy = deploy_component(  # For demonstration purpose only. In production-grade environments, deployments are often managed in separate pipelines (or CI/CD). 
        model_path=train.outputs["model_path"],  # Needed to enforce dependency on train-component, but not actually used
        model_name=model_name,
        endpoint_name=endpoint_name
    )
    test = test_component(
        endpoint_name=endpoint_name,
        example_payload=deploy.outputs["example_payload"],
    )

# Submitten pipeline

In [None]:
# model_asset_name was already defined above
endpoint_name = "sklearn-endpoint"
ames_housing_data_asset = Input(type="uri_file", path="azureml:ames-housing-raw:1")

train_and_deploy_job = train_and_deploy(ames_housing_data_asset, model_asset_name, endpoint_name)
train_and_deploy_job.settings.default_compute = "mlops-cluster" # Specifies the default cluster for each component. (Can be overriden per component)

ml_client.jobs.create_or_update(train_and_deploy_job)