Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Infinite loop and memory growth in dynamic task. #5164

Closed
2 tasks done
dansola opened this issue Apr 2, 2024 · 2 comments
Closed
2 tasks done

[BUG] Infinite loop and memory growth in dynamic task. #5164

dansola opened this issue Apr 2, 2024 · 2 comments
Assignees
Labels
backlogged For internal use. Reserved for contributor team workflow. bug Something isn't working flytekit FlyteKit Python related issue

Comments

@dansola
Copy link
Contributor

dansola commented Apr 2, 2024

Describe the bug

Two interesting outcomes when iterating over task output and calling sub-workflows within a dynamic task.

  1. Iterating over task output in a dynamic task:
    Within a dynamic task, if we iterate over a list of FlyteFiles outputted by another task, the dynamic task never appears to enter the first task, yet it seemingly infinitely iterates over the FlyteFile list.
    e.g.
@task(requests=Resources(mem="5Gi"))
def ff_list_task() -> List[FlyteFile]:
    print('entering ff_list_task')
    return [FlyteFile(path="file_1"), FlyteFile(path="file_2")]

@dynamic(requests=Resources(mem="5Gi"))
def dynamic_task() -> List[FlyteFile]:
    batched_input_files = ff_list_task()
    for file in batched_input_files:
        print('entering for loop')
        pass
    return batched_input_files

@workflow
def wf():
    dynamic_task()

Looking at the print statements when we run this example, we see that ff_list_task never gets executed, yet the for loop is repeatedly executed until we manually terminate the workflow.

  1. Calling a sub-workflow in a dynamic task:
    In the above example, if we call a sub-workflow when iterating over the FlyteFile list, we see runaway memory growth.
    e.g.
@task(requests=Resources(mem="5Gi"))
def ff_list_task() -> List[FlyteFile]:
    return [FlyteFile(path="file_1"), FlyteFile(path="file_2")]

@workflow
def sub_wf(input_file: FlyteFile) -> FlyteFile:
    return input_file

@dynamic(requests=Resources(mem="5Gi"))
def dynamic_task() -> List[FlyteFile]:
    batched_input_files = ff_list_task()
    result_files = []
    for file in batched_input_files:
        batch_result_file = sub_wf(input_file=file)
        result_files.append(batch_result_file)
    return result_files

@workflow
def wf():
    files = dynamic_task()

Even though sub_wf shouldn't allocate much memory and should only be called twice, simply adding the sub_wf call causes the memory to increases until an OOM error. In example #1 the memory usage stayed flat.

Expected behavior

In example #1, we would expect that ff_list_task would execute before the for loop. We would also expect that for file in batched_input_files may error as it is iterating over a promise. Then, when the for loop does iterate, we would only expect it to iterate two times in the example (as the list only has two files) rather than infinitely.

In example #2, we would expect all the same behavior as outlined above, but we would also expect memory to not increase until OOM.

Additional context to reproduce

These examples were run on union cloud and resource usage was monitored there. Flytekit version 1.11.0 was used.

Screenshots

Usage Screenshots:
image

Are you sure this issue hasn't been raised already?

  • Yes

Have you read the Code of Conduct?

  • Yes
@dansola dansola added bug Something isn't working untriaged This issues has not yet been looked at by the Maintainers labels Apr 2, 2024
Copy link

welcome bot commented Apr 2, 2024

Thank you for opening your first issue here! 🛠

@eapolinario eapolinario added backlogged For internal use. Reserved for contributor team workflow. and removed untriaged This issues has not yet been looked at by the Maintainers labels Apr 4, 2024
@wild-endeavor wild-endeavor added the flytekit FlyteKit Python related issue label Apr 4, 2024
@wild-endeavor wild-endeavor self-assigned this Apr 10, 2024
@wild-endeavor
Copy link
Contributor

Unfortunately the issue here is that the code is incorrect, but flytekit fails to raise the error.
Specifically, with the work done in #3864, flytekit now supports indexing into promises. This was done for dataclasses initially, but support was also added for indexing into lists and maps. (out of index errors would just show up at run time). Unfortunately we didn't catch the case of a user calling for x in promise. Python calls the __iter__ function in this case, which then infinitely loops, until the process runs out of memory. The fix is just to disallow calling this on a promise.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backlogged For internal use. Reserved for contributor team workflow. bug Something isn't working flytekit FlyteKit Python related issue
Projects
None yet
Development

No branches or pull requests

3 participants