Add UnboundedReadFromBoundedSource, and use it in Dataflow runner#339
Add UnboundedReadFromBoundedSource, and use it in Dataflow runner#339peihe wants to merge 11 commits intoapache:masterfrom
Conversation
| } | ||
|
|
||
| /** | ||
| * Returns a new {@link UnboundedReadFromBoundedSourceTest}. |
| } | ||
|
|
||
| @Override | ||
| public boolean requiresDeduping() { |
There was a problem hiding this comment.
This is incomplete - You must provide an implementation of getCurrentRecordId() for requiresDeduping() to function (otherwise it will throw an exception in calls to getCurrentRecordId()). Probably right to use (element number, shardId) in this invocation. AFAIK, deduping is best-effort (not over the life of a pipeline), so this may produce duplicate elements in some cases.
This also does not solve the problem of progress - the reader may be discarded before it returns false to start() or advance() (as is done in the InProcessPipelineRunner), which may cause to the reader producing the same subset of elements and never completing the input. I believe reading the entire contents into an Iterable within start() and outputting (and then flattening) that Iterable will provide completion and progress guarantees, excepting the case in which not all elements fit in memory. Potentially the produced Iterable can be implemented as a channel back to the underlying BoundedReader (and thus lazily produce elements as TimestampedValues, which can be written to some channel and cleared out of memory if supported by the runner).
There was a problem hiding this comment.
removed requiresDeduping and getCurrentRecordId, since checkpoint is added.
c35fdf3 to
14cc999
Compare
|
PTAL (Jenkins is seems broken for unrelated reasons.) |
cb15794 to
d9ec83a
Compare
| class Reader extends UnboundedReader<T> { | ||
| private final PipelineOptions options; | ||
|
|
||
| private @Nullable final List<TimestampedValue<T>> residualElements; |
There was a problem hiding this comment.
Avoid null - this is just empty if there are no residual elements.
| public void finalizeCheckpoint() {} | ||
| } | ||
|
|
||
| private static class CheckpointCoder<T> extends AtomicCoder<Checkpoint<T>> { |
There was a problem hiding this comment.
This should be a StandardCoder; it is parameterized by elemCoder, so it is not atomic
|
PTAL |
| import javax.annotation.Nullable; | ||
|
|
||
| /** | ||
| * {@link PTransform} that performs a unbounded read from an {@link BoundedSource}. |
There was a problem hiding this comment.
Nevermind on this one, not sure what I was thinking.
1017ce9 to
8588f6d
Compare
|
@dhalperi feedback? |
| public class UnboundedReadFromBoundedSource<T> extends PTransform<PInput, PCollection<T>> { | ||
| private final BoundedSource<T> source; | ||
|
|
||
| public UnboundedReadFromBoundedSource(BoundedSource<T> source) { |
|
Ready to review |
| } | ||
| } | ||
|
|
||
| <<<<<<< HEAD |
There was a problem hiding this comment.
this was from an outdated diff
|
LGTM |
|
Minor fixes, ping me when it's green to merge this and the other PR. |
|
Addressed comments, and rebased for conflicts. |
| import static com.google.common.base.Preconditions.checkArgument; | ||
| import static com.google.common.base.Preconditions.checkState; | ||
|
|
||
| import org.apache.beam.runners.core.UnboundedReadFromBoundedSource; |
There was a problem hiding this comment.
@kennknowles @davorbonaci adding this here would make the Dataflow runner depend on runners-core. This violates our prior assumption that only the service half of the runner should depend on runner core.
Opinions?
|
Doesn't build, for checkstyle but also more fundamentally for the issue identified above. Let's discuss tomorrow. |
| <dependency> | ||
| <groupId>org.apache.beam</groupId> | ||
| <artifactId>beam-runners-core-java</artifactId> | ||
| <scope>runtime</scope> |
There was a problem hiding this comment.
Discussed offline. I believe we have good options for short term and long term to avoid bringing back this dependency:
Longer term: Put the functionality in runners-core but only invoke it in the Dataflow service. If this turns out to be easy then we should do it right away.
Short-term: Put the functionality elsewhere, in one of:
- Dataflow runner module. This is my preference. Right now it is really a matter of how the Dataflow runner works that it is necessary to have this adapter. If another runner decides that it wants to go this same route, then we might be in the longer term scenario anyhow.
- SDK. Mostly harmless & we should move it out prior to a stable release.
- Some
io-coremodule that is not quite the grab bag thatrunners-coreis slated to be.
This highlights the issue that there are really two needs for a utility library:
- Help implement the Beam model on the backend. This will generally impact a service which can be updated transparently and in an agile manner. It presupposes the service is aware of Beam constructs.
- Help to put together a translation from a Beam pipeline to an underlying backend. This will generally occur in SDK-adjacent client-side code, which cannot be updated easily.
We can conflate the two without much of a downside, as long as we shade on both sides, but it helps me to think of them separately. The main risk being that we come to have too thick a client that is hard to update. Or we could split them pretty easily. I propose we wait and see.
|
LGTM. Will merge when green. |
[BEAM-3121] Remove broken docker script and documentation
This PR will make Dataflow streaming runner work with BoundedSources, such as TextIO and AvroIO.