Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove max_number_of_messages cap in Filebeat s3 input #32278

Closed
kaiyan-sheng opened this issue Jul 8, 2022 · 7 comments
Closed

Remove max_number_of_messages cap in Filebeat s3 input #32278

kaiyan-sheng opened this issue Jul 8, 2022 · 7 comments
Labels
enhancement Team:Cloud-Monitoring Label for the Cloud Monitoring team

Comments

@kaiyan-sheng
Copy link
Contributor

In Filebeat aws-s3 input, when SQS notification is set up along with S3 bucket, the max_number_of_messages configuration parameter can be used to control the maximum number of SQS messages in flight at any time. Right now max_number_of_messages has a default value of 5 and the maximum accepted value is 10. The maximum value of 10 comes from the limitation of the AWS ReceiveMessage API call. ReceiveMessage API call retrieves one or more messages, up to 10 from the specified queue.

With this limitation, the only way to scale s3 input is to have multiple Filebeat running in parallel pointing to the same SQS queue. @aspacca and I talked about this, it would be good to remove the limitation (10) for the max_number_of_messages parameter and that way we can specify a larger value of max_number_of_messages, start max_number_of_messages / 10 go routines for the poller to scale.

@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Jul 8, 2022
@kaiyan-sheng kaiyan-sheng added enhancement Team:Cloud-Monitoring Label for the Cloud Monitoring team labels Jul 8, 2022
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Jul 8, 2022
@nimarezainia
Copy link
Contributor

there's also a PR somewhat relevant to this: #31614

@andrewkroh
Copy link
Member

andrewkroh commented Jul 12, 2022

As I mentioned in #31614 (comment), I don't believe our input is limited to max_number_of_messages of 10. It will request at most 10 from the API, but you can configure the max_number_of_messages > 10 and it will continue to make SQS ReceiveMessage requests until that number is reached. Since 7.15 our documentation for max_number_of_messages has not listed an upper limit because there should not be one in the code.

Have you encountered some kind of error while using max_number_of_messages > 10?

You can confirm the number of inflight SQS messages by looking at the sqs_messages_inflight_gauge metric (see https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-aws-s3.html#_metrics).

@nimarezainia
Copy link
Contributor

@andrewkroh the team is trying to address the slower than expected ingestion of SQS logs as reported here: https://github.com/elastic/sdh-beats/issues/2238

Some notable observations:

  • agent seems to be throttled somehow with under utilized CPU
  • SQS queue has built up
  • Adding an additional integration seemed to have increased throughput.

@aspacca
Copy link
Contributor

aspacca commented Jul 13, 2022

I misinterpreted the cap (I thought that steps from 2. to 4. below were capped at 10 goroutines in total)

this is the behaviour I've found in the code:

  • first iteration of the cycle
  1. acquire 100 workers - blocking
  2. fetch 10 messagesn (min between workers at 1. and 10) - blocking
  3. release 100 - 10 workers - blocking
  4. process 10 messages - threads
  5. release a worker a time for every threads - threads
  • second iteration:
  1. acquire 100 workers (only between 90 and 100 available according to what's the status of the previous iteration threads at 5.) - blocking
  2. fetch from 10 (min between workers at 1. and 10) - blocking
  3. release 10 workers - blocking
  4. process 10 messages - threads
  5. release a worker a time for every threads - threads
  • third iteration:
  1. acquire 100 workers (only between 80 and 100 available according to what's the status of the previous iteration threads at 5.) - blocking
  2. fetch 10 messages (min between workers at 1. and 10) - blocking
  3. release 10 workers - blocking
  4. process 10 messages - threads
  5. release a worker a time for every threads - threads

etc etc

we should reach up to the 10th iteration where, in case all of the threads at 5. didn't finish yet but for a few, steps from 2 to 4 will become from 0 to 10 instead of 10

so yes, @andrewkroh , max_number_of_messages > 0 will produce having at max max_number_of_messages concurrent goroutines processing the polled sqs messages over multiple iterations

it's interesting anyway that according to @nimarezainia having multiple integrations increased the throughput
@nimarezainia was it made a test with a single integration with max_number_of_messages = X where X was the sum of max_number_of_messages = N? (as N the value set in the 3 different integrations?)

if not, it should produce similar increased throughput. if it doesn't I suspect the blocking nature of sqs.ReceiveMessage it's affecting throughput because of network I/O or similar

there still space for optimisation maybe (that was my initial thought): having at max max_number_of_messages concurrent goroutines since the first iteration, instead of waiting for them to be added up over iterations
if I got it right, if every goroutine is fast enough to end just after receiving the new messages we will have effectively only 10 concurrent goroutines most of the time

this could also be the case for the different throughput in the two secenarios

what do you think @andrewkroh ?

@aspacca
Copy link
Contributor

aspacca commented Jul 13, 2022

to summarise:
it seems to me there are two different throughputs to consider:

  1. the one for receive the messages
  2. the one for processing the received messages

both are made in batch

  1. is always capped at 10 for every single batch
  2. the same as consequence of capped batch at 1.

max_number_of_messages acts as total throttling limit over 2. across different batches (since 2. is not blocking)

every batch is blocking at 1., this means that on a performant enough compute environment the effective throughput for every batch at 2. will be so limited to 10

this seems to be validated by the increased throughput with the 3x integrations configuration @nimarezainia reported

beware anyway that such configuration produced instability in the agent that eventually died

@andrewkroh
Copy link
Member

andrewkroh commented Jul 13, 2022

I suspect the difference between using two input instances vs one is that each instance creates its own beat.Client (source ref). There is a lock that is held during the Publish call of the client.

This code could be changed to create one beat.Client per worker to remove any contention for the publish lock. This contention would be most noticeable when there are some local Beats processors involved because the lock is held while each event is processed.

@andrewkroh
Copy link
Member

andrewkroh commented Nov 21, 2022

Closing because max_number_of_messages is working correctly. And the issue that I suspected in the previous comment has proven true and is fixed in #33658 for SQS mode.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Team:Cloud-Monitoring Label for the Cloud Monitoring team
Projects
None yet
Development

No branches or pull requests

4 participants