Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added BatchedArrayBlockingQueue #3838

Merged
merged 7 commits into from Mar 6, 2023
Merged

Conversation

merlimat
Copy link
Contributor

@merlimat merlimat commented Mar 4, 2023

Motivation

One of the main contention points in bookies is the Journal queue. There are multiple threads posting entries to be added on the journal on the queue.

In order to reduce contention and increase overall throughput (when there are many small entries), we can adopt a strategy of "bulk" inserting to the queue.

BatchedArrayBlockingQueue is a fixed size blocking queue, very similar to Java ArrayBlockingQueue with 2 additional methods:

  1. putAll(T[] array, int offset, int len) --> Add all the items from the array in one shot, block if full
  2. takeAll(T[] dest) --> Take all the available items in the queue that fit into the destination array, block if empty

We use arrays here instead of List/Collection because it enables to do System.arrayCopy() which is way faster than a for loop and assignments.

This PR only introduces the queue, subsequent work will make use of it for the journal.

Benchmarks

When inserting items in bulk, this queue is 10x to 20x faster than ArrayBlockingQueue

// 4 producing threads

Benchmark                                                      Mode  Cnt    Score    Error   Units
MpScQueueBenchmark.arrayBlockingQueue                         thrpt    5   46.844 ±  3.445  ops/us
MpScQueueBenchmark.batchAwareArrayBlockingQueueBatch          thrpt    5  846.931 ± 36.330  ops/us
MpScQueueBenchmark.batchAwareArrayBlockingQueueSingleEnqueue  thrpt    5   51.522 ±  4.505  ops/us

// 8 producing threads

Benchmark                                                      Mode  Cnt    Score     Error   Units
MpScQueueBenchmark.arrayBlockingQueue                         thrpt    3   43.090 ±   7.548  ops/us
MpScQueueBenchmark.batchAwareArrayBlockingQueueBatch          thrpt    3  825.470 ± 293.480  ops/us
MpScQueueBenchmark.batchAwareArrayBlockingQueueSingleEnqueue  thrpt    3   47.250 ±  16.575  ops/us


// 16 producing threads

Benchmark                                                      Mode  Cnt    Score     Error   Units
MpScQueueBenchmark.arrayBlockingQueue                         thrpt    3   44.185 ±  12.999  ops/us
MpScQueueBenchmark.batchAwareArrayBlockingQueueBatch          thrpt    3  655.073 ± 565.844  ops/us
MpScQueueBenchmark.batchAwareArrayBlockingQueueSingleEnqueue  thrpt    3   47.575 ±  37.798  ops/us

@merlimat merlimat added this to the 4.16.0 milestone Mar 4, 2023
@merlimat merlimat self-assigned this Mar 4, 2023
@codecov-commenter
Copy link

codecov-commenter commented Mar 4, 2023

Codecov Report

Merging #3838 (85aacda) into master (b4112df) will increase coverage by 0.10%.
The diff coverage is n/a.

@@             Coverage Diff              @@
##             master    #3838      +/-   ##
============================================
+ Coverage     68.21%   68.32%   +0.10%     
+ Complexity     6761     6757       -4     
============================================
  Files           473      473              
  Lines         40950    40950              
  Branches       5240     5240              
============================================
+ Hits          27935    27980      +45     
+ Misses        10762    10715      -47     
- Partials       2253     2255       +2     
Flag Coverage Δ
bookie 39.84% <ø> (+0.01%) ⬆️
client 44.21% <ø> (+0.01%) ⬆️
remaining 29.47% <ø> (-0.05%) ⬇️
replication 41.38% <ø> (+0.08%) ⬆️
tls 20.93% <ø> (-0.03%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...apache/bookkeeper/bookie/LedgerDescriptorImpl.java 68.42% <0.00%> (-3.51%) ⬇️
...g/apache/bookkeeper/meta/CleanupLedgerManager.java 71.00% <0.00%> (-2.00%) ⬇️
...he/bookkeeper/bookie/InterleavedLedgerStorage.java 77.44% <0.00%> (-1.88%) ⬇️
...keeper/bookie/storage/ldb/TransientLedgerInfo.java 85.24% <0.00%> (-1.64%) ⬇️
...ava/org/apache/bookkeeper/client/PendingAddOp.java 87.62% <0.00%> (-1.00%) ⬇️
...ache/bookkeeper/bookie/storage/ldb/WriteCache.java 89.51% <0.00%> (-0.81%) ⬇️
...rg/apache/bookkeeper/bookie/IndexInMemPageMgr.java 86.48% <0.00%> (-0.78%) ⬇️
...pache/bookkeeper/proto/PerChannelBookieClient.java 79.44% <0.00%> (-0.76%) ⬇️
...va/org/apache/bookkeeper/proto/BookieProtocol.java 84.28% <0.00%> (-0.72%) ⬇️
.../apache/bookkeeper/bookie/IndexPersistenceMgr.java 75.39% <0.00%> (-0.64%) ⬇️
... and 22 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

}

public int putAll(List<T> c) throws InterruptedException {
lock.lockInterruptibly();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check whether the items in the list is null?

@merlimat merlimat merged commit 73c5a0e into apache:master Mar 6, 2023
}

this.consumerIdx = consumerIdx;
if (size == capacity) {
Copy link
Member

@horizonzy horizonzy Mar 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the maxElements is 0, we shouldn't trigger notFull.signalAll();. Shall we check the maxElements param?

}

@Override
public int remainingCapacity() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method size() is locked, and the method remainingCapacity should also be locked.

this.producerIdx = producerIdx;

if (size == 0) {
notEmpty.signalAll();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No needn't trigger notEmpty.signalAll(); if the target collection c is empty.


// First span
int firstSpan = Math.min(toInsert, capacity - producerIdx);
System.arraycopy(a, offset, data, producerIdx, firstSpan);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may throw ArrayIndexOutOfBoundsException when the param is illegal.

        Integer[] integers = new Integer[] {1, 2, 3, 4, 5, 6, 7, 8, 9,10,11,12,13,14,15,16,17,18,19,20};
        queue.putAll(integers, 10, 11);

}

@Override
public void clear() {
Copy link
Member

@horizonzy horizonzy Mar 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the clear() may not reach the expectation, it may can't clear the queue.

        new Thread(()-> {
            Integer[] integers = new Integer[] {1, 2, 3, 4, 5, 6, 7, 8, 9,10,11,12,13,14,15,16,17,18,19,20};
            try {
                queue.putAll(integers, 0, 20);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }).start();
        
        
        queue.clear();
        //After clear, the queue is not empty.
        System.out.println(queue.size());

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants