-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Throttle Kafka consumption to the file writing speed #686
Throttle Kafka consumption to the file writing speed #686
Conversation
We need to adapt the consumption rate to the file writing rate to avoid out-of-memory issues due to WriteQueue buffering.
Code Coverage |
Code Coverage |
Code Coverage |
Code Coverage |
Code Coverage |
Code Coverage |
Code Coverage |
Code Coverage |
1 similar comment
Code Coverage |
Code Coverage |
Code Coverage |
We need to adapt the consumption rate to the file writing rate to avoid out-of-memory issues due to WriteQueue buffering.
Code Coverage |
Code Coverage |
Code Coverage |
/// Consumers are stopped when the write queue is larger than | ||
/// StreamerOptions.MaxQueuedWrites. This variable defines the ratio below | ||
/// which the consumers will be resumed. | ||
float const QueuedWritesResumeThreshold{0.8F}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
I think the names changes and names for new items are a big improvement 👍
Code Coverage |
Code Coverage |
1 similar comment
Code Coverage |
Code Coverage |
StreamersPaused atomic bool was set externally when throttling consumers, instead of within the pauseStreamers() method. It seems reasonable to ensure the mutex is set by making the setter part of the method.
…cope StreamersPaused atomic bool was set externally when throttling consumers, instead of within the pauseStreamers() method. It seems reasonable to ensure the mutex is set by making the setter part of the method.
Code Coverage |
1 similar comment
Code Coverage |
Issue
ECDC-3253: We currently read from Kakfa at max speed and buffer into memory, which can cause out-of-memory issues if Kafka is faster than the GPFS filesystem.
Description of work
We adapt the consumption rate to the file writing rate:
--max-queued-writes
Topic
andPartition
classes have new methods to supportpause
andresume
of the consumers.Partition
level.Nominate for Group Code Review
Reminder
Changes should be documented in
changes.md