-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
destination-async-framework: use the value from stats counter for global state conversion #35111
destination-async-framework: use the value from stats counter for global state conversion #35111
Conversation
…bal state conversion
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm assuming I understood the behavior correctly
stateManager.decrement(stateId, 2); | ||
List<PartialStateWithDestinationStats> stateAfterAllRecordsAreFlushed = stateManager.flushStates(); | ||
assertEquals(1, stateAfterAllRecordsAreFlushed.size()); | ||
assertEquals(6.0, stateAfterAllRecordsAreFlushed.get(0).stats().getRecordCount()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checking my understanding: without this PR, we would get 0 instead of 6 here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope, we would get 2
/publish-java-cdk
|
…bal state conversion (#35111)
…bal state conversion (airbytehq#35111)
…bal state conversion (airbytehq#35111)
…bal state conversion (airbytehq#35111)
…bal state conversion (airbytehq#35111)
…bal state conversion (#35111)
No issue for this. The GlobalAsyncStateManager assumes by default that the state message is by default PER-STREAM until an actual state message arrives. Meanwhile as records arrive, they are also being flushed and the record counter is decremented as records are flushed. Once a state message finally arrives and is identified as GLOBAL state, the previous counters need to be converted to GLOBAL type. We had a bug in conversion where we were using the decrementing counter instead of the stats counter.