Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/stepfunctions/steps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@
from stepfunctions.steps.states import Graph, FrozenGraph
from stepfunctions.steps.sagemaker import TrainingStep, TransformStep, ModelStep, EndpointConfigStep, EndpointStep
from stepfunctions.steps.compute import LambdaStep, BatchSubmitJobStep, GlueStartJobRunStep, EcsRunTaskStep
from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep
from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep
149 changes: 149 additions & 0 deletions src/stepfunctions/steps/service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
from __future__ import absolute_import

from stepfunctions.steps.states import Task
from stepfunctions.steps.fields import Field


class DynamoDBGetItemStep(Task):
"""
Creates a Task state to get an item from DynamoDB. See `Call DynamoDB APIs with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-ddb.html>`_ for more details.
"""

def __init__(self, state_id, **kwargs):
"""
Args:
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
comment (str, optional): Human-readable comment or description. (default: None)
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
parameters (dict, optional): The value of this field becomes the effective input for the state.
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
"""
kwargs[Field.Resource.value] = 'arn:aws:states:::dynamodb:getItem'
super(DynamoDBGetItemStep, self).__init__(state_id, **kwargs)


class DynamoDBPutItemStep(Task):

"""
Creates a Task state to put an item to DynamoDB. See `Call DynamoDB APIs with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-ddb.html>`_ for more details.
"""

def __init__(self, state_id, **kwargs):
"""
Args:
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
comment (str, optional): Human-readable comment or description. (default: None)
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
parameters (dict, optional): The value of this field becomes the effective input for the state.
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
"""
kwargs[Field.Resource.value] = 'arn:aws:states:::dynamodb:putItem'
super(DynamoDBPutItemStep, self).__init__(state_id, **kwargs)


class DynamoDBDeleteItemStep(Task):

"""
Creates a Task state to delete an item from DynamoDB. See `Call DynamoDB APIs with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-ddb.html>`_ for more details.
"""

def __init__(self, state_id, **kwargs):
"""
Args:
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
comment (str, optional): Human-readable comment or description. (default: None)
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
parameters (dict, optional): The value of this field becomes the effective input for the state.
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
"""
kwargs[Field.Resource.value] = 'arn:aws:states:::dynamodb:deleteItem'
super(DynamoDBDeleteItemStep, self).__init__(state_id, **kwargs)


class DynamoDBUpdateItemStep(Task):

"""
Creates a Task state to update an item from DynamoDB. See `Call DynamoDB APIs with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-ddb.html>`_ for more details.
"""

def __init__(self, state_id, **kwargs):
"""
Args:
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
comment (str, optional): Human-readable comment or description. (default: None)
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
parameters (dict, optional): The value of this field becomes the effective input for the state.
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
"""
kwargs[Field.Resource.value] = 'arn:aws:states:::dynamodb:updateItem'
super(DynamoDBUpdateItemStep, self).__init__(state_id, **kwargs)


class SnsPublishStep(Task):

"""
Creates a Task state to publish a message to SNS topic. See `Call Amazon SNS with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-sns.html>`_ for more details.
"""

def __init__(self, state_id, wait_for_callback=False, **kwargs):
"""
Args:
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
wait_for_callback(bool, optional): Boolean value set to `True` if the Task state should wait for callback to resume the operation. (default: False)
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
comment (str, optional): Human-readable comment or description. (default: None)
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
parameters (dict, optional): The value of this field becomes the effective input for the state.
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
"""
if wait_for_callback:
kwargs[Field.Resource.value] = 'arn:aws:states:::sns:publish.waitForTaskToken'
else:
kwargs[Field.Resource.value] = 'arn:aws:states:::sns:publish'

super(SnsPublishStep, self).__init__(state_id, **kwargs)


class SqsSendMessageStep(Task):

"""
Creates a Task state to send a message to SQS queue. See `Call Amazon SQS with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-sqs.html>`_ for more details.
"""

def __init__(self, state_id, wait_for_callback=False, **kwargs):
"""
Args:
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
wait_for_callback(bool, optional): Boolean value set to `True` if the Task state should wait for callback to resume the operation. (default: False)
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
comment (str, optional): Human-readable comment or description. (default: None)
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
parameters (dict, optional): The value of this field becomes the effective input for the state.
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
"""
if wait_for_callback:
kwargs[Field.Resource.value] = 'arn:aws:states:::sqs:sendMessage.waitForTaskToken'
else:
kwargs[Field.Resource.value] = 'arn:aws:states:::sqs:sendMessage'

super(SqsSendMessageStep, self).__init__(state_id, **kwargs)
202 changes: 202 additions & 0 deletions tests/unit/test_service_steps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
from __future__ import absolute_import

import pytest

from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep
from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep


def test_sns_publish_step_creation():
step = SnsPublishStep('Publish to SNS', parameters={
'TopicArn': 'arn:aws:sns:us-east-1:123456789012:myTopic',
'Message': 'message',
})

assert step.to_dict() == {
'Type': 'Task',
'Resource': 'arn:aws:states:::sns:publish',
'Parameters': {
'TopicArn': 'arn:aws:sns:us-east-1:123456789012:myTopic',
'Message': 'message',
},
'End': True
}

step = SnsPublishStep('Publish to SNS', wait_for_callback=True, parameters={
'TopicArn': 'arn:aws:sns:us-east-1:123456789012:myTopic',
'Message': {
'Input.$': '$',
'TaskToken.$': '$$.Task.Token'
}
})

assert step.to_dict() == {
'Type': 'Task',
'Resource': 'arn:aws:states:::sns:publish.waitForTaskToken',
'Parameters': {
'TopicArn': 'arn:aws:sns:us-east-1:123456789012:myTopic',
'Message': {
'Input.$': '$',
'TaskToken.$': '$$.Task.Token'
}
},
'End': True
}


def test_sqs_send_message_step_creation():
step = SqsSendMessageStep('Send to SQS', parameters={
'QueueUrl': 'https://sqs.us-east-1.amazonaws.com/123456789012/myQueue',
'MessageBody': 'Hello'
})

assert step.to_dict() == {
'Type': 'Task',
'Resource': 'arn:aws:states:::sqs:sendMessage',
'Parameters': {
'QueueUrl': 'https://sqs.us-east-1.amazonaws.com/123456789012/myQueue',
'MessageBody': 'Hello'
},
'End': True
}

step = SqsSendMessageStep('Send to SQS', wait_for_callback=True, parameters={
'QueueUrl': 'https://sqs.us-east-1.amazonaws.com/123456789012/myQueue',
'MessageBody': {
'Input.$': '$',
'TaskToken.$': '$$.Task.Token'
}
})

assert step.to_dict() == {
'Type': 'Task',
'Resource': 'arn:aws:states:::sqs:sendMessage.waitForTaskToken',
'Parameters': {
'QueueUrl': 'https://sqs.us-east-1.amazonaws.com/123456789012/myQueue',
'MessageBody': {
'Input.$': '$',
'TaskToken.$': '$$.Task.Token'
}
},
'End': True
}


def test_dynamodb_get_item_step_creation():
step = DynamoDBGetItemStep('Read Message From DynamoDB', parameters={
'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT',
'Key': {
'MessageId': {
'S.$': '$.List[0]'
}
}
})

assert step.to_dict() == {
'Type': 'Task',
'Resource': 'arn:aws:states:::dynamodb:getItem',
'Parameters': {
'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT',
'Key': {
'MessageId': {
'S.$': '$.List[0]'
}
}
},
'End': True
}


def test_dynamodb_put_item_step_creation():
step = DynamoDBPutItemStep('Add Message From DynamoDB', parameters={
'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT',
'Item': {
'MessageId': {
'S': '123456789'
}
}
})

assert step.to_dict() == {
'Type': 'Task',
'Resource': 'arn:aws:states:::dynamodb:putItem',
'Parameters': {
'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT',
'Item': {
'MessageId': {
'S': '123456789'
}
}
},
'End': True
}


def test_dynamodb_delete_item_step_creation():
step = DynamoDBDeleteItemStep('Delete Message From DynamoDB', parameters={
'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT',
'Key': {
'MessageId': {
'S': 'MyMessage'
}
}
})

assert step.to_dict() == {
'Type': 'Task',
'Resource': 'arn:aws:states:::dynamodb:deleteItem',
'Parameters': {
'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT',
'Key': {
'MessageId': {
'S': 'MyMessage'
}
}
},
'End': True
}


def test_dynamodb_update_item_step_creation():
step = DynamoDBUpdateItemStep('Update Message From DynamoDB', parameters={
'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT',
'Key': {
'RecordId': {
'S': 'RecordId'
}
},
'UpdateExpression': 'set Revision = :val1',
'ExpressionAttributeValues': {
':val1': { 'S': '2' }
}
})

assert step.to_dict() == {
'Type': 'Task',
'Resource': 'arn:aws:states:::dynamodb:updateItem',
'Parameters': {
'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT',
'Key': {
'RecordId': {
'S': 'RecordId'
}
},
'UpdateExpression': 'set Revision = :val1',
'ExpressionAttributeValues': {
':val1': { 'S': '2' }
}
},
'End': True
}