Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to configure BK for low latency busy-wait settings #1812

Merged
merged 11 commits into from
Jan 7, 2019
5 changes: 5 additions & 0 deletions bookkeeper-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
<artifactId>bookkeeper-stats-api</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.bookkeeper</groupId>
merlimat marked this conversation as resolved.
Show resolved Hide resolved
<artifactId>cpu-affinity</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand All @@ -44,6 +45,8 @@

import lombok.extern.slf4j.Slf4j;

import org.apache.bookkeeper.common.collections.BlockingMpscQueue;
import org.apache.bookkeeper.common.util.affinity.CpuAffinity;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
Expand All @@ -68,6 +71,7 @@
@Slf4j
public class OrderedExecutor implements ExecutorService {
public static final int NO_TASK_LIMIT = -1;
private static final int DEFAULT_MAX_ARRAY_QUEUE_SIZE = 10_000;
protected static final long WARN_TIME_MICRO_SEC_DEFAULT = TimeUnit.SECONDS.toMicros(1);

final String name;
Expand All @@ -80,7 +84,7 @@ public class OrderedExecutor implements ExecutorService {
final boolean preserveMdcForTaskExecution;
final long warnTimeMicroSec;
final int maxTasksInQueue;

final boolean enableBusyWait;

public static Builder newBuilder() {
return new Builder();
Expand All @@ -98,7 +102,7 @@ public OrderedExecutor build() {
}
return new OrderedExecutor(name, numThreads, threadFactory, statsLogger,
traceTaskExecution, preserveMdcForTaskExecution,
warnTimeMicroSec, maxTasksInQueue);
warnTimeMicroSec, maxTasksInQueue, enableBusyWait);
}
}

Expand All @@ -114,6 +118,7 @@ public abstract static class AbstractBuilder<T extends OrderedExecutor> {
protected boolean preserveMdcForTaskExecution = false;
protected long warnTimeMicroSec = WARN_TIME_MICRO_SEC_DEFAULT;
protected int maxTasksInQueue = NO_TASK_LIMIT;
protected boolean enableBusyWait = false;

public AbstractBuilder<T> name(String name) {
this.name = name;
Expand Down Expand Up @@ -155,6 +160,11 @@ public AbstractBuilder<T> traceTaskWarnTimeMicroSec(long warnTimeMicroSec) {
return this;
}

public AbstractBuilder<T> enableBusyWait(boolean enableBusyWait) {
this.enableBusyWait = enableBusyWait;
return this;
}

@SuppressWarnings("unchecked")
public T build() {
if (null == threadFactory) {
Expand All @@ -168,7 +178,8 @@ public T build() {
traceTaskExecution,
preserveMdcForTaskExecution,
warnTimeMicroSec,
maxTasksInQueue);
maxTasksInQueue,
enableBusyWait);
}
}

Expand Down Expand Up @@ -277,7 +288,15 @@ public T call() throws Exception {
}

protected ThreadPoolExecutor createSingleThreadExecutor(ThreadFactory factory) {
return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), factory);
BlockingQueue<Runnable> queue;
if (enableBusyWait) {
// Use queue with busy-wait polling strategy
queue = new BlockingMpscQueue<>(maxTasksInQueue > 0 ? maxTasksInQueue : DEFAULT_MAX_ARRAY_QUEUE_SIZE);
} else {
// By default, use regular JDK LinkedBlockingQueue
queue = new LinkedBlockingQueue<>();
}
return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, queue, factory);
}

protected ExecutorService getBoundedExecutor(ThreadPoolExecutor executor) {
Expand Down Expand Up @@ -361,25 +380,40 @@ public <T> Future<T> submit(Runnable task, T result) {
*/
protected OrderedExecutor(String baseName, int numThreads, ThreadFactory threadFactory,
StatsLogger statsLogger, boolean traceTaskExecution,
boolean preserveMdcForTaskExecution, long warnTimeMicroSec, int maxTasksInQueue) {
boolean preserveMdcForTaskExecution, long warnTimeMicroSec, int maxTasksInQueue,
boolean enableBusyWait) {
checkArgument(numThreads > 0);
checkArgument(!StringUtils.isBlank(baseName));

this.maxTasksInQueue = maxTasksInQueue;
this.warnTimeMicroSec = warnTimeMicroSec;
this.enableBusyWait = enableBusyWait;
name = baseName;
threads = new ExecutorService[numThreads];
threadIds = new long[numThreads];
for (int i = 0; i < numThreads; i++) {
ThreadPoolExecutor thread = createSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(name + "-" + getClass().getSimpleName() + "-" + i + "-%d")
.setThreadFactory(threadFactory).build());

threads[i] = addExecutorDecorators(getBoundedExecutor(thread));

final int idx = i;
try {
threads[idx].submit(() -> {
threadIds[idx] = Thread.currentThread().getId();

if (enableBusyWait) {
// Try to acquire 1 CPU core to the executor thread. If it fails we
// are just logging the error and continuing, falling back to
// non-isolated CPUs.
try {
CpuAffinity.acquireCore();
} catch (Throwable t) {
log.warn("Failed to acquire CPU core for thread {}", Thread.currentThread().getName(),
merlimat marked this conversation as resolved.
Show resolved Hide resolved
t.getMessage(), t);
}
}
}).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,9 @@ private OrderedScheduler(String baseName,
long warnTimeMicroSec,
int maxTasksInQueue) {
super(baseName, numThreads, threadFactory, statsLogger, traceTaskExecution,
preserveMdcForTaskExecution, warnTimeMicroSec, maxTasksInQueue);
preserveMdcForTaskExecution, warnTimeMicroSec, maxTasksInQueue, false /* enableBusyWait */);
}


@Override
protected ScheduledThreadPoolExecutor createSingleThreadExecutor(ThreadFactory factory) {
return new ScheduledThreadPoolExecutor(1, factory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,17 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.bookie.stats.JournalStats;
import org.apache.bookkeeper.common.collections.BlockingMpscQueue;
import org.apache.bookkeeper.common.collections.RecyclableArrayList;
import org.apache.bookkeeper.common.util.affinity.CpuAffinity;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.stats.Counter;
Expand All @@ -56,7 +59,6 @@
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -472,6 +474,15 @@ public ForceWriteThread(Thread threadToNotifyOnEx, boolean enableGroupForceWrite
@Override
public void run() {
LOG.info("ForceWrite Thread started");

if (conf.isBusyWaitEnabled()) {
try {
CpuAffinity.acquireCore();
} catch (Exception e) {
LOG.warn("Unable to acquire CPU core for Journal ForceWrite thread: {}", e.getMessage(), e);
}
}

boolean shouldForceWrite = true;
int numReqInLastForceWrite = 0;
while (running) {
Expand Down Expand Up @@ -611,8 +622,8 @@ static void writePaddingBytes(JournalChannel jc, ByteBuf paddingBuffer, int jour
private final ExecutorService cbThreadPool;

// journal entry queue to commit
final BlockingQueue<QueueEntry> queue = new GrowableArrayBlockingQueue<QueueEntry>();
final BlockingQueue<ForceWriteRequest> forceWriteRequests = new GrowableArrayBlockingQueue<ForceWriteRequest>();
final BlockingQueue<QueueEntry> queue;
final BlockingQueue<ForceWriteRequest> forceWriteRequests;

volatile boolean running = true;
private final LedgerDirsManager ledgerDirsManager;
Expand All @@ -628,6 +639,16 @@ public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf
public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf,
LedgerDirsManager ledgerDirsManager, StatsLogger statsLogger) {
super("BookieJournal-" + conf.getBookiePort());

if (conf.isBusyWaitEnabled()) {
// To achieve lower latency, use busy-wait blocking queue implementation
queue = new BlockingMpscQueue<>(conf.getJournalQueueSize());
forceWriteRequests = new BlockingMpscQueue<>(conf.getJournalQueueSize());
} else {
queue = new ArrayBlockingQueue<>(conf.getJournalQueueSize());
forceWriteRequests = new ArrayBlockingQueue<>(conf.getJournalQueueSize());
}

this.ledgerDirsManager = ledgerDirsManager;
this.conf = conf;
this.journalDirectory = journalDirectory;
Expand Down Expand Up @@ -906,6 +927,14 @@ public int getJournalQueueLength() {
public void run() {
LOG.info("Starting journal on {}", journalDirectory);

if (conf.isBusyWaitEnabled()) {
try {
CpuAffinity.acquireCore();
} catch (Exception e) {
LOG.warn("Unable to acquire CPU core for Journal thread: {}", e.getMessage(), e);
}
}

RecyclableArrayList<QueueEntry> toFlush = entryListRecycler.newInstance();
int numEntriesToFlush = 0;
ByteBuf lenBuff = Unpooled.buffer(4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.DefaultThreadFactory;

import java.io.IOException;
import java.net.URI;
import java.util.Collections;
Expand All @@ -39,9 +39,9 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
import org.apache.bookkeeper.client.AsyncCallback.IsClosedCallback;
Expand Down Expand Up @@ -78,9 +78,9 @@
import org.apache.bookkeeper.proto.DataFormats;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.EventLoopUtil;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.lang.SystemUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
Expand Down Expand Up @@ -404,6 +404,7 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo
.traceTaskExecution(conf.getEnableTaskExecutionStats())
.preserveMdcForTaskExecution(conf.getPreserveMdcForTaskExecution())
.traceTaskWarnTimeMicroSec(conf.getTaskExecutionWarnTimeMicros())
.enableBusyWait(conf.isBusyWaitEnabled())
.build();

// initialize stats logger
Expand Down Expand Up @@ -434,7 +435,8 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo

// initialize event loop group
if (null == eventLoopGroup) {
this.eventLoopGroup = getDefaultEventLoopGroup(conf);
this.eventLoopGroup = EventLoopUtil.getClientEventLoopGroup(conf,
new DefaultThreadFactory("bookkeeper-io"));
this.ownEventLoopGroup = true;
} else {
this.eventLoopGroup = eventLoopGroup;
Expand Down Expand Up @@ -601,8 +603,7 @@ BookieWatcher getBookieWatcher() {
return bookieWatcher;
}

@VisibleForTesting
OrderedExecutor getMainWorkerPool() {
public OrderedExecutor getMainWorkerPool() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason why @VisibleForTesting annotation is removed?

return mainWorkerPool;
}

Expand Down Expand Up @@ -1282,7 +1283,6 @@ public void asyncDeleteLedger(final long lId, final DeleteCallback cb, final Obj
* @throws InterruptedException
* @throws BKException
*/
@SuppressWarnings("unchecked")
public void deleteLedger(long lId) throws InterruptedException, BKException {
CompletableFuture<Void> future = new CompletableFuture<>();
SyncDeleteCallback result = new SyncDeleteCallback(future);
Expand Down Expand Up @@ -1406,22 +1406,6 @@ public void close() throws BKException, InterruptedException {
this.metadataDriver.close();
}

static EventLoopGroup getDefaultEventLoopGroup(ClientConfiguration conf) {
ThreadFactory threadFactory = new DefaultThreadFactory("bookkeeper-io");
final int numThreads = conf.getNumIOThreads();

if (SystemUtils.IS_OS_LINUX) {
try {
return new EpollEventLoopGroup(numThreads, threadFactory);
} catch (Throwable t) {
LOG.warn("Could not use Netty Epoll event loop for bookie server: {}", t.getMessage());
return new NioEventLoopGroup(numThreads, threadFactory);
}
} else {
return new NioEventLoopGroup(numThreads, threadFactory);
}
}

@Override
public CreateBuilder newCreateLedgerOp() {
return new LedgerCreateOp.CreateBuilderImpl(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public abstract class AbstractConfiguration<T extends AbstractConfiguration>
"storeSystemTimeAsLedgerUnderreplicatedMarkTime";
protected static final String STORE_SYSTEMTIME_AS_LEDGER_CREATION_TIME = "storeSystemTimeAsLedgerCreationTime";

protected static final String ENABLE_BUSY_WAIT = "enableBusyWait";

// Metastore settings, only being used when LEDGER_MANAGER_FACTORY_CLASS is MSLedgerManagerFactory
protected static final String METASTORE_IMPL_CLASS = "metastoreImplClass";
protected static final String METASTORE_MAX_ENTRIES_PER_SCAN = "metastoreMaxEntriesPerScan";
Expand Down Expand Up @@ -879,6 +881,42 @@ public T setPreserveMdcForTaskExecution(boolean enabled) {
return getThis();
}

/**
* Return whether the busy-wait is enabled for BookKeeper and Netty IO threads.
*
* <p>Default is false
*
* @return the value of the option
*/
public boolean isBusyWaitEnabled() {
return getBoolean(ENABLE_BUSY_WAIT, false);
}

/**
* Option to enable busy-wait settings.
*
* <p>Default is false.
*
* <p>WARNING: This option will enable spin-waiting on executors and IO threads
* in order to reduce latency during context switches. The spinning will
* consume 100% CPU even when bookie is not doing any work. It is
* recommended to reduce the number of threads in the main workers pool
* ({@link ClientConfiguration#setNumWorkerThreads(int)}) and Netty event
* loop {@link ClientConfiguration#setNumIOThreads(int)} to only have few
* CPU cores busy.
* </p>
*
* @param busyWaitEanbled
* if enabled, use spin-waiting strategy to reduce latency in
* context switches
*
* @see #isBusyWaitEnabled()
*/
public T setBusyWaitEnabled(boolean busyWaitEanbled) {
setProperty(ENABLE_BUSY_WAIT, busyWaitEanbled);
return getThis();
}

/**
* Return the flag indicating whether to limit stats logging.
*
Expand Down
Loading