Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flytekit] Support attribute access on promises #1825

Merged
merged 22 commits into from
Nov 1, 2023

Conversation

ByronHsu
Copy link
Collaborator

@ByronHsu ByronHsu commented Sep 11, 2023

See flyteorg/flyteidl#439

Tracking Issue

flyteorg/flyte#3864

Testing

In the following workflow:

  • basic_workflow contains trivial examples to access output attributes
  • failed_workflow contains examples that causes exception (e.g. out of range)
  • advanced_workflow contains examples with more complex attribute access
from dataclasses import dataclass
from typing import Dict, List, NamedTuple

from dataclasses_json import dataclass_json

from flytekit import WorkflowFailurePolicy, task, workflow


@dataclass_json
@dataclass
class foo:
    a: str


@task
def t1() -> (List[str], Dict[str, str], foo):
    return ["a", "b"], {"a": "b"}, foo(a="b")


@task
def t2(a: str) -> str:
    print("a", a)
    return a


@task
def t3() -> (Dict[str, List[str]], List[Dict[str, str]], Dict[str, foo]):
    return {"a": ["b"]}, [{"a": "b"}], {"a": foo(a="b")}


@task
def t4(a: List[str]):
    print("a", a)


@task
def t5(a: Dict[str, str]):
    print("a", a)


@workflow
def basic_workflow():
    l, d, f = t1()
    t2(a=l[0])
    t2(a=d["a"])
    t2(a=f.a)


@workflow(
    # The workflow doesn't fail when one of the nodes fails but other nodes are still executable
    failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE
)
def failed_workflow():
    # This workflow is supposed to fail due to exceptions
    l, d, f = t1()
    t2(a=l[100])
    t2(a=d["b"])
    t2(a=f.b)


@workflow
def advanced_workflow():
    dl, ld, dd = t3()
    t2(a=dl["a"][0])
    t2(a=ld[0]["a"])
    t2(a=dd["a"].a)

    t4(a=dl["a"])
    t5(a=ld[0])

Local execution

$ pyflyte run test_workflow.py basic_workflow   

a a
a b
a b

$ pyflyte run test_workflow.py failed_workflow

Failed with Exception Code: USER:PromiseAttributeResolveError
Underlying Exception: Failed to resolve attribute path [100] in promise Resolved(o0=<FlyteLiteral collection { literals { scalar { primitive { string_value: "a" } } } literals { scalar { primitive { string_value: "b" } } } }>), index 100 out of range 2
Encountered error while executing workflow 'test_workflow.failed_workflow':
  Error encountered while executing 'failed_workflow':
  Failed to resolve attribute path [100] in promise Resolved(o0=<FlyteLiteral collection { literals { scalar { primitive { string_value: "a" } } } literals { scalar { primitive { string_value: "b" } } } }>), index 100 out of range 2

$ pyflyte run test_workflow.py advanced_workflow
a b
a b
a b
a ['b']
a {'a': 'b'}

Remote Execution

basic

failed

advanced

@ByronHsu ByronHsu changed the title Byhsu/promise attr path [flytekit] Support attribute access on promises Sep 11, 2023
@codecov
Copy link

codecov bot commented Sep 11, 2023

Codecov Report

Attention: 66 lines in your changes are missing coverage. Please review.

Comparison is base (4d5e1b8) 54.71% compared to head (b884d32) 54.62%.
Report is 5 commits behind head on master.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #1825      +/-   ##
==========================================
- Coverage   54.71%   54.62%   -0.10%     
==========================================
  Files         305      306       +1     
  Lines       22732    22890     +158     
  Branches     3453     3478      +25     
==========================================
+ Hits        12438    12503      +65     
- Misses      10122    10215      +93     
  Partials      172      172              
Files Coverage Δ
flytekit/exceptions/user.py 80.85% <100.00%> (+0.85%) ⬆️
flytekit/models/types.py 75.22% <50.00%> (-0.82%) ⬇️
flytekit/core/type_engine.py 27.21% <0.00%> (-0.19%) ⬇️
flytekit/core/promise.py 27.02% <21.53%> (-0.92%) ⬇️

... and 7 files with indirect coverage changes

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

flytekit/models/types.py Outdated Show resolved Hide resolved
flytekit/core/promise.py Outdated Show resolved Hide resolved
Copy link
Collaborator

@eapolinario eapolinario left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good. Will take another pass tomorrow morning.

Thanks for adding those great unit tests!

flytekit/core/promise.py Outdated Show resolved Hide resolved
Signed-off-by: byhsu <byhsu@linkedin.com>
Signed-off-by: byhsu <byhsu@linkedin.com>
Signed-off-by: byhsu <byhsu@linkedin.com>
Signed-off-by: byhsu <byhsu@linkedin.com>
Signed-off-by: byhsu <byhsu@linkedin.com>
Signed-off-by: byhsu <byhsu@linkedin.com>
Signed-off-by: byhsu <byhsu@linkedin.com>
Signed-off-by: byhsu <byhsu@linkedin.com>
Signed-off-by: byhsu <byhsu@linkedin.com>
Signed-off-by: byhsu <byhsu@linkedin.com>
Signed-off-by: byhsu <byhsu@linkedin.com>
Signed-off-by: byhsu <byhsu@linkedin.com>
Signed-off-by: byhsu <byhsu@linkedin.com>
Signed-off-by: byhsu <byhsu@linkedin.com>
Signed-off-by: byhsu <byhsu@linkedin.com>
Signed-off-by: byhsu <byhsu@linkedin.com>
Signed-off-by: byhsu <byhsu@linkedin.com>
Signed-off-by: byhsu <byhsu@linkedin.com>
Signed-off-by: byhsu <byhsu@linkedin.com>
Signed-off-by: byhsu <byhsu@linkedin.com>
Signed-off-by: byhsu <byhsu@linkedin.com>
Signed-off-by: byhsu <byhsu@linkedin.com>
@wild-endeavor wild-endeavor merged commit 2497af8 into flyteorg:master Nov 1, 2023
69 of 71 checks passed
ringohoffman pushed a commit to ringohoffman/flytekit that referenced this pull request Nov 24, 2023
Support attribute access on promises

In the following workflow:

- `basic_workflow` contains trivial examples to access output attributes
- `failed_workflow` contains examples that causes exception (e.g. out of range)
- `advanced_workflow` contains examples with more complex attribute access

```python
from dataclasses import dataclass
from typing import Dict, List, NamedTuple

from dataclasses_json import dataclass_json

from flytekit import WorkflowFailurePolicy, task, workflow


@dataclass_json
@DataClass
class foo:
    a: str


@task
def t1() -> (List[str], Dict[str, str], foo):
    return ["a", "b"], {"a": "b"}, foo(a="b")


@task
def t2(a: str) -> str:
    print("a", a)
    return a


@task
def t3() -> (Dict[str, List[str]], List[Dict[str, str]], Dict[str, foo]):
    return {"a": ["b"]}, [{"a": "b"}], {"a": foo(a="b")}


@task
def t4(a: List[str]):
    print("a", a)


@task
def t5(a: Dict[str, str]):
    print("a", a)


@workflow
def basic_workflow():
    l, d, f = t1()
    t2(a=l[0])
    t2(a=d["a"])
    t2(a=f.a)


@workflow(
    # The workflow doesn't fail when one of the nodes fails but other nodes are still executable
    failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE
)
def failed_workflow():
    # This workflow is supposed to fail due to exceptions
    l, d, f = t1()
    t2(a=l[100])
    t2(a=d["b"])
    t2(a=f.b)


@workflow
def advanced_workflow():
    dl, ld, dd = t3()
    t2(a=dl["a"][0])
    t2(a=ld[0]["a"])
    t2(a=dd["a"].a)

    t4(a=dl["a"])
    t5(a=ld[0])
```

Signed-off-by: byhsu <byhsu@linkedin.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants