## 1. 安装和加载使用Step Functions Data Science SDK必需的模块

In [None]:
import sys
!{sys.executable} -m pip install --upgrade stepfunctions

## 2. 在console创建一个Role附加到Step Functions

In [20]:
import stepfunctions
import logging

from stepfunctions import steps
from stepfunctions.steps import *
from stepfunctions.workflow import Workflow

stepfunctions.set_stream_logger(level=logging.INFO)

workflow_execution_role = "arn:aws-cn:iam::685095924131:role/StepFunctionsWorkflowExecutionRole"  # 从IAM找到StepFunctionsWorkflowExecutionRole ARN并粘贴 

### 2.1 指定Execute Crawler的Lambda名字

In [21]:
crawler_step = steps.LambdaStep(
    state_id='Dataset Crawler',
    parameters={
        "FunctionName": "clawler", #replace with the name of your function
        "Payload": {
        "input": "'StatusCode': 200"
            }
                }
)

### 2.2 指定Check Crawler Status的Lambda名字

In [22]:
crawler_status = steps.LambdaStep(
    state_id='Crawler status',
    parameters={
        "FunctionName": "crawler_status", #replace with the name of your function
        "Payload": {
        "input": "glue-demo-crawler-0222"  #replace with the name of your crawler name
            }
                }
)

crawler_status.add_retry(
    Retry(error_equals=["States.TaskFailed"], interval_seconds=30, max_attempts=10, backoff_rate=4.0) #replace with the interval and attempts of your crawler
)

crawler_status.add_catch(
    Catch(error_equals=["States.TaskFailed"], next_step=Fail("LambdaTaskFailed"))
)

### 2.3 指定Execute Glue Job的名字

In [23]:
etl_step = steps.GlueStartJobRunStep(
    state_id='Extract, Transform, Load',
    parameters={"JobName":'glue-demo-job-0223', #replace with the name of your gluejob
               }
)

In [24]:
from stepfunctions.steps.fields import Field

In [25]:
workflow_definition = steps.Chain([
    crawler_step,
    crawler_status,
    etl_step
])

## 3. 运行上述workflow，指定workflow name

In [140]:
workflow = Workflow(
    name='My-ETL-workflow01',
    definition=workflow_definition,
    role=workflow_execution_role,
)

In [141]:
workflow.render_graph()

In [142]:
workflow.create()

[32m[INFO] Workflow created successfully on AWS Step Functions.[0m


'arn:aws-cn:states:cn-north-1:685095924131:stateMachine:My-ETL-workflow01'

In [143]:
execution = workflow.execute()

[32m[INFO] Workflow execution started successfully on AWS Step Functions.[0m


In [None]:
execution.render_progress()

In [147]:
workflow.list_executions(html=True)

Name,Status,Started,End Time
f9dffe5e-b6f7-4758-a9d4-cf814396a0ba,SUCCEEDED,"Jun 04, 2021 03:54:17.208 PM","Jun 04, 2021 03:57:51.225 PM"
