Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.joda.time.Instant;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.PriorityQueue;
import java.util.Set;
Expand Down Expand Up @@ -124,16 +125,19 @@ private void advance(ReduceFnRunner<?, ?, ?, ?> runner, Instant newTime, TimeDom
PriorityQueue<TimerData> timers = queue(domain);
boolean shouldFire = false;

do {
TimerData timer = timers.peek();
// Timers fire if the new time is ahead of the timer
shouldFire = timer != null && newTime.isAfter(timer.getTimestamp());
if (shouldFire) {
// Remove before firing, so that if the trigger adds another identical
while (true) {
ArrayList<TimerData> firedTimers = new ArrayList();
while (!timers.isEmpty() && newTime.isAfter(timers.peek().getTimestamp())) {
// Remove before firing, so that if the callback adds another identical
// timer we don't remove it.
timers.remove();
runner.onTimer(timer);
TimerData timer = timers.remove();
existingTimers.remove(timer);
firedTimers.add(timer);
}
} while (shouldFire);
if (firedTimers.isEmpty()) {
break;
}
runner.onTimers(firedTimers);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.google.cloud.dataflow.sdk.transforms.Sum;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.DoFnRunner.ReduceFnExecutor;
import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
import com.google.cloud.dataflow.sdk.util.state.StateInternals;
import com.google.cloud.dataflow.sdk.values.KV;

Expand Down Expand Up @@ -81,9 +80,7 @@ public void processElement(ProcessContext c) throws Exception {
c.getPipelineOptions());

reduceFnRunner.processElements(element.elementsIterable());
for (TimerData timer : element.timersIterable()) {
reduceFnRunner.onTimer(timer);
}
reduceFnRunner.onTimers(element.timersIterable());
reduceFnRunner.persist();
}

Expand All @@ -101,4 +98,3 @@ public Aggregator<Long, Long> getDroppedDueToLatenessAggregator() {
return droppedDueToLateness;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public void clear(StateAccessor<?> state) {
state.access(PANE_INFO_TAG).clear();
}

public void prefetchPaneInfo(ReduceFn<?, ?, ?, ?>.Context context) {
context.state().access(PaneInfoTracker.PANE_INFO_TAG).readLater();
}

/**
* Return a ({@link ReadableState} for) the pane info appropriate for {@code context}. The pane
* info includes the timing for the pane, who's calculation is quite subtle.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
import com.google.cloud.dataflow.sdk.util.state.MergingStateAccessor;
import com.google.cloud.dataflow.sdk.util.state.ReadableState;
import com.google.cloud.dataflow.sdk.util.state.State;
import com.google.cloud.dataflow.sdk.util.state.StateAccessor;
import com.google.cloud.dataflow.sdk.util.state.StateContext;
Expand Down Expand Up @@ -99,7 +98,7 @@ public ReduceFn<K, InputT, OutputT, W>.ProcessValueContext forValue(
}

public ReduceFn<K, InputT, OutputT, W>.OnTriggerContext forTrigger(W window,
ReadableState<PaneInfo> pane, StateStyle style, OnTriggerCallbacks<OutputT> callbacks) {
PaneInfo pane, StateStyle style, OnTriggerCallbacks<OutputT> callbacks) {
return new OnTriggerContextImpl(stateAccessor(window, style), pane, callbacks);
}

Expand Down Expand Up @@ -371,11 +370,11 @@ public Timers timers() {

private class OnTriggerContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnTriggerContext {
private final StateAccessorImpl<K, W> state;
private final ReadableState<PaneInfo> pane;
private final PaneInfo pane;
private final OnTriggerCallbacks<OutputT> callbacks;
private final TimersImpl timers;

private OnTriggerContextImpl(StateAccessorImpl<K, W> state, ReadableState<PaneInfo> pane,
private OnTriggerContextImpl(StateAccessorImpl<K, W> state, PaneInfo pane,
OnTriggerCallbacks<OutputT> callbacks) {
reduceFn.super();
this.state = state;
Expand Down Expand Up @@ -406,7 +405,7 @@ public StateAccessor<K> state() {

@Override
public PaneInfo paneInfo() {
return pane.read();
return pane;
}

@Override
Expand Down
Loading