From 23201d9a6333b581dfd5c0ed8cd339ff3bceda57 Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Wed, 31 Mar 2021 16:51:33 -0700 Subject: [PATCH 1/4] fix RegisterModel step with model repacking request dict conversion error + add unite and integ tests for it --- src/sagemaker/workflow/step_collections.py | 5 + tests/integ/test_workflow.py | 100 ++++++++++++++++ .../workflow/test_step_collections.py | 110 ++++++++++++++++++ 3 files changed, 215 insertions(+) diff --git a/src/sagemaker/workflow/step_collections.py b/src/sagemaker/workflow/step_collections.py index d599e874aa..6df5a5524c 100644 --- a/src/sagemaker/workflow/step_collections.py +++ b/src/sagemaker/workflow/step_collections.py @@ -109,6 +109,11 @@ def __init__( steps.append(repack_model_step) model_data = repack_model_step.properties.ModelArtifacts.S3ModelArtifacts + # remove kwargs consumed by model repacking step + kwargs.pop("entry_point", None) + kwargs.pop("source_dir", None) + kwargs.pop("dependencies", None) + register_model_step = _RegisterModelStep( name=name, estimator=estimator, diff --git a/tests/integ/test_workflow.py b/tests/integ/test_workflow.py index cc1e25955f..40c1d967bb 100644 --- a/tests/integ/test_workflow.py +++ b/tests/integ/test_workflow.py @@ -776,6 +776,106 @@ def test_conditional_pytorch_training_model_registration( pass +def test_model_registration_with_model_repack( + sagemaker_session, + role, + pipeline_name, + region_name, +): + base_dir = os.path.join(DATA_DIR, "pytorch_mnist") + entry_point = os.path.join(base_dir, "mnist.py") + input_path = sagemaker_session.upload_data( + path=os.path.join(base_dir, "training"), + key_prefix="integ-test-data/pytorch_mnist/training", + ) + inputs = TrainingInput(s3_data=input_path) + + instance_count = ParameterInteger(name="InstanceCount", default_value=1) + instance_type = ParameterString(name="InstanceType", default_value="ml.m5.xlarge") + good_enough_input = ParameterInteger(name="GoodEnoughInput", default_value=1) + + pytorch_estimator = PyTorch( + entry_point=entry_point, + role=role, + framework_version="1.5.0", + py_version="py3", + instance_count=instance_count, + instance_type=instance_type, + sagemaker_session=sagemaker_session, + ) + step_train = TrainingStep( + name="pytorch-train", + estimator=pytorch_estimator, + inputs=inputs, + ) + + step_register = RegisterModel( + name="pytorch-register-model", + estimator=pytorch_estimator, + model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, + content_types=["*"], + response_types=["*"], + inference_instances=["*"], + transform_instances=["*"], + description="test-description", + entry_point=entry_point, + ) + + model = Model( + image_uri=pytorch_estimator.training_image_uri(), + model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, + sagemaker_session=sagemaker_session, + role=role, + ) + model_inputs = CreateModelInput( + instance_type="ml.m5.large", + accelerator_type="ml.eia1.medium", + ) + step_model = CreateModelStep( + name="pytorch-model", + model=model, + inputs=model_inputs, + ) + + step_cond = ConditionStep( + name="cond-good-enough", + conditions=[ConditionGreaterThanOrEqualTo(left=good_enough_input, right=1)], + if_steps=[step_train, step_register], + else_steps=[step_model], + ) + + pipeline = Pipeline( + name=pipeline_name, + parameters=[good_enough_input, instance_count, instance_type], + steps=[step_cond], + sagemaker_session=sagemaker_session, + ) + + try: + response = pipeline.create(role) + create_arn = response["PipelineArn"] + assert re.match( + fr"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}", create_arn + ) + + execution = pipeline.start(parameters={}) + assert re.match( + fr"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}/execution/", + execution.arn, + ) + + execution = pipeline.start(parameters={"GoodEnoughInput": 0}) + assert re.match( + fr"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}/execution/", + execution.arn, + ) + finally: + try: + pipeline.delete() + except Exception: + pass + + def test_training_job_with_debugger_and_profiler( sagemaker_session, pipeline_name, diff --git a/tests/unit/sagemaker/workflow/test_step_collections.py b/tests/unit/sagemaker/workflow/test_step_collections.py index 0e2ff68ce3..664b5e51e3 100644 --- a/tests/unit/sagemaker/workflow/test_step_collections.py +++ b/tests/unit/sagemaker/workflow/test_step_collections.py @@ -14,6 +14,7 @@ from __future__ import absolute_import import pytest +from tests.unit import DATA_DIR import sagemaker @@ -38,6 +39,7 @@ StepCollection, RegisterModel, ) +from sagemaker.workflow.pipeline import Pipeline from tests.unit.sagemaker.workflow.helpers import ordered REGION = "us-west-2" @@ -45,6 +47,9 @@ IMAGE_URI = "fakeimage" ROLE = "DummyRole" MODEL_NAME = "gisele" +MODEL_REPACKING_IMAGE_URI = ( + "246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3" +) class CustomStep(Step): @@ -177,6 +182,111 @@ def test_register_model(estimator, model_metrics): ) +def test_register_model_with_model_repack(estimator, model_metrics): + model_data = f"s3://{BUCKET}/model.tar.gz" + register_model = RegisterModel( + name="RegisterModelStep", + estimator=estimator, + model_data=model_data, + content_types=["content_type"], + response_types=["response_type"], + inference_instances=["inference_instance"], + transform_instances=["transform_instance"], + model_package_group_name="mpg", + model_metrics=model_metrics, + approval_status="Approved", + description="description", + entry_point=f"{DATA_DIR}/dummy_script.py", + ) + + request_dicts = register_model.request_dicts() + assert len(request_dicts) == 2 + print(request_dicts) + for request_dict in request_dicts: + if request_dict["Type"] == "Training": + assert request_dict["Name"] == "RegisterModelStepRepackModel" + arguments = request_dict["Arguments"] + repacker_job_name = arguments["HyperParameters"]["sagemaker_job_name"] + assert ordered(arguments) == ordered( + { + "AlgorithmSpecification": { + "TrainingImage": MODEL_REPACKING_IMAGE_URI, + "TrainingInputMode": "File", + }, + "DebugHookConfig": { + "CollectionConfigurations": [], + "S3OutputPath": f"s3://{BUCKET}/", + }, + "HyperParameters": { + "inference_script": '"dummy_script.py"', + "model_archive": '"model.tar.gz"', + "sagemaker_submit_directory": '"s3://{}/{}/source/sourcedir.tar.gz"'.format( + BUCKET, repacker_job_name.replace('"', "") + ), + "sagemaker_program": '"_repack_model.py"', + "sagemaker_container_log_level": "20", + "sagemaker_job_name": repacker_job_name, + "sagemaker_region": f'"{REGION}"', + }, + "InputDataConfig": [ + { + "ChannelName": "training", + "DataSource": { + "S3DataSource": { + "S3DataDistributionType": "FullyReplicated", + "S3DataType": "S3Prefix", + "S3Uri": f"s3://{BUCKET}", + } + }, + } + ], + "OutputDataConfig": {"S3OutputPath": f"s3://{BUCKET}/"}, + "ResourceConfig": { + "InstanceCount": 1, + "InstanceType": "ml.m5.large", + "VolumeSizeInGB": 30, + }, + "RoleArn": ROLE, + "StoppingCondition": {"MaxRuntimeInSeconds": 86400}, + } + ) + elif request_dict["Type"] == "RegisterModel": + assert request_dict["Name"] == "RegisterModelStep" + arguments = request_dict["Arguments"] + assert len(arguments["InferenceSpecification"]["Containers"]) == 1 + assert ( + arguments["InferenceSpecification"]["Containers"][0]["Image"] + == estimator.training_image_uri() + ) + assert isinstance( + arguments["InferenceSpecification"]["Containers"][0]["ModelDataUrl"], Properties + ) + del arguments["InferenceSpecification"]["Containers"] + assert ordered(arguments) == ordered( + { + "InferenceSpecification": { + "SupportedContentTypes": ["content_type"], + "SupportedRealtimeInferenceInstanceTypes": ["inference_instance"], + "SupportedResponseMIMETypes": ["response_type"], + "SupportedTransformInstanceTypes": ["transform_instance"], + }, + "ModelApprovalStatus": "Approved", + "ModelMetrics": { + "ModelQuality": { + "Statistics": { + "ContentType": "text/csv", + "S3Uri": f"s3://{BUCKET}/metrics.csv", + }, + }, + }, + "ModelPackageDescription": "description", + "ModelPackageGroupName": "mpg", + } + ) + else: + raise Exception("A step exists in the collection of an invalid type.") + + def test_estimator_transformer(estimator): model_data = f"s3://{BUCKET}/model.tar.gz" model_inputs = CreateModelInput( From 97b4d7e59b8d712936b08cd9efe1853df849cfef Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Thu, 1 Apr 2021 01:58:01 -0700 Subject: [PATCH 2/4] support step DependsOn --- src/sagemaker/workflow/_utils.py | 9 +- src/sagemaker/workflow/condition_step.py | 3 +- src/sagemaker/workflow/step_collections.py | 14 +++ src/sagemaker/workflow/steps.py | 34 ++++-- tests/integ/test_workflow.py | 109 +++++++++++++++++- .../sagemaker/workflow/test_condition_step.py | 8 +- .../workflow/test_step_collections.py | 30 ++++- tests/unit/sagemaker/workflow/test_steps.py | 22 +++- tests/unit/sagemaker/workflow/test_utils.py | 2 + 9 files changed, 214 insertions(+), 17 deletions(-) diff --git a/src/sagemaker/workflow/_utils.py b/src/sagemaker/workflow/_utils.py index e9324c482f..358b0a147d 100644 --- a/src/sagemaker/workflow/_utils.py +++ b/src/sagemaker/workflow/_utils.py @@ -59,6 +59,7 @@ def __init__( entry_point: str, source_dir: str = None, dependencies: List = None, + depends_on: List[str] = None, ): """Constructs a TrainingStep, given an `EstimatorBase` instance. @@ -102,7 +103,9 @@ def __init__( inputs = TrainingInput(self._model_prefix) # super! - super(_RepackModelStep, self).__init__(name=name, estimator=repacker, inputs=inputs) + super(_RepackModelStep, self).__init__( + name=name, depends_on=depends_on, estimator=repacker, inputs=inputs + ) def _prepare_for_repacking(self): """Prepares the source for the estimator.""" @@ -221,6 +224,7 @@ def __init__( image_uri=None, compile_model_family=None, description=None, + depends_on: List[str] = None, **kwargs, ): """Constructor of a register model step. @@ -248,9 +252,10 @@ def __init__( compile_model_family (str): Instance family for compiled model, if specified, a compiled model will be used (default: None). description (str): Model Package description (default: None). + depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.TrainingStep` depends on **kwargs: additional arguments to `create_model`. """ - super(_RegisterModelStep, self).__init__(name, StepTypeEnum.REGISTER_MODEL) + super(_RegisterModelStep, self).__init__(name, StepTypeEnum.REGISTER_MODEL, depends_on) self.estimator = estimator self.model_data = model_data self.content_types = content_types diff --git a/src/sagemaker/workflow/condition_step.py b/src/sagemaker/workflow/condition_step.py index 5c4259f98b..13ab405805 100644 --- a/src/sagemaker/workflow/condition_step.py +++ b/src/sagemaker/workflow/condition_step.py @@ -40,6 +40,7 @@ class ConditionStep(Step): def __init__( self, name: str, + depends_on: List[str] = None, conditions: List[Condition] = None, if_steps: List[Union[Step, StepCollection]] = None, else_steps: List[Union[Step, StepCollection]] = None, @@ -60,7 +61,7 @@ def __init__( and `sagemaker.workflow.step_collections.StepCollection` instances that are marked as ready for execution if the list of conditions evaluates to False. """ - super(ConditionStep, self).__init__(name, StepTypeEnum.CONDITION) + super(ConditionStep, self).__init__(name, StepTypeEnum.CONDITION, depends_on) self.conditions = conditions or [] self.if_steps = if_steps or [] self.else_steps = else_steps or [] diff --git a/src/sagemaker/workflow/step_collections.py b/src/sagemaker/workflow/step_collections.py index 6df5a5524c..4ad11b49d3 100644 --- a/src/sagemaker/workflow/step_collections.py +++ b/src/sagemaker/workflow/step_collections.py @@ -60,6 +60,7 @@ def __init__( response_types, inference_instances, transform_instances, + depends_on: List[str] = None, model_package_group_name=None, model_metrics=None, approval_status=None, @@ -80,6 +81,7 @@ def __init__( generate inferences in real-time (default: None). transform_instances (list): A list of the instance types on which a transformation job can be run or on which an endpoint can be deployed (default: None). + depends_on (List[str]): The list of step names the first step in the collection depends on model_package_group_name (str): The Model Package Group name, exclusive to `model_package_name`, using `model_package_group_name` makes the Model Package versioned (default: None). @@ -94,12 +96,15 @@ def __init__( **kwargs: additional arguments to `create_model`. """ steps: List[Step] = [] + repack_model = False if "entry_point" in kwargs: + repack_model = True entry_point = kwargs["entry_point"] source_dir = kwargs.get("source_dir") dependencies = kwargs.get("dependencies") repack_model_step = _RepackModelStep( name=f"{name}RepackModel", + depends_on=depends_on, estimator=estimator, model_data=model_data, entry_point=entry_point, @@ -130,6 +135,9 @@ def __init__( description=description, **kwargs, ) + if not repack_model: + register_model_step.add_depends_on(depends_on) + steps.append(register_model_step) self.steps = steps @@ -160,6 +168,7 @@ def __init__( max_payload=None, tags=None, volume_kms_key=None, + depends_on: List[str] = None, **kwargs, ): """Construct steps required for a Transformer step collection: @@ -196,6 +205,7 @@ def __init__( it will be the format of the batch transform output. env (dict): The Environment variables to be set for use during the transform job (default: None). + depends_on (List[str]): The list of step names the first step in the collection depends on """ steps = [] if "entry_point" in kwargs: @@ -204,6 +214,7 @@ def __init__( dependencies = kwargs.get("dependencies") repack_model_step = _RepackModelStep( name=f"{name}RepackModel", + depends_on=depends_on, estimator=estimator, model_data=model_data, entry_point=entry_point, @@ -232,6 +243,9 @@ def predict_wrapper(endpoint, session): model=model, inputs=model_inputs, ) + if "entry_point" not in kwargs and depends_on: + # if the CreateModelStep is the first step in the collection + model_step.add_depends_on(depends_on) steps.append(model_step) transformer = Transformer( diff --git a/src/sagemaker/workflow/steps.py b/src/sagemaker/workflow/steps.py index aab64f2f4f..d36c161b2b 100644 --- a/src/sagemaker/workflow/steps.py +++ b/src/sagemaker/workflow/steps.py @@ -63,10 +63,12 @@ class Step(Entity): Attributes: name (str): The name of the step. step_type (StepTypeEnum): The type of the step. + depends_on (List[str]): The list of step names the current step depends on """ name: str = attr.ib(factory=str) step_type: StepTypeEnum = attr.ib(factory=StepTypeEnum.factory) + depends_on: List[str] = attr.ib(default=None) @property @abc.abstractmethod @@ -80,11 +82,21 @@ def properties(self): def to_request(self) -> RequestType: """Gets the request structure for workflow service calls.""" - return { + request_dict = { "Name": self.name, "Type": self.step_type.value, "Arguments": self.arguments, } + if self.depends_on: + request_dict["DependsOn"] = self.depends_on + return request_dict + + def add_depends_on(self, step_names: List[str]): + if not step_names: + return + if not self.depends_on: + self.depends_on = [] + self.depends_on.extend(step_names) @property def ref(self) -> Dict[str, str]: @@ -133,6 +145,7 @@ def __init__( estimator: EstimatorBase, inputs: TrainingInput = None, cache_config: CacheConfig = None, + depends_on: List[str] = None, ): """Construct a TrainingStep, given an `EstimatorBase` instance. @@ -144,8 +157,9 @@ def __init__( estimator (EstimatorBase): A `sagemaker.estimator.EstimatorBase` instance. inputs (TrainingInput): A `sagemaker.inputs.TrainingInput` instance. Defaults to `None`. cache_config (CacheConfig): A `sagemaker.workflow.steps.CacheConfig` instance. + depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.TrainingStep` depends on """ - super(TrainingStep, self).__init__(name, StepTypeEnum.TRAINING) + super(TrainingStep, self).__init__(name, StepTypeEnum.TRAINING, depends_on) self.estimator = estimator self.inputs = inputs self._properties = Properties( @@ -188,10 +202,7 @@ class CreateModelStep(Step): """CreateModel step for workflow.""" def __init__( - self, - name: str, - model: Model, - inputs: CreateModelInput, + self, name: str, model: Model, inputs: CreateModelInput, depends_on: List[str] = None ): """Construct a CreateModelStep, given an `sagemaker.model.Model` instance. @@ -203,8 +214,9 @@ def __init__( model (Model): A `sagemaker.model.Model` instance. inputs (CreateModelInput): A `sagemaker.inputs.CreateModelInput` instance. Defaults to `None`. + depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.CreateModelStep` depends on """ - super(CreateModelStep, self).__init__(name, StepTypeEnum.CREATE_MODEL) + super(CreateModelStep, self).__init__(name, StepTypeEnum.CREATE_MODEL, depends_on) self.model = model self.inputs = inputs or CreateModelInput() @@ -247,6 +259,7 @@ def __init__( transformer: Transformer, inputs: TransformInput, cache_config: CacheConfig = None, + depends_on: List[str] = None, ): """Constructs a TransformStep, given an `Transformer` instance. @@ -258,8 +271,9 @@ def __init__( transformer (Transformer): A `sagemaker.transformer.Transformer` instance. inputs (TransformInput): A `sagemaker.inputs.TransformInput` instance. cache_config (CacheConfig): A `sagemaker.workflow.steps.CacheConfig` instance. + depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.TransformStep` depends on """ - super(TransformStep, self).__init__(name, StepTypeEnum.TRANSFORM) + super(TransformStep, self).__init__(name, StepTypeEnum.TRANSFORM, depends_on) self.transformer = transformer self.inputs = inputs self.cache_config = cache_config @@ -320,6 +334,7 @@ def __init__( code: str = None, property_files: List[PropertyFile] = None, cache_config: CacheConfig = None, + depends_on: List[str] = None, ): """Construct a ProcessingStep, given a `Processor` instance. @@ -340,8 +355,9 @@ def __init__( property_files (List[PropertyFile]): A list of property files that workflow looks for and resolves from the configured processing output list. cache_config (CacheConfig): A `sagemaker.workflow.steps.CacheConfig` instance. + depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.ProcessingStep` depends on """ - super(ProcessingStep, self).__init__(name, StepTypeEnum.PROCESSING) + super(ProcessingStep, self).__init__(name, StepTypeEnum.PROCESSING, depends_on) self.processor = processor self.inputs = inputs self.outputs = outputs diff --git a/tests/integ/test_workflow.py b/tests/integ/test_workflow.py index 40c1d967bb..cf02f80b1a 100644 --- a/tests/integ/test_workflow.py +++ b/tests/integ/test_workflow.py @@ -28,7 +28,7 @@ rule_configs, ) from datetime import datetime -from sagemaker.inputs import CreateModelInput, TrainingInput +from sagemaker.inputs import CreateModelInput, TrainingInput, TransformInput from sagemaker.model import Model from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.pytorch.estimator import PyTorch @@ -969,3 +969,110 @@ def test_training_job_with_debugger_and_profiler( pipeline.delete() except Exception: pass + + +def test_two_processing_job_depends_on( + sagemaker_session, + role, + pipeline_name, + region_name, + cpu_instance_type, +): + instance_count = ParameterInteger(name="InstanceCount", default_value=2) + script_path = os.path.join(DATA_DIR, "dummy_script.py") + + pyspark_processor = PySparkProcessor( + base_job_name="sm-spark", + framework_version="2.4", + role=role, + instance_count=instance_count, + instance_type=cpu_instance_type, + max_runtime_in_seconds=1200, + sagemaker_session=sagemaker_session, + ) + + spark_run_args = pyspark_processor.get_run_args( + submit_app=script_path, + arguments=[ + "--s3_input_bucket", + sagemaker_session.default_bucket(), + "--s3_input_key_prefix", + "spark-input", + "--s3_output_bucket", + sagemaker_session.default_bucket(), + "--s3_output_key_prefix", + "spark-output", + ], + ) + + step_pyspark_1 = ProcessingStep( + name="pyspark-process-1", + processor=pyspark_processor, + inputs=spark_run_args.inputs, + outputs=spark_run_args.outputs, + job_arguments=spark_run_args.arguments, + code=spark_run_args.code, + ) + + step_pyspark_2 = ProcessingStep( + name="pyspark-process-2", + depends_on=[step_pyspark_1.name], + processor=pyspark_processor, + inputs=spark_run_args.inputs, + outputs=spark_run_args.outputs, + job_arguments=spark_run_args.arguments, + code=spark_run_args.code, + ) + + pipeline = Pipeline( + name=pipeline_name, + parameters=[instance_count], + steps=[step_pyspark_1, step_pyspark_2], + sagemaker_session=sagemaker_session, + ) + + try: + response = pipeline.create(role) + create_arn = response["PipelineArn"] + assert re.match( + fr"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}", + create_arn, + ) + + pipeline.parameters = [ParameterInteger(name="InstanceCount", default_value=1)] + response = pipeline.update(role) + update_arn = response["PipelineArn"] + assert re.match( + fr"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}", + update_arn, + ) + + execution = pipeline.start(parameters={}) + assert re.match( + fr"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}/execution/", + execution.arn, + ) + + response = execution.describe() + assert response["PipelineArn"] == create_arn + + try: + execution.wait(delay=60) + except WaiterError: + pass + + execution_steps = execution.list_steps() + assert len(execution_steps) == 2 + time_stamp = {} + for execution_step in execution_steps: + name = execution_step["StepName"] + if name == "pyspark-process-1": + time_stamp[name] = execution_step["EndTime"] + else: + time_stamp[name] = execution_step["StartTime"] + assert time_stamp["pyspark-process-1"] < time_stamp["pyspark-process-2"] + finally: + try: + pipeline.delete() + except Exception: + pass diff --git a/tests/unit/sagemaker/workflow/test_condition_step.py b/tests/unit/sagemaker/workflow/test_condition_step.py index a9c8bf6903..9eeeeecb87 100644 --- a/tests/unit/sagemaker/workflow/test_condition_step.py +++ b/tests/unit/sagemaker/workflow/test_condition_step.py @@ -43,11 +43,17 @@ def test_condition_step(): step1 = CustomStep("MyStep1") step2 = CustomStep("MyStep2") cond_step = ConditionStep( - name="MyConditionStep", conditions=[cond], if_steps=[step1], else_steps=[step2] + name="MyConditionStep", + depends_on=["TestStep"], + conditions=[cond], + if_steps=[step1], + else_steps=[step2], ) + cond_step.add_depends_on(["SecondTestStep"]) assert cond_step.to_request() == { "Name": "MyConditionStep", "Type": "Condition", + "DependsOn": ["TestStep", "SecondTestStep"], "Arguments": { "Conditions": [ { diff --git a/tests/unit/sagemaker/workflow/test_step_collections.py b/tests/unit/sagemaker/workflow/test_step_collections.py index 664b5e51e3..b37d4721e8 100644 --- a/tests/unit/sagemaker/workflow/test_step_collections.py +++ b/tests/unit/sagemaker/workflow/test_step_collections.py @@ -13,7 +13,11 @@ # language governing permissions and limitations under the License. from __future__ import absolute_import +import os +import tempfile +import shutil import pytest + from tests.unit import DATA_DIR import sagemaker @@ -127,6 +131,21 @@ def model_metrics(): ) +@pytest.fixture +def source_dir(request): + wf = os.path.join(DATA_DIR, "workflow") + tmp = tempfile.mkdtemp() + shutil.copy2(os.path.join(wf, "inference.py"), os.path.join(tmp, "inference.py")) + shutil.copy2(os.path.join(wf, "foo"), os.path.join(tmp, "foo")) + + def fin(): + shutil.rmtree(tmp) + + request.addfinalizer(fin) + + return tmp + + def test_step_collection(): step_collection = StepCollection(steps=[CustomStep("MyStep1"), CustomStep("MyStep2")]) assert step_collection.request_dicts() == [ @@ -149,12 +168,14 @@ def test_register_model(estimator, model_metrics): model_metrics=model_metrics, approval_status="Approved", description="description", + depends_on=["TestStep"], ) assert ordered(register_model.request_dicts()) == ordered( [ { "Name": "RegisterModelStep", "Type": "RegisterModel", + "DependsOn": ["TestStep"], "Arguments": { "InferenceSpecification": { "Containers": [ @@ -197,14 +218,17 @@ def test_register_model_with_model_repack(estimator, model_metrics): approval_status="Approved", description="description", entry_point=f"{DATA_DIR}/dummy_script.py", + depends_on=["TestStep"], ) request_dicts = register_model.request_dicts() assert len(request_dicts) == 2 - print(request_dicts) + for request_dict in request_dicts: if request_dict["Type"] == "Training": assert request_dict["Name"] == "RegisterModelStepRepackModel" + assert len(request_dict["DependsOn"]) == 1 + assert request_dict["DependsOn"][0] == "TestStep" arguments = request_dict["Arguments"] repacker_job_name = arguments["HyperParameters"]["sagemaker_job_name"] assert ordered(arguments) == ordered( @@ -252,6 +276,7 @@ def test_register_model_with_model_repack(estimator, model_metrics): ) elif request_dict["Type"] == "RegisterModel": assert request_dict["Name"] == "RegisterModelStep" + assert "DependsOn" not in request_dict arguments = request_dict["Arguments"] assert len(arguments["InferenceSpecification"]["Containers"]) == 1 assert ( @@ -302,6 +327,7 @@ def test_estimator_transformer(estimator): instance_count=1, instance_type="ml.c4.4xlarge", transform_inputs=transform_inputs, + depends_on=["TestStep"], ) request_dicts = estimator_transformer.request_dicts() assert len(request_dicts) == 2 @@ -310,6 +336,7 @@ def test_estimator_transformer(estimator): assert request_dict == { "Name": "EstimatorTransformerStepCreateModelStep", "Type": "Model", + "DependsOn": ["TestStep"], "Arguments": { "ExecutionRoleArn": "DummyRole", "PrimaryContainer": { @@ -324,6 +351,7 @@ def test_estimator_transformer(estimator): arguments = request_dict["Arguments"] assert isinstance(arguments["ModelName"], Properties) arguments.pop("ModelName") + assert "DependsOn" not in request_dict assert arguments == { "TransformInput": { "DataSource": { diff --git a/tests/unit/sagemaker/workflow/test_steps.py b/tests/unit/sagemaker/workflow/test_steps.py index 02bb1545e6..894d17160b 100644 --- a/tests/unit/sagemaker/workflow/test_steps.py +++ b/tests/unit/sagemaker/workflow/test_steps.py @@ -124,11 +124,17 @@ def test_training_step(sagemaker_session): inputs = TrainingInput(f"s3://{BUCKET}/train_manifest") cache_config = CacheConfig(enable_caching=True, expire_after="PT1H") step = TrainingStep( - name="MyTrainingStep", estimator=estimator, inputs=inputs, cache_config=cache_config + name="MyTrainingStep", + depends_on=["TestStep"], + estimator=estimator, + inputs=inputs, + cache_config=cache_config, ) + step.add_depends_on(["AnotherTestStep"]) assert step.to_request() == { "Name": "MyTrainingStep", "Type": "Training", + "DependsOn": ["TestStep", "AnotherTestStep"], "Arguments": { "AlgorithmSpecification": {"TrainingImage": IMAGE_URI, "TrainingInputMode": "File"}, "InputDataConfig": [ @@ -178,14 +184,17 @@ def test_processing_step(sagemaker_session): cache_config = CacheConfig(enable_caching=True, expire_after="PT1H") step = ProcessingStep( name="MyProcessingStep", + depends_on=["TestStep", "SecondTestStep"], processor=processor, inputs=inputs, outputs=[], cache_config=cache_config, ) + step.add_depends_on(["ThirdTestStep"]) assert step.to_request() == { "Name": "MyProcessingStep", "Type": "Processing", + "DependsOn": ["TestStep", "SecondTestStep", "ThirdTestStep"], "Arguments": { "AppSpecification": {"ImageUri": "fakeimage"}, "ProcessingInputs": [ @@ -285,13 +294,16 @@ def test_create_model_step(sagemaker_session): ) step = CreateModelStep( name="MyCreateModelStep", + depends_on=["TestStep"], model=model, inputs=inputs, ) + step.add_depends_on(["SecondTestStep"]) assert step.to_request() == { "Name": "MyCreateModelStep", "Type": "Model", + "DependsOn": ["TestStep", "SecondTestStep"], "Arguments": { "ExecutionRoleArn": "DummyRole", "PrimaryContainer": {"Environment": {}, "Image": "fakeimage"}, @@ -310,11 +322,17 @@ def test_transform_step(sagemaker_session): inputs = TransformInput(data=f"s3://{BUCKET}/transform_manifest") cache_config = CacheConfig(enable_caching=True, expire_after="PT1H") step = TransformStep( - name="MyTransformStep", transformer=transformer, inputs=inputs, cache_config=cache_config + name="MyTransformStep", + depends_on=["TestStep"], + transformer=transformer, + inputs=inputs, + cache_config=cache_config, ) + step.add_depends_on(["SecondTestStep"]) assert step.to_request() == { "Name": "MyTransformStep", "Type": "Transform", + "DependsOn": ["TestStep", "SecondTestStep"], "Arguments": { "ModelName": "gisele", "TransformInput": { diff --git a/tests/unit/sagemaker/workflow/test_utils.py b/tests/unit/sagemaker/workflow/test_utils.py index 7df0dfcdfd..8825b32d0f 100644 --- a/tests/unit/sagemaker/workflow/test_utils.py +++ b/tests/unit/sagemaker/workflow/test_utils.py @@ -108,6 +108,7 @@ def test_repack_model_step(estimator): estimator=estimator, model_data=model_data, entry_point=entry_point, + depends_on=["TestStep"], ) request_dict = step.to_request() @@ -121,6 +122,7 @@ def test_repack_model_step(estimator): assert request_dict == { "Name": "MyRepackModelStep", "Type": "Training", + "DependsOn": ["TestStep"], "Arguments": { "AlgorithmSpecification": {"TrainingInputMode": "File"}, "DebugHookConfig": {"CollectionConfigurations": [], "S3OutputPath": "s3://my-bucket/"}, From db68c511ffa0ca2acec45fe585d3e3eb81b01875 Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Thu, 1 Apr 2021 09:41:51 -0700 Subject: [PATCH 3/4] add docstring for workflow.step.add_depends_on --- src/sagemaker/workflow/steps.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sagemaker/workflow/steps.py b/src/sagemaker/workflow/steps.py index d36c161b2b..0aa0eb0eb7 100644 --- a/src/sagemaker/workflow/steps.py +++ b/src/sagemaker/workflow/steps.py @@ -92,6 +92,7 @@ def to_request(self) -> RequestType: return request_dict def add_depends_on(self, step_names: List[str]): + """Add step names to the current step depends on list""" if not step_names: return if not self.depends_on: From d2a8160e09cb1668e0b0081883ceb12e8aa3abf3 Mon Sep 17 00:00:00 2001 From: Jerry Peng Date: Thu, 1 Apr 2021 11:00:21 -0700 Subject: [PATCH 4/4] fix formatting --- src/sagemaker/workflow/_utils.py | 3 ++- src/sagemaker/workflow/step_collections.py | 6 ++++-- src/sagemaker/workflow/steps.py | 12 ++++++++---- tests/integ/test_workflow.py | 2 +- .../unit/sagemaker/workflow/test_step_collections.py | 1 - 5 files changed, 15 insertions(+), 9 deletions(-) diff --git a/src/sagemaker/workflow/_utils.py b/src/sagemaker/workflow/_utils.py index 358b0a147d..02fc94e1e7 100644 --- a/src/sagemaker/workflow/_utils.py +++ b/src/sagemaker/workflow/_utils.py @@ -252,7 +252,8 @@ def __init__( compile_model_family (str): Instance family for compiled model, if specified, a compiled model will be used (default: None). description (str): Model Package description (default: None). - depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.TrainingStep` depends on + depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.TrainingStep` + depends on **kwargs: additional arguments to `create_model`. """ super(_RegisterModelStep, self).__init__(name, StepTypeEnum.REGISTER_MODEL, depends_on) diff --git a/src/sagemaker/workflow/step_collections.py b/src/sagemaker/workflow/step_collections.py index 4ad11b49d3..dd8f32b7fc 100644 --- a/src/sagemaker/workflow/step_collections.py +++ b/src/sagemaker/workflow/step_collections.py @@ -81,7 +81,8 @@ def __init__( generate inferences in real-time (default: None). transform_instances (list): A list of the instance types on which a transformation job can be run or on which an endpoint can be deployed (default: None). - depends_on (List[str]): The list of step names the first step in the collection depends on + depends_on (List[str]): The list of step names the first step in the collection + depends on model_package_group_name (str): The Model Package Group name, exclusive to `model_package_name`, using `model_package_group_name` makes the Model Package versioned (default: None). @@ -205,7 +206,8 @@ def __init__( it will be the format of the batch transform output. env (dict): The Environment variables to be set for use during the transform job (default: None). - depends_on (List[str]): The list of step names the first step in the collection depends on + depends_on (List[str]): The list of step names the first step in + the collection depends on """ steps = [] if "entry_point" in kwargs: diff --git a/src/sagemaker/workflow/steps.py b/src/sagemaker/workflow/steps.py index 0aa0eb0eb7..07098b4a62 100644 --- a/src/sagemaker/workflow/steps.py +++ b/src/sagemaker/workflow/steps.py @@ -158,7 +158,8 @@ def __init__( estimator (EstimatorBase): A `sagemaker.estimator.EstimatorBase` instance. inputs (TrainingInput): A `sagemaker.inputs.TrainingInput` instance. Defaults to `None`. cache_config (CacheConfig): A `sagemaker.workflow.steps.CacheConfig` instance. - depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.TrainingStep` depends on + depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.TrainingStep` + depends on """ super(TrainingStep, self).__init__(name, StepTypeEnum.TRAINING, depends_on) self.estimator = estimator @@ -215,7 +216,8 @@ def __init__( model (Model): A `sagemaker.model.Model` instance. inputs (CreateModelInput): A `sagemaker.inputs.CreateModelInput` instance. Defaults to `None`. - depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.CreateModelStep` depends on + depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.CreateModelStep` + depends on """ super(CreateModelStep, self).__init__(name, StepTypeEnum.CREATE_MODEL, depends_on) self.model = model @@ -272,7 +274,8 @@ def __init__( transformer (Transformer): A `sagemaker.transformer.Transformer` instance. inputs (TransformInput): A `sagemaker.inputs.TransformInput` instance. cache_config (CacheConfig): A `sagemaker.workflow.steps.CacheConfig` instance. - depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.TransformStep` depends on + depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.TransformStep` + depends on """ super(TransformStep, self).__init__(name, StepTypeEnum.TRANSFORM, depends_on) self.transformer = transformer @@ -356,7 +359,8 @@ def __init__( property_files (List[PropertyFile]): A list of property files that workflow looks for and resolves from the configured processing output list. cache_config (CacheConfig): A `sagemaker.workflow.steps.CacheConfig` instance. - depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.ProcessingStep` depends on + depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.ProcessingStep` + depends on """ super(ProcessingStep, self).__init__(name, StepTypeEnum.PROCESSING, depends_on) self.processor = processor diff --git a/tests/integ/test_workflow.py b/tests/integ/test_workflow.py index cf02f80b1a..f0f4a92497 100644 --- a/tests/integ/test_workflow.py +++ b/tests/integ/test_workflow.py @@ -28,7 +28,7 @@ rule_configs, ) from datetime import datetime -from sagemaker.inputs import CreateModelInput, TrainingInput, TransformInput +from sagemaker.inputs import CreateModelInput, TrainingInput from sagemaker.model import Model from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.pytorch.estimator import PyTorch diff --git a/tests/unit/sagemaker/workflow/test_step_collections.py b/tests/unit/sagemaker/workflow/test_step_collections.py index b37d4721e8..80d30fe19c 100644 --- a/tests/unit/sagemaker/workflow/test_step_collections.py +++ b/tests/unit/sagemaker/workflow/test_step_collections.py @@ -43,7 +43,6 @@ StepCollection, RegisterModel, ) -from sagemaker.workflow.pipeline import Pipeline from tests.unit.sagemaker.workflow.helpers import ordered REGION = "us-west-2"