-
Notifications
You must be signed in to change notification settings - Fork 242
Re-work batch logic for simplicity, efficiency, and restart recovery #501
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
|
Explanation seems sound. I'll wait for more of the code to take shape before reviewing though. |
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Codecov Report
@@ Coverage Diff @@
## main #501 +/- ##
=========================================
Coverage 100.00% 100.00%
=========================================
Files 275 275
Lines 15694 15731 +37
=========================================
+ Hits 15694 15731 +37
Continue to review full report at Codecov.
|
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
pkg/fftypes/transaction.go
Outdated
|
|
||
| var ( | ||
| // TransactionTypeNone indicates no transaction should be used for this message/batch | ||
| // TransactionTypeNone deprecreated - replaced by TransactionTypeUnpinned |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: deprecated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking forward to the day we dep-recreate something
pkg/fftypes/transaction.go
Outdated
| // TransactionTypeNone indicates no transaction should be used for this message/batch | ||
| // TransactionTypeNone deprecreated - replaced by TransactionTypeUnpinned | ||
| TransactionTypeNone TransactionType = ffEnum("txtype", "none") | ||
| // TransactionTypeUnpinned indicates no transaction should be used for this message/batch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the wording should indicate that the message will be sent without pinning any evidence to the blockchain.
There is a FireFly transaction (although there's no blockchain transaction) - so want to avoid confusion there.
23e7fc8 to
dfdb167
Compare
… with increasing sequence Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
internal/batch/batch_manager.go
Outdated
| dispatcher.mux.Lock() | ||
| key := fmt.Sprintf("%s:%s:%s[group=%v]", namespace, identity.Author, identity.Key, group) | ||
| processor, ok := dispatcher.processors[key] | ||
| name := fmt.Sprintf("%s|%s|%v", namespace, identity.Author, group) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small thing, but maybe there should be a getProcessorKey next to getDispatcherKey.
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
|
|
||
| func (bm *batchManager) WaitStop() { | ||
| <-bm.sequencerClosed | ||
| func (bm *batchManager) reapQuiescing() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Award for the best function name here 🏅
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
internal/batch/batch_manager.go
Outdated
| // Do not block sending to the shoulderTap - as it can only contain one | ||
| func (bm *batchManager) newMessageNotification(seq int64) { | ||
| log.L(bm.ctx).Debugf("Notification of message %d", seq) | ||
| if (seq - 1) < bm.readOffset { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might bear a comment here on why there are - 1 calculations. Alternately, should we just add 1 to readOffset everywhere (initialize it to 0, make the database query GtOrEq, etc)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just add 1 to
readOffseteverywhere
Worried this is a big risky change. Going for the comment
internal/batch/batch_manager.go
Outdated
| } | ||
|
|
||
| func (bm *batchManager) waitForShoulderTapOrPollTimeout() { | ||
| func (bm *batchManager) waitForShoulderTapOrPollTimeout() (done bool) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a spelling thing - but "shoulder tap" has been replaced by "new message", right?
internal/batch/batch_processor.go
Outdated
| newQueue := make([]*batchWork, 0, len(bp.assemblyQueue)+1) | ||
| added := false | ||
| skip := false | ||
| // Check it's not in the recently flushed lish |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lish
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lush 👍 🏴
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
| added = true | ||
| } | ||
| newQueue = append(newQueue, work) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It also feels like much of this loop could be replaced by a single append of the new work and then a call to sort.Slice(). Would still need a check for duplicates ahead of that though, so unsure of the ultimate performance impact. Just a thought.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I think this ends up as more code in the end of the day. We have to have all the branches that we have today, but also create a custom function do pass into the sort.Slice() to do the comparison.
As for whether it's more performant in practice to do a single traversal and allocate memory for a new pointer array each time, vs only re-allocate the array when we add and do a sort-in-place, I'm not sure.
My gut was re-allocating the array each time and doing a single pass optimizes for the case where we're adding work - which is the most common case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I think with the other optimizations added to abort early in the case of a duplicate, this now reads pretty clean and doesn't need any further tweaking.
internal/batch/batch_processor.go
Outdated
| newQueue = append(newQueue, newWork) | ||
| added = true | ||
| } | ||
| if added { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added will always be true here unless skip was set. Again just feels like a lot of the logic should be skipped early if skip gets flipped to true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's not actually true, if (as is the most common case) we're adding to the end of the list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will do a simplification experiment, to see if I can make the add logic based on if the item is > than the current one... which I think would always add
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, in the common case added = true will be hit up above on line 180, so it will be true by this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... that went badly - just leaving the skip part
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok - early return was the solution 👍 (overflow and full can be assumed to be false if we didn't add anything)
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
…imming the oldest Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
| s.msg.Header.TxType = fftypes.TransactionTypeUnpinned | ||
| default: | ||
| // the only other valid option is "batch_pin" | ||
| s.msg.Header.TxType = fftypes.TransactionTypeBatchPin |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we yell at them for setting an invalid txtype? Or just quietly set to the default like this?
Also might consider adding similar pre-validation to broadcast/message.go - can't remember if/when that case actually fails due to an invalid txtype.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going to go for the default approach, but do the same for Broadcast
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
internal/batch/batch_processor.go
Outdated
| newFlushedSequences := make([]int64, newFlushedSeqLen) | ||
| for i := 0; i < len(flushAssembly); i++ { | ||
| // Add in reverse order - so we can trim the end of it off later (in the next block) and keep the newest | ||
| newFlushedSequences[len(flushAssembly)-i-1] = flushAssembly[i].msg.Sequence |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So... we are adding to the front of a list, in reverse order, and then ultimately trimming the end of the list.
It feels like we could add to the end of the list in order, and then trim the front, and it might be more understandable. However, I do think the current logic works, so it's not mandatory to re-spin this again if we want to defer for now.
awrichar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Marking approved with one more optional thought for a possible simplification.
Thanks for this; feels like a huge step forward for the message batching.
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
| newLength = maxFlushedSeqLen | ||
| } | ||
| dropLength := combinedLen - newLength | ||
| retainLength := len(bp.flushedSequences) - dropLength |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, personally I feel like the extra vars and comments have made this MUCH easier to follow.
This leads on from the investigation in #421 and then #499
At the moment this is how the batching logic works:
OLD
This design evolved from a similar design in the previous generation of Asset Trail technology.
Given this most recent issue, and some inefficiency and crash recovery challenges that exist in the design after it's port over to FireFly and evolution, I'm planning an updated design.
NEW
This PR updates it to simplify it (even beyond the previous proposal in #421):
sentstate for messages that go into a batchNote this means if things are appearing at once across a batch assembly window of 0.5s, it's very likely all messages will be sent in DB sequence order (even though the DB doesn't guarantee that's the order they'll become visible in).