In [1]:
import sagemaker

sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()

bucket = "pipeline-cross-validation"
prefix = "data"
num_class = 8

In [2]:
from sagemaker.workflow.parameters import ParameterString

training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.p3.2xlarge"
)
batch_data = ParameterString(
    name="BatchData",
    default_value=f"s3://{bucket}/{prefix}/test/features/",
)

In [3]:
hyperparameters = {
    "num_class":num_class,
    "max_depth":"5",
    "eta":"0.2",
    "min_child_weight":"1",
    "objective":"multi:softmax",
    "eval_metric":"mlogloss",
    "num_round":"500",
    "tree_method":"gpu_hist",
    "early_stopping_rounds":"10",
    # "csv_weights": "1"
}

In [4]:
def create_steps(iteration):
    
    # Parameters
    model_path = f"s3://{bucket}/{prefix}/model_train/iter{iteration}"
    train_name = f"train_iter{iteration}"
    model_name = train_name.replace("_", "-")
    transform_path = f"s3://{bucket}/{prefix}/output/iter{iteration}"
    transform_name = f"transform_iter{iteration}"
    
    # Training Step
    image_uri = sagemaker.image_uris.retrieve(
        framework="xgboost",
        region=sagemaker_session.boto_region_name,
        version="1.2-2",
        py_version="py3",
        instance_type=training_instance_type,
    )
    
    from sagemaker.estimator import Estimator
    xgb_train = Estimator(
        image_uri=image_uri,
        hyperparameters=hyperparameters,
        instance_type=training_instance_type,
        instance_count=1,
        output_path=model_path,
        role=role,
    )
    
    from sagemaker.inputs import TrainingInput
    from sagemaker.workflow.steps import TrainingStep
    step_train = TrainingStep(
        name=train_name,
        estimator=xgb_train,
        inputs={
            "train": TrainingInput(
                s3_data=f"s3://{bucket}/{prefix}/iter{iteration}/train/",
                content_type="text/csv"
            ),
            "validation": TrainingInput(
                s3_data=f"s3://{bucket}/{prefix}/iter{iteration}/validation/",
                content_type="text/csv"
            )
        },
    )
    
    # Model Step
    from sagemaker.model import Model
    model = Model(
        image_uri=image_uri,
        model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
        sagemaker_session=sagemaker_session,
        role=role,
    )
    
    from sagemaker.inputs import CreateModelInput
    from sagemaker.workflow.steps import CreateModelStep
    inputs = CreateModelInput(
        instance_type="ml.m5.large",
    #     accelerator_type="ml.eia1.medium",
    )
    
    step_create_model = CreateModelStep(
        name=model_name,
        model=model,
        inputs=inputs,
    )
    
    # Transform Step
    from sagemaker.transformer import Transformer
    transformer = Transformer(
        model_name=step_create_model.properties.ModelName,
        instance_type="ml.m5.xlarge",
        instance_count=1,
        assemble_with="Line",
        output_path=transform_path
    )
    
    from sagemaker.inputs import TransformInput
    from sagemaker.workflow.steps import TransformStep
    step_transform = TransformStep(
        name=transform_name,
        transformer=transformer,
        inputs=TransformInput(
            data=batch_data,
            content_type="csv",
            split_type="Line",
        )
    )
    
    steps = [
        step_train,
        step_create_model,
        step_transform,
    ]
    
    return steps

In [5]:
iterations = 4
whole_steps = []
for iteration in range(iterations):
    steps = create_steps(iteration)
    whole_steps.extend(steps)
    break

In [6]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = f"220709Pipeline"

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[training_instance_type, batch_data],
    steps=whole_steps,
)


In [7]:
pipeline.upsert(role_arn=role)
execution = pipeline.start()