Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix broken watermark counters in unordered mapUsingServiceAsync [HZ-1928] #23271

Conversation

k-jamroz
Copy link
Collaborator

@k-jamroz k-jamroz commented Dec 28, 2022

mapUsingServiceAsync in unordered mode used wrong condition to check if the processing is complete and wrong number of currently processed items to propagate watermarks. This could lead to lost items when the pipeline was completed and to crashes during processing of streams containing the same item multiple times.

Fixes #23245

Checklist:

  • Labels (Team:, Type:, Source:, Module:) and Milestone set
  • Label Add to Release Notes or Not Release Notes content set
  • Request reviewers if possible
  • Send backports/forwardports if fix needs to be applied to past/future releases

@k-jamroz k-jamroz added Type: Defect Source: Internal PR or issue was opened by an employee Team: SQL Add to Release Notes Module: Jet Issues/PRs for Jet labels Dec 28, 2022
@k-jamroz k-jamroz added this to the 5.3.0 milestone Dec 28, 2022
@k-jamroz k-jamroz marked this pull request as draft December 28, 2022 13:58
@k-jamroz k-jamroz force-pushed the mapUsingServiceAsync-broken-watermarks-and-counters branch from 7b1146a to cad71e7 Compare December 28, 2022 14:30
// but this is not required.
|| (!ordered && actual.equals(asList("a-1", "a-1", "a-1", wm(10))))
// after snapshot restore watermark may be duplicated
|| (!ordered && actual.equals(asList("a-1", "a-1", wm(10), "a-1", wm(10))))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viliam-durina I think that watermark duplication is allowed. Is that true?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not allowed, they must be strictly monotonic.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Watermark order with regards to items is preserved.
After a lot of debugging I confirmed that WM duplication is partially caused by the fact that we do not store lastEmittedWms in the snapshot. So if at the time of getting snapshot we have:

  1. WM that was already emitted and no new WM was received
  2. inflight item

then, after restore we get:

  1. that old, already emitted, WM in lastReceivedWms
  2. inflight item that assumes that it was received before any snapshot (Long.MIN_VALUE) even though to be strict it should be related to the WM (lastReceivedWms field is updated after restored items are sent for processing)
  3. when inflight item completes, it triggers emission of WM from lastReceivedWms (check in watermarkCount.isEmpty() && lastReceivedWms[i] > lastEmittedWms[i]) which seems to make sense at least for other cases

I think this actually may be by design that given WM can be repeated after snapshot because, as the comment says:

we restart at the oldest WM any instance was at the time of snapshot

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also the duplication does not occur during single execution, duplicated WM is emitted after restore from snapshot. But because of "we restart at the oldest WM any instance was at the time of snapshot" it probably can go backwards.

Still, emitting WM after unrelated item seems at least counter-intuitive.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I debugged it too and yes, the observed output is legal. The duplicate WMs aren't emitted from a single instance of a processor (that wouldn't be legal), but from the processor after restore.

We save a simplified state to the snapshot. Before saving, we know exactly which item was received before which WM. But we can't save it exactly like this, because after restore, the WM can go back in at-least-once mode, and items can be re-partitioned, and we can't re-partition the watermarkCounts. I think we don't even have to save lastReceivedWm at all, we can just rely on the new WMs received after restore, but it's not too important to change, I can be wrong here.

for events received before it were already sent.

Snapshot contains in-flight elements at the time of taking the snapshot.
They are replayed when state is restored from the snapshot, so we get only at-least-once guarantee.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The guarantee is still exactly-once, but under the hood the processing is retried at least once, but that doesn't break the guarantee.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated comment

@sonarcloud
Copy link

sonarcloud bot commented Jan 2, 2023

Kudos, SonarCloud Quality Gate passed!    Quality Gate passed

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 0 Code Smells

0.0% 0.0% Coverage
0.0% 0.0% Duplication

@k-jamroz k-jamroz enabled auto-merge (squash) January 12, 2023 17:07
@k-jamroz k-jamroz merged commit 2bec372 into hazelcast:master Jan 12, 2023
k-jamroz added a commit that referenced this pull request Jan 12, 2023
] [5.2.z] (#23272)

mapUsingServiceAsync in unordered mode used wrong condition to check if
the processing is complete and wrong number of currently processed items
to propagate watermarks. This could lead to lost items when the pipeline
was completed and to crashes during processing of streams containing the
same item multiple times.

Fixes #23245
Backport of: #23271

Co-authored-by: Viliam Durina <viliam@hazelcast.com>
@hazelcast hazelcast deleted a comment from k-jamroz Jan 14, 2023
@hazelcast hazelcast deleted a comment from hz-devops-test Jan 14, 2023
@hazelcast hazelcast deleted a comment from k-jamroz Jan 14, 2023
viliam-durina pushed a commit to viliam-durina/hazelcast that referenced this pull request Jan 14, 2023
…928] (hazelcast#23271)

mapUsingServiceAsync in unordered mode used wrong condition to check if
the processing is complete and wrong number of currently processed items
to propagate watermarks. This could lead to lost items when the pipeline
was completed and to crashes during processing of streams containing the
same item multiple times.

Fixes hazelcast#23245

Co-authored-by: Viliam Durina <viliam@hazelcast.com>
(cherry picked from commit 2bec372)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Add to Release Notes Module: Jet Issues/PRs for Jet Source: Internal PR or issue was opened by an employee Team: SQL Type: Defect
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Unordered mapUsingServiceAsync loses items when there are no watermarks [HZ-1928]
4 participants