Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use SingleThreadExecutor for OrderedExecutor and drainTo() tasks into local array #3546

Merged
merged 10 commits into from
Oct 20, 2022
5 changes: 5 additions & 0 deletions bookkeeper-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@
<artifactId>commons-lang3</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,16 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.collections.BlockingMpscQueue;
import org.apache.bookkeeper.common.util.affinity.CpuAffinity;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
Expand Down Expand Up @@ -294,20 +291,17 @@ public T call() throws Exception {
}
}

protected ThreadPoolExecutor createSingleThreadExecutor(ThreadFactory factory) {
BlockingQueue<Runnable> queue;
if (enableBusyWait) {
// Use queue with busy-wait polling strategy
queue = new BlockingMpscQueue<>(maxTasksInQueue > 0 ? maxTasksInQueue : DEFAULT_MAX_ARRAY_QUEUE_SIZE);
protected ExecutorService createSingleThreadExecutor(ThreadFactory factory) {
if (maxTasksInQueue > 0) {
return new SingleThreadExecutor(factory, maxTasksInQueue, true);
} else {
// By default, use regular JDK LinkedBlockingQueue
queue = new LinkedBlockingQueue<>();
return new SingleThreadExecutor(factory);
}
return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, queue, factory);
}

protected ExecutorService getBoundedExecutor(ThreadPoolExecutor executor) {
return new BoundedExecutorService(executor, this.maxTasksInQueue);
protected ExecutorService getBoundedExecutor(ExecutorService executor) {
checkArgument(executor instanceof ThreadPoolExecutor);
return new BoundedExecutorService((ThreadPoolExecutor) executor, this.maxTasksInQueue);
}

protected ExecutorService addExecutorDecorators(ExecutorService executor) {
Expand Down Expand Up @@ -400,11 +394,14 @@ protected OrderedExecutor(String baseName, int numThreads, ThreadFactory threadF
threads = new ExecutorService[numThreads];
threadIds = new long[numThreads];
for (int i = 0; i < numThreads; i++) {
ThreadPoolExecutor thread = createSingleThreadExecutor(
ExecutorService thread = createSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(name + "-" + getClass().getSimpleName() + "-" + i + "-%d")
.setThreadFactory(threadFactory).build());

threads[i] = addExecutorDecorators(getBoundedExecutor(thread));
if (traceTaskExecution || preserveMdcForTaskExecution) {
thread = addExecutorDecorators(thread);
}
threads[i] = thread;

final int idx = i;
try {
Expand Down Expand Up @@ -434,43 +431,49 @@ protected OrderedExecutor(String baseName, int numThreads, ThreadFactory threadF
throw new RuntimeException("Couldn't start thread " + i, e);
}

// Register gauges
statsLogger.scopeLabel("thread", String.valueOf(idx))
.registerGauge(String.format("%s-queue", name), new Gauge<Number>() {
@Override
public Number getDefaultValue() {
return 0;
}

@Override
public Number getSample() {
return thread.getQueue().size();
}
});
statsLogger.scopeLabel("thread", String.valueOf(idx))
.registerGauge(String.format("%s-completed-tasks", name), new Gauge<Number>() {
@Override
public Number getDefaultValue() {
return 0;
}

@Override
public Number getSample() {
return thread.getCompletedTaskCount();
}
});
statsLogger.scopeLabel("thread", String.valueOf(idx))
.registerGauge(String.format("%s-total-tasks", name), new Gauge<Number>() {
@Override
public Number getDefaultValue() {
return 0;
}

@Override
public Number getSample() {
return thread.getTaskCount();
}
});
if (thread instanceof SingleThreadExecutor) {
SingleThreadExecutor ste = (SingleThreadExecutor) thread;
ste.registerMetrics(statsLogger);
} else if (thread instanceof ThreadPoolExecutor) {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) thread;
// Register gauges
statsLogger.scopeLabel("thread", String.valueOf(idx))
.registerGauge(String.format("%s-queue", name), new Gauge<Number>() {
@Override
public Number getDefaultValue() {
return 0;
}

@Override
public Number getSample() {
return threadPoolExecutor.getQueue().size();
}
});
statsLogger.scopeLabel("thread", String.valueOf(idx))
.registerGauge(String.format("%s-completed-tasks", name), new Gauge<Number>() {
@Override
public Number getDefaultValue() {
return 0;
}

@Override
public Number getSample() {
return threadPoolExecutor.getCompletedTaskCount();
}
});
statsLogger.scopeLabel("thread", String.valueOf(idx))
.registerGauge(String.format("%s-total-tasks", name), new Gauge<Number>() {
@Override
public Number getDefaultValue() {
return 0;
}

@Override
public Number getSample() {
return threadPoolExecutor.getTaskCount();
}
});
}
}

statsLogger.registerGauge(String.format("%s-threads", name), new Gauge<Number>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.bookkeeper.stats.StatsLogger;
Expand Down Expand Up @@ -118,17 +117,20 @@ private OrderedScheduler(String baseName,
}

@Override
protected ScheduledThreadPoolExecutor createSingleThreadExecutor(ThreadFactory factory) {
return new ScheduledThreadPoolExecutor(1, factory);
protected ExecutorService createSingleThreadExecutor(ThreadFactory factory) {
return new BoundedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, factory), this.maxTasksInQueue);
}

@Override
protected ListeningScheduledExecutorService getBoundedExecutor(ThreadPoolExecutor executor) {
protected ListeningScheduledExecutorService getBoundedExecutor(ExecutorService executor) {
return new BoundedScheduledExecutorService((ScheduledThreadPoolExecutor) executor, this.maxTasksInQueue);
}

@Override
protected ListeningScheduledExecutorService addExecutorDecorators(ExecutorService executor) {
if (!(executor instanceof ListeningScheduledExecutorService)) {
executor = new BoundedScheduledExecutorService((ScheduledThreadPoolExecutor) executor, 0);
}
return new OrderedSchedulerDecoratedThread((ListeningScheduledExecutorService) executor);
}

Expand Down
Loading