# SageMaker Training with MLflow

<div class="alert alert-block alert-info">
⚠️ The latest SageMaker Distribution image version known to work with this notebook is <code>3.1.0</code>. If you encounter problems with other versions, please downgrade to version <code>3.1.0</code>. <b>To do so, you must stop your JupyterApp, downgrade the SageMaker Distribution image to <code>3.1.0</code> and restart the JupyterLabApp for the changes to take effect</b>.</div>

<div class="alert alert-warning"> This notebook expects an instance of SageMaker Managed MLflow running.</div>

In this lab, we show how you can use SageMaker Managed MLflow for experimentation tracking.
We will show a few scenarios, mimic the workflow of a data scientist that first develops locally training functions or training scripts to test algorithms before using the SageMaker Managed infrastructure to run remote jobs.

## Setup environment

install the necessary libraries. We use the `mlflow` version `2.22.1`.
In order to log data to MLflow, we need to install the [`sagemaker-mlflow`](https://github.com/aws/sagemaker-mlflow) plugin.
This plugin generates Signature V4 headers in each outgoing request to the Amazon SageMaker with MLflow capability, determines the URL of capability to connect to tracking servers, and registers models to the SageMaker Model Registry.
It generates a token with the SigV4 Algorithm that the service will use to conduct Authentication and Authorization using AWS IAM.

Using the plugin allows you to use transparently the `mlflow` client SDK without any further modifications to log securely your metadata and artifacts to the SageMaker Managed MLflow.

In [None]:
!pip install -q sagemaker[local] mlflow==2.22.1 sagemaker-mlflow

Import the necessary libraries and initialize client SDKs

In [None]:
import sagemaker
from sagemaker import get_execution_role
from sagemaker.sklearn.estimator import SKLearn

import boto3
import numpy as np
import pandas as pd
import os
import json

from IPython.display import Javascript, HTML

# Define session, role, and region so we can
# perform any SageMaker tasks we need
boto_session = boto3.Session()
sagemaker_session = sagemaker.Session()
role = get_execution_role()
region = sagemaker_session.boto_region_name
sm_client = boto_session.client("sagemaker")

sagemaker.__version__

We now extract information about the SageMaker DOmain and Space where we are working.
It they will become useful later for logging more precise information about which `user-profile` has logged specific MLflow runs.

In [None]:
NOTEBOOK_METADATA_FILE = "/opt/ml/metadata/resource-metadata.json"
domain_id = None

if os.path.exists(NOTEBOOK_METADATA_FILE):
    with open(NOTEBOOK_METADATA_FILE, "rb") as f:
        metadata = json.loads(f.read())
        domain_id = metadata.get('DomainId')
        space_name = metadata.get('SpaceName')

if not space_name:
    raise Exception(f"Cannot find the current domain. Make sure you run this notebook in a JupyterLab in the SageMaker AI Studio")
else:
    print(f"SageMaker domain id: {domain_id}")

if not space_name:
    raise Exception(f"Cannot find the current space name. Make sure you run this notebook in a JupyterLab in the SageMaker Studio")
else:
    print(f"Space name: {space_name}")
    
r = sm_client.describe_space(DomainId=domain_id, SpaceName=space_name)
user_profile_name = r['OwnershipSettings']['OwnerUserProfileName']

print(f"User profile name: {user_profile_name}")

For local development, it is useful to have a reference of which SageMaker Distribution Image we are using.
You can find this information as rescribed below

In [None]:
r = sm_client.describe_space(DomainId=domain_id, SpaceName=space_name)
resource_spec = r['SpaceSettings']['JupyterLabAppSettings']['DefaultResourceSpec']
sm_image = resource_spec.get('SageMakerImageArn', 'not defined')
sm_image_version = resource_spec.get('SageMakerImageVersionAlias', 'not defined')
print(f"""
SageMaker image: \033[1m{sm_image}\033[0m
SageMaker image version: \033[1m{sm_image_version}\033[0m
""")


Full details of the space where we are operation can be found as follow

In [None]:
import pprint

pp = pprint.PrettyPrinter(indent=2)
pp.pprint(r)

## MLflow tracking server

If running at an AWS led event, the MLflow tracking server has already been provisioned to you.
Alternatively, make sure you have run the CloudFormation template to create the necessary infrastructure, including the MLflow tracking server.

In [None]:
# Find an active MLflow server in the account
tracking_servers = [s['TrackingServerArn'] for s 
                    in sm_client.list_mlflow_tracking_servers()['TrackingServerSummaries']
                    if s['IsActive'] == 'Active']

if len(tracking_servers) < 1:
    print("You don't have any active MLflow servers. Trying to find a server in the status 'Creating'...")

    r = sm_client.list_mlflow_tracking_servers(
        TrackingServerStatus='Creating',
    )['TrackingServerSummaries']

    if len(r) < 1:
        print("You don't have any MLflow server in the status 'Creating'. Run the next code cell to create a new one.")
        mlflow_server_arn = None
        mlflow_name = None
    else:
        mlflow_server_arn = r[0]['TrackingServerArn']
        mlflow_name = r[0]['TrackingServerName']
        print(f"You have an MLflow server {mlflow_server_arn} in the status 'Creating', going to use this one")
else:
    mlflow_server_arn = tracking_servers[0]
    mlflow_name = tracking_servers[0].split('/')[1]
    print(f"You have {len(tracking_servers)} running MLflow server(s). Get the first server ARN:{mlflow_server_arn}")

mlflow_experiment_name = "sm-immersion-day-experiment"

## Data Preparation

Let's download the save the Iris dataset and save them in the `./data` folder

In [None]:
os.makedirs("./data", exist_ok=True)

s3_client = boto3.client("s3")
s3_client.download_file(
    f"sagemaker-example-files-prod-{region}", "datasets/tabular/iris/iris.data", "./data/iris.csv"
)

df_iris = pd.read_csv("./data/iris.csv", header=None)
df_iris[4] = df_iris[4].map({"Iris-setosa": 0, "Iris-versicolor": 1, "Iris-virginica": 2})
iris = df_iris[[4, 0, 1, 2, 3]].to_numpy()
np.savetxt("./data/iris.csv", iris, delimiter=",", fmt="%1.1f, %1.3f, %1.3f, %1.3f, %1.3f")


In [None]:
# S3 prefix for the training dataset to be uploaded to
prefix = "DEMO-scikit-iris"

WORK_DIRECTORY = "data"

train_input = sagemaker_session.upload_data(
    WORK_DIRECTORY, key_prefix="{}/{}".format(prefix, WORK_DIRECTORY)
)

Prepare the folder for the training code

In [None]:
!mkdir -p training_code

store the training data in a Pandas DataFrame

In [None]:
train_data = pd.read_csv('./data/iris.csv', header=None, engine="python")

## Remote function execution

First execute the training function locally. Please note the `@remote` decorator commented out on top of the function definition. WHen defined like below, this is just a regular python function that can be executed on your local run-time environment.
We set the `MLFLOW_TRACKING_URI` ENV variable to the `mlflow_server_arn`, so the client will log to the remote MLflow Tracking server.
Setting the `LOGNAME` ENV variable on the other end, will make it easier to identify the user that is logging a run.

In [None]:
os.environ['MLFLOW_TRACKING_URI'] = mlflow_server_arn
os.environ["LOGNAME"] = user_profile_name
os.environ["MLFLOW_EXPERIMENT_NAME"] = mlflow_experiment_name

# define a local function
# @remote
def train(train_data, max_leaf_nodes, run_name='Training-local-function-execution'):
    import mlflow
    from mlflow.models import infer_signature
    from sklearn import tree
    import pandas as pd

    # Enable autologging in MLflow for SKlearn
    mlflow.sklearn.autolog()

    with mlflow.start_run(run_name=run_name) as run:
        # labels are in the first column
        train_y = train_data.iloc[:, 0]
        train_X = train_data.iloc[:, 1:]

        # Here we support a single hyperparameter, 'max_leaf_nodes'. Note that you can add as many
        # as your training my require in the ArgumentParser above.

        # Now use scikit-learn's decision tree classifier to train the model.
        clf = tree.DecisionTreeClassifier(max_leaf_nodes=max_leaf_nodes)
        clf = clf.fit(train_X, train_y)

        predictions = clf.predict(train_X)
        signature = infer_signature(train_X, predictions)

        mlflow.set_tags(
            {
                'mlflow.source.name': "def train(...)",
                'mlflow.source.type': 'LOCAL',
            }
        )

        mlflow.sklearn.log_model(clf, "model", signature=signature)

In [None]:
train(train_data, 5)

Let's now verify the details of the run logged to SageMaker AI Managed MLflow.

In [None]:
import mlflow

experiment_id = mlflow.get_experiment_by_name(mlflow_experiment_name).experiment_id
# get the last run in MLflow
last_run_id = mlflow.search_runs(
    experiment_ids=[experiment_id], 
    max_results=1, 
    order_by=["attributes.start_time DESC"]
)['run_id'][0]

# get the presigned url to open the MLflow UI
presigned_url = sm_client.create_presigned_mlflow_tracking_server_url(
    TrackingServerName=mlflow_name,
    ExpiresInSeconds=60,
    SessionExpirationDurationInSeconds=1800
)['AuthorizedUrl']

mlflow_run_link = f"{presigned_url.split('/auth')[0]}/#/experiments/{experiment_id}/runs/{last_run_id}"

We first need to open the presigned URL

In [None]:
# first open the MLflow UI - you can close a new opened window
display(Javascript('window.open("{}");'.format(presigned_url)))

And then we can open the details of the last logged run

In [None]:
display(Javascript('window.open("{}");'.format(mlflow_run_link)))

Now lets get ready to execute this function as a SageMaker Training job in the managed infrastructure.
We first define the dependencies in a `requirements.txt` file.

In [None]:
%%writefile training_code/requirements.txt
mlflow==2.22.1
sagemaker-mlflow==0.1.0

We then prepare a `config.yml` file that holds the configurations we want for the training job.
Full details about which options can be configured for the `@remote` decorator can be found in the [official documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/train-remote-decorator-config.html).
In this case, note how we are also passing the `MLFLOW_TRACKING_URI` ENV variable so to avoid to always set the tracking server uri, and the `user_profile_name` as the `LOGNAME` to keep track of who has created what.

In [None]:
config_yaml = f"""
SchemaVersion: '1.0'
SageMaker:
  PythonSDK:
    Modules:
      TelemetryOptOut: true
      RemoteFunction:
        # role arn is not required if in SageMaker Notebook instance or SageMaker Studio
        # Uncomment the following line and replace with the right execution role if in a local IDE
        # RoleArn: <replace the role arn here>
        InstanceType: ml.m5.xlarge
        EnvironmentVariables: {{'MLFLOW_TRACKING_URI': {mlflow_server_arn}, 'LOGNAME': {user_profile_name}, 'MLFLOW_EXPERIMENT_NAME': {mlflow_experiment_name}}}
        Dependencies: ./training_code/requirements.txt
        IncludeLocalWorkDir: false
        CustomFileFilter:
          IgnoreNamePatterns:
          - "data/*"
          - "models/*"
          - "*.ipynb"
          - "__pycache__"

"""

print(config_yaml, file=open('config.yaml', 'w'))
print(config_yaml)

In [None]:
import os

# Use the current working directory as the location for SageMaker Python SDK config file
os.environ["SAGEMAKER_USER_CONFIG_OVERRIDE"] = os.getcwd()

In [None]:
from sagemaker.remote_function import remote

# define a local function
@remote
def train(train_data, max_leaf_nodes, run_name='Training-remote-function-execution'):
    import mlflow
    from mlflow.models import infer_signature
    from sklearn import tree
    import pandas as pd

    # Enable autologging in MLflow for SKlearn
    mlflow.sklearn.autolog()

    with mlflow.start_run(run_name=run_name) as run:
        # labels are in the first column
        train_y = train_data.iloc[:, 0]
        train_X = train_data.iloc[:, 1:]

        # Now use scikit-learn's decision tree classifier to train the model.
        clf = tree.DecisionTreeClassifier(max_leaf_nodes=max_leaf_nodes)
        clf = clf.fit(train_X, train_y)

        predictions = clf.predict(train_X)
        signature = infer_signature(train_X, predictions)

        mlflow.set_tags(
            {
                'mlflow.source.name': "@remote\ndef train(...)",
                'mlflow.source.type': 'JOB',
            }
        )

        mlflow.sklearn.log_model(clf, "model", signature=signature)

In [None]:
train_data = pd.read_csv('./data/iris.csv', header=None, engine="python")

Now the execution of the `train` function will run in the cloud and the SageMaker SDK will take care of serializing/deserializing and marshalling/unmarshalling the data/variables.
All relevant files will be packaged and made available to the training job in the way SageMaker expect it to find them.

In [None]:
train(train_data, 2)

In [None]:
# get the last run in MLflow
last_run_id = mlflow.search_runs(
    experiment_ids=[experiment_id], 
    max_results=1, 
    order_by=["attributes.start_time DESC"]
)['run_id'][0]

mlflow_run_link = f"{presigned_url.split('/auth')[0]}/#/experiments/{experiment_id}/runs/{last_run_id}"

In [None]:
display(Javascript('window.open("{}");'.format(mlflow_run_link)))

## Running SageMaker training job in local mode

We also have the possibility to use SageMaker in script mode using our own managed containers and just pass a script.
Let first make sure all dependencies have already been installed. First, lets check that Docker has been enabled on the SageMaker AI Studio Domain

In [None]:
# check that docker enabled in the SageMaker domain
docker_settings = sm_client.describe_domain(DomainId=domain_id)['DomainSettings'].get('DockerSettings')
docker_enabled = False

if docker_settings:
    if docker_settings.get('EnableDockerAccess') in ['ENABLED']:
        print(f"The docker access is ENABLED in the domain {domain_id}")
        docker_enabled = True

if not docker_enabled:
    raise Exception(f"You must enable docker access in the domain to use Studio local mode")

and then that `docker` has been installed.

In [None]:
%%bash

# see https://docs.docker.com/engine/install/ubuntu/#install-using-the-repository
sudo apt-get update
sudo apt-get install -y ca-certificates curl
sudo install -m 0755 -d /etc/apt/keyrings
sudo curl -fsSL https://download.docker.com/linux/ubuntu/gpg -o /etc/apt/keyrings/docker.asc
sudo chmod a+r /etc/apt/keyrings/docker.asc

# Add the repository to Apt sources:
echo \
  "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/ubuntu \
  $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \
  sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
sudo apt-get update

## Currently only Docker version 20.10.X is supported in Studio: see https://docs.aws.amazon.com/sagemaker/latest/dg/studio-updated-local.html
# pick the latest patch from:
# apt-cache madison docker-ce | awk '{ print $3 }' | grep -i 20.10
VERSION_STRING=5:20.10.24~3-0~ubuntu-jammy
sudo apt-get install docker-ce-cli=$VERSION_STRING docker-compose-plugin -y

# validate the Docker Client is able to access Docker Server at [unix:///docker/proxy.sock]
docker version


Lets us now write a `train.py` script to process the training data.

In [None]:
%%writefile training_code/train.py

from __future__ import print_function

import argparse
import os
import pandas as pd

from sklearn import tree

import mlflow
from mlflow.models import infer_signature

mode = os.environ.get("MODE")
if mode is None:
    run_name = "Training"
else:
    run_name = "Local-Training"

if __name__ == '__main__':
    parser = argparse.ArgumentParser()

    # Hyperparameters are described here. In this simple example we are just including one hyperparameter.
    parser.add_argument('--max_leaf_nodes', type=int, default=-1)

    # Sagemaker specific arguments. Defaults are set in the environment variables.
    parser.add_argument('--output-data-dir', type=str, default=os.environ['SM_OUTPUT_DATA_DIR'])
    parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
    parser.add_argument('--train', type=str, default=os.environ['SM_CHANNEL_TRAIN'])

    args = parser.parse_args()

    # Take the set of files and read them all into a single pandas dataframe
    input_files = [ os.path.join(args.train, file) for file in os.listdir(args.train) if os.path.isfile(os.path.join(args.train, file))]
    if len(input_files) == 0:
        raise ValueError(('There are no files in {}.\n' +
                          'This usually indicates that the channel ({}) was incorrectly specified,\n' +
                          'the data specification in S3 was incorrectly specified or the role specified\n' +
                          'does not have permission to access the data.').format(args.train, "train"))
    raw_data = [ pd.read_csv(file, header=None, engine="python") for file in input_files ]
    train_data = pd.concat(raw_data)
    
    # Enable autologging in MLflow for SKlearn
    mlflow.sklearn.autolog()

    with mlflow.start_run(run_name=run_name) as run:
        # labels are in the first column
        train_y = train_data.iloc[:, 0]
        train_X = train_data.iloc[:, 1:]
    
        # Here we support a single hyperparameter, 'max_leaf_nodes'. Note that you can add as many
        # as your training my require in the ArgumentParser above.
        max_leaf_nodes = args.max_leaf_nodes
    
        # Now use scikit-learn's decision tree classifier to train the model.
        clf = tree.DecisionTreeClassifier(max_leaf_nodes=max_leaf_nodes)
        clf = clf.fit(train_X, train_y)
    
        predictions = clf.predict(train_X)
        signature = infer_signature(train_X, predictions)

        mlflow.set_tags(
            {
                'mlflow.source.name': "training_code/train.py",
                'mlflow.source.type': 'JOB',
            }
        )
    
        mlflow.sklearn.log_model(clf, "model", signature=signature)


## SageMaker Local Model

Run in local mode and log to the MLflow tracking server

In [None]:
from sagemaker.local import LocalSession

LOCAL_SESSION = LocalSession()
LOCAL_SESSION.config = {'local': {'local_code': True}}  # Ensure full code locality, see: https://sagemaker.readthedocs.io/en/stable/overview.html#local-mode


sklearn_local = SKLearn(
    entry_point="train.py",
    source_dir="training_code",
    framework_version="1.2-1",
    instance_type="ml.c5.xlarge",
    role=role,
    sagemaker_session=LOCAL_SESSION,
    hyperparameters={"max_leaf_nodes": 30},
    keep_alive_period_in_seconds=3600,
    environment={
        "MLFLOW_TRACKING_URI": mlflow_server_arn,
        "MODE": "local-mode",
        "LOGNAME": user_profile_name,
        "MLFLOW_EXPERIMENT_NAME": mlflow_experiment_name
    },
)

sklearn_local.fit({"train": train_input})

In [None]:
experiment_id = mlflow.get_experiment_by_name(mlflow_experiment_name).experiment_id
# get the last run in MLflow
last_run_id = mlflow.search_runs(
    experiment_ids=[experiment_id], 
    max_results=1, 
    order_by=["attributes.start_time DESC"]
)['run_id'][0]


mlflow_run_link = f"{presigned_url.split('/auth')[0]}/#/experiments/{experiment_id}/runs/{last_run_id}"

In [None]:
display(Javascript('window.open("{}");'.format(mlflow_run_link)))

Run in the managed infrastructure mode and log to the MLflow tracking server

In [None]:
sklearn = SKLearn(
    entry_point="train.py",
    source_dir="training_code",
    framework_version="1.2-1",
    instance_type="ml.c5.xlarge",
    role=role,
    sagemaker_session=sagemaker_session,
    hyperparameters={"max_leaf_nodes": 30},
    keep_alive_period_in_seconds=3600,
    environment={
        "MLFLOW_TRACKING_URI": mlflow_server_arn,
        "LOGNAME": user_profile_name,
        "MLFLOW_EXPERIMENT_NAME": mlflow_experiment_name
    },
)

sklearn.fit({"train": train_input})

In [None]:
# get the last run in MLflow
last_run_id = mlflow.search_runs(
    experiment_ids=[experiment_id], 
    max_results=1, 
    order_by=["attributes.start_time DESC"]
)['run_id'][0]

mlflow_run_link = f"{presigned_url.split('/auth')[0]}/#/experiments/{experiment_id}/runs/{last_run_id}"

In [None]:
display(Javascript('window.open("{}");'.format(mlflow_run_link)))

## Registering a MLflow model 

In [None]:
registered_model_name = "sm-immersion-day-model"

# construct the model URI
model_uri = f"runs:/{last_run_id}/model"

# register the model
registered_model_version = mlflow.register_model(model_uri, registered_model_name)

In [None]:
# get SageMaker model registry data for this model version
model_package_group_name = sm_client.list_model_package_groups(NameContains=registered_model_name)['ModelPackageGroupSummaryList'][0]['ModelPackageGroupName']
sm_model_package = sm_client.list_model_packages(
        ModelPackageGroupName=model_package_group_name,
        SortBy="CreationTime",
        SortOrder="Descending",
    )['ModelPackageSummaryList'][0]



In [None]:
sm_model_package

In [None]:
model_approval_status = 'PendingManualApproval'

# update SageMaker model version with mlflow cross-reference
sm_client.update_model_package(
        ModelPackageArn=sm_model_package['ModelPackageArn'],
        ModelApprovalStatus=model_approval_status,
        ApprovalDescription="created a new model version",
        CustomerMetadataProperties={
            "mlflow_model_name": registered_model_version.name,
            "mlflow_model_uri": model_uri,
            "mlflow_experiment_name": mlflow_experiment_name,
        },
)


In [None]:


# Show the model registry link
display(
    HTML('<b>See <a target="top" href="https://studio-{}.studio.{}.sagemaker.aws/models/registered-models/{}/versions">the model package group</a> in the Studio UI</b>'.format(
            domain_id, region, model_package_group_name))
)

