-
Notifications
You must be signed in to change notification settings - Fork 242
Update aggregator batch processing to maintain in-memory pin state until OnFinalize #483
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…til OnFinalize Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Codecov Report
@@ Coverage Diff @@
## main #483 +/- ##
=========================================
Coverage 100.00% 100.00%
=========================================
Files 266 267 +1
Lines 15147 15239 +92
=========================================
+ Hits 15147 15239 +92
Continue to review full report at Codecov.
|
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
…roadcast and private Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
| return err | ||
| } | ||
|
|
||
| _, err = s.updateTx(ctx, tx, query, nil /* no change events filter based update */) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this type does reliably emit ChangeEventTypeCreated. It theoretically emits ChangeEventTypeDeleted, although I don't know of anywhere we call DeletePin. It now never emits ChangeEventTypeUpdated though.
I wonder if having change events at all is useful, or if it's confusing to emit them inconsistently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is common across the codebase for these update-multiple style actions.
We don't have a need for the events where we use them currently, and it's hard in SQL without detailed DB-specific coding to find out what rows have been updated. There's also a performance overhead
| } | ||
|
|
||
| // batchState are synchronous actions to be performed while processing system messages, but which must happen after reading the whole batch | ||
| type batchState struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The description here doesn't fully cover the expanded purpose of this type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 - I've added a bunch of extra text here - might be good for you to validate my understanding @awrichar
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks very good, thanks 🤩
| val2 := validateReceivedMessages(suite.testState, suite.testState.client2, fftypes.MessageTypeBroadcast, fftypes.TransactionTypeBatchPin, 1, 0) | ||
| assert.Equal(suite.T(), data.Value, val2.Value) | ||
| for i := 0; i < totalMessages; i++ { | ||
| // Wait for all thel message-confirmed events, from both participants |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like there's a slight step backward in going from waitForMessageConfirmed to simply counting websocket events - but it can be a topic for future enhancement if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not an intentional change - I'll fix that
test/e2e/tokens_test.go
Outdated
| assert.Equal(suite.T(), fftypes.TokenTransferTypeTransfer, transfers[0].Type) | ||
| assert.Equal(suite.T(), int64(1), transfers[0].Amount.Int().Int64()) | ||
| data := GetDataForMessage(suite.T(), suite.testState.client1, suite.testState.startTime, transfers[0].MessageHash) | ||
| data := GetDataByMessageHash(suite.T(), suite.testState.client1, suite.testState.startTime, transfers[0].MessageHash) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NB: We could actually do GetDataForMessage here, as we do have the ID in transfers[0].Message. When the test was originally written, the hash was the only thing recorded, so we had to do the lookup by hash - but now looking up by ID is probably better.
awrichar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like where this has landed. The pin logic is still pretty complex, but as far as I can tell it looks right - and I think it's slightly more efficient now? Thanks for adding E2E coverage as well.
No major notes, so marking approved and we can always circle back on anything that doesn't need to be addressed immediately.
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Fix for #481
We were failing to process multiple private messages on the same topic, within a single batch of pins that are read by the aggregator. This was a side-effect of the #462 changes - because the logic that evaluates the
NextPinstate on each message no longer had up-to-date information to read from the database.The core fix implemented for that, was to move to in-memory processing for all state that can change during processing of a page of pins in the aggregator. So the first time we access the state on a context (context is a particular
topic, scoped to a group if it's a private context), we read the data we need in memory and from that point on we update it in memory until theFinalizephase is called at the end. At that point everything is flushed.Fixing this turned into quite a big change to the internals of the aggregator, and in doing so I did find two other less serious issues:
dispatched=falsewhen the message was confirmed. This means potential re-processing on rewind.The fix for ^^^ was that we always mark all pins associated with a message as dispatched. That can include pins outside of the page that was just read, so we calculate the start+end index of the pins within the batch for that message, and do an update in the DB scoped to that start+end range (and the batch ID).