Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/apache-airflow-providers-amazon/operators/sagemaker.rst
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ Start an Amazon SageMaker pipeline execution
To trigger an execution run for an already-defined Amazon Sagemaker pipeline, you can use
:class:`~airflow.providers.amazon.aws.operators.sagemaker.SageMakerStartPipelineOperator`.

.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_sagemaker.py
.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_sagemaker_pipeline.py
:language: python
:dedent: 4
:start-after: [START howto_operator_sagemaker_start_pipeline]
Expand All @@ -168,7 +168,7 @@ Stop an Amazon SageMaker pipeline execution
To stop an Amazon Sagemaker pipeline execution that is currently running, you can use
:class:`~airflow.providers.amazon.aws.operators.sagemaker.SageMakerStopPipelineOperator`.

.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_sagemaker.py
.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_sagemaker_pipeline.py
:language: python
:dedent: 4
:start-after: [START howto_operator_sagemaker_stop_pipeline]
Expand Down Expand Up @@ -289,7 +289,7 @@ Wait on an Amazon SageMaker pipeline execution state
To check the state of an Amazon Sagemaker pipeline execution until it reaches a terminal state
you can use :class:`~airflow.providers.amazon.aws.sensors.sagemaker.SageMakerPipelineSensor`.

.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_sagemaker.py
.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_sagemaker_pipeline.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_sagemaker_pipeline]
Expand Down
53 changes: 0 additions & 53 deletions tests/system/providers/amazon/aws/example_sagemaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,12 @@
SageMakerModelOperator,
SageMakerProcessingOperator,
SageMakerRegisterModelVersionOperator,
SageMakerStartPipelineOperator,
SageMakerStopPipelineOperator,
SageMakerTrainingOperator,
SageMakerTransformOperator,
SageMakerTuningOperator,
)
from airflow.providers.amazon.aws.sensors.sagemaker import (
SageMakerAutoMLSensor,
SageMakerPipelineSensor,
SageMakerTrainingSensor,
SageMakerTransformSensor,
SageMakerTuningSensor,
Expand Down Expand Up @@ -199,7 +196,6 @@ def set_up(env_id, role_arn):
transform_job_name = f"{env_id}-transform"
tuning_job_name = f"{env_id}-tune"
model_package_group_name = f"{env_id}-group"
pipeline_name = f"{env_id}-pipe"
auto_ml_job_name = f"{env_id}-automl"
experiment_name = f"{env_id}-experiment"

Expand All @@ -221,16 +217,6 @@ def set_up(env_id, role_arn):
f"the directions at the top of the system testfile "
)

# Json definition for a dummy pipeline of 30 chained "conditional step" checking that 3 < 6
# Each step takes roughly 1 second to execute, so the pipeline runtimes is ~30 seconds, which should be
# enough to test stopping and awaiting without race conditions.
# Built using sagemaker sdk, and using json.loads(pipeline.definition())
pipeline_json_definition = """{"Version": "2020-12-01", "Metadata": {}, "Parameters": [], "PipelineExperimentConfig": {"ExperimentName": {"Get": "Execution.PipelineName"}, "TrialName": {"Get": "Execution.PipelineExecutionId"}}, "Steps": [{"Name": "DummyCond29", "Type": "Condition", "Arguments": {"Conditions": [{"Type": "LessThanOrEqualTo", "LeftValue": 3.0, "RightValue": 6.0}], "IfSteps": [{"Name": "DummyCond28", "Type": "Condition", "Arguments": {"Conditions": [{"Type": "LessThanOrEqualTo", "LeftValue": 3.0, "RightValue": 6.0}], "IfSteps": [{"Name": "DummyCond27", "Type": "Condition", "Arguments": {"Conditions": [{"Type": "LessThanOrEqualTo", "LeftValue": 3.0, "RightValue": 6.0}], "IfSteps": [{"Name": "DummyCond26", "Type": "Condition", "Arguments": {"Conditions": [{"Type": "LessThanOrEqualTo", "LeftValue": 3.0, "RightValue": 6.0}], "IfSteps": [{"Name": "DummyCond25", "Type": "Condition", "Arguments": {"Conditions": [{"Type": "LessThanOrEqualTo", "LeftValue": 3.0, "RightValue": 6.0}], "IfSteps": [{"Name": "DummyCond24", "Type": "Condition", "Arguments": {"Conditions": [{"Type": "LessThanOrEqualTo", "LeftValue": 3.0, "RightValue": 6.0}], "IfSteps": [{"Name": "DummyCond23", "Type": "Condition", "Arguments": {"Conditions": [{"Type": "LessThanOrEqualTo", "LeftValue": 3.0, "RightValue": 6.0}], "IfSteps": [{"Name": "DummyCond22", "Type": "Condition", "Arguments": {"Conditions": [{"Type": "LessThanOrEqualTo", "LeftValue": 3.0, "RightValue": 6.0}], "IfSteps": [{"Name": "DummyCond21", "Type": "Condition", "Arguments": {"Conditions": [{"Type": "LessThanOrEqualTo", "LeftValue": 3.0, "RightValue": 6.0}], "IfSteps": [{"Name": "DummyCond20", "Type": "Condition", "Arguments": {"Conditions": [{"Type": "LessThanOrEqualTo", "LeftValue": 3.0, "RightValue": 6.0}], "IfSteps": [{"Name": "DummyCond19", "Type": "Condition", "Arguments": {"Conditions": [{"Type": "LessThanOrEqualTo", "LeftValue": 3.0, "RightValue": 6.0}], "IfSteps": [{"Name": "DummyCond18", "Type": "Condition", "Arguments": {"Conditions": [{"Type": "LessThanOrEqualTo", "LeftValue": 3.0, "RightValue": 6.0}], "IfSteps": [{"Name": "DummyCond17", "Type": "Condition", "Arguments": {"Conditions": [{"Type": "LessThanOrEqualTo", "LeftValue": 3.0, "RightValue": 6.0}], "IfSteps": [{"Name": "DummyCond16", "Type": "Condition", "Arguments": {"Conditions": [{"Type": "LessThanOrEqualTo", "LeftValue": 3.0, "RightValue": 6.0}], "IfSteps": [{"Name": "DummyCond15", "Type": "Condition", "Arguments": {"Conditions": [{"Type": "LessThanOrEqualTo", "LeftValue": 3.0, "RightValue": 6.0}], "IfSteps": [{"Name": "DummyCond14", "Type": "Condition", "Arguments": {"Conditions": [{"Type": "LessThanOrEqualTo", "LeftValue": 3.0, "RightValue": 6.0}], "IfSteps": [{"Name": "DummyCond13", "Type": "Condition", "Arguments": {"Conditions": [{"Type": "LessThanOrEqualTo", "LeftValue": 3.0, "RightValue": 6.0}], "IfSteps": [{"Name": "DummyCond12", "Type": "Condition", "Arguments": {"Conditions": [{"Type": "LessThanOrEqualTo", "LeftValue": 3.0, "RightValue": 6.0}], "IfSteps": [{"Name": "DummyCond11", "Type": "Condition", "Arguments": {"Conditions": [{"Type": "LessThanOrEqualTo", "LeftValue": 3.0, "RightValue": 6.0}], "IfSteps": [{"Name": "DummyCond10", "Type": "Condition", "Arguments": {"Conditions": [{"Type": "LessThanOrEqualTo", "LeftValue": 3.0, "RightValue": 6.0}], "IfSteps": [{"Name": "DummyCond9", "Type": "Condition", "Arguments": {"Conditions": [{"Type": "LessThanOrEqualTo", "LeftValue": 3.0, "RightValue": 6.0}], "IfSteps": [{"Name": "DummyCond8", "Type": "Condition", "Arguments": {"Conditions": [{"Type": "LessThanOrEqualTo", "LeftValue": 3.0, "RightValue": 6.0}], "IfSteps": [{"Name": "DummyCond7", "Type": "Condition", "Arguments": {"Conditions": [{"Type": "LessThanOrEqualTo", "LeftValue": 3.0, "RightValue": 6.0}], "IfSteps": [{"Name": "DummyCond6", "Type": "Condition", "Arguments": {"Conditions": [{"Type": "LessThanOrEqualTo", "LeftValue": 3.0, "RightValue": 6.0}], "IfSteps": [{"Name": "DummyCond5", "Type": "Condition", "Arguments": {"Conditions": [{"Type": "LessThanOrEqualTo", "LeftValue": 3.0, "RightValue": 6.0}], "IfSteps": [{"Name": "DummyCond4", "Type": "Condition", "Arguments": {"Conditions": [{"Type": "LessThanOrEqualTo", "LeftValue": 3.0, "RightValue": 6.0}], "IfSteps": [{"Name": "DummyCond3", "Type": "Condition", "Arguments": {"Conditions": [{"Type": "LessThanOrEqualTo", "LeftValue": 3.0, "RightValue": 6.0}], "IfSteps": [{"Name": "DummyCond2", "Type": "Condition", "Arguments": {"Conditions": [{"Type": "LessThanOrEqualTo", "LeftValue": 3.0, "RightValue": 6.0}], "IfSteps": [{"Name": "DummyCond1", "Type": "Condition", "Arguments": {"Conditions": [{"Type": "LessThanOrEqualTo", "LeftValue": 3.0, "RightValue": 6.0}], "IfSteps": [{"Name": "DummyCond0", "Type": "Condition", "Arguments": {"Conditions": [{"Type": "LessThanOrEqualTo", "LeftValue": 3.0, "RightValue": 6.0}], "IfSteps": [{"Name": "DummyCond", "Type": "Condition", "Arguments": {"Conditions": [{"Type": "LessThanOrEqualTo", "LeftValue": 3.0, "RightValue": 6.0}], "IfSteps": [], "ElseSteps": []}}], "ElseSteps": []}}], "ElseSteps": []}}], "ElseSteps": []}}], "ElseSteps": []}}], "ElseSteps": []}}], "ElseSteps": []}}], "ElseSteps": []}}], "ElseSteps": []}}], "ElseSteps": []}}], "ElseSteps": []}}], "ElseSteps": []}}], "ElseSteps": []}}], "ElseSteps": []}}], "ElseSteps": []}}], "ElseSteps": []}}], "ElseSteps": []}}], "ElseSteps": []}}], "ElseSteps": []}}], "ElseSteps": []}}], "ElseSteps": []}}], "ElseSteps": []}}], "ElseSteps": []}}], "ElseSteps": []}}], "ElseSteps": []}}], "ElseSteps": []}}], "ElseSteps": []}}], "ElseSteps": []}}], "ElseSteps": []}}], "ElseSteps": []}}], "ElseSteps": []}}]}""" # noqa: E501
sgmk_client = boto3.client("sagemaker")
sgmk_client.create_pipeline(
PipelineName=pipeline_name, PipelineDefinition=pipeline_json_definition, RoleArn=role_arn
)

resource_config = {
"InstanceCount": 1,
"InstanceType": "ml.m5.large",
Expand Down Expand Up @@ -410,7 +396,6 @@ def set_up(env_id, role_arn):
ti.xcom_push(key="training_config", value=training_config)
ti.xcom_push(key="training_job_name", value=training_job_name)
ti.xcom_push(key="model_package_group_name", value=model_package_group_name)
ti.xcom_push(key="pipeline_name", value=pipeline_name)
ti.xcom_push(key="auto_ml_job_name", value=auto_ml_job_name)
ti.xcom_push(key="experiment_name", value=experiment_name)
ti.xcom_push(key="model_config", value=model_config)
Expand Down Expand Up @@ -444,12 +429,6 @@ def delete_model_group(group_name, model_version_arn):
sgmk_client.delete_model_package_group(ModelPackageGroupName=group_name)


@task(trigger_rule=TriggerRule.ALL_DONE)
def delete_pipeline(pipeline_name):
sgmk_client = boto3.client("sagemaker")
sgmk_client.delete_pipeline(PipelineName=pipeline_name)


@task(trigger_rule=TriggerRule.ALL_DONE)
def delete_experiment(name):
sgmk_client = boto3.client("sagemaker")
Expand Down Expand Up @@ -528,33 +507,6 @@ def delete_docker_image(image_name):
# [END howto_sensor_sagemaker_auto_ml]
await_automl.poke_interval = 10

# [START howto_operator_sagemaker_start_pipeline]
start_pipeline1 = SageMakerStartPipelineOperator(
task_id="start_pipeline1",
pipeline_name=test_setup["pipeline_name"],
)
# [END howto_operator_sagemaker_start_pipeline]

# [START howto_operator_sagemaker_stop_pipeline]
stop_pipeline1 = SageMakerStopPipelineOperator(
task_id="stop_pipeline1",
pipeline_exec_arn=start_pipeline1.output,
)
# [END howto_operator_sagemaker_stop_pipeline]

start_pipeline2 = SageMakerStartPipelineOperator(
task_id="start_pipeline2",
pipeline_name=test_setup["pipeline_name"],
)

# [START howto_sensor_sagemaker_pipeline]
await_pipeline2 = SageMakerPipelineSensor(
task_id="await_pipeline2",
pipeline_exec_arn=start_pipeline2.output,
)
# [END howto_sensor_sagemaker_pipeline]
await_pipeline2.poke_interval = 10

# [START howto_operator_sagemaker_experiment]
create_experiment = SageMakerCreateExperimentOperator(
task_id="create_experiment", name=test_setup["experiment_name"]
Expand Down Expand Up @@ -668,10 +620,6 @@ def delete_docker_image(image_name):
# TEST BODY
automl,
await_automl,
start_pipeline1,
start_pipeline2,
stop_pipeline1,
await_pipeline2,
create_experiment,
preprocess_raw_data,
train_model,
Expand All @@ -688,7 +636,6 @@ def delete_docker_image(image_name):
delete_model,
delete_bucket,
delete_experiment(test_setup["experiment_name"]),
delete_pipeline(test_setup["pipeline_name"]),
delete_docker_image(test_setup["docker_image"]),
log_cleanup,
)
Expand Down
Loading