Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce contention in DocumentsWriterPerThreadPool. #12199

Merged
merged 9 commits into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;

/**
* Concurrent version of {@link ApproximatePriorityQueue}, which trades a bit more of ordering for
* better concurrency by maintaining 8 sub {@link ApproximatePriorityQueue}s that are locked
* independently.
*/
final class ConcurrentApproximatePriorityQueue<T> {

/** Keeping 8 queues should already help a lot compared to a single one. */
private static final int CONCURRENCY = 8;

private static final int MASK = 0x07;

private final Lock[] locks;
private final ApproximatePriorityQueue<T>[] queues;

ConcurrentApproximatePriorityQueue() {
locks = new Lock[CONCURRENCY];
@SuppressWarnings({"rawtypes", "unchecked"})
ApproximatePriorityQueue<T>[] queues = new ApproximatePriorityQueue[CONCURRENCY];
this.queues = queues;
for (int i = 0; i < CONCURRENCY; ++i) {
locks[i] = new ReentrantLock();
queues[i] = new ApproximatePriorityQueue<>();
}
}

void add(T entry, long weight) {
// Seed the order in which to look at entries based on the current thread. This helps distribute
// entries across queues and gives a bit of thread affinity between entries and threads, which
// can't hurt.
final int threadHash = Thread.currentThread().hashCode();
for (int i = 0; i < CONCURRENCY; ++i) {
final int index = (threadHash + i) & MASK;
final Lock lock = locks[index];
final ApproximatePriorityQueue<T> queue = queues[index];
if (lock.tryLock()) {
try {
queue.add(entry, weight);
return;
} finally {
lock.unlock();
}
}
}
final int index = threadHash & MASK;
final Lock lock = locks[index];
final ApproximatePriorityQueue<T> queue = queues[index];
lock.lock();
try {
queue.add(entry, weight);
} finally {
lock.unlock();
}
}

T poll(Predicate<T> predicate) {
final int threadHash = Thread.currentThread().hashCode();
for (int i = 0; i < CONCURRENCY; ++i) {
final int index = (threadHash + i) & MASK;
final Lock lock = locks[index];
final ApproximatePriorityQueue<T> queue = queues[index];
if (lock.tryLock()) {
try {
T entry = queue.poll(predicate);
if (entry != null) {
return entry;
}
} finally {
lock.unlock();
}
}
}
for (int i = 0; i < CONCURRENCY; ++i) {
final int index = (threadHash + i) & MASK;
final Lock lock = locks[index];
final ApproximatePriorityQueue<T> queue = queues[index];
lock.lock();
try {
T entry = queue.poll(predicate);
if (entry != null) {
return entry;
}
} finally {
lock.unlock();
}
}
return null;
}

// Only used for assertions
boolean contains(Object o) {
for (int i = 0; i < CONCURRENCY; ++i) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick can you add a check that assertions are enabled?

locks[i].lock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is really a nit pick but for methods that use a lock I'd prefer to assign the lock to a local var instead of dereferencing it again in the finally block.

try {
if (queues[i].contains(o)) {
return true;
}
} finally {
locks[i].unlock();
}
}
return false;
}

boolean remove(Object o) {
for (int i = 0; i < CONCURRENCY; ++i) {
locks[i].lock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, maybe use a local var for the lock. It really looks cleaner

try {
if (queues[i].remove(o)) {
return true;
}
} finally {
locks[i].unlock();
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ final class DocumentsWriterPerThreadPool implements Iterable<DocumentsWriterPerT

private final Set<DocumentsWriterPerThread> dwpts =
Collections.newSetFromMap(new IdentityHashMap<>());
private final ApproximatePriorityQueue<DocumentsWriterPerThread> freeList =
new ApproximatePriorityQueue<>();
private final ConcurrentApproximatePriorityQueue<DocumentsWriterPerThread> freeList =
new ConcurrentApproximatePriorityQueue<>();
private final Supplier<DocumentsWriterPerThread> dwptFactory;
private int takenWriterPermits = 0;
private boolean closed;
private volatile boolean closed;

DocumentsWriterPerThreadPool(Supplier<DocumentsWriterPerThread> dwptFactory) {
this.dwptFactory = dwptFactory;
Expand Down Expand Up @@ -113,15 +113,17 @@ private synchronized DocumentsWriterPerThread newWriter() {
* operation (add/updateDocument).
*/
DocumentsWriterPerThread getAndLock() {
synchronized (this) {
ensureOpen();
DocumentsWriterPerThread dwpt = freeList.poll(DocumentsWriterPerThread::tryLock);
if (dwpt == null) {
dwpt = newWriter();
}
// DWPT is already locked before return by this method:
return dwpt;
ensureOpen();
DocumentsWriterPerThread dwpt = freeList.poll(DocumentsWriterPerThread::tryLock);
if (dwpt == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason, double locking always makes me cringe. There are lengthy discussions about this idiom all around the web (visibility of partially constructed objects).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that this is a case of double-checked locking. In general double-checked locking tries to reduce the overhead of acquiring a lock by adding a quick check before the lock. Here the logic is different, it would be legal to remove the call to poll under lock, I only added it because there is a chance that a thread had to wait on the lock, so we could check if another DWPT was added to the queue in the meantime in order to save creating a new DWPT. I don't think it's actually important, I could remove the second call to poll to make it look less like double-checked locking.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not about the poll. It's about whether dwpt = newWriter(); can be expanded and reordered by the compiler so that dwpt gets assigned a value before the constructor (or whatever initialization inside newWriter) takes place. Any thread checking dwpt == null outside synchronized could, at least in theory, see such a "partially constructed" object.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alex Shipilev had a nice writeup about it - found it here: https://shipilev.net/blog/2014/safe-public-construction/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for sharing this blog post, I remember reading it in the past, it was a good re-read. I mentioned polls because I thought that they were what made you think that this code is a case of double-checked locking, as there is a first call before the lock and another one under the lock, like the null checks with double-checked locking with singletons. I need to go on weekend, I'll try to post a convincing explanation of why this is not a case of double-checked locking and why it is safe next week. By the way, thanks for looking!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to convince me, @jpountz - I just expressed my reluctance at it because, well, it requires convincing. :) Unless there's really a huge gain, I typically just go with what I understand works (making the field volatile, for example).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still have the feeling that you are making incorrect assumptions about what this piece of code is doing (this method itself doesn't publish the object to other threads), but I took this thread as a call to keep things simple, so I removed the retry and added a few more comments about the logic of this pool.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I certainly am! But anyone looking at this code without jumping deep will have the same doubts, I think. Unless there is a convincing (performance) argument that it's worth it, I like the updated version a lot better - it's clear and simple.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks code looks much better. I was also irritated by the long chain of "tries" with and without synchronized. To me actualy code is easier to read.

// newWriter() adds the DWPT to the `dwpts` set as a side-effect. However it is not added to
// `freeList` at this point, it will be added later on once DocumentsWriter has indexed a
// document into this DWPT and then gives it back to the pool by calling
// #marksAsFreeAndUnlock.
dwpt = newWriter();
uschindler marked this conversation as resolved.
Show resolved Hide resolved
}
// DWPT is already locked before return by this method:
return dwpt;
}

private void ensureOpen() {
Expand All @@ -130,13 +132,15 @@ private void ensureOpen() {
}
}

private synchronized boolean contains(DocumentsWriterPerThread state) {
return dwpts.contains(state);
}

void marksAsFreeAndUnlock(DocumentsWriterPerThread state) {
final long ramBytesUsed = state.ramBytesUsed();
synchronized (this) {
assert dwpts.contains(state)
: "we tried to add a DWPT back to the pool but the pool doesn't know aobut this DWPT";
freeList.add(state, ramBytesUsed);
}
assert contains(state)
: "we tried to add a DWPT back to the pool but the pool doesn't know about this DWPT";
freeList.add(state, ramBytesUsed);
state.unlock();
}

Expand Down Expand Up @@ -175,6 +179,9 @@ List<DocumentsWriterPerThread> filterAndLock(Predicate<DocumentsWriterPerThread>
* @return <code>true</code> iff the given DWPT has been removed. Otherwise <code>false</code>
*/
synchronized boolean checkout(DocumentsWriterPerThread perThread) {
// The DWPT must be held by the current thread. This guarantees that concurrent calls to
// #getAndLock cannot pull this DWPT out of the pool since #getAndLock does a DWPT#tryLock to
// check if the DWPT is available.
assert perThread.isHeldByCurrentThread();
if (dwpts.remove(perThread)) {
freeList.remove(perThread);
Expand Down