[BEAM-5650]: Modify BoundedToUnboundedSourceAdapter to configure its reader to read more than 1 bounded source#6952
Conversation
| private BoundedSource<T> boundedSource; | ||
| //TODO: There must be a better way to figure out the maxOpenConnections available. Use that number here | ||
| //Mention this as part of PR to get feedback. | ||
| private static final int READER_QUEUE_SIZE = 10; |
There was a problem hiding this comment.
Shouldn't this be part of the splitting behavior of BoundedSource?
b38b23b to
b0fd857
Compare
…d more than 1 bounded source
b0fd857 to
83e321a
Compare
|
@iemejia - Whenever you get a chance, please review. Thanks. |
|
Ok some general comments:
In general I think the solution makes sense, however I have a bit of doubt if the point where you are fixing it is the correct one, I would like to preserve the current 1-1 relation of unbounded-bounded if possible. (Note that if we can't we will need to update the current documentation because with these changes the semantics has clearly changed). Thinking a bit this is similar to the Connection/Thread Pool classic problem and I don't know if we may create something similar where the limited 'tasks' are such readers, or if we can just delegate this into native Flink and it will take care. Since I am not a Flink expert I asked @mxm to help us take a look, maybe he knows a better way to proceed. |
mxm
left a comment
There was a problem hiding this comment.
Thank you for working on this problem @jhalaria. We definitely need to address this issue. Some observations from reading the code:
- After the BoundedSource is split, it will be processed in bundles of
READER_QUEUE_SIZE. This breaks the current guarantee that all readers will be processed at the same time. Some applications might actually assume this behavior. - There is an unnecessary change with how the class is instantiated, i.e. with an ArrayDequeue
- The constructor assumes an implicit size 1, otherwise the the splitting / size estimation won't work correctly.
Rather than encapsulating this throttling behavior in UnboundedReadFromBoundedSource, wouldn't it make sense to delegate this a BoundedSource? We could create a ThrottledBoundedSource wrapper which would handle the throttling you implemented. This would be completely transparent to UnboundedReadFromBoundedSource.
The question remains how we decide during translation when to implement the throttling. I'd say, this could be a PipelineOption which you can set, as we have with HadoopFileSystemOptions.
| ArrayDeque<BoundedSource<T>> unboundedSources = new ArrayDeque<>(); | ||
| unboundedSources.add(transform.getSource()); | ||
| UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter unboundedSource = | ||
| new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(unboundedSources); |
There was a problem hiding this comment.
Would prefer if there was no change here. Seems arbitrary to add a one-item Dequeue here.
| public PCollection<T> expand(PBegin input) { | ||
| return input.getPipeline().apply(Read.from(new BoundedToUnboundedSourceAdapter<>(source))); | ||
| final ArrayDeque<BoundedSource<T>> dequeue = new ArrayDeque<>(Arrays.asList(source)); | ||
| return input.getPipeline().apply(Read.from(new BoundedToUnboundedSourceAdapter<>(dequeue))); |
There was a problem hiding this comment.
Please revert, wouldn't make any change here. The normal use case of wrapping a single source should still be supported.
|
|
||
| public BoundedToUnboundedSourceAdapter(BoundedSource<T> boundedSource) { | ||
| this.boundedSource = boundedSource; | ||
| } |
There was a problem hiding this comment.
I think we should keep this constructor. See above.
| private BoundedSource<T> boundedSource; | ||
| //TODO: There must be a better way to figure out the maxOpenConnections available. Use that number here | ||
| //Mention this as part of PR to get feedback. | ||
| private static final int READER_QUEUE_SIZE = 10; |
There was a problem hiding this comment.
Shouldn't this be part of the splitting behavior of BoundedSource?
| List<? extends BoundedSource<T>> splits = boundedSource.split(desiredBundleSize, options); | ||
| return splits | ||
| final List<? extends List<? extends BoundedSource<T>>> partition = | ||
| Lists.partition(splits, READER_QUEUE_SIZE); |
There was a problem hiding this comment.
This re-partitioning seems arbitrary / optimized for your case only.
| */ | ||
| final BoundedSource<T> boundedSource = boundedSources.peek(); | ||
| long estimatedSize = boundedSource.getEstimatedSizeBytes(options); | ||
| long desiredBundleSize = estimatedSize / desiredNumSplits; |
There was a problem hiding this comment.
This will only estimate the bundle size based on the first boundedsource. Doesn't seem right.
| public BoundedToUnboundedSourceAdapter(ArrayDeque<BoundedSource<T>> boundedSources) { | ||
| this.boundedSources = boundedSources; | ||
| final BoundedSource<T> source = boundedSources.peek(); | ||
| this.checkpointCoder = new CheckpointCoder<>(source.getDefaultOutputCoder()); |
There was a problem hiding this comment.
This will only work if the coder are the same for all sources.
| return helper(onlyFinishReadingCurrentSource); | ||
| } | ||
|
|
||
| private boolean helper(boolean onlyFinishReadingCurrentSource) throws IOException { |
There was a problem hiding this comment.
Change this to a more descriptive name?
|
Any updates on this PR? |
|
Oups seems this needs a rebase too now @jhalaria |
|
@robertwb - I was out on vacation. Haven't had a chance to resume work on it yet. Will let you know once I have something. |
|
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
@iemejia - Please review. Thank-you.
DEFAULT_MAX_CONNECTIONS) and there isn't a way to override it [We should have the ability to override this anyways. Will create a separate PR for that.].BoundedToUnboundedSourceAdapteranArrayDequeso that one reader can read more than 1BoundedSource. Each Reader finishes reading from aBoundedSourceand then goes to the next one.BoundedSource, we create a checkpoint with the remaining elements of theBoundedSourceand the elements remaining in theArrayDequeue.Post-Commit Tests Status (on master branch)