Skip to content

Commit

Permalink
Merge pull request apache#1 from apache/master
Browse files Browse the repository at this point in the history
sync up latest code
  • Loading branch information
xumingmin committed Mar 11, 2017
2 parents b51ce8d + d167153 commit 700ab00
Show file tree
Hide file tree
Showing 80 changed files with 8,405 additions and 426 deletions.
5 changes: 3 additions & 2 deletions .jenkins/common_job_properties.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ class common_job_properties {

// Sets common top-level job properties for main repository jobs.
static void setTopLevelMainJobProperties(context,
String defaultBranch = 'master') {
setTopLevelJobProperties(context, 'beam', defaultBranch, 100)
String defaultBranch = 'master',
int defaultTimeout = 100) {
setTopLevelJobProperties(context, 'beam', defaultBranch, defaultTimeout)
}

// Sets common top-level job properties. Accessed through one of the above
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ mavenJob('beam_PostCommit_Java_RunnableOnService_Dataflow') {
previousNames('beam_PostCommit_RunnableOnService_GoogleCloudDataflow')

// Set common parameters.
common_job_properties.setTopLevelMainJobProperties(delegate)
common_job_properties.setTopLevelMainJobProperties(delegate, 'master', 120)

// Set maven parameters.
common_job_properties.setMavenConfig(delegate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
import org.apache.beam.runners.core.ExecutionContext.StepContext;
import org.apache.beam.runners.core.StatefulDoFnRunner.CleanupTimer;
import org.apache.beam.runners.core.StatefulDoFnRunner.StateCleaner;
import org.apache.beam.runners.core.StatefulDoFnRunner.StateInternalsStateCleaner;
import org.apache.beam.runners.core.StatefulDoFnRunner.TimeInternalsCleanupTimer;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.DoFn;
Expand Down Expand Up @@ -135,24 +132,18 @@ DoFnRunner<InputT, OutputT> defaultStatefulDoFnRunner(
DoFnRunner<InputT, OutputT> doFnRunner,
StepContext stepContext,
AggregatorFactory aggregatorFactory,
WindowingStrategy<?, ?> windowingStrategy) {
WindowingStrategy<?, ?> windowingStrategy,
CleanupTimer cleanupTimer,
StateCleaner<W> stateCleaner) {
Aggregator<Long, Long> droppedDueToLateness = aggregatorFactory.createAggregatorForDoFn(
fn.getClass(), stepContext, StatefulDoFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER,
Sum.ofLongs());

CleanupTimer cleanupTimer =
new TimeInternalsCleanupTimer(stepContext.timerInternals(), windowingStrategy);

Coder<W> windowCoder = (Coder<W>) windowingStrategy.getWindowFn().windowCoder();
StateCleaner<W> stateCleaner =
new StateInternalsStateCleaner<>(fn, stepContext.stateInternals(), windowCoder);

return new StatefulDoFnRunner<>(
doFnRunner,
windowingStrategy,
cleanupTimer,
stateCleaner,
droppedDueToLateness);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,15 @@
*/
package org.apache.beam.runners.core;

import java.util.Map;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.State;
import org.apache.beam.sdk.util.state.StateSpec;
import org.joda.time.Instant;

/**
Expand All @@ -45,7 +39,6 @@
public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
implements DoFnRunner<InputT, OutputT> {

public static final String GC_TIMER_ID = "__StatefulParDoGcTimerId";
public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "StatefulParDoDropped";

private final DoFnRunner<InputT, OutputT> doFnRunner;
Expand Down Expand Up @@ -83,45 +76,50 @@ public void startBundle() {
}

@Override
public void processElement(WindowedValue<InputT> compressedElem) {
public void processElement(WindowedValue<InputT> input) {

// StatefulDoFnRunner always observes windows, so we need to explode
for (WindowedValue<InputT> value : compressedElem.explodeWindows()) {
for (WindowedValue<InputT> value : input.explodeWindows()) {

BoundedWindow window = value.getWindows().iterator().next();

if (!dropLateData(window)) {
if (isLate(window)) {
// The element is too late for this window.
droppedDueToLateness.addValue(1L);
WindowTracing.debug(
"StatefulDoFnRunner.processElement: Dropping element at {}; window:{} "
+ "since too far behind inputWatermark:{}",
input.getTimestamp(), window, cleanupTimer.currentInputWatermarkTime());
} else {
cleanupTimer.setForWindow(window);
doFnRunner.processElement(value);
}
}
}

private boolean dropLateData(BoundedWindow window) {
private boolean isLate(BoundedWindow window) {
Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
Instant inputWM = cleanupTimer.currentInputWatermarkTime();
if (gcTime.isBefore(inputWM)) {
// The element is too late for this window.
droppedDueToLateness.addValue(1L);
WindowTracing.debug(
"StatefulDoFnRunner.processElement/onTimer: Dropping element for window:{} "
+ "since too far behind inputWatermark:{}", window, inputWM);
return true;
} else {
return false;
}
return gcTime.isBefore(inputWM);
}

@Override
public void onTimer(
String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME);
Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
if (isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp)) {
if (cleanupTimer.isForWindow(timerId, window, timestamp, timeDomain)) {
stateCleaner.clearForWindow(window);
// There should invoke the onWindowExpiration of DoFn
} else {
if (isEventTimer || !dropLateData(window)) {
// An event-time timer can never be late because we don't allow setting timers after GC time.
// Ot can happen that a processing-time time fires for a late window, we need to ignore
// this.
if (!timeDomain.equals(TimeDomain.EVENT_TIME) && isLate(window)) {
// don't increment the dropped counter, only do that for elements
WindowTracing.debug(
"StatefulDoFnRunner.onTimer: Ignoring processing-time timer at {}; window:{} "
+ "since window is too far behind inputWatermark:{}",
timestamp, window, cleanupTimer.currentInputWatermarkTime());
} else {
doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
}
}
Expand Down Expand Up @@ -151,6 +149,16 @@ public interface CleanupTimer {
* Set the garbage collect time of the window to timer.
*/
void setForWindow(BoundedWindow window);

/**
* Checks whether the given timer is a cleanup timer for the window.
*/
boolean isForWindow(
String timerId,
BoundedWindow window,
Instant timestamp,
TimeDomain timeDomain);

}

/**
Expand All @@ -160,74 +168,4 @@ public interface StateCleaner<W extends BoundedWindow> {

void clearForWindow(W window);
}

/**
* A {@link CleanupTimer} implemented by TimerInternals.
*/
public static class TimeInternalsCleanupTimer implements CleanupTimer {

private final TimerInternals timerInternals;
private final WindowingStrategy<?, ?> windowingStrategy;
private final Coder<BoundedWindow> windowCoder;

public TimeInternalsCleanupTimer(
TimerInternals timerInternals,
WindowingStrategy<?, ?> windowingStrategy) {
this.windowingStrategy = windowingStrategy;
WindowFn<?, ?> windowFn = windowingStrategy.getWindowFn();
windowCoder = (Coder<BoundedWindow>) windowFn.windowCoder();
this.timerInternals = timerInternals;
}

@Override
public Instant currentInputWatermarkTime() {
return timerInternals.currentInputWatermarkTime();
}

@Override
public void setForWindow(BoundedWindow window) {
Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
timerInternals.setTimer(StateNamespaces.window(windowCoder, window),
GC_TIMER_ID, gcTime, TimeDomain.EVENT_TIME);
}

}

/**
* A {@link StateCleaner} implemented by StateInternals.
*/
public static class StateInternalsStateCleaner<W extends BoundedWindow>
implements StateCleaner<W> {

private final DoFn<?, ?> fn;
private final DoFnSignature signature;
private final StateInternals<?> stateInternals;
private final Coder<W> windowCoder;

public StateInternalsStateCleaner(
DoFn<?, ?> fn,
StateInternals<?> stateInternals,
Coder<W> windowCoder) {
this.fn = fn;
this.signature = DoFnSignatures.getSignature(fn.getClass());
this.stateInternals = stateInternals;
this.windowCoder = windowCoder;
}

@Override
public void clearForWindow(W window) {
for (Map.Entry<String, DoFnSignature.StateDeclaration> entry :
signature.stateDeclarations().entrySet()) {
try {
StateSpec<?, ?> spec = (StateSpec<?, ?>) entry.getValue().field().get(fn);
State state = stateInternals.state(StateNamespaces.window(windowCoder, window),
StateTags.tagForSpec(entry.getKey(), (StateSpec) spec));
state.clear();
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
}
}

}
Loading

0 comments on commit 700ab00

Please sign in to comment.