-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-3689] Fix unbounded reader leak in direct-runner. #4658
Conversation
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, the number 10 is explicitly arbitrary (living under ARBITRARY_MAX_ELEMENTS and whatnot) so there's no reason it can't be changed, though you'll probably want to make it configurable with a larger default so you don't have to add a billion elements to all of the tests.
} | ||
UnboundedSourceShard<OutputT, CheckpointMarkT> residual = UnboundedSourceShard.of( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the missing element (in the continuous input case) is that getReader is potentially creating a new reader which we don't populate in the shard, right?
The test we have demonstrates that with an input UnboundedSourceShard
with a reader we'll reuse it, but I don't know if we have a test that demonstrates that with no input reader we'll produce a shard with one, which I think is the best way to describe the bug. Can you add such a test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition to the test I updated, right? Will do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test we have demonstrates that with an input UnboundedSourceShard with a reader we'll reuse it, but I don't know if we have a test that demonstrates that with no input reader we'll produce a shard with one, which I think is the best way to describe the bug.
Can you rephrase it? The test does start with a shard without a reader and ensures that it is reused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean it ensures that the reader is created only once. And similar to other tests in the file, it does not actually assert the output is correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically, we have two cases of input:
UnboundedSourceShard{source, deduplicator, reader.absent(), checkpoint}
UnboundedSourceShard{source, deduplicator, reader.present(), checkpoint}
and we have a test to demonstrate that if given UnboundedSourceShard{source, deduplicator, reader.present(), checkpoint}
, we produce UnboundedSourceShard{source, deduplicator, reader.present(), newCheckpoint}
but no test to demonstrate that if given UnboundedSourceShard{source, deduplicator, reader.absent(), checkpoint}
we produce UnboundedSourceShard{source, deduplicator, reader.present(), newCheckpoint}
, which is the base case that we don't handle properly right now, which means we almost never get to the reuse case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line 291 [[1]] creates a UnboundedSourceShard{source, deduplicator, reader.absent(), checkpoint}
, and it is added to inputBundle right below that.
and we have a test to demonstrate that if given UnboundedSourceShard{source, deduplicator, reader.present(), checkpoint}, we produce UnboundedSourceShard{source, deduplicator, reader.present(), newCheckpoint}
Are you referring to evaluatorReusesReader()
updated here or is this another test?
Sorry for not multiple follow up questions, I think I am misinterpreting what you are saying.
[[1]]: https://github.com/apache/beam/pull/4658/files#diff-41695f8bd3f499370918186c7f369efdR291
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahah, yep, that's pretty busted;
Really I think the updates to evaluatorReusesReader
is mostly checking that the reader is populated in the second shard.
We should have a test around reusing a reader that is then discarded because there was no input and it was at the end of time (the new branch you've added)
You can probably remove withCheckpoint
from the UnboundedSourceShard
class as well; I think this is probably the only caller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed withCheckpoint
.
Added an option to advance watermark to infinity when the reader reaches the end
Updated the test to:
- verify that number of elements produced matches input.
- reader is closed when end of stream is reached.
A smaller default is fine too. Shall I send a PR to make this and reuse-chance configurable? |
Also close the reader at end of reader. Updated the test for reader reuse.
5250f6a
to
faf5383
Compare
Squashed commits. |
Direct runner reads 10 records at a time from a reader. I think the intention is to reuse the reader, but it reuses only if the reader is idle initially, not when the source has messages available.
When I was testing KafkaIO with direct runner it kept opening new reader for every 10 records and soon ran out of file descriptors.
Btw, direct runner closes the reader every 20 input bundles on average. Since each bundle is at most 10 elements long.. we recreate a reader for every 200 records (not related to this fix). I know direct runner is not meant for production applications, but this seems really low even for toy applications.