# 03 - MLOps

As you move from running individual AI/ML projects to using AI/ML to transform your business at scale, the discipline of ML Operations (MLOps) can help. MLOps refers to a methodology that is built on applying DevOps practices to machine learning workloads. MLOps focuses on the intersection of data science and data engineering in combination with existing DevOps practices to streamline model delivery across the machine learning development lifecycle. ML model building requires many iterations of training as you tune the algorithm, model architecture, and parameters to achieve high prediction accuracy. 

[Amazon SageMaker AI with MLflow](https://docs.aws.amazon.com/sagemaker/latest/dg/mlflow.html) helps you track, organize, view, analyze, and compare iterative ML experimentation to gain comparative insights and register and deploy your best performing models. You can compare model performance, parameters, and metrics across experiments in the MLflow UI, keep track of your best models in the MLflow Model Registry, automatically register them as a SageMaker AI model, and deploy registered models to SageMaker AI endpoints.

This notebook demonstrates how you can use SageMaker AI with MLflow to perform the following:
1. Use an MLflow Tracking server to run experiments
2. Process data on a SageMaker managed cluster using SageMaker Training jobs
3. Train an XGBoost model in a SageMaker managed cluster using SageMaker Training jobs
4. Save the model to the SageMaker model registry
5. Deploy your model for inference

## 1. Set up environment

Restore variables from the `00_setup` notebook.

In [None]:
%store -r train_data_path test_data_path
%store -r bucket_name model_prefix model_artifact
%store -r role

Import the necessary libraries and set up our environment:

In [None]:
import boto3
import mlflow
import pandas as pd
import os
import sagemaker
from time import gmtime, strftime
from sagemaker import get_execution_role
import json

bucket_name = sagemaker.Session().default_bucket()
prefix = "mlflow-credit-risk"

sagemaker_client = boto3.client("sagemaker")

s3_root_folder = f"s3://{bucket_name}/{prefix}"
sess = sagemaker.Session()

role = get_execution_role(sess)
print (f"Your Amazon SageMaker Execution role is: {role}")

## 2. Locate MLflow Tracking server

Amazon SageMaker AI with MLflow is a capability of Amazon SageMaker AI that lets you create, manage, analyze, and compare your machine learning experiments. We need to set up an [MLflow tracking server](https://docs.aws.amazon.com/sagemaker/latest/dg/mlflow-create-tracking-server.html) to track our machine learning (ML) experiments with this capability. An MLflow Tracking Server is a stand-alone HTTP server that serves multiple REST API endpoints for tracking runs and experiments. 

To set up and manage an MLflow tracking server, as well as work with managed MLflow experiments, we need to add MLflow permissions to the SageMaker execution role. 

### Add MLflow permissions to SageMaker Execution Role


<div class="alert alert-block alert-warning">
<b>Important:</b> Skip this step if you are at an AWS event and using temporary AWS accounts provided by the event organisers.</div>

- On AWS Console, navigate to IAM, click on Roles on the left hand menu
- Type in AmazonSageMaker-ExecutionRole and select the role that matches the SageMaker Execution Role Name retrievd in the previous step
- Click "Add permissions" then choose "Create inline policy"
- Switch to JSON editor and paste the following json
- Add a name for the policy like, "MLFlow Permissions" and follow the next steps to complete creating it. 

```json
{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Sid": "VisualEditor0",
			"Effect": "Allow",
			"Action": [
				"sagemaker:DeleteMlflowTrackingServer",
				"sagemaker:StartMlflowTrackingServer",
				"sagemaker:CreatePresignedMlflowTrackingServerUrl",
				"sagemaker:UpdateMlflowTrackingServer",
				"sagemaker:CreateMlflowTrackingServer",
				"sagemaker:StopMlflowTrackingServer"
			],
			"Resource": "*"
		},
		{
			"Sid": "VisualEditor1",
			"Effect": "Allow",
			"Action": [
				"sagemaker-mlflow:*"
			],
			"Resource": "*"
		}
	]
}
```

The following code reads the SageMaker Studio resource metadata file to identify which domain the notebook is running in. The metadata file at /opt/ml/metadata/resource-metadata.json contains important information about the Studio environment, including the domain identifier. We will use the domain identifier to set-up the MLFlow server

In [None]:
NOTEBOOK_METADATA_FILE = "/opt/ml/metadata/resource-metadata.json"
domain_id = 'default'
if os.path.exists(NOTEBOOK_METADATA_FILE):
    with open(NOTEBOOK_METADATA_FILE, "rb") as f:
        metadata = json.loads(f.read())
        domain_id = metadata.get('DomainId')
        space_name = metadata.get('SpaceName')
        print(f"SageMaker domain id: {domain_id}")

Next we want to set-up an MLFlow server to track our experiments. We first look for existing servers in "Created" or "Creating" states, prioritizing the most recently created ones. If no suitable server is found, we provision a new MLflow tracking server.

In [None]:
def get_running_mlflow_server(sagemaker_client, status_filter=['Created', 'Creating']):
    for status in status_filter:
        servers = sagemaker_client.list_mlflow_tracking_servers(TrackingServerStatus=status, SortBy='CreationTime', SortOrder='Descending')['TrackingServerSummaries']
        if servers:
            for server in servers:
                print(f"Found an MLflow server {server['TrackingServerArn']} in the status '{status}'.")
                return server['TrackingServerArn'], server['TrackingServerName']
    print("No MLflow servers found.")
    return None, None

def create_mlflow_server(sagemaker_client, bucket_name, sm_role, domain_id):
    """
    Creates a new MLflow server and returns its ARN and name.
    """
    timestamp = strftime('%d-%H-%M-%S', gmtime())
    mlflow_name = f"mlflow-{domain_id}-{timestamp}"
    response = sagemaker_client.create_mlflow_tracking_server(
        TrackingServerName=mlflow_name,
        ArtifactStoreUri=f"s3://{bucket_name}/mlflow/{timestamp}",
        RoleArn=sm_role,
        AutomaticModelRegistration=True,
    )

    mlflow_arn = response['TrackingServerArn']
    print(f"Server creation request succeeded. The server {mlflow_arn} is being created.")
    return mlflow_arn, mlflow_name


In [None]:

# Get a running MLflow server 
mlflow_arn, mlflow_name = get_running_mlflow_server(sagemaker_client)
print(f"Using server {mlflow_name}")

If you are running this workshop at an AWS event, an MLflow tracking server should already be available. If no tracking server was found in the last step, the following cell with create it for you. Note this can take several minutes.

In [None]:
# Create a new one if none exists

if not mlflow_arn:
    mlflow_arn, mlflow_name = create_mlflow_server(sagemaker_client, bucket_name, role, domain_id)
print(f"Using server {mlflow_name}")

## 3. Prepare the data

The code was adapted from this repository https://github.com/aws-samples/amazon-sagemaker-credit-risk-prediction-explainability-bias-detection/tree/main

In this step, we'll load our training data from the CSV file that was created during the set-up phase. This file contains the features and target variable that we will use to train our XGBoost model for credit risk prediction. We perform some basic pre-processing and upload the training and test files to Amazon S3.

In [None]:
credit_columns = [
    "status",
    "duration",
    "credit_history",
    "purpose",
    "amount",
    "savings",
    "employment_duration",
    "installment_rate",
    "personal_status_sex",
    "other_debtors",
    "present_residence",
    "property",
    "age",
    "other_installment_plans",
    "housing",
    "number_credits",
    "job",
    "people_liable",
    "telephone",
    "foreign_worker",
    "credit_risk",
]

In [None]:
training_data = pd.read_csv(
    "data/SouthGermanCredit.asc",
    names=credit_columns,
    header=0,
    sep=r" ",
    engine="python",
    na_values="?",
).dropna()

In [None]:
test_data = training_data.sample(frac=0.1, random_state=42)
test_data = test_data.drop("credit_risk", axis=1)
test_columns = [
    "status",
    "duration",
    "credit_history",
    "purpose",
    "amount",
    "savings",
    "employment_duration",
    "installment_rate",
    "personal_status_sex",
    "other_debtors",
    "present_residence",
    "property",
    "age",
    "other_installment_plans",
    "housing",
    "number_credits",
    "job",
    "people_liable",
    "telephone",
    "foreign_worker",
]

training_data.to_csv("train.csv", index=False, header=True, columns=credit_columns)
test_data.to_csv("test.csv", index=False, header=True, columns=test_columns)

# save the datasets in S3 for future use
train_s3_url = sagemaker.Session().upload_data(
    path="train.csv",
    bucket=bucket_name,
    key_prefix=f"{prefix}/input"
)
print(f"Upload the dataset to {train_s3_url}")

test_s3_url = sagemaker.Session().upload_data(
    path="test.csv",
    bucket=bucket_name,
    key_prefix=f"{prefix}/input"
)
print(f"Upload the dataset to {test_s3_url}")


## 4. Data Preprocessing with SageMaker Remote Functions and MLflow

Next, we define a ***process*** method that prepares our credit risk data for machine learning. This method does the following to get our data ready for XGBoost:
- It performs one-hot encoding of categorical features and splits data into training and validation sets
- It logs parameters and artifacts to MLflow, and saves the preprocessed datasets
- The featurizer model is also serialized and stored as a model artifact, enabling consistent feature transformation during inference. 

Note the use of the **@remote** decorator. This decorator starts a training job that allows the preprocessing function to execute on a separate SageMaker instance (ml.m5.large), offloading resource-intensive operations from the notebook environment. This approach combines the scalability of SageMaker with the experiment tracking capabilities of MLflow to create a reproducible and well-documented machine learning workflow.

In [None]:
from time import gmtime, strftime, sleep

experiment_suffix = strftime('%d-%H-%M-%S', gmtime())
registered_model_name = f"credit-risk-model-{experiment_suffix}"
experiment_name = f"credit-risk-model-experiment-{experiment_suffix}"
print(experiment_name)

In [None]:
mlflow_arn

In [None]:
import warnings
import pandas as pd
import numpy as np
import tarfile
import sklearn
import joblib
import mlflow
from sagemaker.s3 import S3Uploader
import os

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
from sklearn.compose import make_column_transformer

from sklearn.exceptions import DataConversionWarning
from sagemaker.remote_function import remote


@remote(s3_root_uri=f"s3://{bucket_name}/{prefix}", dependencies=f"requirements.txt", instance_type="ml.m5.large")
def preprocess(df, experiment_name, mlflow_arn, bucket_name, prefix, run_id=None):
    """
    Preprocess the input data and split it into training and validation sets.

    Args:
        df (pandas.DataFrame): Input data.
        experiment_name (str): Name of the MLflow experiment.
        run_id (str, optional): MLflow run ID. If not provided, a new run will be created.
        mlflow_arn (str, optional): MLflow tracking URI.
        s3_root_folder (str, optional): S3 root folder for remote execution.

    Returns:
        tuple: A tuple containing the training and validation features and labels.
    """
    try:
        mlflow.set_tracking_uri(mlflow_arn)
        suffix = strftime('%d-%H-%M-%S', gmtime())
        mlflow.set_experiment(experiment_name=experiment_name if experiment_name else f"credit-risk-model-experiment-{suffix}")
        run = mlflow.start_run(run_id=run_id) if run_id else mlflow.start_run(run_name=f"remote-processing-{suffix}", nested=True)

        output_path = "/opt/ml/output/data"
        os.makedirs(output_path, exist_ok=True)

        print("Reading input data")
        model_dataset = mlflow.data.from_pandas(df)
        mlflow.log_input(model_dataset, context="model_dataset")

        print("Performing one-hot encoding")
        categorical_cols = [
            "credit_history",
            "purpose",
            "personal_status_sex",
            "other_debtors",
            "property",
            "other_installment_plans",
            "housing",
            "job",
            "telephone",
            "foreign_worker",
        ]
        transformer = make_column_transformer(
            (OneHotEncoder(sparse_output=False), categorical_cols),
            remainder="passthrough",
        )

        print("Preparing features and labels")
        X = df.drop("credit_risk", axis=1)
        y = df["credit_risk"]

        print("Building scikit-learn transformer")
        featurizer_model = transformer.fit(X)
        features = featurizer_model.transform(X)
        labels = LabelEncoder().fit_transform(y)

        split_ratio = 0.3
        print(f"Splitting data into train and validation sets with ratio {split_ratio}")
        X_train, X_val, y_train, y_val = train_test_split(
            features, labels, test_size=split_ratio, random_state=0
        )

        print(f"Train features shape after preprocessing: {X_train.shape}")
        print(f"Validation features shape after preprocessing: {X_val.shape}")

        mlflow.log_params({"train_shape": X_train.shape, "val_shape": X_val.shape})

        train_features_path = os.path.join(output_path, "train_features.csv")
        print(f"Saving training features to {train_features_path}")
        pd.DataFrame(X_train).to_csv(train_features_path, header=False, index=False)

        train_labels_path = os.path.join(output_path, "train_labels.csv")
        print(f"Saving training labels to {train_labels_path}")
        pd.DataFrame(y_train).to_csv(train_labels_path, header=False, index=False)

        val_features_path = os.path.join(output_path, "val_features.csv")
        print(f"Saving validation features to {val_features_path}")
        pd.DataFrame(X_val).to_csv(val_features_path, header=False, index=False)

        val_labels_path = os.path.join(output_path, "val_labels.csv")
        print(f"Saving validation labels to {val_labels_path}")
        pd.DataFrame(y_val).to_csv(val_labels_path, header=False, index=False)

        model_dir = "/opt/ml/model"
        os.makedirs(model_dir, exist_ok=True)
        model_path = os.path.join(model_dir, "model.joblib")
        model_output_path = os.path.join(model_dir, "model.tar.gz")

        print(f"Saving featurizer model to {model_output_path}")
        joblib.dump(featurizer_model, model_path)
        with tarfile.open(model_output_path, "w:gz") as tar:
            tar.add(model_path, arcname="model.joblib")

        logged_model = mlflow.sklearn.log_model(
            sk_model=featurizer_model,
            artifact_path="processing/model",
            registered_model_name="sk-learn-model",
        )
        return X_train, X_val, y_train, y_val, logged_model

    except Exception as e:
        print(f"Exception in processing script: {e}")
        raise e
    finally:
        mlflow.end_run()

In [None]:
df = pd.read_csv("input/train_data.csv", names=None, header=0, sep=",")
X_train, X_val, y_train, y_val, logged_featurizer_model = preprocess(df, experiment_name, mlflow_arn, bucket_name, prefix)
print(logged_featurizer_model)

In [None]:
logged_featurizer_model.model_uri

## 5. Model training with SageMaker training jobs

In this step we define a function that trains an XGBoost model for credit risk prediction, saves it and registers it in MLflow for version control.  

In [None]:
import xgboost
from sklearn.metrics import roc_auc_score
import os
import mlflow
import tarfile

@remote(s3_root_uri=f"s3://{bucket_name}/{prefix}", dependencies=f"requirements.txt", instance_type="ml.m5.large")
def train(X, val_X, y, val_y, num_round, params, mlflow_arn, experiment_name,run_id=None):
    output_path = "/opt/ml/model"
    mlflow.set_tracking_uri(mlflow_arn)
    mlflow.autolog()
    
    suffix = strftime('%d-%H-%M-%S', gmtime())
    mlflow.set_experiment(experiment_name=experiment_name if experiment_name else f"credit-risk-model-experiment-{suffix}")
    run = mlflow.start_run(run_id=run_id) if run_id else mlflow.start_run(run_name=f"remote-training-{suffix}", nested=True)

    try:
        os.makedirs(output_path, exist_ok=True)
        print(f"Directory '{output_path}' created successfully.")
    except OSError as e:
        print(f"Error creating directory '{output_path}': {e}")
        
    dtrain = xgboost.DMatrix(X, label=y)
    dval = xgboost.DMatrix(val_X, label=val_y)

    dtrain = xgboost.DMatrix(X, label=y)
    dval = xgboost.DMatrix(val_X, label=val_y)

    watchlist = [(dtrain, "train"), (dval, "validation")]
    mlflow.log_params(params)

    print("Training the model")
    evaluation__results = {}
    bst = xgboost.train(
        params=params, dtrain=dtrain, evals=watchlist, num_boost_round=num_round
    )
    bst.save_model(output_path + "/model.ubj")

     # Compress the model.bin artifact to a tar file
    tar_filename = f"{output_path}/model.tar.gz"
    with tarfile.open(tar_filename, "w:gz") as tar:
        tar.add(f"{output_path}/model.bin", arcname="model.ubj")

    mlflow.log_artifact(local_path=tar_filename)

    logged_model = mlflow.xgboost.log_model(
        xgb_model=bst,
        artifact_path="model",
        registered_model_name="xgb-creditrisk-model"
    )  
    return logged_model


In [None]:
hyperparameters = {
    "max_depth": "5",
    "eta": "0.1",
    "gamma": "4",
    "min_child_weight": "6",
    "silent": "1",
    "objective": "binary:logistic",
    "num_round": "100",
    "subsample": "0.8",
    "eval_metric": "auc"
}
num_round = 50

logged_creditrisk_model = train(X_train, X_val, y_train, y_val,num_round, hyperparameters, mlflow_arn, experiment_name)

In [None]:
logged_creditrisk_model.model_uri

We will store the following variables that will be used in the optional challenge section.

In [None]:
%store mlflow_arn
%store experiment_name
%store logged_featurizer_model
%store logged_creditrisk_model

## 6. Register your candidate model directly with the SageMaker model registry (optional)

In the previous sections, the trained featurizer and credit risk models were registered in the MLflow model registry. The models were also automatically registered in the SageMaker model registry.

In this module you will see how you can also register models directly with the SageMaker Model Registry. This option can be used if you do not want to use MLFlow, and also supports some additional parameters.

In [None]:
mlflow.set_tracking_uri(mlflow_arn)

Next, we filter MLflow training runs to identify the best performing model using validation AUC score.

In [None]:
from mlflow.entities import ViewType

run_filter = f"""
attributes.run_name LIKE "%training%"
attributes.status = 'FINISHED'
"""

runs_with_filter = mlflow.search_runs(
    experiment_names=[experiment_name],
    run_view_type=ViewType.ACTIVE_ONLY,
    filter_string=run_filter,
    order_by=["metrics.`validation-auc` DESC"],
)
best_run = runs_with_filter[:1]

In [None]:
artifact_uri = best_run['artifact_uri'][0]
print(artifact_uri)

[SageMaker Model Registry](https://docs.aws.amazon.com/sagemaker/latest/dg/model-registry-models.html) is structured as several Model (Package) Groups with model packages in each group. A model package is the actual model that is registered into the Model Registry as a versioned entity. The Model Registry receives every new model that you retrain, gives it a version, and assigns it to a Model Group inside the Model Registry. The model 

In the next steps, we create a new model package and assign it to a group.

In [None]:
response = sagemaker_client.list_model_package_groups()
model_package_group_arn = response['ModelPackageGroupSummaryList'][0]['ModelPackageGroupArn']
print(model_package_group_arn)

In [None]:
print(f"{artifact_uri}/model/model.tar.gz")

In [None]:
import time

modelpackage_inference_specification =  {
    "InferenceSpecification": {
      "Containers": [
         {
            "Image": "public.ecr.aws/sagemaker/sagemaker-distribution:3.0.0-cpu",
    	    "ModelDataUrl": f"{artifact_uri}/model.tar.gz"
         }
      ],
      "SupportedContentTypes": [ "text/csv" ],
      "SupportedResponseMIMETypes": [ "text/csv" ],
   },
    "ModelPackageGroupName" : model_package_group_arn,
    "ModelPackageDescription" : "Model to detect credit risk",
    "ModelApprovalStatus" : "PendingManualApproval"
}

model_package_group_name = "model-group-" + str(round(time.time()))

create_model_package_input_dict = {
    "ModelPackageGroupName" : model_package_group_name,
    "ModelPackageDescription" : "Model to detect credit risk",
    "ModelApprovalStatus" : "PendingManualApproval"
}
create_model_package_input_dict.update(modelpackage_inference_specification)

create_model_package_response = sagemaker_client.create_model_package(**create_model_package_input_dict)
model_package_arn = create_model_package_response["ModelPackageArn"]
print('ModelPackage Version ARN : {}'.format(model_package_arn))

### Conclusion and Next Steps:
In this notebook, 
- You set up an MLflow tracking server
- Preprocessed data using SageMaker Remote Functions and MLflow
- Trained an XGBoost model for credit risk prediction
- Saved a model package to a Model Group in SageMaker Model Registry 

Now, if you're up for a challenge move on to the notebook 04_challenge!