Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce contention in DocumentsWriterPerThreadPool. #12199

Merged
merged 9 commits into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lucene/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ Optimizations

* GITHUB#12179: Better PostingsEnum reuse in MultiTermQueryConstantScoreBlendedWrapper. (Greg Miller)

* GITHUB#12198: Reduced contention when indexing with many threads. (Adrien Grand)
* GITHUB#12198, GITHUB#12199: Reduced contention when indexing with many threads. (Adrien Grand)

Bug Fixes
---------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;

/**
* Concurrent version of {@link ApproximatePriorityQueue}, which trades a bit more of ordering for
* better concurrency by maintaining 8 sub {@link ApproximatePriorityQueue}s that are locked
* independently.
*/
final class ConcurrentApproximatePriorityQueue<T> {

/** Keeping 8 queues should already help a lot compared to a single one. */
static final int CONCURRENCY = 8;

private static final int MASK = 0x07;

final Lock[] locks;
final ApproximatePriorityQueue<T>[] queues;

ConcurrentApproximatePriorityQueue() {
locks = new Lock[CONCURRENCY];
@SuppressWarnings({"rawtypes", "unchecked"})
ApproximatePriorityQueue<T>[] queues = new ApproximatePriorityQueue[CONCURRENCY];
this.queues = queues;
for (int i = 0; i < CONCURRENCY; ++i) {
locks[i] = new ReentrantLock();
queues[i] = new ApproximatePriorityQueue<>();
}
}

void add(T entry, long weight) {
// Seed the order in which to look at entries based on the current thread. This helps distribute
// entries across queues and gives a bit of thread affinity between entries and threads, which
// can't hurt.
final int threadHash = Thread.currentThread().hashCode();
for (int i = 0; i < CONCURRENCY; ++i) {
final int index = (threadHash + i) & MASK;
final Lock lock = locks[index];
final ApproximatePriorityQueue<T> queue = queues[index];
if (lock.tryLock()) {
try {
queue.add(entry, weight);
return;
} finally {
lock.unlock();
}
}
}
final int index = threadHash & MASK;
final Lock lock = locks[index];
final ApproximatePriorityQueue<T> queue = queues[index];
lock.lock();
try {
queue.add(entry, weight);
} finally {
lock.unlock();
}
}

T poll(Predicate<T> predicate) {
final int threadHash = Thread.currentThread().hashCode();
for (int i = 0; i < CONCURRENCY; ++i) {
final int index = (threadHash + i) & MASK;
final Lock lock = locks[index];
final ApproximatePriorityQueue<T> queue = queues[index];
if (lock.tryLock()) {
try {
T entry = queue.poll(predicate);
if (entry != null) {
return entry;
}
} finally {
lock.unlock();
}
}
}
for (int i = 0; i < CONCURRENCY; ++i) {
final int index = (threadHash + i) & MASK;
final Lock lock = locks[index];
final ApproximatePriorityQueue<T> queue = queues[index];
lock.lock();
try {
T entry = queue.poll(predicate);
if (entry != null) {
return entry;
}
} finally {
lock.unlock();
}
}
return null;
}

// Only used for assertions
boolean contains(Object o) {
boolean assertionsAreEnabled = false;
assert assertionsAreEnabled = true;
if (assertionsAreEnabled == false) {
throw new AssertionError("contains should only be used for assertions");
}

for (int i = 0; i < CONCURRENCY; ++i) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick can you add a check that assertions are enabled?

final Lock lock = locks[i];
final ApproximatePriorityQueue<T> queue = queues[i];
lock.lock();
try {
if (queue.contains(o)) {
return true;
}
} finally {
lock.unlock();
}
}
return false;
}

boolean remove(Object o) {
for (int i = 0; i < CONCURRENCY; ++i) {
final Lock lock = locks[i];
final ApproximatePriorityQueue<T> queue = queues[i];
lock.lock();
try {
if (queue.remove(o)) {
return true;
}
} finally {
lock.unlock();
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerT

private final Set<DocumentsWriterPerThread> dwpts =
Collections.newSetFromMap(new IdentityHashMap<>());
private final ApproximatePriorityQueue<DocumentsWriterPerThread> freeList =
new ApproximatePriorityQueue<>();
private final ConcurrentApproximatePriorityQueue<DocumentsWriterPerThread> freeList =
new ConcurrentApproximatePriorityQueue<>();
private final Supplier<DocumentsWriterPerThread> dwptFactory;
private int takenWriterPermits = 0;
private boolean closed;
private volatile boolean closed;

DocumentsWriterPerThreadPool(Supplier<DocumentsWriterPerThread> dwptFactory) {
this.dwptFactory = dwptFactory;
Expand Down Expand Up @@ -113,15 +113,16 @@ private synchronized DocumentsWriterPerThread newWriter() {
* operation (add/updateDocument).
*/
DocumentsWriterPerThread getAndLock() {
synchronized (this) {
ensureOpen();
DocumentsWriterPerThread dwpt = freeList.poll(DocumentsWriterPerThread::tryLock);
if (dwpt == null) {
dwpt = newWriter();
}
// DWPT is already locked before return by this method:
ensureOpen();
DocumentsWriterPerThread dwpt = freeList.poll(DocumentsWriterPerThread::tryLock);
if (dwpt != null) {
return dwpt;
}
// newWriter() adds the DWPT to the `dwpts` set as a side-effect. However it is not added to
// `freeList` at this point, it will be added later on once DocumentsWriter has indexed a
// document into this DWPT and then gives it back to the pool by calling
// #marksAsFreeAndUnlock.
return newWriter();
}

private void ensureOpen() {
Expand All @@ -130,13 +131,15 @@ private void ensureOpen() {
}
}

private synchronized boolean contains(DocumentsWriterPerThread state) {
return dwpts.contains(state);
}

void marksAsFreeAndUnlock(DocumentsWriterPerThread state) {
final long ramBytesUsed = state.ramBytesUsed();
synchronized (this) {
assert dwpts.contains(state)
: "we tried to add a DWPT back to the pool but the pool doesn't know aobut this DWPT";
freeList.add(state, ramBytesUsed);
}
assert contains(state)
: "we tried to add a DWPT back to the pool but the pool doesn't know about this DWPT";
freeList.add(state, ramBytesUsed);
state.unlock();
}

Expand Down Expand Up @@ -175,6 +178,9 @@ List<DocumentsWriterPerThread> filterAndLock(Predicate<DocumentsWriterPerThread>
* @return <code>true</code> iff the given DWPT has been removed. Otherwise <code>false</code>
*/
synchronized boolean checkout(DocumentsWriterPerThread perThread) {
// The DWPT must be held by the current thread. This guarantees that concurrent calls to
// #getAndLock cannot pull this DWPT out of the pool since #getAndLock does a DWPT#tryLock to
// check if the DWPT is available.
assert perThread.isHeldByCurrentThread();
if (dwpts.remove(perThread)) {
freeList.remove(perThread);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;

import java.util.concurrent.CountDownLatch;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.util.ThreadInterruptedException;

public class TestConcurrentApproximatePriorityQueue extends LuceneTestCase {

public void testPollFromSameThread() {
ConcurrentApproximatePriorityQueue<Integer> pq = new ConcurrentApproximatePriorityQueue<>();
pq.add(3, 3);
pq.add(10, 10);
pq.add(7, 7);
assertEquals(Integer.valueOf(10), pq.poll(x -> true));
assertEquals(Integer.valueOf(7), pq.poll(x -> true));
assertEquals(Integer.valueOf(3), pq.poll(x -> true));
assertNull(pq.poll(x -> true));
}

public void testPollFromDifferentThread() throws Exception {
ConcurrentApproximatePriorityQueue<Integer> pq = new ConcurrentApproximatePriorityQueue<>();
pq.add(3, 3);
pq.add(10, 10);
pq.add(7, 7);
Thread t =
new Thread() {
@Override
public void run() {
assertEquals(Integer.valueOf(10), pq.poll(x -> true));
assertEquals(Integer.valueOf(7), pq.poll(x -> true));
assertEquals(Integer.valueOf(3), pq.poll(x -> true));
assertNull(pq.poll(x -> true));
}
};
t.start();
t.join();
}

public void testCurrentLockIsBusy() throws Exception {
ConcurrentApproximatePriorityQueue<Integer> pq = new ConcurrentApproximatePriorityQueue<>();
pq.add(3, 3);
CountDownLatch takeLock = new CountDownLatch(1);
CountDownLatch releaseLock = new CountDownLatch(1);
Thread t =
new Thread() {
@Override
public void run() {
int queueIndex = -1;
for (int i = 0; i < pq.queues.length; ++i) {
if (pq.queues[i].isEmpty() == false) {
queueIndex = i;
break;
}
}
assertTrue(pq.locks[queueIndex].tryLock());
takeLock.countDown();
try {
releaseLock.await();
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
}
pq.locks[queueIndex].unlock();
}
};
t.start();
takeLock.await();
pq.add(1, 1); // The lock is taken so this needs to go to a different queue
assertEquals(Integer.valueOf(1), pq.poll(x -> true));
releaseLock.countDown();
assertEquals(Integer.valueOf(3), pq.poll(x -> true));
assertNull(pq.poll(x -> true));
}
}