Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-3499, BEAM-2607] Gives the runner access to positions of SDF claimed blocks #4483

Merged
merged 6 commits into from
Feb 6, 2018

Conversation

jkff
Copy link
Contributor

@jkff jkff commented Jan 25, 2018

This addresses the following issues:

The former is the primary motivation for this PR. This PR changes SDF checkpointing timer countdown to start from the first claimed block, rather than from the beginning of @ProcessElement. This requires giving the runner visibility into claimed blocks. Such visibility enables fixing BEAM-2607 as well. It also is a required part of implementing SDF splitting over Fn API (tracked separately).

This PR also, of course, changes the Watch transform to the new API; and, while we're at it, does some related improvements:

  • Compresses Watch.GrowthState using Snappy. E.g. with 100k files, the encoded state is about 3MB instead of 8MB. Compressing it much more is difficult because the state includes uncompressible hashes. To address this, one must shard the filepattern, or implement the improvements suggested in https://issues.apache.org/jira/browse/BEAM-2680 .
  • Makes direct runner create a clone of state cells - I did this mainly because I noticed that GrowthStateCoder was never called on the Watch state, which risks missing coder bugs when testing with direct runner.

This PR is update-incompatible for users of the Watch transform, e.g. FileIO.match().continuously(). This is an experimental and very recent transform, so I'm going to ignore the incompatibility. It also requires a traditional Dataflow worker dance to get the worker container in sync with these runners-core changes - I'll perform that when the rest of the PR is approved.

R: @tgroh @chamikaramj
CC: @kennknowles @reuvenlax

@jkff
Copy link
Contributor Author

jkff commented Jan 25, 2018

retest this please

Copy link
Member

@tgroh tgroh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be a couple of duplicate comments, because I went through the commits one-by-one first

this.claimObserver = claimObserver;
}

public final boolean tryClaim(PositionT position) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would strongly consider inverting these names in some way (so the author implements tryClaim, and this is executeTryClaim or something that signals that it is using the tryClaim method)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm coming from the assumption that new RestrictionTrackers are written much more rarely than new SDFs using existing trackers, and I'd like the caller to use tryClaim. I guess I could rename tryClaimImpl to executeTryClaim but it seems about equally descriptive. (side note: I considered a number of other alternatives for this design, e.g. passing a claim callback as a context parameter to @ProcessElement; allowing a RestrictionTracker to simply refuse a checkpoint etc. to address just the checkpointing issue, but they all were much worse in various ways)

@@ -888,7 +886,10 @@ public String toString(Growth.TerminationCondition<?, TerminationStateT> termina
this.isOutputComplete = state.isOutputComplete;
this.pollWatermark = state.pollWatermark;
this.terminationState = state.terminationState;
this.pending = Lists.newLinkedList(state.pending);
this.pending = Maps.newLinkedHashMapWithExpectedSize(state.pending.size());
for (Map.Entry<HashCode, TimestampedValue<OutputT>> entry : state.pending.entrySet()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.pending.putAll(state.pending)? or this.pending = new LinkedHashMap<>(state.pending)

},
maxDuration.getMillis(),
TimeUnit.MILLISECONDS);
void checkClaimHasNotFailed() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this just be inlined?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -23,12 +23,39 @@
* Manages concurrent access to the restriction and keeps track of its claimed part for a <a
* href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}.
*/
public interface RestrictionTracker<RestrictionT> {
public abstract class RestrictionTracker<RestrictionT, PositionT> {
interface ClaimObserver<PositionT> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we expect the ClaimObserver to ever interact with the PositionT?

that.contents.putAll(this.contents);
return that;
}
}

private static <T> T unsafeClone(Coder<T> coder, T value) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/unsafe/unchecked/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -378,7 +388,7 @@ public boolean isCleared() {
@Override
public InMemoryCombiningState<InputT, AccumT, OutputT> copy() {
InMemoryCombiningState<InputT, AccumT, OutputT> that =
new InMemoryCombiningState<>(combineFn);
new InMemoryCombiningState<>(combineFn, accumCoder);
if (!this.isCleared) {
that.isCleared = this.isCleared;
that.addAccum(accum);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be cloned?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, thanks for the catch

private @Nullable Instant lastReportedWatermark;

public ProcessContext(WindowedValue<InputT> element, TrackerT tracker) {
fn.super();
this.element = element;
this.tracker = tracker;
}

void checkClaimHasNotFailed() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inline?

public abstract class RestrictionTracker<RestrictionT, PositionT> {
/** Internal interface allowing a runner to observe the calls to {@link #tryClaim}. */
@Internal
public interface ClaimObserver<PositionT> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we expect this to ever do anything notable with the position? I can't think of a case where the invoker would be concerned with the actual position, which is an implementation detail of the DoFn.

If you've got an idea of when it might, I'd love an example; otherwise I'd remove the parameters from this interface

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the observer will eventually need to store the positions and pass them back to new methods of SDF for splitting, as part of implementation of splitting/checkpointing over Fn API.

Copy link
Contributor Author

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@@ -378,7 +388,7 @@ public boolean isCleared() {
@Override
public InMemoryCombiningState<InputT, AccumT, OutputT> copy() {
InMemoryCombiningState<InputT, AccumT, OutputT> that =
new InMemoryCombiningState<>(combineFn);
new InMemoryCombiningState<>(combineFn, accumCoder);
if (!this.isCleared) {
that.isCleared = this.isCleared;
that.addAccum(accum);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, thanks for the catch

that.contents.putAll(this.contents);
return that;
}
}

private static <T> T unsafeClone(Coder<T> coder, T value) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

},
maxDuration.getMillis(),
TimeUnit.MILLISECONDS);
void checkClaimHasNotFailed() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

public abstract class RestrictionTracker<RestrictionT, PositionT> {
/** Internal interface allowing a runner to observe the calls to {@link #tryClaim}. */
@Internal
public interface ClaimObserver<PositionT> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the observer will eventually need to store the positions and pass them back to new methods of SDF for splitting, as part of implementation of splitting/checkpointing over Fn API.

this.claimObserver = claimObserver;
}

public final boolean tryClaim(PositionT position) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm coming from the assumption that new RestrictionTrackers are written much more rarely than new SDFs using existing trackers, and I'd like the caller to use tryClaim. I guess I could rename tryClaimImpl to executeTryClaim but it seems about equally descriptive. (side note: I considered a number of other alternatives for this design, e.g. passing a claim callback as a context parameter to @ProcessElement; allowing a RestrictionTracker to simply refuse a checkpoint etc. to address just the checkpointing issue, but they all were much worse in various ways)

@jkff
Copy link
Contributor Author

jkff commented Jan 25, 2018

Apologies, forgot to actually push the changes.

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

}

private void noteOutput() {
checkState(!hasClaimFailed, "Output is not allowed after a failed tryClaim()");
checkState(numClaimedBlocks > 0, "Output is not allowed before tryClaim()");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice :)

@@ -279,9 +320,14 @@ public PaneInfo pane() {

@Override
public synchronized void updateWatermark(Instant watermark) {
// Updating the watermark without any claimed blocks is allowed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why ? Should we at least warn ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarified in a comment.

@@ -898,27 +909,31 @@ public String toString(Growth.TerminationCondition<?, TerminationStateT> termina

@Override
public synchronized GrowthState<OutputT, KeyT, TerminationStateT> checkpoint() {
checkState(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be rejecting the checkpoint request instead of failing here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rejecting the checkpoint is not allowed. Allowing it is one of the alternatives I considered, but since runner needs access to positions anyway, I preferred to just do that.

@@ -753,14 +761,17 @@ public ProcessContinuation process(
// No more pending outputs - future output will come from more polling,
// unless output is complete or termination condition is reached.
if (tracker.shouldPollMore()) {
LOG.info(
"{} - emitted all known results so far; will resume polling in {} ms",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mention numEmitted (total) here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
LOG.debug("{} - emitted {} new results.", c.element(), numEmitted);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log might be a bit confusing. It says new results but do not reset numEmitted after this log.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made this log more comprehensive and added some clarifying variables.

// of any work to be done at the moment, but more might emerge later. In this case,
// we must simply reschedule the original restriction - checkpointing a tracker that
// hasn't claimed any work is not allowed.
residual = KV.of(tracker.currentRestriction(), processContext.getLastReportedWatermark());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just fail ? This might result in an infinite scheduling loop due to a bug, no ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarified in a comment.

that.contents.putAll(this.contents);
return that;
}
}

private static <T> T uncheckedClone(Coder<T> coder, T value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why "unchecked" ? Add a comment ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment.

@@ -1074,6 +1106,40 @@ public HashCode decode(InputStream is) throws IOException {
}
}

private static class SnappyCoder<T> extends StructuredCoder<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we take make this public (and it's own Java file) ? This might be useful for other transforms.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor Author

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

that.contents.putAll(this.contents);
return that;
}
}

private static <T> T uncheckedClone(Coder<T> coder, T value) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment.

// of any work to be done at the moment, but more might emerge later. In this case,
// we must simply reschedule the original restriction - checkpointing a tracker that
// hasn't claimed any work is not allowed.
residual = KV.of(tracker.currentRestriction(), processContext.getLastReportedWatermark());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarified in a comment.

@@ -279,9 +320,14 @@ public PaneInfo pane() {

@Override
public synchronized void updateWatermark(Instant watermark) {
// Updating the watermark without any claimed blocks is allowed.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarified in a comment.

}
LOG.debug("{} - emitted {} new results.", c.element(), numEmitted);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made this log more comprehensive and added some clarifying variables.

@@ -753,14 +761,17 @@ public ProcessContinuation process(
// No more pending outputs - future output will come from more polling,
// unless output is complete or termination condition is reached.
if (tracker.shouldPollMore()) {
LOG.info(
"{} - emitted all known results so far; will resume polling in {} ms",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -898,27 +909,31 @@ public String toString(Growth.TerminationCondition<?, TerminationStateT> termina

@Override
public synchronized GrowthState<OutputT, KeyT, TerminationStateT> checkpoint() {
checkState(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rejecting the checkpoint is not allowed. Allowing it is one of the alternatives I considered, but since runner needs access to positions anyway, I preferred to just do that.

@@ -1074,6 +1106,40 @@ public HashCode decode(InputStream is) throws IOException {
}
}

private static class SnappyCoder<T> extends StructuredCoder<T> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

LGTM other than one comment.

// the original restriction, i.e. pointless.
this.scheduledCheckpoint =
executor.schedule(
this::takeCheckpointNow, maxDuration.getMillis(), TimeUnit.MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be max((maxDuration - "time up to now"), 0) no ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. I think both "10 seconds of claimed work" and "10 seconds of total work" are valid options, but I'm slightly in favor of the former because it's less likely to lead to pathological behavior, e.g. imagine that opening a connection to Kafka consistently takes 10+ seconds due to network issues, then the first behavior will lead to repeatedly reading just 1 record from Kafka (compared to 0 before this PR...), but the second will provide 10 seconds of useful work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

@jkff
Copy link
Contributor Author

jkff commented Jan 26, 2018

Would like @tgroh 's LGTM as well before proceeding with Dataflow worker changes.

@chamikaramj
Copy link
Contributor

LGTM

@jkff
Copy link
Contributor Author

jkff commented Jan 26, 2018

Test failures are unrelated.

Copy link
Member

@tgroh tgroh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My naming thing is because I really dislike the use of Impl as a signifier of the implementation, and generally want to give users the nicer name if possible, but it's purely a style thing.

@jkff
Copy link
Contributor Author

jkff commented Feb 1, 2018

Rebased but still waiting for some Dataflow worker side Google-internal stuff before I can merge.

@jkff
Copy link
Contributor Author

jkff commented Feb 6, 2018

Run Dataflow ValidatesRunner

@jkff
Copy link
Contributor Author

jkff commented Feb 6, 2018

Dataflow runner tests failed somewhere towards the end due to unrelated issues - I confirmed by looking at Jenkins output that SDF tests passed. Merging.

@jkff jkff merged commit 2826362 into apache:master Feb 6, 2018
@jkff jkff deleted the sdf-claim-callback branch February 6, 2018 18:07
@kennknowles
Copy link
Member

Noting here, too, that InMemoryStateInternals is not part of the direct runner, but a general utility. The cloning changes caused perf regressions in multiple other contexts and need to be reverted and re-instantiated only in the direct runner.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants