In [None]:
import sagemaker
import sagemaker.session
import boto3.json
import pandas as pd
import numpy as np
from autogluon.tabular import TabularDataset, TabularPredictor
from ag_model import (
    AutoGluonTraining,
    AutoGluonInferenceModel,
    AutoGluonTabularPredictor,
)
from sagemaker import utils
from sagemaker.serializers import CSVserializer
import os
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.model_step import ModelStep
from autonotebook import tqdm as notebook_tqdm

In [None]:
## Define the IAM role ARN for data access, training and hosting.
role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.session.session()
region = sagemaker_session._region_name

default_bucket = sagemaker_session.default_bucket()
s3_prefix = f"autogluon_sm/{utils.sagemaker_timestap()}"
output_path = f"s3://{default_bucket}/{s3_prefix}/output/"

model_package_group_name =f"Mo***"
pipeline_session = PipelineSession()

In [None]:
# Step 1: Download the Dataset
local_path = "./***.csv"
s3 = boto3.resource("s3")
base_uri = f"s3://{default_bucket}/fm_pipeline"

input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path
    desired_s3_uri=base_uri,
)
print(f'Input data URI: {input_data_uri}')

In [None]:
# Step 2: Define Pipeline Parameters
from sagemaker.workflow.parameters import (
    name="ProcessingInstanceCount",
    default_value=1
)
model_approval_status = ParameterString (
    name="ModelApprovalStatus",
    default_value = "Approved"    
)
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)

In [None]:
# Step 3: Define a Processing Step for Feature Engineering
%%writefile preprocessing.py

from sklearn.model_selection import train_test_split
import pandas as pd

if __name__ == "__main__":
    base_dir = "/opt/ml/processing"

    df = pd.read_csv(
        f"{base_dir}/input/fm_dataset.csv"
    )
    train, val_n_test = train_test_split(
        df, testi_szie = 0.3, random_state=42
    )
    val,test = train_test_split(
        val_n_test,test_size =0.3,random_state=42
    )

    pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv",index=False)
    pd.DataFrame(val).to_csv(f"{base_dir}/validation/validation.csv",index=False)
    pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv",index=False)


In [None]:
# SklearnProcessor handles Amazon Sagemaker processing tasks
from sagemaker.sklearn.processing import SklearnProcessor

framework_version="0.23-1"

sklearn_processor = SklearnProcessor(
    framework_version=framework_version,
    instance_type ="ml.m5.xlarge"
    instance_count=processing_instance_count,
    base_job_name="sklearn-fm-process",
    role=role,
)

In [None]:
from sagemaker.processing import ProcessingInput,ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

step_process = ProcessingStep(
    name="fmProcess",
    processor=sklearn_processor,
    inputs=[ProcessingInput(source=input_data,destination="/opt/ml/processing/input"),
            ],
    outputs=[
        ProcessingOutput(output_name="train",source="opt/ml/processing/train"),
        ProcessingOutput(output_name="validation",source="opt/ml/processing/validation"),
        ProcessingOutput(output_name="test",source="opt/ml/processing/test"),
    ],
    code="preprocessing.py",
)

In [None]:
# Step 4 Define a training step
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

ag=AutoGluonTraining(
    role=role,
    entry_point="tabular_train.py",
    region=region,
    instance_count=1,
    instance_type="ml.m5.2xlarge",
    framework_version="0.4",
    py_version="py38",
    base_job_name="autogluon-tabular-train",
)

config_input = ag.sagemaker_session.upload_data(
    path=os.path.join("config.yaml"),key_prefix=s3_prefix
)

step_train  = TrainingStep(
    name="fmTrain",
    estimator=ag,
    inputs={
        "train":TrainingInput(
            s3_data=step_process.properties.ProcessingOutoutConfit.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
         "validation":TrainingInput(
            s3_data=step_process.properties.ProcessingOutoutConfit.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "config": TrainingInput(
            s3_data=config_input
        )
    },
)


In [None]:
# Step 5: Define a processing step for model evaluation
%%writefile evaluation.py
import json
import pathlib
import pickle
import tarfile
import joblib
import numpy as np
import pandas as pd
import autogluon
from autogluon.tabular import TabularDataset, TabularPredictor
from sklearn.metrics import roc_auc_score

if __name__ == "__main__":
    model_path = f"/opt/ml/processing/model/model.tar.gz"

    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")

        predictor = pickle.load(open("predictor.pkl","rb"))

        test_path = "/opt/ml/processing/test/test.csv"
        df = pd.read_csv(test_path)

        data = df.drop(columns="TargerClass")[:].values

        y_pred_proba = 0.9

        report_dict ={
            "y_pred_proba": y_pred_proba
        }
        output_dir = "/opt/ml/processing/evaluation"
        pathlib.Path(output_dir).mkdir(parents=True,exist_ok=True)

        evaluation_path = f"{output_dir}/evaluation.json"
        with open(evaluation_path,"w") as f:
            f.write(json.dumps(report_dict))

In [None]:
# Retrieve a training docker image
from sagemaker import image_urls

train_model_id, train_model_version, train_scope = "autogluon-classification-ensemble", "*","training"
training_instance_type = "ml.m5.2xlarge"

train_image_uri = image_uris.retrieve(
    regin = None,
    framework=None,
    model_id=train_model_id,
    model_version=train_model_version,
    image_scope=train_scope,
    instance_type=training_instance_type
)
from sagemaker.processing import ScriptProcessor

script_eval = ScriptProcessor(
    image_uri=train_image_uri,
    command=["python3"],
    instance_type="ml.m5.2xlarge",
    instance_count=1,
    base_jonb_name="script-fm-eval",
    role=role,
)

from sagemaker.workflow.properties import PropertyFile

evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json"
)
step_eval= ProcessingStep(
    name= "fmEval",
    processor=script_eval,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model"
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                "test" 
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/test"
        )
    ],
    output=[
        ProcessingOutput(output_name="evaluation",source="/opt/ml/processing/evaluation"),
    ],
    code="evaluation.py",
    property_files=[evaluation_report],
)

In [None]:
# Step 6: Define a RegisterModel Step to Create a Model Package
from sagemaker.model import Model
from sagemaker.sklearn.model import SKLearnModel
from sagemaker import PipelineModel
ag_model_s3 = "{}/model.tar.gz".format(
    step_process.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
)

train_model_id, train_model_version, train_scope = "autogluon-classification-ensemble", "*", "training"
training_instance_type = "ml.m5.2xlarge"

ag_model_image_uri = sagemaker.image_uris.retrieve(
    framework="autogluon",
    region=region,
    version="0.5.2",
    image_scope="inference",
    py_version="py38",
    instance_type=training_instance_type,
)

ag_model = Model(
    image_uri=ag_model_image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)


In [None]:
# Define a PipeModel to register the ag_model
pipeline_model = PipelineModel(
    models=[ag_model], 
    role=role, 
    sagemaker_session=pipeline_session
)

from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel


evaluation_s3_uri = "{}/evaluation.json".format(
    step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
)

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=evaluation_s3_uri,
        content_type="application/json",
    )
)

register_args = pipeline_model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.m5.large", "ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    model_metrics=model_metrics,
    approval_status=model_approval_status,
)

step_register = ModelStep(
    name="fmPipelineModel",
    step_args=register_args,
)

In [None]:
# Step 7: Define a Condition Step to Verify Model Accuracy
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
cond_lte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="y_pred_proba"
    ),
    right=0.1
)

step_cond = ConditionStep(
    name="fmCond",
    conditions=[cond_lte],
    if_steps=[step_register],
    else_steps=[], 
)
step_process, step_train, step_eval, step_cond

In [None]:
# Step 8: Create a pipeline
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = f"fmPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        model_approval_status,
        input_data,
    ],
    steps=[step_process, step_train, step_eval, step_cond]
)

In [None]:
# Step 9: Run a Pipeline
mport json

definition = json.loads(pipeline.definition())
definition
Popping out 'CertifyForMarketplace' from the pipeline definition since it will be overridden in pipeline execution time.
{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'Approved'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-732995967575/fm_pipeline/fm_dataset.csv'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'fmProcess',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3',
     'ContainerEntrypoint': ['python3',
      '/opt/ml/processing/input/code/preprocessing.py']},
    'RoleArn': 'arn:aws:iam::732995967575:role/StackSet-gdQuests-25d24950-20-DefaultSagemakerRole-Y7HJ0E89VLJU',
    'ProcessingInputs': [{'InputName': 'input-1',
      'AppManaged': False,
      'S3Input': {'S3Uri': {'Get': 'Parameters.InputData'},
       'LocalPath': '/opt/ml/processing/input',
       'S3DataType': 'S3Prefix',
       'S3InputMode': 'File',
       'S3DataDistributionType': 'FullyReplicated',
       'S3CompressionType': 'None'}},
     {'InputName': 'code',
      'AppManaged': False,
      'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-732995967575/fmProcess-91a918b0aa642e395efcab501eaccdca/input/code/preprocessing.py',
       'LocalPath': '/opt/ml/processing/input/code',
       'S3DataType': 'S3Prefix',
       'S3InputMode': 'File',
       'S3DataDistributionType': 'FullyReplicated',
       'S3CompressionType': 'None'}}],
    'ProcessingOutputConfig': {'Outputs': [{'OutputName': 'train',
       'AppManaged': False,
       'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-732995967575/fmProcess-91a918b0aa642e395efcab501eaccdca/output/train',
        'LocalPath': '/opt/ml/processing/train',
        'S3UploadMode': 'EndOfJob'}},
      {'OutputName': 'validation',
       'AppManaged': False,
       'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-732995967575/fmProcess-91a918b0aa642e395efcab501eaccdca/output/validation',
        'LocalPath': '/opt/ml/processing/validation',
        'S3UploadMode': 'EndOfJob'}},
      {'OutputName': 'test',
       'AppManaged': False,
       'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-732995967575/fmProcess-91a918b0aa642e395efcab501eaccdca/output/test',
        'LocalPath': '/opt/ml/processing/test',
        'S3UploadMode': 'EndOfJob'}}]}}},
  {'Name': 'fmTrain',
   'Type': 'Training',
   'Arguments': {'AlgorithmSpecification': {'TrainingInputMode': 'File',
     'TrainingImage': '763104351884.dkr.ecr.us-east-1.amazonaws.com/autogluon-training:0.4-cpu-py38'},
    'OutputDataConfig': {'S3OutputPath': 's3://sagemaker-us-east-1-732995967575/'},
    'StoppingCondition': {'MaxRuntimeInSeconds': 86400},
    'ResourceConfig': {'VolumeSizeInGB': 30,
     'InstanceCount': 1,
     'InstanceType': 'ml.m5.2xlarge'},
    'RoleArn': 'arn:aws:iam::732995967575:role/StackSet-gdQuests-25d24950-20-DefaultSagemakerRole-Y7HJ0E89VLJU',
    'InputDataConfig': [{'DataSource': {'S3DataSource': {'S3DataType': 'S3Prefix',
        'S3Uri': {'Get': "Steps.fmProcess.ProcessingOutputConfig.Outputs['train'].S3Output.S3Uri"},
        'S3DataDistributionType': 'FullyReplicated'}},
      'ContentType': 'text/csv',
      'ChannelName': 'train'},
     {'DataSource': {'S3DataSource': {'S3DataType': 'S3Prefix',
        'S3Uri': {'Get': "Steps.fmProcess.ProcessingOutputConfig.Outputs['validation'].S3Output.S3Uri"},
        'S3DataDistributionType': 'FullyReplicated'}},
      'ContentType': 'text/csv',
      'ChannelName': 'validation'},
     {'DataSource': {'S3DataSource': {'S3DataType': 'S3Prefix',
        'S3Uri': 's3://sagemaker-us-east-1-732995967575/autogluon_sm/2023-04-04-02-23-20-714/config.yaml',
        'S3DataDistributionType': 'FullyReplicated'}},
      'ChannelName': 'config'}],
    'HyperParameters': {'sagemaker_submit_directory': '"s3://sagemaker-us-east-1-732995967575/fmTrain-94602cc26a604d1c35f099835a1520b6/source/sourcedir.tar.gz"',
     'sagemaker_program': '"tabular_train.py"',
     'sagemaker_container_log_level': '20',
     'sagemaker_region': '"us-east-1"'},
    'DebugHookConfig': {'S3OutputPath': 's3://sagemaker-us-east-1-732995967575/',
     'CollectionConfigurations': []},
    'ProfilerConfig': {'S3OutputPath': 's3://sagemaker-us-east-1-732995967575/',
     'DisableProfiler': False}}},
  {'Name': 'fmEval',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.2xlarge',
      'InstanceCount': 1,
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '763104351884.dkr.ecr.us-east-1.amazonaws.com/autogluon-training:0.4.3-cpu-py38',
     'ContainerEntrypoint': ['python3',
      '/opt/ml/processing/input/code/evaluation.py']},
    'RoleArn': 'arn:aws:iam::732995967575:role/StackSet-gdQuests-25d24950-20-DefaultSagemakerRole-Y7HJ0E89VLJU',
    'ProcessingInputs': [{'InputName': 'input-1',
      'AppManaged': False,
      'S3Input': {'S3Uri': {'Get': 'Steps.fmTrain.ModelArtifacts.S3ModelArtifacts'},
       'LocalPath': '/opt/ml/processing/model',
       'S3DataType': 'S3Prefix',
       'S3InputMode': 'File',
       'S3DataDistributionType': 'FullyReplicated',
       'S3CompressionType': 'None'}},
     {'InputName': 'input-2',
      'AppManaged': False,
      'S3Input': {'S3Uri': {'Get': "Steps.fmProcess.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri"},
       'LocalPath': '/opt/ml/processing/test',
       'S3DataType': 'S3Prefix',
       'S3InputMode': 'File',
       'S3DataDistributionType': 'FullyReplicated',
       'S3CompressionType': 'None'}},
     {'InputName': 'code',
      'AppManaged': False,
      'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-732995967575/fmEval-fcf375e1b17d753d0ed448a2fdec1e5b/input/code/evaluation.py',
       'LocalPath': '/opt/ml/processing/input/code',
       'S3DataType': 'S3Prefix',
       'S3InputMode': 'File',
       'S3DataDistributionType': 'FullyReplicated',
       'S3CompressionType': 'None'}}],
    'ProcessingOutputConfig': {'Outputs': [{'OutputName': 'evaluation',
       'AppManaged': False,
       'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-732995967575/fmEval-fcf375e1b17d753d0ed448a2fdec1e5b/output/evaluation',
        'LocalPath': '/opt/ml/processing/evaluation',
        'S3UploadMode': 'EndOfJob'}}]}},
   'PropertyFiles': [{'PropertyFileName': 'EvaluationReport',
     'OutputName': 'evaluation',
     'FilePath': 'evaluation.json'}]},
  {'Name': 'fmCond',
   'Type': 'Condition',
   'Arguments': {'Conditions': [{'Type': 'GreaterThanOrEqualTo',
      'LeftValue': {'Std:JsonGet': {'PropertyFile': {'Get': 'Steps.fmEval.PropertyFiles.EvaluationReport'},
        'Path': 'y_pred_proba'}},
      'RightValue': 0.1}],
    'IfSteps': [{'Name': 'fmPipelineModel-RegisterModel',
      'Type': 'RegisterModel',
      'Arguments': {'ModelPackageGroupName': 'MonocerosModelPackageGroupName',
       'ModelMetrics': {'ModelQuality': {'Statistics': {'ContentType': 'application/json',
          'S3Uri': 's3://sagemaker-us-east-1-732995967575/fmEval-fcf375e1b17d753d0ed448a2fdec1e5b/output/evaluation/evaluation.json'}},
        'Bias': {},
        'Explainability': {}},
       'InferenceSpecification': {'Containers': [{'Image': '763104351884.dkr.ecr.us-east-1.amazonaws.com/autogluon-inference:0.5.2-cpu-py38',
          'Environment': {},
          'ModelDataUrl': {'Get': 'Steps.fmTrain.ModelArtifacts.S3ModelArtifacts'}}],
        'SupportedContentTypes': ['text/csv'],
        'SupportedResponseMIMETypes': ['text/csv'],
        'SupportedRealtimeInferenceInstanceTypes': ['ml.m5.large',
         'ml.m5.xlarge']},
       'ModelApprovalStatus': {'Get': 'Parameters.ModelApprovalStatus'}}}],
    'ElseSteps': []}}]}
pipeline.upsert(role_arn=role)
{'PipelineArn': 'arn:aws:sagemaker:us-east-1:732995967575:pipeline/fmpipeline',
 'ResponseMetadata': {'RequestId': 'f44c68dc-75c6-442d-87c3-6a0b2b803572',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'f44c68dc-75c6-442d-87c3-6a0b2b803572',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '78',
   'date': 'Tue, 04 Apr 2023 03:02:53 GMT'},
  'RetryAttempts': 0}}
execution = pipeline.start()
execution.wait()

Wait until all steps run successfully before proceeding to the next step. You can monitor the pipeline progress in SageMaker Studio console.


In [None]:
# Step 10: Deploy a Model
!aws s3 cp s3://sagemaker-us-east-1-732995967575/pipelines-37bdqm85upf9-fmTrain-Bp20YU8cTd/output/model.tar.gz .
!ls -alF model.tar.gz
!ls -alF model.tar.gz
endpoint_name = sagemaker.utils.unique_name_from_base("fm-autogluon-serving-trained-model")
model_data = sagemaker_session.upload_data(
    path=os.path.join(".", "model.tar.gz"), key_prefix=f"{endpoint_name}/models"
)
instance_type = "ml.m5.2xlarge"
model = AutoGluonInferenceModel(
    model_data=model_data,
    role=role,
    region=region,
    framework_version="0.4",
    py_version="py38",
    instance_type=instance_type,
    entry_point="tabular_serve.py",
)
predictor = model.deploy(
    initial_instance_count=1, serializer=CSVSerializer(), instance_type=instance_type
)

In [None]:
df = pd.read_csv("data/test.csv")
data = df.drop(columns="TargetClass")[:].values
preds = predictor.predict(data)
p = pd.DataFrame({"preds": pd.DataFrame(preds)[0], "actual": df["TargetClass"][: len(preds)]})
p.head()

In [None]:
print(f"{(p.preds==p.actual).astype(int).sum()}/{len(p)} are correct")


In [None]:
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt

y_true=df["TargetClass"][: len(preds)]
y_pred=pd.DataFrame(preds)[0]

conf_matrix = confusion_matrix(y_true, y_pred)

fig, ax = plt.subplots(figsize=(3, 3))
ax.matshow(conf_matrix, cmap=plt.cm.Blues, alpha=0.3)
for i in range(conf_matrix.shape[0]):
    for j in range(conf_matrix.shape[1]):
        ax.text(x=j, y=i, s=conf_matrix[i, j], va="center", ha="center", size="xx-large")

plt.xlabel("Predictions", fontsize=12)
plt.ylabel("Actuals", fontsize=12)
plt.title("Confusion Matrix", fontsize=12)
plt.show()

Cleanup

In [None]:
predictor.delete_endpoint()