-
Notifications
You must be signed in to change notification settings - Fork 115
/
Copy pathapp.py
48 lines (42 loc) · 1.78 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
import time
import random
import boto3
import os
from botocore.config import Config
config = Config(
retries = {
'max_attempts': 10,
'mode': 'standard'
}
)
try:
#get parameter from system manager
ssm = boto3.client('ssm')
concurrency_limit_threshold = int(ssm.get_parameter(Name='concurrencyLimit', WithDecryption=True)['Parameter']['Value'])
except:
print('Failed to get failure threshold from SSM')
def lambda_handler(event, context):
#list step function executions
region = context.invoked_function_arn.split(":")[3]
account_id = context.invoked_function_arn.split(":")[4]
arn = 'arn:aws:states:'+ str(region) + ':' + str(account_id) + ':stateMachine:CC-WorkStateMachine'
print('stepfunction arn:' + str(arn))
stepfunctions = boto3.client('stepfunctions', config=config)
records = event['Records']
for record in records:
#wait a random amount of time before invoking step function
time.sleep(random.randint(1,10)*0.1)
executions = stepfunctions.list_executions(stateMachineArn=arn, statusFilter='RUNNING')
#get number of executions
execution_count = len(executions['executions'])
print('current execution count:' + str(execution_count))
print('concurrency limit threshold:' + str(concurrency_limit_threshold))
# Throw and exception if the random number is larger than the specified threshold
if execution_count >= concurrency_limit_threshold:
raise Exception('Concurrent workflow reaching limit!')
else:
#invoke step function
print('Processing ' + str(record["body"]))
stepfunctions.start_execution(stateMachineArn=arn)