# Workflow Management Service

### Set up a customer in the TPS Interface before using

#### -----------------------

## Global Configs

In [14]:
import json
import re
import boto3
import os
from dotenv import load_dotenv
load_dotenv()

invoke_workflow_lambda_name = os.environ['INVOKE_WORKFLOW_SM_NAME']
invoke_workflow_executions_lambda_name = os.environ['INVOKE_WORKFLOW_EXECUTION_SM_NAME']
create_workflow_schedule_lambda_name = os.environ['CREATE_WORKFLOW_SCHEDULE_NAME']
delete_workflow_schedule_lambda_name = os.environ['DELETE_WORKFLOW_SCHEDULE_NAME']

boto3_session = boto3.Session()
lambda_client = boto3_session.client('lambda')

In [None]:
# Customer ID you want to use for WFM
CUSTOMER_ID = "democustomer"

#### -----------------------

# Workflow Requests

### Create Workflow

In [None]:
WORKFLOW_ID = "test-wfm-1"
#############################################
create_workflow_request = {
    "customerId": CUSTOMER_ID,
    "workflowRequest": {
        "customerId": CUSTOMER_ID,
        "requestType": "createWorkflow",
        "workflowDefinition": {
            "workflowId": WORKFLOW_ID,
            "filteredMetricsDiscriminatorColumn": "filtered",
            "inputParameters": [
                {
                    "columnType": "DIMENSION",
                    "dataType": "STRING",
                    "defaultValue": "None",
                    "description": "The date and time that the report was run",
                    "name": "report_date"
                }
            ],
            "sqlQuery": """
                SELECT
                    CUSTOM_PARAMETER('report_date'),
                    advertiser,
                    campaign,
                    (CASE 
                        WHEN SECONDS_BETWEEN (impression_dt, conversion_event_dt) <= 60 THEN '1 | < 1 MIN'
                        WHEN SECONDS_BETWEEN (impression_dt,conversion_event_dt) <= 600 THEN '2 | 1 - 10 MIN'
                        WHEN SECONDS_BETWEEN (impression_dt, conversion_event_dt) <= 1800 THEN '3 | 10 - 30 MIN'
                        WHEN SECONDS_BETWEEN (impression_dt, conversion_event_dt) <= 3600 THEN '4 | 30 - 60 MIN'
                        WHEN SECONDS_BETWEEN (impression_dt, conversion_event_dt) <= 7200 THEN '5 | 1 - 2 HRS'
                        WHEN SECONDS_BETWEEN (impression_dt, conversion_event_dt) <= 43200 THEN '6 | 2 - 12 HRS'
                        WHEN SECONDS_BETWEEN (impression_dt, conversion_event_dt) <= 86400 THEN '7 | 12 - 24 HRS'
                        WHEN SECONDS_BETWEEN (impression_dt, conversion_event_dt) <= 604800 THEN '8 | 1 - 7 DAYS'
                        ELSE '9 | 7+ DAYS'
                    END) AS time_to_conversion,
                    SUM(purchases) AS purchases,
                    SUM(total_purchases) AS total_brand_purchases
            FROM
                amazon_attributed_events_by_conversion_time
            GROUP BY
                1,
                2,
                3,
                4
            """
        }
    }
}

#############################################
# Execute request
response = lambda_client.invoke(
    FunctionName=invoke_workflow_lambda_name,
    InvocationType='RequestResponse',
    LogType='Tail',
    Payload=json.dumps(create_workflow_request).encode('UTF-8')
)

# Print output
response_code = response.get('ResponseMetadata', {}).get('HTTPStatusCode', 0)
if response_code in range(200, 204):
    payload_json = json.loads(response.get('Payload').read().decode('UTF-8'))
    payload_json = json.loads(payload_json)
    executionArn = payload_json.get('executionArn', '')
    sm_region, sm_acct_number, sm_name, sm_execution_id = re.match(
        "arn:aws:states:([^:]*-[^:]*-[0-9]+):([0-9]{12}):execution:([^:]*):([^:]*)", executionArn).groups()
    execution_URL = f"https://{sm_region}.console.aws.amazon.com/states/home?region={sm_region}#/executions/details/{executionArn}"
    print(f"Execution URL {execution_URL}")

### Update Workflow

In [None]:
WORKFLOW_ID = "test-wfm-1"
#############################################
update_workflow_request = {
    "customerId": CUSTOMER_ID,
    "workflowRequest": {
        "customerId": CUSTOMER_ID,
        "requestType": "updateWorkflow",
        "workflowDefinition": {
            "workflowId": WORKFLOW_ID,
            "filteredMetricsDiscriminatorColumn": "filtered",
            "sqlQuery": """
                SELECT
                    advertiser,
                    campaign,
                    (CASE 
                        WHEN SECONDS_BETWEEN (impression_dt, conversion_event_dt) <= 60 THEN '1 | < 1 MIN'
                        WHEN SECONDS_BETWEEN (impression_dt,conversion_event_dt) <= 600 THEN '2 | 1 - 10 MIN'
                        WHEN SECONDS_BETWEEN (impression_dt, conversion_event_dt) <= 1800 THEN '3 | 10 - 30 MIN'
                        WHEN SECONDS_BETWEEN (impression_dt, conversion_event_dt) <= 3600 THEN '4 | 30 - 60 MIN'
                        WHEN SECONDS_BETWEEN (impression_dt, conversion_event_dt) <= 7200 THEN '5 | 1 - 2 HRS'
                        WHEN SECONDS_BETWEEN (impression_dt, conversion_event_dt) <= 43200 THEN '6 | 2 - 12 HRS'
                        WHEN SECONDS_BETWEEN (impression_dt, conversion_event_dt) <= 86400 THEN '7 | 12 - 24 HRS'
                        WHEN SECONDS_BETWEEN (impression_dt, conversion_event_dt) <= 604800 THEN '8 | 1 - 7 DAYS'
                        ELSE '9 | 7+ DAYS'
                    END) AS time_to_conversion,
                    SUM(purchases) AS purchases,
                    SUM(total_purchases) AS total_brand_purchases
            FROM
                amazon_attributed_events_by_conversion_time
            GROUP BY
                1,
                2,
                3
            """
        }
    }
}

#############################################
# Execute request
response = lambda_client.invoke(
    FunctionName=invoke_workflow_lambda_name,
    InvocationType='RequestResponse',
    LogType='Tail',
    Payload=json.dumps(update_workflow_request).encode('UTF-8'),
)

# Print output
response_code = response.get('ResponseMetadata', {}).get('HTTPStatusCode', 0)
if response_code in range(200, 204):
    payload_json = json.loads(response.get('Payload').read().decode('UTF-8'))
    payload_json = json.loads(payload_json)
    executionArn = payload_json.get('executionArn', '')
    sm_region, sm_acct_number, sm_name, sm_execution_id = re.match(
        "arn:aws:states:([^:]*-[^:]*-[0-9]+):([0-9]{12}):execution:([^:]*):([^:]*)", executionArn).groups()
    execution_URL = f"https://{sm_region}.console.aws.amazon.com/states/home?region={sm_region}#/executions/details/{executionArn}"
    print(f"Execution URL {execution_URL}")

### Get Workflow

In [None]:
WORKFLOW_ID = "test-wfm-1"
#############################################
get_workflow_request = {
    "customerId": CUSTOMER_ID,
    "workflowRequest": {
        "customerId": CUSTOMER_ID,
        "requestType": "getWorkflow",
        "workflowId": WORKFLOW_ID
    }
}

#############################################
# Execute request
response = lambda_client.invoke(
    FunctionName=invoke_workflow_lambda_name,
    InvocationType='RequestResponse',
    LogType='Tail',
    Payload=json.dumps(get_workflow_request).encode('UTF-8')
)

# Print output
response_code = response.get('ResponseMetadata', {}).get('HTTPStatusCode', 0)
if response_code in range(200, 204):
    payload_json = json.loads(response.get('Payload').read().decode('UTF-8'))
    payload_json = json.loads(payload_json)
    executionArn = payload_json.get('executionArn', '')
    sm_region, sm_acct_number, sm_name, sm_execution_id = re.match(
        "arn:aws:states:([^:]*-[^:]*-[0-9]+):([0-9]{12}):execution:([^:]*):([^:]*)", executionArn).groups()
    execution_URL = f"https://{sm_region}.console.aws.amazon.com/states/home?region={sm_region}#/executions/details/{executionArn}"
    print(f"Execution URL {execution_URL}")


### Delete Workflow

In [None]:
WORKFLOW_ID = "test-wfm-1"
#############################################
delete_workflow_request = {
    "customerId": CUSTOMER_ID,
    "workflowRequest": {
        "customerId": CUSTOMER_ID,
        "requestType": "deleteWorkflow",
        "workflowId": WORKFLOW_ID
    }
}

#############################################
# Execute request
response = lambda_client.invoke(
    FunctionName=invoke_workflow_lambda_name,
    InvocationType='RequestResponse',
    LogType='Tail',
    Payload=json.dumps(delete_workflow_request).encode('UTF-8')
)

# Print output
response_code = response.get('ResponseMetadata', {}).get('HTTPStatusCode', 0)
if response_code in range(200, 204):
    payload_json = json.loads(response.get('Payload').read().decode('UTF-8'))
    payload_json = json.loads(payload_json)
    executionArn = payload_json.get('executionArn', '')
    sm_region, sm_acct_number, sm_name, sm_execution_id = re.match(
        "arn:aws:states:([^:]*-[^:]*-[0-9]+):([0-9]{12}):execution:([^:]*):([^:]*)", executionArn).groups()
    execution_URL = f"https://{sm_region}.console.aws.amazon.com/states/home?region={sm_region}#/executions/details/{executionArn}"
    print(f"Execution URL {execution_URL}")

#### -----------------------

# Workflow Execution Requests

### Invoke Workflow Execution

In [None]:
WORKFLOW_ID = "test-wfm-1"
#############################################
create_execution_request = {
    "customerId": CUSTOMER_ID,
    "requestType": "createExecution",
    "createExecutionRequest": {
        "timeWindowStart": "FIRSTDAYOFOFFSETMONTH(-2)",
        "timeWindowEnd": "FIRSTDAYOFOFFSETMONTH(-1)",
        "timeWindowType": "EXPLICIT",
        "workflow_executed_date": "now()",
        "timeWindowTimeZone": "America/New_York",
        "workflowId": WORKFLOW_ID,
        "ignoreDataGaps": "True",
        "workflowExecutionTimeoutSeconds": "86400",
        "parameterValues": {
            "report_date": "now()"
        }
    }
}

#############################################
# Execute request
response = lambda_client.invoke(
    FunctionName=invoke_workflow_executions_lambda_name,
    InvocationType='RequestResponse',
    LogType='Tail',
    Payload=json.dumps(create_execution_request).encode('UTF-8')
)

# Print output
response_code = response.get('ResponseMetadata', {}).get('HTTPStatusCode', 0)
if response_code in range(200, 204):
    payload_json = json.loads(response.get('Payload').read().decode('UTF-8'))
    payload_json = json.loads(payload_json)
    executionArn = payload_json.get('executionArn', '')
    sm_region, sm_acct_number, sm_name, sm_execution_id = re.match(
        "arn:aws:states:([^:]*-[^:]*-[0-9]+):([0-9]{12}):execution:([^:]*):([^:]*)", executionArn).groups()
    execution_URL = f"https://{sm_region}.console.aws.amazon.com/states/home?region={sm_region}#/executions/details/{executionArn}"
    print(f"Execution URL {execution_URL}")

### Cancel Workflow Execution

In [None]:
# workflowExecutionId can be found in the Workflow Executions table
cancel_execution_request = {
    "customerId": CUSTOMER_ID,
    "requestType": "cancelExecution",
    "workflowExecutionId": "7d29a430-6a8a-4dbc-9148-04bfcb58c99c"
}

#############################################
# Execute request
response = lambda_client.invoke(
    FunctionName=invoke_workflow_executions_lambda_name,
    InvocationType='RequestResponse',
    LogType='Tail',
    Payload=json.dumps(cancel_execution_request).encode('UTF-8')
)

# Print output
response_code = response.get('ResponseMetadata', {}).get('HTTPStatusCode', 0)
if response_code in range(200, 204):
    payload_json = json.loads(response.get('Payload').read().decode('UTF-8'))
    payload_json = json.loads(payload_json)
    executionArn = payload_json.get('executionArn', '')
    sm_region, sm_acct_number, sm_name, sm_execution_id = re.match(
        "arn:aws:states:([^:]*-[^:]*-[0-9]+):([0-9]{12}):execution:([^:]*):([^:]*)", executionArn).groups()
    execution_URL = f"https://{sm_region}.console.aws.amazon.com/states/home?region={sm_region}#/executions/details/{executionArn}"
    print(f"Execution URL {execution_URL}")


#### -----------------------

# Workflow Execution Schedules

### Create Workflow Execution Schedule
CRON format: https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/ScheduledEvents.html

In [None]:
WORKFLOW_ID = "test-wfm-1"
#############################################
schedule_expression = 'cron(0/15 * * * ? *)'
rule_name = 'testrule1'
rule_description = 'Testing the workflow schedule creation for WFM'

create_execution_schedule_request = {
    "execution_request": {
        "customerId": CUSTOMER_ID,
        "requestType": "createExecution",
        "createExecutionRequest": {
            "timeWindowStart": "FIRSTDAYOFOFFSETMONTH(-2)",
            "timeWindowEnd": "FIRSTDAYOFOFFSETMONTH(-1)",
            "timeWindowType": "EXPLICIT",
            "workflow_executed_date": "now()",
            "timeWindowTimeZone": "America/New_York",
            "workflowId": WORKFLOW_ID,
            "ignoreDataGaps": "True",
            "workflowExecutionTimeoutSeconds": "86400",
            "parameterValues": {
                "report_date": "now()"
            }
        },
    },
    "schedule_expression": schedule_expression,
    "rule_name": rule_name,
    "rule_description": rule_description
}

#############################################
# Execute request
response = lambda_client.invoke(
    FunctionName=create_workflow_schedule_lambda_name,
    InvocationType='RequestResponse',
    LogType='Tail',
    Payload=json.dumps(create_execution_schedule_request).encode('UTF-8')
)

# Print output
response_code = response.get('ResponseMetadata', {}).get('HTTPStatusCode', 0)
if response_code in range(200, 204):
    rule_URL = f"https://{os.environ['REGION']}.console.aws.amazon.com/events/home?region={os.environ['REGION']}#/eventbus/default/rules/{os.environ['RULE_PREFIX']}-wfm-{rule_name}" 
    print(f"Event Rule URL {rule_URL}")

### Delete Workflow Execution Schedule

In [None]:
delete_request = {
    "rule_name": 'testrule1'
}

#############################################
# Execute request
response = lambda_client.invoke(
    FunctionName=delete_workflow_schedule_lambda_name,
    InvocationType='RequestResponse',
    LogType='Tail',
    Payload=json.dumps(create_execution_schedule_request).encode('UTF-8')
)

# Print output
response_code = response.get('ResponseMetadata', {}).get('HTTPStatusCode', 0)
print(response_code)

----
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0