Add BoundedReader APIs for expressing remaining and consumed parallelism#353
Add BoundedReader APIs for expressing remaining and consumed parallelism#353dhalperi wants to merge 15 commits intoapache:masterfrom
Conversation
And implement it for common sources
*) Make the start of a block match Avro's definition: the first byte after the previous sync marker. This enables detecting the last block in the file. *) This change enables us to unify currentOffset and currentBlockOffset, as all records are emitted at the start of the block that contains them. *) Simplify block header reading to have fewer object allocations and buffers using a direct reader and a (allocated once only) CountingInputStream to measure the size of that header. *) Add tests for consumed and remaining parallelism *) Let BlockBasedSource detect the end of the file in remaining parallelism. *) Verify in more places that the correct number of bytes is read from the input Avro file.
*) empty file *) non-empty compressed file *) non-empty not-compressed file
*) empty file *) non-empty file
This is not a very good offset because it is an upper bound, but it is likely better than not reporting any progress at all.
|
R: @bjchambers |
There was a problem hiding this comment.
This is helpful, but some of the parantheticals seem confusing.
"current (previous)" I think should be "current (about to be previous)" or just say "current" since you've already stated that the current block in the precondition is about to be previous.
Similarly, in Postcondition: "current (formerly next)" could be clarified to the "new current (formerly next)"
There was a problem hiding this comment.
If we know we're at a sync marker, it's unclear why we need advancePastNextSyncMarker -- it looks like that attempts to read up to the next sync marker. Further, that seems to be why we need push back. If we now know the next byte is the start of the sync marker, couldn't we just read it and continue? Wouldn't that also mean that we don't need the push back here?
There was a problem hiding this comment.
Alternatively, for methods that are "here is the splittable implementation and here is the non-splittable implementation" use an "if/else" block to make that clear.
|
Is there any way to update |
| @@ -46,8 +46,8 @@ | |||
| * <ul> | |||
| * <li>Progress estimation ({@link BoundedReader#getFractionConsumed}) | |||
| * <li>Tracking of parallelism, to determine with the current source can be split | |||
There was a problem hiding this comment.
to determine with the current source can be split reads oddly
|
LGTM |
These are useful for dynamic work rebalancing and autoscaling.