-
Notifications
You must be signed in to change notification settings - Fork 242
Fix batch pin index calculation logic and improve logging #499
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
| startupOffsetRetryAttempts: config.GetInt(config.OrchestratorStartupAttempts), | ||
| dispatchers: make(map[fftypes.MessageType]*dispatcher), | ||
| shoulderTap: make(chan bool, 1), | ||
| newMessages: make(chan int64, readPageSize), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seemed arbitrary that we had a page size here, as we have a dedicated goroutine reading from it
| // Make sure we only rewind backwards - as we might get multiple shoulder taps | ||
| // for different message sequences during a single poll cycle. | ||
| if bm.rewindTo < 0 || rewindTo < bm.rewindTo { | ||
| previousRewind := bm.rewindTo |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just to allow logging
| return ls, err | ||
| } | ||
|
|
||
| func (ag *aggregator) extractBatchMessagePin(batch *fftypes.Batch, requiredIndex int64) (totalBatchPins int64, msg *fftypes.Message, msgBaseIndex int64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The re-write of this logic, is the main thing here.
We trigger from the pin - which references the batch and the index, but we need two other bits of information:
- Which message within the batch lines up with this index
- Noting that each message might have multiple topics
- What the first index is of this message in the batch
- This is to account for the new logic from Update aggregator batch processing to maintain in-memory pin state until OnFinalize #483 that ensures all pins associated with a message get marked
dispatched=truetogether
- This is to account for the new logic from Update aggregator batch processing to maintain in-memory pin state until OnFinalize #483 that ensures all pins associated with a message get marked
|
|
||
| // Extract the message from the batch - where the index is of a topic within a message | ||
| var msg *fftypes.Message | ||
| var i int64 = -1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case of pin.Index == 0 this logic was broken. msg and msgBaseIndex were incorrect.
Codecov Report
@@ Coverage Diff @@
## main #499 +/- ##
=========================================
Coverage 100.00% 100.00%
=========================================
Files 267 267
Lines 15257 15264 +7
=========================================
+ Hits 15257 15264 +7
Continue to review full report at Codecov.
|
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
|
This whole thing as I'm chipping away at it, feels like I'm just coming to the big re-work needed in #421 (comment) from the other end. I'm thinking I will just go full steam for that and deal with the consequences. |
|
Closing this out - in favor of the full rework. |
Next step on #493