Support continuous pipeline in run_pipeline_in_subprocess#1369
Merged
Conversation
When the pipeline config has `continuous=True` on its source, the internal `_Wrapper` used by `run_pipeline_in_subprocess` and `run_pipeline_in_subinterpreter` now builds the pipeline once and reuses it across `iter()` calls via `get_iterator()`. This avoids rebuilding the thread pool and event loop each epoch, keeping threads alive across epoch boundaries inside the subprocess. Previously, every `iter()` call on the wrapper rebuilt the pipeline from scratch, creating a new `ThreadPoolExecutor` and asyncio event loop. For continuous sources with epoch boundary markers (`_EPOCH_END`), this was wasteful — the pipeline should stay alive and simply yield items until the next epoch boundary. The `_has_continuous_source()` helper inspects the `PipelineConfig` tree (including `MergeConfig` sub-pipelines) to detect whether the source is continuous, and the check is performed once in `__init__`.
Contributor
|
@facebook-github-bot has imported this pull request. If you are a Meta employee, you can view this in D101687499. (Because this pull request was imported automatically, there will not be any future comments.) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
When the pipeline config has
continuous=Trueon its source, the internal_Wrapperused byrun_pipeline_in_subprocessandrun_pipeline_in_subinterpreternow builds the pipeline once and reuses it acrossiter()calls viaget_iterator(). This avoids rebuilding the thread pool and event loop each epoch, keeping threads alive across epoch boundaries inside the subprocess.Previously, every
iter()call on the wrapper rebuilt the pipeline from scratch, creating a newThreadPoolExecutorand asyncio event loop.For continuous sources with epoch boundary markers (
_EPOCH_END), this was wasteful — the pipeline should stay alive and simply yield items until the next epoch boundary.The
_has_continuous_source()helper inspects thePipelineConfigtree (includingMergeConfigsub-pipelines) to detect whether the source is continuous, and the check is performed once in__init__.