Skip to content

Commit

Permalink
Make light and heavy thread pool configurable for s3 proxy v2
Browse files Browse the repository at this point in the history
### What changes are proposed in this pull request?

Add properties to config s3 proxy v2 thread pool.
for light pool:
1. `alluxio.proxy.s3.v2.async.light.pool.core.thread.number` ;
2. `alluxio.proxy.s3.v2.async.light.pool.maximum.thread.number`;
3. `alluxio.proxy.s3.v2.async.light.pool.queue.size`.


for heavy pool:
1. `alluxio.proxy.s3.v2.async.heavy.pool.core.thread.number` ;
2. `alluxio.proxy.s3.v2.async.heavy.pool.maximum.thread.number`;
3. `alluxio.proxy.s3.v2.async.heavy.pool.queue.size`.

### Why are the changes needed?

We config `alluxio.web.threads` as 1000, because our OPS has reached
2000. The default value of thread number is not big enough.

pr-link: #17082
change-id: cid-a89b016822a511619d23971118336a34cdf39328
  • Loading branch information
humengyu2012 committed Mar 21, 2023
1 parent 9532ce0 commit 65ed1bb
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 9 deletions.
54 changes: 54 additions & 0 deletions core/common/src/main/java/alluxio/conf/PropertyKey.java
Expand Up @@ -5338,6 +5338,48 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey PROXY_S3_V2_ASYNC_LIGHT_POOL_CORE_THREAD_NUMBER =
intBuilder(Name.PROXY_S3_V2_ASYNC_LIGHT_POOL_CORE_THREAD_NUMBER)
.setDefaultValue(8)
.setDescription("Core thread number for async light thread pool.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey PROXY_S3_V2_ASYNC_LIGHT_POOL_MAXIMUM_THREAD_NUMBER =
intBuilder(Name.PROXY_S3_V2_ASYNC_LIGHT_POOL_MAXIMUM_THREAD_NUMBER)
.setDefaultValue(64)
.setDescription("Maximum thread number for async light thread pool.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey PROXY_S3_V2_ASYNC_LIGHT_POOL_QUEUE_SIZE =
intBuilder(Name.PROXY_S3_V2_ASYNC_LIGHT_POOL_QUEUE_SIZE)
.setDefaultValue(64 * 1024)
.setDescription("Queue size for async light thread pool.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey PROXY_S3_V2_ASYNC_HEAVY_POOL_CORE_THREAD_NUMBER =
intBuilder(Name.PROXY_S3_V2_ASYNC_HEAVY_POOL_CORE_THREAD_NUMBER)
.setDefaultValue(8)
.setDescription("Core thread number for async heavy thread pool.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey PROXY_S3_V2_ASYNC_HEAVY_POOL_MAXIMUM_THREAD_NUMBER =
intBuilder(Name.PROXY_S3_V2_ASYNC_HEAVY_POOL_MAXIMUM_THREAD_NUMBER)
.setDefaultValue(64)
.setDescription("Maximum thread number for async heavy thread pool.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey PROXY_S3_V2_ASYNC_HEAVY_POOL_QUEUE_SIZE =
intBuilder(Name.PROXY_S3_V2_ASYNC_HEAVY_POOL_QUEUE_SIZE)
.setDefaultValue(64 * 1024)
.setDescription("Queue size for async heavy thread pool.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.SERVER)
.build();
public static final PropertyKey PROXY_STREAM_CACHE_TIMEOUT_MS =
durationBuilder(Name.PROXY_STREAM_CACHE_TIMEOUT_MS)
.setAlias("alluxio.proxy.stream.cache.timeout.ms")
Expand Down Expand Up @@ -8538,6 +8580,18 @@ public static final class Name {
"alluxio.proxy.s3.v2.version.enabled";
public static final String PROXY_S3_V2_ASYNC_PROCESSING_ENABLED =
"alluxio.proxy.s3.v2.async.processing.enabled";
public static final String PROXY_S3_V2_ASYNC_LIGHT_POOL_CORE_THREAD_NUMBER =
"alluxio.proxy.s3.v2.async.light.pool.core.thread.number";
public static final String PROXY_S3_V2_ASYNC_LIGHT_POOL_MAXIMUM_THREAD_NUMBER =
"alluxio.proxy.s3.v2.async.light.pool.maximum.thread.number";
public static final String PROXY_S3_V2_ASYNC_LIGHT_POOL_QUEUE_SIZE =
"alluxio.proxy.s3.v2.async.light.pool.queue.size";
public static final String PROXY_S3_V2_ASYNC_HEAVY_POOL_CORE_THREAD_NUMBER =
"alluxio.proxy.s3.v2.async.heavy.pool.core.thread.number";
public static final String PROXY_S3_V2_ASYNC_HEAVY_POOL_MAXIMUM_THREAD_NUMBER =
"alluxio.proxy.s3.v2.async.heavy.pool.maximum.thread.number";
public static final String PROXY_S3_V2_ASYNC_HEAVY_POOL_QUEUE_SIZE =
"alluxio.proxy.s3.v2.async.heavy.pool.queue.size";
public static final String S3_UPLOADS_ID_XATTR_KEY = "s3_uploads_mulitpartupload_id";
public static final String PROXY_S3_BUCKETPATHCACHE_TIMEOUT_MS =
"alluxio.proxy.s3.bucketpathcache.timeout";
Expand Down
56 changes: 47 additions & 9 deletions core/server/proxy/src/main/java/alluxio/web/ProxyWebServer.java
Expand Up @@ -29,6 +29,7 @@
import alluxio.util.ThreadFactoryUtils;
import alluxio.util.io.PathUtils;

import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.RateLimiter;
import org.eclipse.jetty.server.HttpChannel;
Expand Down Expand Up @@ -165,15 +166,8 @@ public void init() throws ServletException {
new StreamCache(Configuration.getMs(PropertyKey.PROXY_STREAM_CACHE_TIMEOUT_MS)));
getServletContext().setAttribute(ALLUXIO_PROXY_AUDIT_LOG_WRITER_KEY,
mAsyncAuditLogWriter);

getServletContext().setAttribute(PROXY_S3_V2_LIGHT_POOL,
new ThreadPoolExecutor(8, 64, 0,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(64 * 1024),
ThreadFactoryUtils.build("S3-LIGHTPOOL-%d", false)));
getServletContext().setAttribute(PROXY_S3_V2_HEAVY_POOL,
new ThreadPoolExecutor(8, 64, 0,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(64 * 1024),
ThreadFactoryUtils.build("S3-HEAVYPOOL-%d", false)));
getServletContext().setAttribute(PROXY_S3_V2_LIGHT_POOL, createLightThreadPool());
getServletContext().setAttribute(PROXY_S3_V2_HEAVY_POOL, createHeavyThreadPool());
getServletContext().setAttribute(PROXY_S3_HANDLER_MAP, mS3HandlerMap);
}
});
Expand All @@ -187,6 +181,50 @@ public void init() throws ServletException {
.addServlet(rsServletHolder, PathUtils.concatPath(Constants.REST_API_PREFIX, "*"));
}

private ThreadPoolExecutor createLightThreadPool() {
int lightCorePoolSize = Configuration.getInt(
PropertyKey.PROXY_S3_V2_ASYNC_LIGHT_POOL_CORE_THREAD_NUMBER);
Preconditions.checkArgument(lightCorePoolSize > 0,
PropertyKey.PROXY_S3_V2_ASYNC_LIGHT_POOL_CORE_THREAD_NUMBER.getName()
+ " must be a positive integer.");
int lightMaximumPoolSize = Configuration.getInt(
PropertyKey.PROXY_S3_V2_ASYNC_LIGHT_POOL_MAXIMUM_THREAD_NUMBER);
Preconditions.checkArgument(lightMaximumPoolSize >= lightCorePoolSize,
PropertyKey.PROXY_S3_V2_ASYNC_LIGHT_POOL_MAXIMUM_THREAD_NUMBER.getName()
+ " must be greater than or equal to the value of "
+ PropertyKey.PROXY_S3_V2_ASYNC_LIGHT_POOL_CORE_THREAD_NUMBER.getName());
int lightPoolQueueSize = Configuration.getInt(
PropertyKey.PROXY_S3_V2_ASYNC_LIGHT_POOL_QUEUE_SIZE);
Preconditions.checkArgument(lightPoolQueueSize > 0,
PropertyKey.PROXY_S3_V2_ASYNC_LIGHT_POOL_QUEUE_SIZE.getName()
+ " must be a positive integer.");
return new ThreadPoolExecutor(lightCorePoolSize, lightMaximumPoolSize, 0,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(lightPoolQueueSize),
ThreadFactoryUtils.build("S3-LIGHTPOOL-%d", false));
}

private ThreadPoolExecutor createHeavyThreadPool() {
int heavyCorePoolSize = Configuration.getInt(
PropertyKey.PROXY_S3_V2_ASYNC_HEAVY_POOL_CORE_THREAD_NUMBER);
Preconditions.checkArgument(heavyCorePoolSize > 0,
PropertyKey.PROXY_S3_V2_ASYNC_HEAVY_POOL_CORE_THREAD_NUMBER.getName()
+ " must be a positive integer.");
int heavyMaximumPoolSize = Configuration.getInt(
PropertyKey.PROXY_S3_V2_ASYNC_HEAVY_POOL_MAXIMUM_THREAD_NUMBER);
Preconditions.checkArgument(heavyMaximumPoolSize >= heavyCorePoolSize,
PropertyKey.PROXY_S3_V2_ASYNC_HEAVY_POOL_MAXIMUM_THREAD_NUMBER.getName()
+ " must be greater than or equal to the value of "
+ PropertyKey.PROXY_S3_V2_ASYNC_HEAVY_POOL_CORE_THREAD_NUMBER.getName());
int heavyPoolQueueSize = Configuration.getInt(
PropertyKey.PROXY_S3_V2_ASYNC_HEAVY_POOL_QUEUE_SIZE);
Preconditions.checkArgument(heavyPoolQueueSize > 0,
PropertyKey.PROXY_S3_V2_ASYNC_HEAVY_POOL_QUEUE_SIZE.getName()
+ " must be a positive integer.");
return new ThreadPoolExecutor(heavyCorePoolSize, heavyMaximumPoolSize, 0,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(heavyPoolQueueSize),
ThreadFactoryUtils.build("S3-HEAVYPOOL-%d", false));
}

@Override
public void stop() throws Exception {
if (mAsyncAuditLogWriter != null) {
Expand Down

0 comments on commit 65ed1bb

Please sign in to comment.