Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] map_task downloads all pickled outputs from previous map_task #5180

Open
2 tasks done
tomtou-bspace opened this issue Apr 4, 2024 · 3 comments
Open
2 tasks done
Labels
bug Something isn't working

Comments

@tomtou-bspace
Copy link

Describe the bug

Consequent map_tasks download all outputs of entire previous map task, instead of specific outputs of a previous sub-task.
This leads to a the subsequent map task running for an hour instead of 5 minutes.

Expected behavior

if map_task B's inputs are map_task A's outputs, then:
the i'th sub_task of map task B, should download the outputs of the i'th sub_task of map task A

Additional context to reproduce

When running map_task A, where each sub_task returns a type Dict[str,Dict[str,np.ndarray]]
then map_task B receives the output of A, where each sub task of B downloads a list of Dict[str,Dict[str,np.ndarray]] instead of a single object.

Screenshots

map_task of task "save_as_parquet_to_s3" takes more than an hour, instead of 5 minutes
image

Are you sure this issue hasn't been raised already?

  • Yes

Have you read the Code of Conduct?

  • Yes
@tomtou-bspace tomtou-bspace added bug Something isn't working untriaged This issues has not yet been looked at by the Maintainers labels Apr 4, 2024
Copy link

welcome bot commented Apr 4, 2024

Thank you for opening your first issue here! 🛠

Copy link

dosubot bot commented Apr 4, 2024

Hey there, @tomtou-bspace! Great to see you back diving into the intricacies of workflow orchestration. Hope you're ready for another round of troubleshooting and insights. Let's get to it!

Based on the information you've provided, it seems like the issue you're encountering is due to the way outputs are being passed between map_task A and map_task B. Here are some suggestions on how you might resolve this issue:

  1. Ensure Task Isolation: Make sure that each sub-task in map_task B is designed to only access its corresponding output from map_task A. This might involve adjusting how outputs are passed between tasks to ensure they are not bundled together in a way that requires downloading all outputs when only one is needed.

  2. Use ArrayNode for Enhanced Mapping Support: The Flyte documentation mentions ArrayNode as an enhancement over regular map tasks, offering wider mapping support and improved subtask execution handling, including cache management and subtask failure handling. By using ArrayNode from the experimental module, you might gain better control over how outputs are managed and accessed, potentially resolving the issue of unnecessary data downloads.

    Here is a sample code snippet:

    from flytekit import task, workflow
    from flytekit.experimental import map_task
    
    @task
    def your_task(a: int) -> int:
        ...
    
    @workflow
    def your_workflow(xs: list[int]) -> list[int]:
        return map_task(your_task)(a=xs)
  3. Review Task Output and Input Handling: Re-examine how outputs from map_task A are structured and how map_task B accesses these outputs. It might be necessary to refactor the tasks to ensure that each sub-task in map_task B explicitly accesses only its corresponding output from map_task A, possibly by indexing or other means of direct reference.

  4. Optimize Data Storage and Access: If the workflow involves storing outputs of map_task A in a shared location (e.g., cloud storage), consider structuring the storage so that each output is stored in a separate, directly accessible file or object. This way, sub-tasks in map_task B can directly access only the data they need without downloading everything.

  5. Leverage Task Metadata and Resource Overrides: Although not directly related to the issue of data downloading, optimizing the execution of map tasks through the use of TaskMetadata for caching and with_overrides for resource allocation (as shown in the provided examples) can contribute to overall workflow efficiency. This might indirectly alleviate the impact of increased runtime due to unnecessary data downloads.

For more information, you can refer to the Flyte documentation on map tasks.

Regarding the structure and type of the output from map_task A and how it is passed to map_task B, the output from a task execution is encapsulated in the TaskExecutionClosure message. The TaskExecutionClosure contains a field named OutputResult, which can be one of three types: OutputUri (string), OutputData (*core.LiteralMap), or Error (*core.ExecutionError). The specific type that is used depends on the outcome of the task execution. If the task execution was successful and produced output data, the OutputData field would be used. If the task execution resulted in an error, the Error field would be used. The OutputUri field is deprecated and not used in new implementations. You can find more details in the FlyteIDL protobuf definitions.

I hope these suggestions help you resolve the issue. If you need further assistance, please provide more details about your implementation, and I'll be glad to help.

Sources

About Dosu

This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

@eapolinario
Copy link
Contributor

@tomtou-bspace , this is a known limitation of the current implementation of map tasks. Unfortunately, we're not going to revisit this decision in the short / medium term as the team is focused on other projects right now.

@pingsutw pingsutw removed the untriaged This issues has not yet been looked at by the Maintainers label Apr 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants