Skip to content

Commit

Permalink
Improvements to ReduceFnRunner prefetching:
Browse files Browse the repository at this point in the history
- add prefetch* methods for prefetching state matching existing methods
- replace onTimer with batched onTimers method to allow prefetching
  across timers
- prefetch triggers in processElements
Additionally remove deprecated TimerCallback usage
  • Loading branch information
scwhittle committed Dec 6, 2016
1 parent 1efda59 commit 5085c77
Show file tree
Hide file tree
Showing 10 changed files with 440 additions and 243 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.TimerInternals.TimerData;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.util.state.StateInternalsFactory;
Expand Down Expand Up @@ -72,9 +71,9 @@ private GroupAlsoByWindowViaWindowSetDoFn(

@Override
public void processElement(ProcessContext c) throws Exception {
KeyedWorkItem<K, InputT> element = c.element();
KeyedWorkItem<K, InputT> keyedWorkItem = c.element();

K key = c.element().key();
K key = keyedWorkItem.key();
TimerInternals timerInternals = c.windowingInternals().timerInternals();
StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);

Expand All @@ -92,10 +91,8 @@ public void processElement(ProcessContext c) throws Exception {
reduceFn,
c.getPipelineOptions());

reduceFnRunner.processElements(element.elementsIterable());
for (TimerData timer : element.timersIterable()) {
reduceFnRunner.onTimer(timer);
}
reduceFnRunner.processElements(keyedWorkItem.elementsIterable());
reduceFnRunner.onTimers(keyedWorkItem.timersIterable());
reduceFnRunner.persist();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@
package org.apache.beam.runners.core;

import com.google.common.collect.Iterables;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.util.state.StateInternalsFactory;
import org.apache.beam.sdk.util.state.TimerCallback;
import org.joda.time.Instant;

/**
Expand Down Expand Up @@ -59,9 +60,8 @@ public void processElement(ProcessContext c) throws Exception {
// timer manager from the context because it doesn't exist. So we create one and emulate the
// watermark, knowing that we have all data and it is in timestamp order.
InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
timerInternals.advanceProcessingTime(TimerCallback.NO_OP, Instant.now());
timerInternals.advanceSynchronizedProcessingTime(
TimerCallback.NO_OP, BoundedWindow.TIMESTAMP_MAX_VALUE);
timerInternals.advanceProcessingTime(Instant.now());
timerInternals.advanceSynchronizedProcessingTime(Instant.now());
StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);

ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
Expand All @@ -85,22 +85,50 @@ public void processElement(ProcessContext c) throws Exception {
reduceFnRunner.processElements(chunk);

// Then, since elements are sorted by their timestamp, advance the input watermark
// to the first element, and fire any timers that may have been scheduled.
timerInternals.advanceInputWatermark(reduceFnRunner, chunk.iterator().next().getTimestamp());
// to the first element.
timerInternals.advanceInputWatermark(chunk.iterator().next().getTimestamp());
// Advance the processing times.
timerInternals.advanceProcessingTime(Instant.now());
timerInternals.advanceSynchronizedProcessingTime(Instant.now());

// Fire any processing timers that need to fire
timerInternals.advanceProcessingTime(reduceFnRunner, Instant.now());
// Fire all the eligible timers.
fireEligibleTimers(timerInternals, reduceFnRunner);

// Leave the output watermark undefined. Since there's no late data in batch mode
// there's really no need to track it as we do for streaming.
}

// Finish any pending windows by advancing the input watermark to infinity.
timerInternals.advanceInputWatermark(reduceFnRunner, BoundedWindow.TIMESTAMP_MAX_VALUE);
timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);

// Finally, advance the processing time to infinity to fire any timers.
timerInternals.advanceProcessingTime(reduceFnRunner, BoundedWindow.TIMESTAMP_MAX_VALUE);
timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);

fireEligibleTimers(timerInternals, reduceFnRunner);

reduceFnRunner.persist();
}

private void fireEligibleTimers(InMemoryTimerInternals timerInternals,
ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner) throws Exception {
List<TimerInternals.TimerData> timers = new ArrayList<>();
while (true) {
TimerInternals.TimerData timer;
while ((timer = timerInternals.removeNextEventTimer()) != null) {
timers.add(timer);
}
while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
timers.add(timer);
}
while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
timers.add(timer);
}
if (timers.isEmpty()) {
break;
}
reduceFnRunner.onTimers(timers);
timers.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public void clear(StateAccessor<?> state) {
state.access(PANE_INFO_TAG).clear();
}

public void prefetchPaneInfo(ReduceFn<?, ?, ?, ?>.Context context) {
context.state().access(PaneInfoTracker.PANE_INFO_TAG).readLater();
}

/**
* Return a ({@link ReadableState} for) the pane info appropriate for {@code context}. The pane
* info includes the timing for the pane, who's calculation is quite subtle.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.beam.sdk.util.Timers;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.MergingStateAccessor;
import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.State;
import org.apache.beam.sdk.util.state.StateAccessor;
import org.apache.beam.sdk.util.state.StateContext;
Expand Down Expand Up @@ -117,7 +116,7 @@ public ReduceFn<K, InputT, OutputT, W>.ProcessValueContext forValue(
}

public ReduceFn<K, InputT, OutputT, W>.OnTriggerContext forTrigger(W window,
ReadableState<PaneInfo> pane, StateStyle style, OnTriggerCallbacks<OutputT> callbacks) {
PaneInfo pane, StateStyle style, OnTriggerCallbacks<OutputT> callbacks) {
return new OnTriggerContextImpl(stateAccessor(window, style), pane, callbacks);
}

Expand Down Expand Up @@ -389,11 +388,11 @@ public Timers timers() {

private class OnTriggerContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnTriggerContext {
private final StateAccessorImpl<K, W> state;
private final ReadableState<PaneInfo> pane;
private final PaneInfo pane;
private final OnTriggerCallbacks<OutputT> callbacks;
private final TimersImpl timers;

private OnTriggerContextImpl(StateAccessorImpl<K, W> state, ReadableState<PaneInfo> pane,
private OnTriggerContextImpl(StateAccessorImpl<K, W> state, PaneInfo pane,
OnTriggerCallbacks<OutputT> callbacks) {
reduceFn.super();
this.state = state;
Expand Down Expand Up @@ -424,7 +423,7 @@ public StateAccessor<K> state() {

@Override
public PaneInfo paneInfo() {
return pane.read();
return pane;
}

@Override
Expand Down

0 comments on commit 5085c77

Please sign in to comment.