Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task mapping on generators? #2023

Closed
mohamed--abdel-maksoud opened this issue Feb 13, 2020 · 8 comments
Closed

task mapping on generators? #2023

mohamed--abdel-maksoud opened this issue Feb 13, 2020 · 8 comments

Comments

@mohamed--abdel-maksoud
Copy link

Is there a reason why we can't pass a generator expression to task.map? loading the whole list in memory is potentially inefficient.
If what is needed is just development time, I'd like to work on it.

@joshmeek
Copy link

Hey @mohamed--abdel-maksoud the core development team has a plan in the works for a larger mapping refactor to better support some use cases we have seen arise as well as enable the addition of new features. This may end up solving your use case but I can't say for certain. That being said the reason why mapping over generators is not currently supported is due to the nature of Prefect requiring tasks to finish and return pickleable results for their downstream tasks.

@mohamed--abdel-maksoud
Copy link
Author

Thanks @joshmeek for the quick answer, I'll wait for the refactoring then.
I understand a task cannot return a generator since it's not pickle'able, in my case though the generator is returned by a vanilla function.

@earthastronaut
Copy link

@joshmeek I also ran into this. I'm interested to see how you solve it. My use case: I have a task which preforms an unbounded task (looping over request pages) and generates results which a downstream task will process without blocking.

Looking at the code; It's fairly straight forward to modify prefect.engine.task_runner.TaskRunner.run_mapped_task to handle next(upstream_state.result) generator with StopIteration in addition to the upstream_state.result[i]. However, it doesn't start the downstream task until the upstream finishes as you pointed out. And the if the upstream generator is truely unbounded it may never end which could be a problem. It could queue up new downstream tasks by executor.submit but that likely breaks some of the benefits of requiring upstream processes to finish (like check points) which honestly I don't fully understand yet.

Anyway, I hope that adds something to the conversation. Thanks for working on this hard problem and making my development easier!

@michcio1234
Copy link

@joshmeek is there any timeline for this?

@cicdw
Copy link
Member

cicdw commented Mar 13, 2021

Chiming in for josh here -- there is currently no timeline for this although it is something we are increasingly interested in undertaking! The implementation will most likely be more general than mapping over generators, and allow for a "parent" task to asynchronously spawn new tasks that can immediately begin running (under the right executor). I will update here if and when this enters the planning phases.

zanieb pushed a commit that referenced this issue Jun 22, 2022
Feature: using same feature flags check as nebula
@sveinugu
Copy link

@joshmeek Seems this issue was closed by mistake (commit references pull request 2023, not issue 2023). Any general support for generators still on the table?

@zanieb
Copy link
Contributor

zanieb commented Nov 28, 2022

@sveinugu that commit is indeed unrelated but this was closed long ago by the opener of the issue. I believe mapping over generators is supported in Prefect v2.

@sveinugu
Copy link

@sveinugu that commit is indeed unrelated but this was closed long ago by the opener of the issue. I believe mapping over generators is supported in Prefect v2.

Thanks for the feedback. I see I misread the issue a bit. #5708 is what I was looking for.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants