Skip to content

Commit

Permalink
[SCB-1753]accessor problem fix: LinkedBlockingQueueEx queue features
Browse files Browse the repository at this point in the history
  • Loading branch information
liubao68 committed Aug 3, 2020
1 parent 70756fe commit 7b85cf8
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 113 deletions.
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -49,6 +50,8 @@ public class GroupExecutor implements Executor, Closeable {

public static final String KEY_MAX_QUEUE_SIZE = "servicecomb.executor.default.maxQueueSize-per-group";

public static final String KEY_ALLOW_CORE_THREAD_TIMEOUT = "servicecomb.executor.default.allowCoreThreadTimeOut";

private static final AtomicBoolean LOG_PRINTED = new AtomicBoolean();

protected String groupName;
Expand All @@ -63,6 +66,8 @@ public class GroupExecutor implements Executor, Closeable {

protected int maxQueueSize;

protected boolean allowCoreThreadTimeOut;

// to avoid multiple network thread conflicted when put tasks to executor queue
private List<ExecutorService> executorList = new ArrayList<>();

Expand All @@ -83,12 +88,26 @@ public GroupExecutor init(String groupName) {
for (int groupIdx = 0; groupIdx < groupCount; groupIdx++) {
GroupThreadFactory factory = new GroupThreadFactory(groupName + groupIdx);

ThreadPoolExecutorEx executor = new ThreadPoolExecutorEx(coreThreads,
maxThreads,
maxIdleInSecond,
TimeUnit.SECONDS,
new LinkedBlockingQueueEx<>(maxQueueSize),
factory);
ThreadPoolExecutorEx executor;
if (allowCoreThreadTimeOut) {
// coreThreads = maxThreads
executor = new ThreadPoolExecutorEx(maxThreads,
maxThreads,
maxIdleInSecond,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(maxQueueSize),
factory);
} else {
executor = new ThreadPoolExecutorEx(coreThreads,
maxThreads,
maxIdleInSecond,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(maxQueueSize),
factory);
}

executor.allowCoreThreadTimeOut(allowCoreThreadTimeOut);

executorList.add(executor);
}

Expand Down Expand Up @@ -123,10 +142,12 @@ public void initConfig() {

maxIdleInSecond = DynamicPropertyFactory.getInstance().getIntProperty(KEY_MAX_IDLE_SECOND, 60).get();
maxQueueSize = DynamicPropertyFactory.getInstance().getIntProperty(KEY_MAX_QUEUE_SIZE, Integer.MAX_VALUE).get();

allowCoreThreadTimeOut = DynamicPropertyFactory.getInstance()
.getBooleanProperty(KEY_ALLOW_CORE_THREAD_TIMEOUT, true).get();
LOGGER.info(
"executor name={}, group={}. per group settings, coreThreads={}, maxThreads={}, maxIdleInSecond={}, maxQueueSize={}.",
groupName, groupCount, coreThreads, maxThreads, maxIdleInSecond, maxQueueSize);
"executor name={}, group={}. per group settings, coreThreads={}, "
+ "maxThreads={}, maxIdleInSecond={}, maxQueueSize={}, allowCoreThreadTimeOut={}.",
groupName, groupCount, coreThreads, maxThreads, maxIdleInSecond, maxQueueSize, allowCoreThreadTimeOut);
}

public List<ExecutorService> getExecutorList() {
Expand Down

This file was deleted.

Expand Up @@ -33,9 +33,6 @@ public class ThreadPoolExecutorEx extends ThreadPoolExecutor {
public ThreadPoolExecutorEx(int coreThreads, int maxThreads, int maxIdleInSecond, TimeUnit timeUnit,
BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {
super(coreThreads, maxThreads, maxIdleInSecond, timeUnit, queue, threadFactory);
if (queue instanceof LinkedBlockingQueueEx) {
((LinkedBlockingQueueEx) queue).setOwner(this);
}
setRejectedExecutionHandler(this::rejectedExecution);
}

Expand Down
Expand Up @@ -20,11 +20,13 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.IntSupplier;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -52,8 +54,14 @@ public void run() {
}
}

ThreadPoolExecutorEx executorEx = new ThreadPoolExecutorEx(2, 4, 60, TimeUnit.SECONDS,
new LinkedBlockingQueueEx<>(2), Executors.defaultThreadFactory());
ThreadPoolExecutorEx executorEx;

@Before
public void setUp() {
executorEx = new ThreadPoolExecutorEx(4, 4, 3, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2), Executors.defaultThreadFactory());
executorEx.allowCoreThreadTimeOut(true);
}

public TestTask submitTask() {
TestTask task = new TestTask();
Expand Down Expand Up @@ -139,19 +147,27 @@ public void schedule() throws ExecutionException, InterruptedException {
t4.quit();
t5.quit();
t6.quit();
waitForResult(0, executorEx::getPoolSize);
executorEx.shutdown();
}

private void waitForResult(int expect, IntSupplier supplier) {
long max = 30000;
long waited = 0;

for (; ; ) {
if (waited > max) {
throw new IllegalStateException("timed out waiting.");
}
int actual = supplier.getAsInt();
if (expect == actual) {
return;
}

LOGGER.info("waiting for thread result, expect:{}, actual: {}.", expect, actual);
try {
TimeUnit.MILLISECONDS.sleep(100);
TimeUnit.MILLISECONDS.sleep(200);
waited += 200;
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
Expand Down
Expand Up @@ -17,8 +17,6 @@

package org.apache.servicecomb.it.edge.handler;

import java.util.concurrent.TimeoutException;

import javax.ws.rs.core.Response.Status;

import org.apache.servicecomb.core.Handler;
Expand Down

0 comments on commit 7b85cf8

Please sign in to comment.