Skip to content

Commit

Permalink
This closes #1519
Browse files Browse the repository at this point in the history
  • Loading branch information
kennknowles committed Dec 6, 2016
2 parents 9d380de + 2b044f3 commit c72708c
Show file tree
Hide file tree
Showing 10 changed files with 440 additions and 243 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.TimerInternals.TimerData;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.util.state.StateInternalsFactory;
Expand Down Expand Up @@ -72,9 +71,9 @@ private GroupAlsoByWindowViaWindowSetDoFn(

@Override
public void processElement(ProcessContext c) throws Exception {
KeyedWorkItem<K, InputT> element = c.element();
KeyedWorkItem<K, InputT> keyedWorkItem = c.element();

K key = c.element().key();
K key = keyedWorkItem.key();
TimerInternals timerInternals = c.windowingInternals().timerInternals();
StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);

Expand All @@ -92,10 +91,8 @@ public void processElement(ProcessContext c) throws Exception {
reduceFn,
c.getPipelineOptions());

reduceFnRunner.processElements(element.elementsIterable());
for (TimerData timer : element.timersIterable()) {
reduceFnRunner.onTimer(timer);
}
reduceFnRunner.processElements(keyedWorkItem.elementsIterable());
reduceFnRunner.onTimers(keyedWorkItem.timersIterable());
reduceFnRunner.persist();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@
package org.apache.beam.runners.core;

import com.google.common.collect.Iterables;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.util.state.StateInternalsFactory;
import org.apache.beam.sdk.util.state.TimerCallback;
import org.joda.time.Instant;

/**
Expand Down Expand Up @@ -59,9 +60,8 @@ public void processElement(ProcessContext c) throws Exception {
// timer manager from the context because it doesn't exist. So we create one and emulate the
// watermark, knowing that we have all data and it is in timestamp order.
InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
timerInternals.advanceProcessingTime(TimerCallback.NO_OP, Instant.now());
timerInternals.advanceSynchronizedProcessingTime(
TimerCallback.NO_OP, BoundedWindow.TIMESTAMP_MAX_VALUE);
timerInternals.advanceProcessingTime(Instant.now());
timerInternals.advanceSynchronizedProcessingTime(Instant.now());
StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);

ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
Expand All @@ -85,22 +85,50 @@ public void processElement(ProcessContext c) throws Exception {
reduceFnRunner.processElements(chunk);

// Then, since elements are sorted by their timestamp, advance the input watermark
// to the first element, and fire any timers that may have been scheduled.
timerInternals.advanceInputWatermark(reduceFnRunner, chunk.iterator().next().getTimestamp());
// to the first element.
timerInternals.advanceInputWatermark(chunk.iterator().next().getTimestamp());
// Advance the processing times.
timerInternals.advanceProcessingTime(Instant.now());
timerInternals.advanceSynchronizedProcessingTime(Instant.now());

// Fire any processing timers that need to fire
timerInternals.advanceProcessingTime(reduceFnRunner, Instant.now());
// Fire all the eligible timers.
fireEligibleTimers(timerInternals, reduceFnRunner);

// Leave the output watermark undefined. Since there's no late data in batch mode
// there's really no need to track it as we do for streaming.
}

// Finish any pending windows by advancing the input watermark to infinity.
timerInternals.advanceInputWatermark(reduceFnRunner, BoundedWindow.TIMESTAMP_MAX_VALUE);
timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);

// Finally, advance the processing time to infinity to fire any timers.
timerInternals.advanceProcessingTime(reduceFnRunner, BoundedWindow.TIMESTAMP_MAX_VALUE);
timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);

fireEligibleTimers(timerInternals, reduceFnRunner);

reduceFnRunner.persist();
}

private void fireEligibleTimers(InMemoryTimerInternals timerInternals,
ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner) throws Exception {
List<TimerInternals.TimerData> timers = new ArrayList<>();
while (true) {
TimerInternals.TimerData timer;
while ((timer = timerInternals.removeNextEventTimer()) != null) {
timers.add(timer);
}
while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
timers.add(timer);
}
while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
timers.add(timer);
}
if (timers.isEmpty()) {
break;
}
reduceFnRunner.onTimers(timers);
timers.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public void clear(StateAccessor<?> state) {
state.access(PANE_INFO_TAG).clear();
}

public void prefetchPaneInfo(ReduceFn<?, ?, ?, ?>.Context context) {
context.state().access(PaneInfoTracker.PANE_INFO_TAG).readLater();
}

/**
* Return a ({@link ReadableState} for) the pane info appropriate for {@code context}. The pane
* info includes the timing for the pane, who's calculation is quite subtle.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.beam.sdk.util.Timers;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.MergingStateAccessor;
import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.State;
import org.apache.beam.sdk.util.state.StateAccessor;
import org.apache.beam.sdk.util.state.StateContext;
Expand Down Expand Up @@ -117,7 +116,7 @@ public ReduceFn<K, InputT, OutputT, W>.ProcessValueContext forValue(
}

public ReduceFn<K, InputT, OutputT, W>.OnTriggerContext forTrigger(W window,
ReadableState<PaneInfo> pane, StateStyle style, OnTriggerCallbacks<OutputT> callbacks) {
PaneInfo pane, StateStyle style, OnTriggerCallbacks<OutputT> callbacks) {
return new OnTriggerContextImpl(stateAccessor(window, style), pane, callbacks);
}

Expand Down Expand Up @@ -389,11 +388,11 @@ public Timers timers() {

private class OnTriggerContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnTriggerContext {
private final StateAccessorImpl<K, W> state;
private final ReadableState<PaneInfo> pane;
private final PaneInfo pane;
private final OnTriggerCallbacks<OutputT> callbacks;
private final TimersImpl timers;

private OnTriggerContextImpl(StateAccessorImpl<K, W> state, ReadableState<PaneInfo> pane,
private OnTriggerContextImpl(StateAccessorImpl<K, W> state, PaneInfo pane,
OnTriggerCallbacks<OutputT> callbacks) {
reduceFn.super();
this.state = state;
Expand Down Expand Up @@ -424,7 +423,7 @@ public StateAccessor<K> state() {

@Override
public PaneInfo paneInfo() {
return pane.read();
return pane;
}

@Override
Expand Down

0 comments on commit c72708c

Please sign in to comment.