Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-9308] Decorrelate state cleanup timers #10852

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -54,6 +54,7 @@
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.DoFnInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
Expand Down Expand Up @@ -326,9 +327,20 @@ public void processElement(Object untypedElem) throws Exception {
WindowedValue<InputT> elem = (WindowedValue<InputT>) untypedElem;

if (fnSignature != null && fnSignature.stateDeclarations().size() > 0) {
int jitterMs = 0;
if (elem.getValue() instanceof KV<?, ?>) {
int keyHash = ((KV<?, ?>) elem.getValue()).getKey().hashCode();
// mix the hash in case the key has a "bad" hash function (eg Integer just returns itself).
int mixedHash = 0x1b873593 * Integer.rotateLeft(keyHash * 0xcc9e2d51, 15);
// spread the hash across 180 seconds, an arbitrarily chosen number.
// TODO: make this configurable?
jitterMs = Math.abs(mixedHash % 180) * 1000;
}

registerStateCleanup(
(WindowingStrategy<?, BoundedWindow>) getDoFnInfo().getWindowingStrategy(),
(Collection<BoundedWindow>) elem.getWindows());
(Collection<BoundedWindow>) elem.getWindows(),
jitterMs);
}

outputsPerElementTracker.onProcessElement();
Expand Down Expand Up @@ -477,18 +489,17 @@ private void processTimers(
}

private <W extends BoundedWindow> void registerStateCleanup(
WindowingStrategy<?, W> windowingStrategy, Collection<W> windowsToCleanup) {
WindowingStrategy<?, W> windowingStrategy, Collection<W> windowsToCleanup, int jitterMs) {
Coder<W> windowCoder = windowingStrategy.getWindowFn().windowCoder();

for (W window : windowsToCleanup) {
// The stepContext is the thing that know if it is batch or streaming, hence
// whether state needs to be cleaned up or will simply be discarded so the
// timer can be ignored
stepContext.setStateCleanupTimer(
CLEANUP_TIMER_ID,
window,
windowCoder,
earliestAllowableCleanupTime(window, windowingStrategy));
Instant jitteredCleanupTime =
earliestAllowableCleanupTime(window, windowingStrategy).plus(jitterMs);

stepContext.setStateCleanupTimer(CLEANUP_TIMER_ID, window, windowCoder, jitteredCleanupTime);
}
}

Expand Down