-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add document retries #99
Conversation
a083095
to
bd54c70
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've noticed that the tests have MaxRequests: 1
, which is why the tests are passing. However, in a more realistic scenario with MaxRequests > 1
, the documents stored in the local bulk_indexer
won't be flushed to Elasticsearch until the bulk indexer is cycled through the channel.
Lines 470 to 479 in c8c4e54
indexer := active | |
active = nil | |
attrs := metric.WithAttributeSet(a.config.MetricAttributes) | |
a.errgroup.Go(func() error { | |
var err error | |
took := timeFunc(func() { | |
err = a.flush(a.errgroupContext, indexer) | |
}) | |
indexer.Reset() | |
a.available <- indexer |
We should prevent that from happening. I think we may need to introduce a new channel for bulk_indexers that have items from previous 429 failures, and treat it with higher priority than a.available
. There are some edge cases we need to handle (like closing the appender), but it would be better than simply sending the bulk_indexer with cached items back to the channel like empty indexers.
Another concern is that there's currently no limit on the number of times 429s
will be retried. In theory, the entire buffer could be filled with documents that have not been indexed due to 429s
, which would cause only 1 new event to be sent in each subsequent flush. Have you given any thought on how we could configure an upper limit of consecutive retries?
appender_test.go
Outdated
}{ | ||
"nocompression": { | ||
cfg: docappender.Config{ | ||
MaxRequests: 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests work because MaxRequests
is set to 1; however, if they're set to 2 or 3, the documents stored for subsequent retries won't be flushed in the next bulk request. Instead, the other 1 or 2 bulk indexers need to be flushed first, and only after that happens will the buffered "failed" docs be sent over to Elasticsearch.
To be honest, I think this is fine. We already changed quite a lot of things because of concerns around complexity. If ES is returning 429s there are a lot of events buffered/coming in. I don't think retried events are gonna stay "idle" for a long time.
Thanks for this! This should be fixed 👍 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be honest, I think this is fine. We already changed quite a lot of things because of concerns around complexity. If ES is returning 429s there are a lot of events buffered/coming in. I don't think retried events are gonna stay "idle" for a long time.
I don't think it's acceptable to do all of this retry work and then return a potentially full bulkIndexer
back to the available
channel, where it may or may not be flushed in the future.
While it may be the case that 429s tend to happen on higher throughputs and the bulkIndexer
may not remain in that channel for longer, it won't be flushed on appender.Close()
, leading to those buffered events to be completely lost.
Could you please add a test that ensures that previously buffered 429s are flushed once more on indexer.Close()
?
In the case where 429s are a considerable % of the total documents in a bulk request, say 30-40%, we'd leave a bunch of half-full (or almost entirely full if 429 % is even higher) bulkIndexers
in that available
channel.
I think we should get rid of the "active" channel entirely and let the runtime handle the channels but that should go in a separate PR.
Could you elaborate on this? I'm not following your reasoning and how the runtime would be of any help to us in this case.
@@ -55,6 +55,9 @@ type Config struct { | |||
// If MaxRequests is less than or equal to zero, the default of 10 will be used. | |||
MaxRequests int | |||
|
|||
// MaxDocumentRetries holds the maximum number of document retries | |||
MaxDocumentRetries int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this default to 3?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I'd prefer to keep it a 0 and have this behaviour opt-in for users of this library
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can revisit this in the future
Co-authored-by: Marc Lopez Rubio <marc5.12@outlook.com>
The idea is that they will always be flushed in the future. Either the next time data comes in or after the flush intervel (currently not working because of the active channel approach we are using).
Mmh, this is a bug. Sorry about that! It should be fixed now!
I don't see this as an issue. This would decrease the throughput of the bulk indexers which is fine as the whole point of the 429s is to slow down.
Ideally, we would just read from the channel and let the runtime switch between them, removing the |
The |
start/end lnidx are not really indexes so the name was misleading.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes mostly LGTM, I think we need to retry flushing even if the error group returns an error.
Could you elaborate on this? I'm not following your reasoning and how the runtime would be of any help to us in this case.
Ideally, we would just read from the channel and let the runtime switch between them, removing the
active
channel completely. The flush interval would then cause a flush request for the bulk indexers that still have events in the buffer. The issue of bulk indexers remaining in a "limbo" would not apply anymore because they would all be active.
I still don't follow what you meant.
Also, have you run some benchmarks using the tilt
environment and monitoring how the metrics look? I think it's worth doing so before merging.
if err := a.errgroup.Wait(); err != nil { | ||
return err | ||
} | ||
return a.errgroup.Wait() | ||
close(a.available) | ||
for bi := range a.available { | ||
if err := a.flush(context.Background(), bi); err != nil { | ||
return fmt.Errorf("failed to flush events on close: %w", err) | ||
} | ||
} | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Returning early will cause the non-empty bulkIndexers
to not be flushed. What do you think about the snippet below?
// Wait until all active indexers have been flushed.
err := a.errgroup.Wait()
close(a.available)
for bi := range a.available {
a.errgroup.Go(func() error {
if e := a.flush(context.Background(), bi); e != nil {
return fmt.Errorf("failed to flush events on close: %w", e)
}
return nil
})
}
return errors.Join(err, a.errgroup.Wait())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is on purpose to be consistent with the current behaviour:
Lines 482 to 492 in 9748039
var err error | |
took := timeFunc(func() { | |
err = a.flush(a.errgroupContext, indexer) | |
}) | |
indexer.Reset() | |
a.available <- indexer | |
a.addUpDownCount(1, &a.availableBulkRequests, a.metrics.availableBulkRequests) | |
a.metrics.flushDuration.Record(context.Background(), took.Seconds(), | |
attrs, | |
) | |
return err |
Even with retries disabled, if flush fails the errgroup returns an error which will not block/wait for the others to finish.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you open a follow up issue then to tackle this? With adding retry behavior we are moving more towards reliability vs. dropping events, so the ask to change the behavior for flushing non empty bulk indexers seems totally fair.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That means that we won't be retrying one last time. If you could add a comment about that in the code, that'd be great to make more obvious to readers.
I only run benchmarks using the appender benchmarks in the repo but can you clarify this so we are aligned ? Are you asking to test the performance of the retry code or the overhead on the "normal operation" (requests succeed) ? |
I do not want to speak for Marc, but IMO we should have some numbers on how this impacts overall performance. I would expect
Testing this e2e under load in a dev environment is essential before starting the promotion process. |
@simitt, that's right. @kruskall, I'll leave it up to you whether you want to test this as part of the dependency update or as part of the PR. If we go the dependency promotion path, we may approve the PR without any regression testing. I understand the retry behavior is opt-in rather than opt-out, but that's what regression testing is for. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. It'd be great to test this when either as part of the dependency update or before we merge.
if err := a.errgroup.Wait(); err != nil { | ||
return err | ||
} | ||
return a.errgroup.Wait() | ||
close(a.available) | ||
for bi := range a.available { | ||
if err := a.flush(context.Background(), bi); err != nil { | ||
return fmt.Errorf("failed to flush events on close: %w", err) | ||
} | ||
} | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That means that we won't be retrying one last time. If you could add a comment about that in the code, that'd be great to make more obvious to readers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't need to export Indexnth then it would be better not to, to keep the public API tidy. The way the document indexes are found looks a bit inefficient, but we could improve it in a follow up since it should not be on a hot path.
b.retryCounts[b.itemsAdded] = count | ||
|
||
if b.gzipw != nil { | ||
gr, err := gzip.NewReader(bytes.NewReader(b.copyBuf)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we're decompressing and scanning through the request body for every failed item. Should we be decompressing once, and scanning through incrementally? e.g. if you have failures at index 1 and 8, then first skip to the index 1, then skip from index 1 to index 8 without decompressing and reprocessing up to index 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be misunderstanding something but decompressing the whole request body is not acceptable. It was explicitly put as a goal to implement retries without decompress the request body.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't mean decompress the whole payload at once, I meant create the decompressor once and stream through the results. AFAICS there's no need to call gzip.NewReader
more than once.
Instead you could lazily create it when seeing the first 429 result, wrap it with bufio.NewScanner, and stream through the lines, skipping non-429 result lines as needed.
Blocked by #100 (to make testing easier)Add document retries logic. Followup to #91 (important to understand how the compression logic changed)### Compression off:In this case we simply iterate over the array and submit the failed events to the buffer. We can count them using newlines.### Compression on:Retrieve the offset of the failed event and decompress the "gzip container". Loop over the array and submit the failed event to buffer.Testing
This PR adds tests to cover as mush edge cases as possible: one event for each gzip container, multiple events per gzip container, all events in a gzip container. Tests also verify the retry logic twice to ensure there are no conflicts between flushes.
Update:
Changes:
the approach was simplified: iterate over the array and resubmit the failed events to the buffer. For gzip we read in batches to avoid decompress the whole request.