In [114]:
import os

import boto3
import sagemaker
import sagemaker.session

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.model_metrics import (
    MetricsSource,
    ModelMetrics,
)
from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput,
    ScriptProcessor,
)
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.steps import (
    ProcessingStep,
    TrainingStep,
)
from sagemaker.workflow.step_collections import RegisterModel


In [115]:
region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()

role = sagemaker.get_execution_role()

default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"BeerModelPackageGroupName"

base_job_prefix="Beer"

model_package_group_name="BeerPackageGroup",
pipeline_name="BeerRegressorPipeline"

## Parametros do Pipeline

In [116]:
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType", default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType", default_value="ml.m5.xlarge"
)
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)
project_name = ParameterString(
    name="ProjectName",
    default_value="beer",
)

bucket_cleaned = ParameterString(
    name="BucketCleaned",
    default_value="beer-transformed",
)

bucket_dataset = ParameterString(
    name="BucketDataset",
    default_value="beer-dataset",
)


## Pipeline de Preprocessamento

In [117]:
# processing step for feature engineering
sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name=f"{base_job_prefix}/sklearn-beer-preprocess",
    sagemaker_session=sagemaker_session,
    role=role,
)
step_process = ProcessingStep(
    name="PreprocessBeerData",
    processor=sklearn_processor,
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="preprocessing.py",
    job_arguments=["--project_name", project_name,
                  "--bucket_cleaned", bucket_cleaned,
                  "--bucket_dataset", bucket_dataset],
)


## Treinamento

In [118]:
model_path = f"s3://{sagemaker_session.default_bucket()}/{base_job_prefix}/BeerTrain"

In [119]:
# training step for generating model artifacts

image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type=training_instance_type,
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    base_job_name=f"{base_job_prefix}/beer-train",
    sagemaker_session=sagemaker_session,
    role=role,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
    silent=0,
)

In [120]:
step_train = TrainingStep(
    name="TrainBeerModel",
    estimator=xgb_train,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
)

## Avaliação do Modelo

In [121]:
# processing step for evaluation
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name=f"{base_job_prefix}/script-beer-eval",
    sagemaker_session=sagemaker_session,
    role=role,
)
evaluation_report = PropertyFile(
    name="BeerEvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)
step_eval = ProcessingStep(
    name="EvaluateBeerModel",
    processor=script_eval,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="evaluate.py",
    property_files=[evaluation_report],
)


## Registro do Modelo

In [122]:
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)
step_register = RegisterModel(
    name="RegisterBeerModel",
    estimator=xgb_train,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.large"],
    transform_instances=["ml.m5.large"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)



In [141]:
cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step=step_eval,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value"
    ),
    right=6.0,
)
step_cond = ConditionStep(
    name="CheckMSEBeerEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register],
    else_steps=[],
)

In [144]:
step_register

RegisterModel(steps=[_RegisterModelStep(name='RegisterBeerModel', step_type=<StepTypeEnum.REGISTER_MODEL: 'RegisterModel'>, depends_on=None)])

## Criação do Pipeline

In [145]:
# pipeline instance
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type,
        processing_instance_count,
        training_instance_type,
        model_approval_status,
        project_name,
        bucket_cleaned,
        bucket_dataset
    ],
    steps=[step_process, step_train, step_eval, step_cond],
    sagemaker_session=sagemaker_session,
)

In [146]:
pipeline.upsert(role_arn=role)

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


ClientError: An error occurred (ValidationException) when calling the CreatePipeline operation: Unable to parse pipeline definition. Property 'null' with value 'null' is of unexpected type. Types must be booleans, strings, numbers or SageMaker Pipelines functions

In [None]:
pipeline.upsert(role_arn=role)

In [None]:
execution = pipeline.start()