Skip to content

[Merged by Bors] - feat(producer): added blocking if the batch queue is full#2562

Closed
galibey wants to merge 7 commits intofluvio-community:masterfrom
galibey:feat/2512-throughput-control-condvar
Closed

[Merged by Bors] - feat(producer): added blocking if the batch queue is full#2562
galibey wants to merge 7 commits intofluvio-community:masterfrom
galibey:feat/2512-throughput-control-condvar

Conversation

@galibey
Copy link
Contributor

@galibey galibey commented Aug 16, 2022

Added the batch queue size control using Condvar.
If the configured queue size is reached, we block pushing new records until there is enough space for a new batch. During the flush, we notify all waiting threads that they can progress.

Fixed #2512

@galibey galibey requested review from morenol, sehz and tjtelan August 16, 2022 13:01
Copy link

@sehz sehz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach seems to be reasonable approach with minimum complexity.

Couple of changes I would like to see:

  1. Replace CondVar with EventListener which we use lot in the code base. This should remove need to depend on entire async-std
  2. Unit Test queue which I believe should be achievable by abstracting out Mutex<VedDeque...


pub(crate) type BatchHandler = (
Arc<BatchEvents>,
Arc<(Mutex<VecDeque<ProducerBatch>>, Condvar)>,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably should abstract out:

Arc<(Mutex<VecDeque<ProducerBatch>>, Condvar)>

as NewType so can independently unit test and easy to reason about

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new internal type created for that

Copy link
Contributor

@tjtelan tjtelan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@morenol morenol requested a review from sehz August 19, 2022 02:43
@morenol
Copy link
Collaborator

morenol commented Aug 19, 2022

Feedback addressed, I also tried hourly tests with these changes to ensure that it is working as expected

Copy link

@sehz sehz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Nice work @galibey @morenol

@sehz
Copy link

sehz commented Aug 19, 2022

bors r+

bors bot pushed a commit that referenced this pull request Aug 19, 2022
Added the batch queue size control using `Condvar`.
If the configured queue size is reached, we block pushing new records until there is enough space for a new batch. During the flush, we notify all waiting threads that they can progress. 

Fixed #2512 

Co-authored-by: Luis Moreno <morenol@users.noreply.github.com>
@sehz
Copy link

sehz commented Aug 19, 2022

hopefully this will improve hourly test stability

@bors
Copy link

bors bot commented Aug 19, 2022

Pull request successfully merged into master.

Build succeeded:

@bors bors bot changed the title feat(producer): added blocking if the batch queue is full [Merged by Bors] - feat(producer): added blocking if the batch queue is full Aug 19, 2022
@bors bors bot closed this Aug 19, 2022
@galibey galibey deleted the feat/2512-throughput-control-condvar branch October 27, 2022 11:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Throughput Control

4 participants