Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unordered mapUsingServiceAsync loses items when there are no watermarks [HZ-1928] #23245

Closed
k-jamroz opened this issue Dec 22, 2022 · 3 comments · Fixed by #23271
Closed

Unordered mapUsingServiceAsync loses items when there are no watermarks [HZ-1928] #23245

k-jamroz opened this issue Dec 22, 2022 · 3 comments · Fixed by #23271
Assignees
Labels
Module: Jet Issues/PRs for Jet Source: Community PR or issue was opened by a community user to-jira Type: Defect
Milestone

Comments

@k-jamroz
Copy link
Contributor

Describe the bug

Unordered mapUsingServiceAsync loses last items when there are no watermarks.

To Reproduce

https://hazelcastcommunity.slack.com/archives/C013LM6JK7S/p1671652200501389

@k-jamroz k-jamroz added Type: Defect Source: Community PR or issue was opened by a community user Module: Jet Issues/PRs for Jet labels Dec 22, 2022
@k-jamroz
Copy link
Contributor Author

It is likely that the problem was introduced in 5.2.z with multiple watermarks support.

@github-actions github-actions bot changed the title Unordered mapUsingServiceAsync loses items when there are no watermarks Unordered mapUsingServiceAsync loses items when there are no watermarks [HZ-1928] Dec 22, 2022
@github-actions
Copy link
Contributor

Internal Jira issue: HZ-1928

@AyberkSorgun AyberkSorgun added this to the 5.3.0 milestone Dec 22, 2022
@k-jamroz
Copy link
Contributor Author

Another problem with unordered async processor is that when the same element is processed multiple times. In such case internal counters are corrupted and processor crashes on assertion (and hangs the entire job).

Reproducer (add in AsyncTransformUsingServicePTest):

    @Test
    public void test_completedFutures_sameElementInterleavedWithWatermark() {
        TestSupport
                .verifyProcessor(getSupplier((ctx, item) ->  completedFuture(singleton(item + "-1"))))
                .hazelcastInstance(instance())
                .input(asList("a", "a", wm(10), "a"))
                .disableProgressAssertion()
                .expectOutput(asList("a-1", "a-1", wm(10), "a-1"));
    }

causes:

java.lang.AssertionError: count=-1

	at com.hazelcast.jet.impl.processor.AsyncTransformUsingServiceUnorderedP.tryFlushQueue(AsyncTransformUsingServiceUnorderedP.java:367)
	at com.hazelcast.jet.impl.processor.AsyncTransformUsingServiceUnorderedP.complete(AsyncTransformUsingServiceUnorderedP.java:261)
	at com.hazelcast.jet.core.test.TestSupport.lambda$runTest$8(TestSupport.java:768)
	at com.hazelcast.jet.core.test.TestSupport.doCall(TestSupport.java:960)
	at com.hazelcast.jet.core.test.TestSupport.runTest(TestSupport.java:768)
	at com.hazelcast.jet.core.test.TestSupport.run(TestSupport.java:632)
	at com.hazelcast.jet.core.test.TestSupport.assertOutput(TestSupport.java:423)
	at com.hazelcast.jet.core.test.TestSupport.expectOutputs(TestSupport.java:344)
	at com.hazelcast.jet.core.test.TestSupport.expectOutput(TestSupport.java:325)
	at com.hazelcast.jet.impl.processor.AsyncTransformUsingServicePTest.test_completedFutures_sameElementInterleavedWithWatermark(AsyncTransformUsingServicePTest.java:196)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
	at java.base/java.lang.reflect.Method.invoke(Method.java:578)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at com.hazelcast.test.FailOnTimeoutStatement$CallableStatement.call(FailOnTimeoutStatement.java:115)
	at com.hazelcast.test.FailOnTimeoutStatement$CallableStatement.call(FailOnTimeoutStatement.java:107)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.lang.Thread.run(Thread.java:1589)

k-jamroz added a commit to k-jamroz/hazelcast that referenced this issue Dec 28, 2022
k-jamroz added a commit to k-jamroz/hazelcast that referenced this issue Dec 28, 2022
k-jamroz added a commit to k-jamroz/hazelcast that referenced this issue Dec 28, 2022
k-jamroz added a commit to k-jamroz/hazelcast that referenced this issue Dec 28, 2022
k-jamroz added a commit to k-jamroz/hazelcast that referenced this issue Dec 28, 2022
k-jamroz added a commit that referenced this issue Jan 12, 2023
] (#23271)

mapUsingServiceAsync in unordered mode used wrong condition to check if
the processing is complete and wrong number of currently processed items
to propagate watermarks. This could lead to lost items when the pipeline
was completed and to crashes during processing of streams containing the
same item multiple times.

Fixes #23245

Co-authored-by: Viliam Durina <viliam@hazelcast.com>
k-jamroz added a commit that referenced this issue Jan 12, 2023
] [5.2.z] (#23272)

mapUsingServiceAsync in unordered mode used wrong condition to check if
the processing is complete and wrong number of currently processed items
to propagate watermarks. This could lead to lost items when the pipeline
was completed and to crashes during processing of streams containing the
same item multiple times.

Fixes #23245
Backport of: #23271

Co-authored-by: Viliam Durina <viliam@hazelcast.com>
viliam-durina pushed a commit to viliam-durina/hazelcast that referenced this issue Jan 14, 2023
…928] (hazelcast#23271)

mapUsingServiceAsync in unordered mode used wrong condition to check if
the processing is complete and wrong number of currently processed items
to propagate watermarks. This could lead to lost items when the pipeline
was completed and to crashes during processing of streams containing the
same item multiple times.

Fixes hazelcast#23245

Co-authored-by: Viliam Durina <viliam@hazelcast.com>
(cherry picked from commit 2bec372)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Module: Jet Issues/PRs for Jet Source: Community PR or issue was opened by a community user to-jira Type: Defect
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants