# Magic Task

## What is Magic Task

The nature of the Amazon State Machine Definition is just a JSON DSL (Domain specific language). It uses special syntax like ``InputPath``, ``Parameters``, ``ResultSelector``, ``ResultPath``, ``OutputPath`` and ``ChoiceRule`` to provide basic capability to allow you to manipulate input / output data, make conditional choice. The research on user community shows that "Input/Output data handling" and "Conditional Choice" are difficult to learn and also not flexible to use.

**Magic Task** is a feature in ``aws-stepfunction`` library that allows developer to implement "Input/Output data handling" and "Conditional Choice" in pure python function, and automatically creates the backend lambda function and hook up your tasks. With **Magic Task**, you no longer need to write ``JSON notation`` and ``ChoiceRule`` at all, instead, you just write your python code.

**Example**

This is an example from [official doc](https://docs.aws.amazon.com/step-functions/latest/dg/input-output-inputpath-params.html):

State Input:

```python
{
    "comment": "Example for InputPath.",
    "dataset1": {
        "val1": 1,
        "val2": 2,
        "val3": 3
    },
    "dataset2": {
        "val1": "a",
        "val2": "b",
        "val3": "c"
    }
}
```

``InputPath``:

```python
{
    "InputPath": "$.dataset2"
}
```

With the previous InputPath, the following is the JSON that is passed as the input.

```python
{
    "val1": "a",
    "val2": "b",
    "val3": "c"
}
```

**With Magic Task**

You just need to write:

```python
def lambda_handler(event, context):
    return event["dataset2"]
```

Of course, you can add data schema definition to improve readability:

```python
import dataclasses


@dataclasses.dataclass
class InputData:
    comment: str
    dataset1: dict
    dataset2: dict


@dataclasses.dataclass
class OutputData:
    val1: str
    val2: str
    val3: str


def lambda_handler(event, context):
    input_data = InputData(**event)
    output_data = OutputData(**input_data.dataset2)
    return dataclasses.asdict(output_data)
```


Reference:

- [Input Output Filtering](https://docs.aws.amazon.com/step-functions/latest/dg/concepts-input-output-filtering.html)
- [Choice](https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-choice-state.html)

## Define Workflow, State and StateMachine

First, let's import required libraries

In [1]:
import os
import json

from pathlib_mate import Path
import aws_stepfunction as sfn
from aws_stepfunction.magic import LambdaTask

from boto_session_manager import BotoSesManager
from rich import print as rprint

dir_here = Path(os.getcwd()).absolute()

bsm = BotoSesManager(
    profile_name="aws_data_lab_sanhe_us_east_1",
    region_name="us-east-1",
)

## Define Magic Tasks

In [2]:
workflow = sfn.Workflow()

task1_get_order_detail = LambdaTask(
    id="Task1-Get-Order-Detail",
    lbd_func_name="aws_stepfunction_magic_task_demo-task1_get_order_detail",
    lbd_package="s1_get_order_detail.py",
    lbd_handler="s1_get_order_detail.lambda_handler",
    lbd_aws_account_id=bsm.aws_account_id,
    lbd_aws_region=bsm.aws_region,
)

task2a_1_extract_items = LambdaTask(
    id="Task2a-1-Extract-Items",
    lbd_func_name="aws_stepfunction_magic_task_demo-task2a_1_extract_items",
    lbd_package="s2a_1_extract_items.py",
    lbd_handler="s2a_1_extract_items.lambda_handler",
    lbd_aws_account_id=bsm.aws_account_id,
    lbd_aws_region=bsm.aws_region,
)

task2a_2_get_item_cost = LambdaTask(
    id="Task2a-2-Get-Item-Cost",
    lbd_func_name="aws_stepfunction_magic_task_demo-task2a_2_get_item_cost",
    lbd_package="s2a_2_get_item_cost.py",
    lbd_handler="s2a_2_get_item_cost.lambda_handler",
    lbd_aws_account_id=bsm.aws_account_id,
    lbd_aws_region=bsm.aws_region,
)

task2b_1_extract_ship_address = LambdaTask(
    id="Task2b-1-Extract-Ship-Address",
    lbd_func_name="aws_stepfunction_magic_task_demo-task2b_1_extract_ship_address",
    lbd_package="s2b_1_extract_ship_address.py",
    lbd_handler="s2b_1_extract_ship_address.lambda_handler",
    lbd_aws_account_id=bsm.aws_account_id,
    lbd_aws_region=bsm.aws_region,
)

task2b_2_get_ship_cost = LambdaTask(
    id="Task2b-2-Get-Ship-Cost",
    lbd_func_name="aws_stepfunction_magic_task_demo-task2b_2_get_ship_cost",
    lbd_package="s2b_2_get_ship_cost.py",
    lbd_handler="s2b_2_get_ship_cost.lambda_handler",
    lbd_aws_account_id=bsm.aws_account_id,
    lbd_aws_region=bsm.aws_region,
)

task3_find_balance = LambdaTask(
    id="Task3-Find-Balance",
    lbd_func_name="aws_stepfunction_magic_task_demo-task3_find_balance",
    lbd_package="s3_find_balance.py",
    lbd_handler="s3_find_balance.lambda_handler",
    lbd_aws_account_id=bsm.aws_account_id,
    lbd_aws_region=bsm.aws_region,
)

task4_process_payment = LambdaTask(
    id="Task4-Process-Payment",
    lbd_func_name="aws_stepfunction_magic_task_demo-task4_process_payment",
    lbd_package="s4_process_payment.py",
    lbd_handler="s4_process_payment.lambda_handler",
    lbd_aws_account_id=bsm.aws_account_id,
    lbd_aws_region=bsm.aws_region,
)

(
    workflow.start_from(task1_get_order_detail)
    .parallel([
        (
            workflow.subflow_from(task2a_1_extract_items)
            .next_then(task2a_2_get_item_cost)
            .end()
        ),
        (
            workflow.subflow_from(task2b_1_extract_ship_address)
            .next_then(task2b_2_get_ship_cost)
            .end()
        ),
    ])
    .next_then(task3_find_balance)
    .next_then(task4_process_payment)
    .end()
)

print("preview workflow definition")
rprint(workflow.serialize())

sfn_name = "aws_stepfunction_magic_task_demo"

state_machine = sfn.StateMachine(
    name=sfn_name,
    workflow=workflow,
    role_arn="arn:aws:iam::669508176277:role/sanhe-for-everything-admin",
)
state_machine.set_type_as_express()

deploy_result = state_machine.deploy(bsm, verbose=True)

preview workflow definition


detect whether the magic task is used ...
    yes
identify necessary S3 bucket and IAM role ...
    we need a default IAM role for lambda function
    done
deploy Lambda Functions ...
    upload lambda deployment artifacts ...
        upload from /Users/sanhehu/tmp/15598fc58f34fc1f96961b2ac2bfa8d4.zip to s3://669508176277-us-east-1-aws-stepfunction-python-sdk/aws-stepfunction-python-sdk/fb09bdfd614b3527c7b3f83590ccfbaa.zip
        declare Lambda Function aws_stepfunction_magic_task_demo-task1_get_order_detail
        upload from /Users/sanhehu/tmp/e80c5b01037301fc9fc7505d74c5ff91.zip to s3://669508176277-us-east-1-aws-stepfunction-python-sdk/aws-stepfunction-python-sdk/fbd03d389fb07478101080e0442a0269.zip
        declare Lambda Function aws_stepfunction_magic_task_demo-task2a_1_extract_items
        upload from /Users/sanhehu/tmp/fa760dbc90fabef2e87d6513173ce8d9.zip to s3://669508176277-us-east-1-aws-stepfunction-python-sdk/aws-stepfunction-python-sdk/5cc18d736eec63de8df2191b29a25981.z

In [7]:
execute_result = state_machine.execute(
    bsm,
    payload={"order_id": "order-1"},
    sync=True,
)

input = json.loads(execute_result["input"])
output = json.loads(execute_result["output"])
print(f"\ninput:\n")
print(json.dumps(input))
print(f"\noutput:\n")
print(json.dumps(output))

execute state machine 'arn:aws:states:us-east-1:669508176277:stateMachine:aws_stepfunction_magic_task_demo'
  preview at: https://us-east-1.console.aws.amazon.com/states/home?region=us-east-1#/express-executions/details/arn:aws:states:us-east-1:669508176277:express:aws_stepfunction_magic_task_demo:2d727198-828f-4b0b-a147-d99bc1ab231d:94f8d2e5-f51f-498d-99f0-2b2090384894?startDate=1668013293470



input:

{"order_id": "order-1"}

output:

{"status": "success"}


In [31]:
rprint(execute_result)

In [12]:
rprint(result)