# Use Case and Model  Life cycle Governance with SageMaker Model Registry resource sharing

## ML Flow Experimentation with Shared Model Group

### 0. Pre-requisites: Access Model Package Groups in Shared Services account

In [None]:
%pip install -U -r requirements.txt

# restart kernel
import IPython
IPython.Application.instance().kernel.do_shutdown(True) #automatically restarts kernel

Before you get started, check if there are any pending invitations from the shared services account 
and accept them. 
This will allow you to discover share model package groups and register your model versions against them.

In [None]:
# set required parameters used in cells below
# Hub / Shared Account ID
hub_account_id = 'AWS_ACCOUNT_ID'
# Region used in the Hub account deployments
hub_region = 'AWS_REGION'
# KMS Key id deployed in the Hub account
hub_kms_key_id = 'KMS_KEY_ID'
# S3 bucket name created in the Hub account (The template HubS3BucketName parameter value)
hub_s3_bucket_name = 'HubS3BucketName parameter value used in Cloudformation'

# derive KMS key ARN
hub_kms_key_arn = 'arn:aws:kms:{hub_region}:{hub_account_id}:key/{hub_kms_key_id}'.format(
    hub_account_id=hub_account_id,
    hub_region=hub_region,
    hub_kms_key_id=hub_kms_key_id
)

# set the s3 bucket full name
hub_s3_bucket = 'sagemaker-{hub_region}-{hub_account_id}-{hub_s3_bucket_name}'.format(
    hub_region=hub_region,
    hub_account_id=hub_account_id,
    hub_s3_bucket_name=hub_s3_bucket_name
)

In [None]:
import boto3
ram_client = boto3.client('ram')
response = ram_client.get_resource_share_invitations()
pending_invitations = []
# Review all pending invitations
for i in response['resourceShareInvitations']:
    if i['status'] == "PENDING":
        pending_invitations.append(i)
print(pending_invitations,sep='\n')

In [None]:
# Accept the resource share invitation from shared services account
if pending_invitations:
    response = ram_client.accept_resource_share_invitation(resourceShareInvitationArn=pending_invitations[0]['resourceShareInvitationArn'])
    print(response)

In [None]:
# update the shared model bucket name here from the shared services account
shared_model_bucket_name = hub_s3_bucket

from botocore.client import ClientError
import boto3
s3_client = boto3.resource('s3')
bucket = s3_client.Bucket(shared_model_bucket_name)
try:
    s3_client.meta.client.head_bucket(Bucket=shared_model_bucket_name)
    print("The bucket "+shared_model_bucket_name+" exists and you have access.")
except ClientError:
    print("The bucket "+shared_model_bucket_name+" does not exist or you have no access.")
    raise

In [None]:
# Specify your KMS key ID here
# Update the ARN of the KMS Key Id from Shared Services Account 
kms_key_id = hub_kms_key_arn # 'your-kms-key-id' 

kms_client = boto3.client('kms')
try:
    response = kms_client.describe_key(
        # An identifier for the KMS key. You can use the key ID, key ARN, alias name, alias ARN of the KMS key.
        KeyId=kms_key_id,
    )
    print("The KMS "+kms_key_id+" key exists and you have access.")
except ClientError:
    print("The KMS key "+kms_key_id+" does not exist or you have no access.")
    raise

### 1. Set-up

In [None]:
import boto3
import mlflow
import pandas as pd
import sagemaker
from sagemaker import get_execution_role

def get_domain_id():
    try:
        sagemaker_client = boto3.client('sagemaker')
        domains = sagemaker_client.list_domains()
        if domains['Domains']:
            return domains['Domains'][0]['DomainId']
        else:
            print("No SageMaker domains found.")
            return None
    except Exception as e:
        print(f"Error retrieving domain ID: {str(e)}")
        return None

# SageMaker session and bucket setup
sagemaker_session = sagemaker.Session()
bucket_name = sagemaker_session.default_bucket()
prefix = "mlflow-credit-risk"

sagemaker_client = boto3.client("sagemaker")

s3_root_folder = f"s3://{bucket_name}/{prefix}"

# Get domain ID programmatically
domain_id = get_domain_id()
if domain_id is None:
    print("Warning: Unable to retrieve domain ID. Some functionality may be limited.")

role = get_execution_role(sagemaker_session)

# Print configuration for verification
print(f"Using S3 bucket: {bucket_name}")
print(f"Shared model bucket: {shared_model_bucket_name}")
print(f"MLflow prefix: {prefix}")
print(f"SageMaker Domain ID: {domain_id}")
print(f"S3 root folder for MLflow: {s3_root_folder}")
print(f"IAM role: {role}")


In [None]:
def get_running_mlflow_server(sagemaker_client, status_filter=['Created', 'Creating', 'Started']):
    for status in status_filter:
        servers = sagemaker_client.list_mlflow_tracking_servers(TrackingServerStatus=status)['TrackingServerSummaries']
        if servers:
            server = servers[0]
            print(f"Found an MLflow server {server['TrackingServerArn']} in the status '{status}'.")
            return server['TrackingServerArn'], server['TrackingServerName']
    print("No MLflow servers found.")
    return None, None

def create_mlflow_server(sagemaker_client, bucket_name, sm_role, domain_id):
    """
    Creates a new MLflow server and returns its ARN and name.
    """
    from time import strftime, gmtime
    timestamp = strftime('%d-%H-%M-%S', gmtime())
    mlflow_name = f"mlflow-{domain_id}-{timestamp}"
    response = sagemaker_client.create_mlflow_tracking_server(
        TrackingServerName=mlflow_name,
        ArtifactStoreUri=f"s3://{bucket_name}/mlflow/{timestamp}",
        RoleArn=sm_role,
        AutomaticModelRegistration=True,
    )

    mlflow_arn = response['TrackingServerArn']
    print(f"Server creation request succeeded. The server {mlflow_arn} is being created.")
    return mlflow_arn, mlflow_name

# Get a running MLflow server or create a new one if none exists
mlflow_arn, mlflow_name = get_running_mlflow_server(sagemaker_client)
if not mlflow_arn:
    mlflow_arn, mlflow_name = create_mlflow_server(sagemaker_client, bucket_name, role, domain_id)

### 2. Prepare the data

The code was adapted from this repository https://github.com/aws-samples/amazon-sagemaker-credit-risk-prediction-explainability-bias-detection/tree/main

#### UCI Machine Learning Repository Data Usage Disclaimer

Before proceeding with any code execution or data download, please read and acknowledge the following:

#### Disclaimer

The following code and any datasets it may download or use adhere to the UCI Machine Learning Repository citation policy. This includes properly citing both the UCI Machine Learning Repository itself and any relevant papers associated with specific datasets. No modification or distribution of UCI datasets is permitted without proper authorization.

#### Confirmation

Please confirm that you have read and agree to these terms before proceeding

If agree continue to with copying url https://archive.ics.uci.edu/static/public/573/south+german+credit+update.zip and replace in the cell below

In [None]:
# download and extract the dataset
!mkdir -p data
!rm -rf data/*
!wget -N --no-check-certificate #replace-url-here
!unzip south+german+credit+update.zip -d data

In [None]:
credit_columns = [
    "status",
    "duration",
    "credit_history",
    "purpose",
    "amount",
    "savings",
    "employment_duration",
    "installment_rate",
    "personal_status_sex",
    "other_debtors",
    "present_residence",
    "property",
    "age",
    "other_installment_plans",
    "housing",
    "number_credits",
    "job",
    "people_liable",
    "telephone",
    "foreign_worker",
    "credit_risk",
]

In [None]:
training_data = pd.read_csv(
    "data/SouthGermanCredit.asc",
    names=credit_columns,
    header=0,
    sep=r" ",
    engine="python",
    na_values="?",
).dropna()

In [None]:
test_data = training_data.sample(frac=0.1, random_state=42)
test_data = test_data.drop("credit_risk", axis=1)
test_columns = [
    "status",
    "duration",
    "credit_history",
    "purpose",
    "amount",
    "savings",
    "employment_duration",
    "installment_rate",
    "personal_status_sex",
    "other_debtors",
    "present_residence",
    "property",
    "age",
    "other_installment_plans",
    "housing",
    "number_credits",
    "job",
    "people_liable",
    "telephone",
    "foreign_worker",
]

training_data.to_csv("train.csv", index=False, header=True, columns=credit_columns)
test_data.to_csv("test.csv", index=False, header=True, columns=test_columns)

# save the datasets in S3 for future use
train_s3_url = sagemaker.Session().upload_data(
    path="train.csv",
    bucket=bucket_name,
    key_prefix=f"{prefix}/input"
)
print(f"Upload the dataset to {train_s3_url}")

test_s3_url = sagemaker.Session().upload_data(
    path="test.csv",
    bucket=bucket_name,
    key_prefix=f"{prefix}/input"
)
print(f"Upload the dataset to {test_s3_url}")


### 3. Process the data with Amazon SageMaker

In [None]:
from time import gmtime, strftime, sleep

experiment_suffix = strftime('%d-%H-%M-%S', gmtime())
registered_model_name = f"credit-risk-model-{experiment_suffix}"
experiment_name = f"credit-risk-model-experiment-{experiment_suffix}"
print(experiment_name)

In [None]:
import warnings
import pandas as pd
import numpy as np
import tarfile
import sklearn
import joblib
import mlflow
from sagemaker.s3 import S3Uploader
import os

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
from sklearn.compose import make_column_transformer

from sklearn.exceptions import DataConversionWarning
from sagemaker.remote_function import remote


# use spot instances
# @remote(s3_root_uri=f"s3://{bucket_name}/{prefix}", dependencies=f"requirements.txt", instance_type="ml.m5.large", use_spot_instances=True, max_wait_time_in_seconds=3600, max_runtime_in_seconds=3600)
# use ondemand instances
@remote(s3_root_uri=f"s3://{bucket_name}/{prefix}", dependencies=f"requirements.txt", instance_type="ml.m5.large")
def preprocess(df, experiment_name, mlflow_arn, bucket_name, prefix, run_id=None):
    """
    Preprocess the input data and split it into training and validation sets.

    Args:
        df (pandas.DataFrame): Input data.
        experiment_name (str): Name of the MLflow experiment.
        run_id (str, optional): MLflow run ID. If not provided, a new run will be created.
        mlflow_arn (str, optional): MLflow tracking URI.
        s3_root_folder (str, optional): S3 root folder for remote execution.

    Returns:
        tuple: A tuple containing the training and validation features and labels.
    """
    try:
        mlflow.set_tracking_uri(mlflow_arn)
        suffix = strftime('%d-%H-%M-%S', gmtime())
        mlflow.set_experiment(experiment_name=experiment_name if experiment_name else f"credit-risk-model-experiment-{suffix}")
        run = mlflow.start_run(run_id=run_id) if run_id else mlflow.start_run(run_name=f"remote-processing-{suffix}", nested=True)

        output_path = "/opt/ml/output/data"
        os.makedirs(output_path, exist_ok=True)

        print("Reading input data")
        model_dataset = mlflow.data.from_pandas(df)
        mlflow.log_input(model_dataset, context="model_dataset")

        print("Performing one-hot encoding")
        categorical_cols = [
            "credit_history",
            "purpose",
            "personal_status_sex",
            "other_debtors",
            "property",
            "other_installment_plans",
            "housing",
            "job",
            "telephone",
            "foreign_worker",
        ]
        transformer = make_column_transformer(
            (OneHotEncoder(sparse_output=False), categorical_cols),
            remainder="passthrough",
        )

        print("Preparing features and labels")
        X = df.drop("credit_risk", axis=1)
        y = df["credit_risk"]

        print("Building scikit-learn transformer")
        featurizer_model = transformer.fit(X)
        features = featurizer_model.transform(X)
        labels = LabelEncoder().fit_transform(y)

        split_ratio = 0.3
        print(f"Splitting data into train and validation sets with ratio {split_ratio}")
        X_train, X_val, y_train, y_val = train_test_split(
            features, labels, test_size=split_ratio, random_state=0
        )

        print(f"Train features shape after preprocessing: {X_train.shape}")
        print(f"Validation features shape after preprocessing: {X_val.shape}")

        mlflow.log_params({"train_shape": X_train.shape, "val_shape": X_val.shape})

        train_features_path = os.path.join(output_path, "train_features.csv")
        print(f"Saving training features to {train_features_path}")
        pd.DataFrame(X_train).to_csv(train_features_path, header=False, index=False)

        train_labels_path = os.path.join(output_path, "train_labels.csv")
        print(f"Saving training labels to {train_labels_path}")
        pd.DataFrame(y_train).to_csv(train_labels_path, header=False, index=False)

        val_features_path = os.path.join(output_path, "val_features.csv")
        print(f"Saving validation features to {val_features_path}")
        pd.DataFrame(X_val).to_csv(val_features_path, header=False, index=False)

        val_labels_path = os.path.join(output_path, "val_labels.csv")
        print(f"Saving validation labels to {val_labels_path}")
        pd.DataFrame(y_val).to_csv(val_labels_path, header=False, index=False)

        model_dir = "/opt/ml/model"
        os.makedirs(model_dir, exist_ok=True)
        model_path = os.path.join(model_dir, "model.joblib")
        model_output_path = os.path.join(model_dir, "model.tar.gz")

        print(f"Saving featurizer model to {model_output_path}")
        joblib.dump(featurizer_model, model_path)
        with tarfile.open(model_output_path, "w:gz") as tar:
            tar.add(model_path, arcname="model.joblib")

        mlflow.sklearn.log_model(
            sk_model=featurizer_model,
            artifact_path="processing/model",
            registered_model_name="sk-learn-model",
        )  
        return X_train, X_val, y_train, y_val
        
    except Exception as e:
        print(f"Exception in processing script: {e}")
        raise e
    finally:
        mlflow.end_run()

In [None]:
df = pd.read_csv("train.csv", names=None, header=0, sep=",")
X_train, X_val, y_train, y_val = preprocess(df, experiment_name, mlflow_arn, bucket_name, prefix)

### 4. Model training with SageMaker training jobs

In [None]:
import xgboost
from sklearn.metrics import roc_auc_score
import pickle as pkl
import os
import mlflow
import tarfile

# use spot instances
# @remote(s3_root_uri=f"s3://{bucket_name}/{prefix}", dependencies=f"requirements.txt", instance_type="ml.m5.large", use_spot_instances=True, max_wait_time_in_seconds=3600, max_runtime_in_seconds=3600)
# use ondemand instances
@remote(s3_root_uri=f"s3://{bucket_name}/{prefix}", dependencies=f"requirements.txt", instance_type="ml.m5.large")
def train(X, val_X, y, val_y, num_round, params, mlflow_arn, experiment_name,run_id=None):
    output_path = "/opt/ml/model"
    mlflow.set_tracking_uri(mlflow_arn)
    mlflow.autolog()
    
    suffix = strftime('%d-%H-%M-%S', gmtime())
    mlflow.set_experiment(experiment_name=experiment_name if experiment_name else f"credit-risk-model-experiment-{suffix}")
    run = mlflow.start_run(run_id=run_id) if run_id else mlflow.start_run(run_name=f"remote-training-{suffix}", nested=True)

    try:
        os.makedirs(output_path, exist_ok=True)
        print(f"Directory '{output_path}' created successfully.")
    except OSError as e:
        print(f"Error creating directory '{output_path}': {e}")
        
    dtrain = xgboost.DMatrix(X, label=y)
    dval = xgboost.DMatrix(val_X, label=val_y)

    dtrain = xgboost.DMatrix(X, label=y)
    dval = xgboost.DMatrix(val_X, label=val_y)

    watchlist = [(dtrain, "train"), (dval, "validation")]
    mlflow.log_params(params)

    print("Training the model")
    evaluation__results = {}
    bst = xgboost.train(
        params=params, dtrain=dtrain, evals=watchlist, num_boost_round=num_round
    )
    pkl.dump(bst, open(output_path + "/model.bin", "wb"))

     # Compress the model.bin artifact to a tar file
    tar_filename = f"{output_path}/model.tar.gz"
    with tarfile.open(tar_filename, "w:gz") as tar:
        tar.add(f"{output_path}/model.bin", arcname="model.bin")

# Upload the compressed model to S3

    s3_client = boto3.client("s3")
    s3_key = f"{prefix}/model_{run.info.run_id}.tar.gz"
    
    s3_client.upload_file(tar_filename,
        shared_model_bucket_name,
        s3_key,
        ExtraArgs={
            'ServerSideEncryption': 'aws:kms',
            'SSEKMSKeyId': kms_key_id
        }
    )

    mlflow.log_artifact(local_path=tar_filename)


In [None]:
hyperparameters = {
    "max_depth": "5",
    "eta": "0.1",
    "gamma": "4",
    "min_child_weight": "6",
    "silent": "1",
    "objective": "binary:logistic",
    "num_round": "100",
    "subsample": "0.8",
    "eval_metric": "auc"
}
num_round = 50

train(X_train, X_val, y_train, y_val,num_round, hyperparameters, mlflow_arn, experiment_name)

### 5. Register your the candidate model to the model registry in the shared services account

Now register the trained model in the MLflow model registry. The model is also automatically registered in the SageMaker model registry.

In [None]:
mlflow.set_tracking_uri(mlflow_arn)

In [None]:
check_experiments = mlflow.search_experiments(
    # filter_string="tags.`project_name` = 'credit-risk-model-experiment-22-06-27-27'"
)
check_experiments

In [None]:
from mlflow.entities import ViewType

run_filter = f"""
attributes.run_name LIKE "%training%"
attributes.status = 'FINISHED'
"""
runs_with_filter = mlflow.search_runs(
    experiment_names=[experiment_name],
    run_view_type=ViewType.ACTIVE_ONLY,
    filter_string=run_filter,
    order_by=["metrics.`validation-auc` DESC"],
)
best_run = runs_with_filter[:1]

display(best_run)

In [None]:
import mlflow

# Assuming best_run is a pandas DataFrame with one row
if not best_run.empty:
    run_id = best_run.iloc[0]['run_id']  # Get the run_id from the first row

    # Fetch the full run data
    full_run = mlflow.get_run(run_id)

    # Access run data
    run_data = full_run.data
    params = run_data.params
    metrics = run_data.metrics

    # Display parameters
    display("Parameters:")
    for key, value in params.items():
        print(f"{key}: {value}")

    # Display metrics
    display("\nMetrics:")
    for key, value in metrics.items():
        print(f"{key}: {value}")

    # Display run info
    display("\nRun Info:")
    run_info = full_run.info
    for attr in dir(run_info):
        if not attr.startswith("_"):  # Skip private attributes
            value = getattr(run_info, attr)
            if not callable(value):  # Skip methods
                print(f"{attr}: {value}")

    # Display tags if any
    if full_run.data.tags:
        display("\nTags:")
        for key, value in full_run.data.tags.items():
            print(f"{key}: {value}")
else:
    display("No best run found.")


In [None]:
artifact_uri = best_run['artifact_uri'][0]
print(artifact_uri)

In [None]:
response = sagemaker_client.list_model_package_groups(CrossAccountFilterOption="CrossAccount")
model_package_group_name = response['ModelPackageGroupSummaryList'][0]['ModelPackageGroupName']
print("model_package_group_name:",model_package_group_name)
model_package_group_arn = response['ModelPackageGroupSummaryList'][0]['ModelPackageGroupArn']
print("model_package_group_arn:",model_package_group_arn)

In [None]:
print(f"{artifact_uri}/model/model.tar.gz")

In [None]:
import mlflow
import json

# Assuming the training has been completed and MLflow has logged the run
# Get the most recent run (or you can specify a run_id if you know it)
runs = mlflow.search_runs(experiment_ids=[mlflow.get_experiment_by_name(experiment_name).experiment_id])
best_run = runs.iloc[0]  # Get the most recent run

# Print the run ID
print(f"Best run ID: {best_run.run_id}")

s3_key = f"{prefix}/model_{best_run.run_id}.tar.gz"
print(f"s3://{shared_model_bucket_name}/{s3_key}")

# Now, let's update the model card with the information from this run
# Now create the model card content
# The information entered here is key to model governance tracking table
model_card_content = {
    "model_overview": {
        "model_creator": "XGBoost Training Team",
        "model_artifact": [f"s3://{shared_model_bucket_name}/{s3_key}"]  # You need to define s3_bucket and s3_key
    },
    "intended_uses": {
        "purpose_of_model": "Credit risk assessment",
        "intended_uses": "Evaluate creditworthiness of loan applicants",
        "factors_affecting_model_efficiency": "Data quality, economic conditions, regulatory changes",
        "risk_rating": "Medium",
        "explanations_for_risk_rating": "Model deals with sensitive financial data and decisions"
    },
    "business_details": {
        "business_problem": "Improve accuracy and efficiency of credit risk assessment",
        "business_stakeholders": "Credit department, Risk management team, Compliance officers",
        "line_of_business": "Consumer Lending"
    },
    "training_details": {
        "objective_function": {
            "function": "Maximize",
            "notes": "XGBoost objective function used for training"
        },
        "training_observations": "Model trained using XGBoost with early stopping"
    },
    "additional_information": {
        "ethical_considerations": "Ensure fair lending practices, avoid discriminatory outcomes",
        "caveats_and_recommendations": "Regular monitoring for model drift, periodic retraining with updated data",
        "custom_details": {
            "UseCaseId": "CREDIT-001",
            "UseCaseName": "Credit Risk Assessment",
            "UseCaseStage": "Development"
        }
    }

}

# Save the model card
with open('model_card.json', 'w') as f:
    json.dump(model_card_content, f, indent=2)
print(model_card_content)
print("Model card has been created and saved.")


In [None]:
region = boto3.Session().region_name
container = sagemaker.image_uris.retrieve(
    framework='xgboost',
    region=region,
    version='1.0-1',
    py_version='py3',
    instance_type='ml.m5.xlarge'
)
container

In [None]:
# Cell 11: Create a model package

# CHECK ARN IN SHARED MODEL PACKAGE GROUPS AND REPLACE

model_package_input_dict = {
    "ModelPackageGroupName": model_package_group_arn,
    "ModelPackageDescription": "Credit Risk - Initial version",
    "ModelApprovalStatus": "PendingManualApproval",
    "InferenceSpecification": {
        "Containers": [
            {
                "Image": container,
                "ModelDataUrl": f"s3://{shared_model_bucket_name}/{s3_key}"
            }
        ],
        "SupportedContentTypes": ["text/csv"],
        "SupportedResponseMIMETypes": ["text/csv"]
    },
    # "ModelMetrics": {
    #     "ModelQuality": {
    #         "Statistics": {
    #             "S3Uri": f's3://{bucket}/{prefix}/metrics/iris_model_metrics.json',
    #             "ContentType": 'application/json'
    #         }
    #     }
    # },
    "ModelCard": {
        "ModelCardContent": json.dumps(model_card_content),
        "ModelCardStatus": "Draft"
    }
}

In [None]:
modelpackage_package_specification =  {
    "ModelPackageGroupName" : model_package_group_arn,
    "ModelPackageDescription" : "Model to detect credit risk",
    "ModelApprovalStatus" : "PendingManualApproval",
    "ModelCard": {
        "ModelCardContent": json.dumps(model_card_content),
        "ModelCardStatus": "Draft"
    }
}

create_model_package_input_dict = {
    "ModelPackageGroupName" : model_package_group_arn,
    "ModelPackageDescription" : "Model to detect credit risk",
    "ModelApprovalStatus" : "PendingManualApproval"
}

create_model_package_input_dict.update(modelpackage_package_specification)

create_model_package_input_dict.update(model_package_input_dict)

print(create_model_package_input_dict)
create_model_package_response = sagemaker_client.create_model_package(**create_model_package_input_dict)
##model_package_arn = create_model_package_response["ModelPackageArn"]
#print('ModelPackage Version ARN : {}'.format(model_package_arn))

In [None]:
#create_model_package_response = sagemaker_client.create_model_package(**create_model_package_input_dict)
model_package_arn = create_model_package_response["ModelPackageArn"]
print('ModelPackage Version ARN : {}'.format(model_package_arn))

In [None]:
# Update Model Lifecycle
import boto3
import json

print("Boto3 version:", boto3. __version__)

def update_model_lifecycle(model_package_info, model_package_update_input_dict):
    sagemaker_client = boto3.client('sagemaker')
    try:

         # Extract the ARN from the model_package_info
        model_package_arn = model_package_info.get('ModelPackageArn')
        
        if not model_package_arn:
            raise ValueError("ModelPackageArn not found in the provided information")

        # Ensure ModelPackageArn is in the input dictionary
        model_package_update_input_dict['ModelPackageArn'] = model_package_arn
        response = sagemaker_client.update_model_package(**model_package_update_input_dict)
        
        print(f"Model lifecycle updated successfully for {model_package_arn}")
        return response
    except Exception as e:
        print(f"Error updating model lifecycle: {str(e)}")
        return None

# Update Model Cycle Info

model_package_info = {
    'ModelPackageGroupName': model_package_group_arn,
    'ModelPackageArn': model_package_arn,
}

# Update the staging values as needed for your projects
# cfr https://docs.aws.amazon.com/sagemaker/latest/dg/model-registry-staging-construct-set-up.html

model_package_update_input_dict = {
    'ModelLifeCycle': {
        'Stage': 'Development',
        'StageDescription': 'Model trained and evaluated in development environment',
        'StageStatus': 'Approved' # PendingApproval/Approved/Rejected
    },
}

result = update_model_lifecycle(model_package_info, model_package_update_input_dict)

if result:
    print("Update successful")
    print(json.dumps(result, indent=2))
else:
    print("Update failed")