Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix broken watermark counters in unordered mapUsingServiceAsync [HZ-1928] #23271

Expand Up @@ -963,8 +963,8 @@ private void doCall(String methodName, boolean isCooperative, Runnable r) {
if (isCooperative) {
if (cooperativeTimeout > 0) {
assertTrue(String.format("call to %s() took %.1fms, it should be <%dms", methodName,
toMillis(elapsed), COOPERATIVE_TIME_LIMIT_MS_FAIL),
elapsed < MILLISECONDS.toNanos(COOPERATIVE_TIME_LIMIT_MS_FAIL));
toMillis(elapsed), cooperativeTimeout),
elapsed < MILLISECONDS.toNanos(cooperativeTimeout));
}
// print warning
if (elapsed > MILLISECONDS.toNanos(COOPERATIVE_TIME_LIMIT_MS_WARN)) {
Expand Down
Expand Up @@ -83,18 +83,44 @@ public final class AsyncTransformUsingServiceUnorderedP<C, S, T, K, R> extends A
In the resultQueue we also store the last received WM value for each key.

Separately, we track watermark counts for each key, which we increment for each WM key when an event is received,
and decrement when a response is processed. The count is the count of events received _before_ that WM, since
the previous WM. When the count gets to 0, we know we can emit the watermark, because all the responses
and decrement when a response is processed. The count is the count of events received _since_ that WM, _before_
the next WM. When the count gets to 0, we know we can emit the next watermark, because all the responses
for events received before it were already sent.

Snapshot contains in-flight elements at the time of taking the snapshot.
They are replayed when state is restored from the snapshot.
*/

private final BiFunctionEx<? super S, ? super T, ? extends CompletableFuture<Traverser<R>>> callAsyncFn;
private final Function<? super T, ? extends K> extractKeyFn;

private ManyToOneConcurrentArrayQueue<Tuple3<T, long[], Object>> resultQueue;
/**
* Each watermark count map contains:
* <ul>
* <li>key: watermark timestamp or {@link Long#MIN_VALUE} for items before first watermark</li>
* <li>value: number of items received _after_ this WM and _before_ next WM (if any)
* that are still being processed.</li>
* </ul>
*/
// TODO we can use more efficient structure: we only remove from the beginning and add to the end
@SuppressWarnings("unchecked")
private SortedMap<Long, Integer>[] watermarkCounts = new SortedMap[0];
/**
* Current in-flight items.
* <p>
* Invariants:
* <ol>
* <li>for each key, value > 0. Finished items are immediately removed
* <li>sum of all values in {@link #inFlightItems} is equal to
* {@link #asyncOpsCounter}, and to the sum of values in every map in the
* {@link #watermarkCounts} array.
* </ol>
* <p>
* This is {@link IdentityHashMap} but after restoring from snapshot objects
* that used single shared instance (e.g. {@link String})
* may no longer be the same shared instance.
*/
private final Map<T, Integer> inFlightItems = new IdentityHashMap<>();
private Traverser<Object> currentTraverser = Traversers.empty();
@SuppressWarnings("rawtypes")
Expand All @@ -110,9 +136,23 @@ public final class AsyncTransformUsingServiceUnorderedP<C, S, T, K, R> extends A
* WM key.
*/
private byte[] wmKeysInv = {};
/**
* Last received watermark for given key index (wmIndex).
* Copy-on-write.
*/
private long[] lastReceivedWms = {};
private long[] lastEmittedWms = {};
private long[] minRestoredWms = {};
/**
* Number of submitted asynchronous operations that have not yet finished.
* <p>
* Invariants:
* <ol>
* <li>asyncOpsCounter >= 0</li>
* <li>asyncOpsCounter <= maxConcurrentOps</li>
* <li>see invariant in {@link #inFlightItems}</li>
* </ol>
*/
private int asyncOpsCounter;

/** Temporary collection for restored objects during snapshot restore. */
Expand Down Expand Up @@ -194,14 +234,15 @@ public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
return true;
}
if (allEmpty(watermarkCounts)) {
// Emit watermark eagerly if there are no pending items to wait for.
if (!tryEmit(watermark)) {
return false;
}
lastEmittedWms[wmIndex] = watermark.timestamp();
}
// We must not mutate lastReceivedWms, because we share the instance in the inFlightItems - we would
// mutate the instance they have. Instead, we copy and mutate it.
lastReceivedWms = Arrays.copyOf(lastReceivedWms, Math.max(wmIndex + 1, lastReceivedWms.length));
lastReceivedWms = lastReceivedWms.clone();
lastReceivedWms[wmIndex] = watermark.timestamp();
return true;
}
Expand Down Expand Up @@ -239,8 +280,12 @@ private int getWmIndex(byte wmKey0) {

watermarkCounts = Arrays.copyOf(watermarkCounts, newLength);
watermarkCounts[wmIndex] = new TreeMap<>();
if (inFlightItems.size() > 0) {
watermarkCounts[wmIndex].put(Long.MIN_VALUE, inFlightItems.size());
if (asyncOpsCounter > 0) {
// This is the first time we have seen this watermark key.
// Current in-flight items were received before any watermark for this key.
// Note that the same item can be processed multiple times
// if it appeared many times in the inbox.
watermarkCounts[wmIndex].put(Long.MIN_VALUE, asyncOpsCounter);
}

minRestoredWms = Arrays.copyOf(minRestoredWms, newLength);
Expand Down Expand Up @@ -289,7 +334,7 @@ public boolean saveToSnapshot() {
protected void restoreFromSnapshot(@Nonnull Object key, @Nonnull Object value) {
if (key instanceof BroadcastKey) {
assert ((BroadcastKey) key).key().equals(Keys.LAST_RECEIVED_WMS) : "Unexpected key: " + key;
// we restart at the oldest WM any instance was at at the time of snapshot
// we restart at the oldest WM any instance was at the time of snapshot
for (Entry<Byte, Long> en : ((Map<Byte, Long>) value).entrySet()) {
int wmIndex = getWmIndex(en.getKey());
minRestoredWms[wmIndex] = Math.min(minRestoredWms[wmIndex], en.getValue());
Expand All @@ -300,6 +345,7 @@ protected void restoreFromSnapshot(@Nonnull Object key, @Nonnull Object value) {
Tuple2<T, Integer> value1 = (Tuple2<T, Integer>) value;
// we can't apply backpressure here, we have to store the items and execute them later
assert value1.f0() != null && value1.f1() != null;
// replay each item appropriate number of times, order does not matter
for (int i = 0; i < value1.f1(); i++) {
restoredObjects.add(value1.f0());
LoggingUtil.logFinest(getLogger(), "Restored: %s", value1.f0());
Expand All @@ -321,8 +367,8 @@ public boolean finishSnapshotRestore() {
return true;
} else {
tryFlushQueue();
return false;
}
return false;
}

/**
Expand All @@ -344,10 +390,12 @@ private boolean tryFlushQueue() {
}
Tuple3<T, long[], Object> tuple = resultQueue.poll();
if (tuple == null) {
return allEmpty(watermarkCounts);
// done if there are no ready and no in-flight items
return asyncOpsCounter == 0;
}
assert asyncOpsCounter > 0;
asyncOpsCounter--;
Integer inFlightItemsCount = inFlightItems.merge(tuple.f0(), -1, (o, n) -> o == 1 ? null : o + n);
Integer inFlightItemsCount = inFlightItems.compute(tuple.f0(), (k, v) -> v == 1 ? null : v - 1);
assert inFlightItemsCount == null || inFlightItemsCount > 0 : "inFlightItemsCount=" + inFlightItemsCount;
// the result is either Throwable or Traverser<Object>
if (tuple.f2() instanceof Throwable) {
Expand All @@ -370,6 +418,9 @@ private boolean tryFlushQueue() {
continue;
}
long wmToEmit = Long.MIN_VALUE;
// The first watermark with non-zero counter is ready to be emitted:
// - all items before it have completed (and can be removed from watermarkCount map)
// - there are in-flight items received after it, so the next watermark is not ready
for (Iterator<Entry<Long, Integer>> it = watermarkCount.entrySet().iterator(); it.hasNext(); ) {
Entry<Long, Integer> entry = it.next();
if (entry.getValue() != 0) {
Expand Down
Expand Up @@ -21,6 +21,7 @@
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.SimpleTestInClusterSupport;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Watermark;
Expand All @@ -44,22 +45,28 @@
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import static com.hazelcast.jet.Traversers.singleton;
import static com.hazelcast.jet.Traversers.traverseItems;
import static com.hazelcast.jet.impl.util.Util.exceptionallyCompletedFuture;
import static com.hazelcast.jet.pipeline.GeneralStage.DEFAULT_MAX_CONCURRENT_OPS;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

@RunWith(HazelcastParametrizedRunner.class)
@Parameterized.UseParametersRunnerFactory(HazelcastSerialParametersRunnerFactory.class)
@Category({QuickTest.class, ParallelJVMTest.class})
public class AsyncTransformUsingServicePTest extends SimpleTestInClusterSupport {
public class AsyncTransformUsingServicePTest extends SimpleTestInClusterSupport {

@Rule
public ExpectedException exception = ExpectedException.none();
Expand Down Expand Up @@ -160,9 +167,6 @@ public void test_forwardWatermarksWithoutItems() {

@Test
public void test_forwardMultipleWatermarksWithoutItems() {
if (ordered) {
return;
}
TestSupport
.verifyProcessor(getSupplier((ctx, item) -> {
throw new UnsupportedOperationException();
Expand All @@ -172,6 +176,34 @@ public void test_forwardMultipleWatermarksWithoutItems() {
.expectOutput(asList(wm(10, (byte) 0), wm(5, (byte) 1), wm(0, (byte) 2)));
}

@Test
public void test_completedFutures_sameElement() {
TestSupport
.verifyProcessor(getSupplier((ctx, item) -> completedFuture(singleton(item + "-1"))))
.hazelcastInstance(instance())
.input(asList("a", "a", "a"))
.disableProgressAssertion()
.expectOutput(asList("a-1", "a-1", "a-1"));
}

@Test
public void test_completedFutures_sameElementInterleavedWithWatermark() {
TestSupport
.verifyProcessor(getSupplier((ctx, item) -> completedFuture(singleton(item + "-1"))))
.hazelcastInstance(instance())
.input(asList("a", "a", wm(10), "a"))
.outputChecker((expected, actual) ->
actual.equals(asList("a-1", "a-1", wm(10), "a-1"))
// this is possible if restoring the processor after every snapshot
|| (!ordered && actual.equals(asList("a-1", "a-1", "a-1", wm(10))))
// this is possible if restoring the processor after every other snapshot. The
// WM isn't actually duplicated, but is emitted again after a restart
|| (!ordered && actual.equals(asList("a-1", "a-1", wm(10), "a-1", wm(10))))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viliam-durina I think that watermark duplication is allowed. Is that true?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not allowed, they must be strictly monotonic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Watermark order with regards to items is preserved.
After a lot of debugging I confirmed that WM duplication is partially caused by the fact that we do not store lastEmittedWms in the snapshot. So if at the time of getting snapshot we have:

  1. WM that was already emitted and no new WM was received
  2. inflight item

then, after restore we get:

  1. that old, already emitted, WM in lastReceivedWms
  2. inflight item that assumes that it was received before any snapshot (Long.MIN_VALUE) even though to be strict it should be related to the WM (lastReceivedWms field is updated after restored items are sent for processing)
  3. when inflight item completes, it triggers emission of WM from lastReceivedWms (check in watermarkCount.isEmpty() && lastReceivedWms[i] > lastEmittedWms[i]) which seems to make sense at least for other cases

I think this actually may be by design that given WM can be repeated after snapshot because, as the comment says:

we restart at the oldest WM any instance was at the time of snapshot

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also the duplication does not occur during single execution, duplicated WM is emitted after restore from snapshot. But because of "we restart at the oldest WM any instance was at the time of snapshot" it probably can go backwards.

Still, emitting WM after unrelated item seems at least counter-intuitive.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I debugged it too and yes, the observed output is legal. The duplicate WMs aren't emitted from a single instance of a processor (that wouldn't be legal), but from the processor after restore.

We save a simplified state to the snapshot. Before saving, we know exactly which item was received before which WM. But we can't save it exactly like this, because after restore, the WM can go back in at-least-once mode, and items can be re-partitioned, and we can't re-partition the watermarkCounts. I think we don't even have to save lastReceivedWm at all, we can just rely on the new WMs received after restore, but it's not too important to change, I can be wrong here.

)
.disableProgressAssertion()
.expectOutput(singletonList("<see code>"));
}

@Test
public void when_mapFnReturnsNullFuture_then_filteredOut() {
TestSupport
Expand Down Expand Up @@ -214,7 +246,7 @@ public void test_wmNotCountedToParallelOps() throws Exception {
assertTrue("wm rejected", processor.tryProcessWatermark(wm(0)));
inbox.add("bar");
processor.process(0, inbox);
assertTrue("2nd item rejected even though max parallel ops is 1", inbox.isEmpty());
assertTrue("2nd item rejected even though max parallel ops is 2", inbox.isEmpty());
}

@Test
Expand All @@ -223,12 +255,114 @@ public void test_watermarksConflated() throws Exception {
2, (ctx, item) -> new CompletableFuture<>()).get(1).iterator().next();
processor.init(new TestOutbox(128), new TestProcessorContext());
TestInbox inbox = new TestInbox();
// i have to add an item first because otherwise the WMs are forwarded right away
// I have to add an item first because otherwise the WMs are forwarded right away
inbox.add("foo");
processor.process(0, inbox);
assertTrue("inbox not empty", inbox.isEmpty());
assertTrue("wm rejected", processor.tryProcessWatermark(wm(0)));
assertTrue("wm rejected", processor.tryProcessWatermark(wm(1)));
assertTrue("wm rejected", processor.tryProcessWatermark(wm(2)));
}

@Test
public void test_allItemsProcessed_withoutWatermarks() throws Exception {
CompletableFuture<Traverser<String>> processFuture = new CompletableFuture<>();
Processor processor = getSupplier(2, (ctx, item) -> processFuture)
.get(1).iterator().next();
TestOutbox outbox = new TestOutbox(128);
processor.init(outbox, new TestProcessorContext());
TestInbox inbox = new TestInbox(singletonList("foo"));
processor.process(0, inbox);
assertFalse("Should not complete when items are being processed", processor.complete());
processFuture.complete(Traversers.singleton("foo-processed"));
assertTrueEventually(() -> assertTrue("Should complete after all items are processed",
processor.complete()));
assertThat(outbox.queue(0)).containsExactly("foo-processed");
}

@Test
public void test_firstWatermarkIsForwardedAfterPreviousItemsComplete() throws Exception {
// edge case test for first watermark
CompletableFuture<Traverser<String>> processFuture = new CompletableFuture<>();
Processor processor = getSupplier(2, (ctx, item) -> processFuture)
.get(1).iterator().next();

TestOutbox outbox = new TestOutbox(128);
processor.init(outbox, new TestProcessorContext());
TestInbox inbox = new TestInbox(singletonList("foo"));
processor.process(0, inbox);
assertTrue(processor.tryProcessWatermark(wm(10)));
assertEquals("Should not forward watermark until previous elements are processed",
Long.MIN_VALUE, outbox.lastForwardedWm((byte) 0));
assertThat(outbox.queue(0)).isEmpty();

processFuture.complete(Traversers.singleton("foo-processed"));

assertTrueEventually(() -> assertTrue("Should complete after all items are processed",
processor.complete()));
assertThat(outbox.queue(0)).containsExactly("foo-processed", wm(10));
assertEquals(10, outbox.lastForwardedWm((byte) 0));
}

@Test
public void test_firstWatermarkIsForwardedAfterPreviousItemsComplete_whenTheSameElementIsProcessed_inOrder() throws Exception {
test_firstWatermarkIsForwardedAfterPreviousItemsComplete_whenTheSameElementIsProcessed(0, 1, 2);
}

@Test
public void test_firstWatermarkIsForwardedAfterPreviousItemsComplete_whenTheSameElementIsProcessed_inReverseOrder() throws Exception {
test_firstWatermarkIsForwardedAfterPreviousItemsComplete_whenTheSameElementIsProcessed(2, 1, 0);
}

private void test_firstWatermarkIsForwardedAfterPreviousItemsComplete_whenTheSameElementIsProcessed(int first,
int second,
int third)
throws Exception {
// futures can be completed in arbitrary order
List<CompletableFuture<String>> futureList = new ArrayList<>(3);

Processor processor = getSupplier(
10,
(ctx, item) -> {
CompletableFuture<String> f = new CompletableFuture<>();
futureList.add(f);
return f.thenApply(Traversers::singleton);
}).get(1).iterator().next();
TestOutbox outbox = new TestOutbox(128);
processor.init(outbox, new TestProcessorContext());
TestInbox inbox = new TestInbox(asList("foo", "foo"));
processor.process(0, inbox);
assertTrue(processor.tryProcessWatermark(wm(10)));
assertEquals("Should not forward watermark until previous elements are processed",
Long.MIN_VALUE, outbox.lastForwardedWm((byte) 0));
assertThat(outbox.queue(0)).isEmpty();

TestInbox inbox2 = new TestInbox(singletonList("foo"));
processor.process(0, inbox2);
assertEquals("Should not forward watermark until previous elements are processed",
Long.MIN_VALUE, outbox.lastForwardedWm((byte) 0));
assertThat(outbox.queue(0)).isEmpty();

assertThat(futureList).hasSize(3);
futureList.get(first).complete("foo-" + first);
futureList.get(second).complete("foo-" + second);
futureList.get(third).complete("foo-" + third);

assertTrueEventually(null, () -> assertTrue("Should complete after all items are processed",
processor.complete()), 10);
if (ordered) {
assertThat(outbox.queue(0))
.as("Items should be emitted in submission order")
.containsExactly("foo-0", "foo-1", wm(10), "foo-2");
} else {
assertThat(outbox.queue(0))
.as("All items should be emitted")
.containsExactlyInAnyOrder("foo-0", "foo-1", "foo-2", wm(10))
// note that foo-2 can be reordered with watermark but this way it is fine
.as("Items should be emitted in correct order relative to watermark")
.containsSubsequence("foo-0", wm(10))
.containsSubsequence("foo-1", wm(10));
}
assertEquals(10, outbox.lastForwardedWm((byte) 0));
}
}