Skip to content

Commit

Permalink
Merge pull request #76 from brightsparc/sagemaker-v2
Browse files Browse the repository at this point in the history
fix: Upgrade to SageMaker v2 SDK
  • Loading branch information
vaib-amz committed Sep 23, 2020
2 parents 976bdbe + c5cabdc commit dbcf358
Show file tree
Hide file tree
Showing 14 changed files with 97 additions and 86 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.1.2
2.0.0-rc1
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
sagemaker>=1.71.0,<2.0.0
boto3>=1.9.213
sagemaker>=2.1.0
boto3>=1.14.38
pyyaml
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ def read_version():

# Declare minimal set for installation
required_packages = [
"sagemaker>=1.71.0,<2.0.0",
"boto3>=1.9.213",
"sagemaker>=2.1.0",
"boto3>=1.14.38",
"pyyaml"
]

Expand Down
22 changes: 11 additions & 11 deletions src/stepfunctions/steps/sagemaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ def __init__(self, state_id, estimator, job_name, data=None, hyperparameters=Non
data: Information about the training data. Please refer to the ``fit()`` method of the associated estimator, as this can take any of the following forms:
* (str) - The S3 location where training data is saved.
* (dict[str, str] or dict[str, sagemaker.session.s3_input]) - If using multiple
* (dict[str, str] or dict[str, sagemaker.inputs.TrainingInput]) - If using multiple
channels for training data, you can specify a dict mapping channel names to
strings or :func:`~sagemaker.session.s3_input` objects.
* (sagemaker.session.s3_input) - Channel configuration for S3 data sources that can
strings or :func:`~sagemaker.inputs.TrainingInput` objects.
* (sagemaker.inputs.TrainingInput) - Channel configuration for S3 data sources that can
provide additional information about the training dataset. See
:func:`sagemaker.session.s3_input` for full details.
:func:`sagemaker.inputs.TrainingInput` for full details.
* (sagemaker.amazon.amazon_estimator.RecordSet) - A collection of
Amazon :class:`Record` objects serialized and stored in S3.
For use with an estimator for an Amazon algorithm.
Expand Down Expand Up @@ -198,11 +198,11 @@ def __init__(self, state_id, model, model_name=None, instance_type=None, tags=No
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
model (sagemaker.model.Model): The SageMaker model to use in the ModelStep. If :py:class:`TrainingStep` was used to train the model and saving the model is the next step in the workflow, the output of :py:func:`TrainingStep.get_expected_model()` can be passed here.
model_name (str or Placeholder, optional): Specify a model name, this is required for creating the model. We recommend to use :py:class:`~stepfunctions.inputs.ExecutionInput` placeholder collection to pass the value dynamically in each execution.
instance_type (str, optional): The EC2 instance type to deploy this Model to. For example, 'ml.p2.xlarge'. This parameter is typically required when the estimator used is not an `Amazon built-in algorithm <https://docs.aws.amazon.com/sagemaker/latest/dg/algos.html>`_.
instance_type (str, optional): The EC2 instance type to deploy this Model to. For example, 'ml.p2.xlarge'.
tags (list[dict], optional): `List to tags <https://docs.aws.amazon.com/sagemaker/latest/dg/API_Tag.html>`_ to associate with the resource.
"""
if isinstance(model, FrameworkModel):
parameters = model_config(model=model, instance_type=instance_type, role=model.role, image=model.image)
parameters = model_config(model=model, instance_type=instance_type, role=model.role, image_uri=model.image_uri)
if model_name:
parameters['ModelName'] = model_name
elif isinstance(model, Model):
Expand All @@ -211,7 +211,7 @@ def __init__(self, state_id, model, model_name=None, instance_type=None, tags=No
'ModelName': model_name or model.name,
'PrimaryContainer': {
'Environment': {},
'Image': model.image,
'Image': model.image_uri,
'ModelDataUrl': model.model_data
}
}
Expand Down Expand Up @@ -322,12 +322,12 @@ def __init__(self, state_id, tuner, job_name, data, wait_for_completion=True, ta
data: Information about the training data. Please refer to the ``fit()`` method of the associated estimator in the tuner, as this can take any of the following forms:
* (str) - The S3 location where training data is saved.
* (dict[str, str] or dict[str, sagemaker.session.s3_input]) - If using multiple
* (dict[str, str] or dict[str, sagemaker.inputs.TrainingInput]) - If using multiple
channels for training data, you can specify a dict mapping channel names to
strings or :func:`~sagemaker.session.s3_input` objects.
* (sagemaker.session.s3_input) - Channel configuration for S3 data sources that can
strings or :func:`~sagemaker.inputs.TrainingInput` objects.
* (sagemaker.inputs.TrainingInput) - Channel configuration for S3 data sources that can
provide additional information about the training dataset. See
:func:`sagemaker.session.s3_input` for full details.
:func:`sagemaker.inputs.TrainingInput` for full details.
* (sagemaker.amazon.amazon_estimator.RecordSet) - A collection of
Amazon :class:`Record` objects serialized and stored in S3.
For use with an estimator for an Amazon algorithm.
Expand Down
26 changes: 13 additions & 13 deletions src/stepfunctions/template/pipeline/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ def __init__(self, preprocessor, estimator, inputs, s3_bucket, role, client=None
inputs: Information about the training data. Please refer to the `fit()` method of the associated estimator, as this can take any of the following forms:
* (str) - The S3 location where training data is saved.
* (dict[str, str] or dict[str, `sagemaker.session.s3_input`]) - If using multiple channels for training data, you can specify a dict mapping channel names to strings or `sagemaker.session.s3_input` objects.
* (`sagemaker.session.s3_input`) - Channel configuration for S3 data sources that can provide additional information about the training dataset. See `sagemaker.session.s3_input` for full details.
* (dict[str, str] or dict[str, `sagemaker.inputs.TrainingInput`]) - If using multiple channels for training data, you can specify a dict mapping channel names to strings or `sagemaker.inputs.TrainingInput` objects.
* (`sagemaker.inputs.TrainingInput`) - Channel configuration for S3 data sources that can provide additional information about the training dataset. See `sagemaker.inputs.TrainingInput` for full details.
* (`sagemaker.amazon.amazon_estimator.RecordSet`) - A collection of Amazon `Record` objects serialized and stored in S3. For use with an estimator for an Amazon algorithm.
* (list[`sagemaker.amazon.amazon_estimator.RecordSet`]) - A list of `sagemaker.amazon.amazon_estimator.RecordSet` objects, where each instance is a different channel of training data.
s3_bucket (str): S3 bucket under which the output artifacts from the training job will be stored. The parent path used is built using the format: ``s3://{s3_bucket}/{pipeline_name}/models/{job_name}/``. In this format, `pipeline_name` refers to the keyword argument provided for TrainingPipeline. If a `pipeline_name` argument was not provided, one is auto-generated by the pipeline as `training-pipeline-<timestamp>`. Also, in the format, `job_name` refers to the job name provided when calling the :meth:`TrainingPipeline.run()` method.
Expand Down Expand Up @@ -87,8 +87,8 @@ def build_workflow_definition(self):
"""
default_name = self.pipeline_name

train_instance_type = self.preprocessor.train_instance_type
train_instance_count = self.preprocessor.train_instance_count
instance_type = self.preprocessor.instance_type
instance_count = self.preprocessor.instance_count

# Preprocessor for feature transformation
preprocessor_train_step = TrainingStep(
Expand All @@ -100,13 +100,13 @@ def build_workflow_definition(self):
preprocessor_model = self.preprocessor.create_model()
preprocessor_model_step = ModelStep(
StepId.CreatePreprocessorModel.value,
instance_type=train_instance_type,
instance_type=instance_type,
model=preprocessor_model,
model_name=default_name
)
preprocessor_transform_step = TransformStep(
StepId.TransformInput.value,
transformer=self.preprocessor.transformer(instance_count=train_instance_count, instance_type=train_instance_type, max_payload=20),
transformer=self.preprocessor.transformer(instance_count=instance_count, instance_type=instance_type, max_payload=20),
job_name=default_name,
model_name=default_name,
data=self.inputs['train'],
Expand All @@ -115,8 +115,8 @@ def build_workflow_definition(self):
)

# Training
train_instance_type = self.estimator.train_instance_type
train_instance_count = self.estimator.train_instance_count
instance_type = self.estimator.instance_type
instance_count = self.estimator.instance_count

training_step = TrainingStep(
StepId.Train.value,
Expand All @@ -135,21 +135,21 @@ def build_workflow_definition(self):
)
pipeline_model_step = ModelStep(
StepId.CreatePipelineModel.value,
instance_type=train_instance_type,
instance_type=instance_type,
model=preprocessor_model,
model_name=default_name
)
pipeline_model_step.parameters = self.pipeline_model_config(train_instance_type, pipeline_model)
pipeline_model_step.parameters = self.pipeline_model_config(instance_type, pipeline_model)

deployable_model = Model(model_data='', image='')
deployable_model = Model(model_data='', image_uri='')

# Deployment
endpoint_config_step = EndpointConfigStep(
StepId.ConfigureEndpoint.value,
endpoint_config_name=default_name,
model_name=default_name,
initial_instance_count=train_instance_count,
instance_type=train_instance_type
initial_instance_count=instance_count,
instance_type=instance_type
)

deploy_step = EndpointStep(
Expand Down
14 changes: 7 additions & 7 deletions src/stepfunctions/template/pipeline/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ def __init__(self, estimator, role, inputs, s3_bucket, client=None, **kwargs):
inputs: Information about the training data. Please refer to the `fit()` method of the associated estimator, as this can take any of the following forms:
* (str) - The S3 location where training data is saved.
* (dict[str, str] or dict[str, `sagemaker.session.s3_input`]) - If using multiple channels for training data, you can specify a dict mapping channel names to strings or `sagemaker.session.s3_input` objects.
* (`sagemaker.session.s3_input`) - Channel configuration for S3 data sources that can provide additional information about the training dataset. See `sagemaker.session.s3_input` for full details.
* (dict[str, str] or dict[str, `sagemaker.inputs.TrainingInput`]) - If using multiple channels for training data, you can specify a dict mapping channel names to strings or `sagemaker.inputs.TrainingInput` objects.
* (`sagemaker.inputs.TrainingInput`) - Channel configuration for S3 data sources that can provide additional information about the training dataset. See `sagemaker.inputs.TrainingInput` for full details.
* (`sagemaker.amazon.amazon_estimator.RecordSet`) - A collection of Amazon `Record` objects serialized and stored in S3. For use with an estimator for an Amazon algorithm.
* (list[`sagemaker.amazon.amazon_estimator.RecordSet`]) - A list of `sagemaker.amazon.amazon_estimator.RecordSet` objects, where each instance is a different channel of training data.
s3_bucket (str): S3 bucket under which the output artifacts from the training job will be stored. The parent path used is built using the format: ``s3://{s3_bucket}/{pipeline_name}/models/{job_name}/``. In this format, `pipeline_name` refers to the keyword argument provided for TrainingPipeline. If a `pipeline_name` argument was not provided, one is auto-generated by the pipeline as `training-pipeline-<timestamp>`. Also, in the format, `job_name` refers to the job name provided when calling the :meth:`TrainingPipeline.run()` method.
Expand Down Expand Up @@ -79,8 +79,8 @@ def build_workflow_definition(self):
"""
default_name = self.pipeline_name

train_instance_type = self.estimator.train_instance_type
train_instance_count = self.estimator.train_instance_count
instance_type = self.estimator.instance_type
instance_count = self.estimator.instance_count

training_step = TrainingStep(
StepId.Train.value,
Expand All @@ -92,7 +92,7 @@ def build_workflow_definition(self):
model = self.estimator.create_model()
model_step = ModelStep(
StepId.CreateModel.value,
instance_type=train_instance_type,
instance_type=instance_type,
model=model,
model_name=default_name
)
Expand All @@ -101,8 +101,8 @@ def build_workflow_definition(self):
StepId.ConfigureEndpoint.value,
endpoint_config_name=default_name,
model_name=default_name,
initial_instance_count=train_instance_count,
instance_type=train_instance_type
initial_instance_count=instance_count,
instance_type=instance_type
)
deploy_step = EndpointStep(
StepId.Deploy.value,
Expand Down
4 changes: 2 additions & 2 deletions tests/integ/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ def sagemaker_role_arn(aws_account_id):
def pca_estimator_fixture(sagemaker_role_arn):
estimator = pca.PCA(
role=sagemaker_role_arn,
train_instance_count=1,
train_instance_type="ml.m5.large",
instance_count=1,
instance_type="ml.m5.large",
num_components=48
)
return estimator
Expand Down
8 changes: 6 additions & 2 deletions tests/integ/test_inference_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ def sklearn_preprocessor(sagemaker_role_arn, sagemaker_session):
'one_p_mnist',
'sklearn_mnist_preprocessor.py')
sklearn_preprocessor = SKLearn(
framework_version='0.20.0',
py_version='py3',
entry_point=script_path,
role=sagemaker_role_arn,
train_instance_type="ml.m5.large",
instance_type="ml.m5.large",
sagemaker_session=sagemaker_session,
hyperparameters={"epochs": 1},
)
Expand All @@ -58,9 +60,11 @@ def sklearn_estimator(sagemaker_role_arn, sagemaker_session):
'one_p_mnist',
'sklearn_mnist_estimator.py')
sklearn_estimator = SKLearn(
framework_version='0.20.0',
py_version='py3',
entry_point=script_path,
role=sagemaker_role_arn,
train_instance_type="ml.m5.large",
instance_type="ml.m5.large",
sagemaker_session=sagemaker_session,
hyperparameters={"epochs": 1},
input_mode='File'
Expand Down
4 changes: 2 additions & 2 deletions tests/integ/test_sagemaker_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ def test_tuning_step(sfn_client, record_set_for_hyperparameter_tuning, sagemaker

kmeans = KMeans(
role=sagemaker_role_arn,
train_instance_count=1,
train_instance_type=INSTANCE_TYPE,
instance_count=1,
instance_type=INSTANCE_TYPE,
k=10
)

Expand Down
4 changes: 2 additions & 2 deletions tests/integ/test_state_machine_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import json

from sagemaker.utils import unique_name_from_base
from sagemaker.amazon.amazon_estimator import get_image_uri
from sagemaker.image_uris import retrieve
from stepfunctions import steps
from stepfunctions.workflow import Workflow
from tests.integ.utils import state_machine_delete_wait
Expand All @@ -25,7 +25,7 @@
def training_job_parameters(sagemaker_session, sagemaker_role_arn, record_set_fixture):
parameters = {
"AlgorithmSpecification": {
"TrainingImage": get_image_uri(sagemaker_session.boto_session.region_name, 'pca'),
"TrainingImage": retrieve(region=sagemaker_session.boto_session.region_name, framework='pca'),
"TrainingInputMode": "File"
},
"OutputDataConfig": {
Expand Down
10 changes: 6 additions & 4 deletions tests/integ/test_training_pipeline_estimators.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

# import Sagemaker
from sagemaker.amazon.pca import PCA
from sagemaker.amazon.amazon_estimator import get_image_uri
from sagemaker.image_uris import retrieve

# import StepFunctions
from stepfunctions.template.pipeline import TrainingPipeline
Expand All @@ -50,8 +50,8 @@ def pca_estimator(sagemaker_role_arn):
pca_estimator = PCA(
role=sagemaker_role_arn,
num_components=1,
train_instance_count=1,
train_instance_type='ml.m5.large',
instance_count=1,
instance_type='ml.m5.large',
)

pca_estimator.feature_dim=500
Expand Down Expand Up @@ -105,7 +105,7 @@ def test_pca_estimator(sfn_client, sagemaker_session, sagemaker_role_arn, sfn_ro
job_name = workflow_execution_info['name']
s3_manifest_uri = inputs.s3_data
status = 'SUCCEEDED'
estimator_image_uri = get_image_uri(sagemaker_session.boto_region_name, 'pca')
estimator_image_uri = retrieve(region=sagemaker_session.boto_region_name, framework='pca')

execution_info = sfn_client.describe_execution(executionArn=execution_arn)
execution_info['input'] = json.loads(execution_info['input'])
Expand All @@ -115,7 +115,9 @@ def test_pca_estimator(sfn_client, sagemaker_session, sagemaker_role_arn, sfn_ro
s3_output_path = 's3://{bucket_name}/{workflow_name}/models'.format(bucket_name=bucket_name, workflow_name=unique_name)
expected_execution_info = {'executionArn': execution_arn,
'stateMachineArn': state_machine_arn,
'inputDetails': {'included': True},
'name': job_name,
'outputDetails': {'included': True},
'status': status,
'startDate': execution_info['startDate'],
'stopDate': execution_info['stopDate'],
Expand Down
12 changes: 7 additions & 5 deletions tests/integ/test_training_pipeline_framework_estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@
def torch_estimator(sagemaker_role_arn):
script_path = os.path.join(DATA_DIR, "pytorch_mnist", "mnist.py")
return PyTorch(
py_version='py3',
entry_point=script_path,
role=sagemaker_role_arn,
framework_version='1.2.0',
train_instance_count=1,
train_instance_type='ml.m5.large',
instance_count=1,
instance_type='ml.m5.large',
hyperparameters={
'epochs': 6,
'backend': 'gloo'
Expand All @@ -48,11 +49,12 @@ def torch_estimator(sagemaker_role_arn):
def sklearn_estimator(sagemaker_role_arn):
script_path = os.path.join(DATA_DIR, "sklearn_mnist", "mnist.py")
return SKLearn(
framework_version='0.20.0',
py_version='py3',
entry_point=script_path,
role=sagemaker_role_arn,
train_instance_count=1,
train_instance_type='ml.m5.large',
framework_version='0.20.0',
instance_count=1,
instance_type='ml.m5.large',
hyperparameters={
"epochs": 1
}
Expand Down

0 comments on commit dbcf358

Please sign in to comment.