-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor internal buffer chain in the memory queue #37795
Conversation
This pull request does not have a backport label.
To fixup this pull request, you need to add the backport labels for the needed
|
❕ Build Aborted
Expand to view the summary
Build stats
Test stats 🧪
Test errorsExpand to view the tests failures> Show only the first 10 test failures
|
❕ Build Aborted
Expand to view the summary
Build stats
Test stats 🧪
Test errorsExpand to view the tests failures> Show only the first 10 test failures
|
This pull request does not have a backport label.
To fixup this pull request, you need to add the backport labels for the needed
|
Pinging @elastic/elastic-agent (Team:Elastic-Agent) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great PR @faec!
I confess it was a bit hard to know where to start reviewing it because I'm not familiar with the queue. Looking at runloop.go
first helped me.
One thing I'm missing is some test to ensure the new behaviour, you mention it in the proposed commit message:
Simplify the behavior of queue.mem.flush.min_events so it can't starve output workers when enough data is ready (previously it interacted in complicated ways with bulk_max_size, now it serves as a simple maximum on allowable event batch size).
But I didn't see a test ensure it actually works, ideally a test that would fail with the old behaviour and pass with the new one.
It's hard to test the worst behavior without a live input like s3 that will provide the pushback that caused the issue. The performance is expected to be quite similar in most cases, and without active pushback from an input you will mostly just see increased latency, as the input will eventually reach equilibrium and keep up with ingestion. But here's a contrived way to test it:
In base 8.16, this should stall for 5 minutes (because it is waiting to accumulate a buffer of 1600 events) and then ingest everything at once. With this PR applied, it should instead ingest almost everything immediately and only delay for the last <100 events if the remainder isn't exact. |
The documentation for the queue at https://github.com/elastic/beats/blob/main/libbeat/docs/queueconfig.asciidoc looks like it needs to be updated. |
We also need a changelog entry describing this in a way a user would understand. |
What are the advantages of the implementation in this PR vs just deprecating the user facing min_events setting (making it no longer user configurable) and just having the internal value mirror whatever value is used for max bulk size? |
Mainly backwards compatibility: some users have used |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of questions, but I don't think they are blockers for the PR.
|
||
chanSize := AdjustInputQueueSize(inputQueueSize, sz) | ||
// Start the queue workers | ||
b.wg.Add(2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this would ever happen, but since we added 2 here, is it possible for the run
methods to block on each other in any way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, their only communication is from runLoop to ackLoop via consumedChan, and both sender and receiver are in select statements that include the queue's global context.
// (Otherwise, it would make sense to leave FlushTimeout unchanged here.) | ||
if settings.MaxGetRequest <= 1 { | ||
settings.FlushTimeout = 0 | ||
settings.MaxGetRequest = (settings.Events + 1) / 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit. Events & MaxGetRequests are ints, so they could be negative, which could lead to some interesting results. Do we have protections to make sure these are positive values? or maybe switch to uint?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, Events has a min of 32 and MaxGetRequest has a min of 0 enforced by the config parser.
💚 Build Succeeded
History
cc @faec |
💚 Build Succeeded
History
cc @faec |
💚 Build Succeeded
History
cc @faec |
💛 Build succeeded, but was flaky
Failed CI StepsHistory
cc @faec |
💚 Build Succeeded
History
cc @faec |
💚 Build Succeeded
History
cc @faec |
Thank you @faec! It was super easy to test with those values ❤️ For reference in case someone else also wants to test filebeat.yml
filebeat.inputs:
- type: filestream
id: my-filestream-id
enabled: true
paths:
- /tmp/flog.log
queue.mem:
events: 4096 # bigger than the number of events on the file
flush:
timeout: 30s
min_events: 1600 # bigger than the number of events on the file
output.elasticsearch:
enabled: true
hosts: localhost:9200
protocol: http
username: admin
password: testing
bulk_max_size: 100
workers: 2
logging:
level: debug
to_stderr: true
selectors:
- elasticsearch
- eslegclient Building form
Once the queue flush timeout is reached, we can see the output working
Building from this PR it publishes everything right away 🚀 |
Delete the proxy queue, a prototype written to reduce memory use in the old shipper project. Recent improvements to the memory queue (#37795, #38166) added support for the same early-free mechanisms as the proxy queue, so it is now redundant. The proxy queue was never used or exposed in a public release, so there are no compatibility concerns. (This is pre-cleanup for adding early-encoding support, to avoid implementing new functionality in a queue that is no longer used.)
Delete the proxy queue, a prototype written to reduce memory use in the old shipper project. Recent improvements to the memory queue (elastic#37795, elastic#38166) added support for the same early-free mechanisms as the proxy queue, so it is now redundant. The proxy queue was never used or exposed in a public release, so there are no compatibility concerns. (This is pre-cleanup for adding early-encoding support, to avoid implementing new functionality in a queue that is no longer used.)
Proposed commit message
queue.mem.flush.min_events
so it can't starve output workers when enough data is ready (previously it interacted in complicated ways withbulk_max_size
, now it serves as a simple maximum on allowable event batch size).Checklist
I have made corresponding change to the default configuration filesCHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.How to test this PR locally
Test any ingestion while varying queue-related parameters, most importantly
queue.mem.flush.min_events
,queue.mem.flush.timeout
, andoutput.elasticsearch.bulk_max_size
Related issues
bulk_max_size
#37757