Skip to content

Commit

Permalink
✨ Setps Undertow to use custom ThreadAwareByteBufferPool that delegat…
Browse files Browse the repository at this point in the history
…es to specific implementations based on the thread type to optimize performance given the unique characteristics of virtual threads

#474
  • Loading branch information
ujibang committed May 7, 2024
1 parent 6eae3d0 commit 37d0b99
Show file tree
Hide file tree
Showing 11 changed files with 114 additions and 242 deletions.
Expand Up @@ -34,6 +34,7 @@ public record CoreModule(String name,
int ioThreads,
int workersSchedulerParallelism,
int workersSchedulerMaxPoolSize,
boolean buffersPooling,
int bufferSize,
boolean directBuffers,
boolean forceGzipEncoding,
Expand All @@ -48,12 +49,13 @@ public record CoreModule(String name,
public static final String IO_THREADS_KEY = "io-threads";
public static final String WORKERS_SCHEDULER_PARALLELISM_KEY ="workers-scheduler-parallelism";
public static final String WORKERS_SCHEDULER_MAX_POOL_SIZE_KEY = "workers-scheduler-max-pool-size";
public static final String BUFFERS_POOLING_KEY = "buffer-pooling";
public static final String BUFFER_SIZE_KEY = "buffer-size";
public static final String DIRECT_BUFFERS_KEY = "direct-buffers";
public static final String FORCE_GZIP_ENCODING_KEY = "force-gzip-encoding";
public static final String ALLOW_UNESCAPED_CHARS_IN_ULR_KEY = "allow-unescaped-characters-in-url";

private static final CoreModule DEFAULT_CORE_MODULE = new CoreModule("default", "plugins", new ArrayList<>(), false, null, 0, 0, 256, 16364, true, false, true);
private static final CoreModule DEFAULT_CORE_MODULE = new CoreModule("default", "plugins", new ArrayList<>(), false, null, 0, 0, 256, true, 16364, true, false, true);

public CoreModule(Map<String, Object> conf, boolean silent) {
this(getOrDefault(conf, INSTANCE_NAME_KEY, DEFAULT_CORE_MODULE.name(), silent),
Expand All @@ -65,6 +67,7 @@ public CoreModule(Map<String, Object> conf, boolean silent) {
getOrDefault(conf, IO_THREADS_KEY, DEFAULT_CORE_MODULE.ioThreads(), silent),
getOrDefault(conf, WORKERS_SCHEDULER_PARALLELISM_KEY, DEFAULT_CORE_MODULE.workersSchedulerParallelism(), silent),
getOrDefault(conf, WORKERS_SCHEDULER_MAX_POOL_SIZE_KEY, DEFAULT_CORE_MODULE.workersSchedulerMaxPoolSize(), silent),
getOrDefault(conf, BUFFERS_POOLING_KEY, DEFAULT_CORE_MODULE.buffersPooling(), silent),
getOrDefault(conf, BUFFER_SIZE_KEY, DEFAULT_CORE_MODULE.bufferSize(), silent),
getOrDefault(conf, DIRECT_BUFFERS_KEY, DEFAULT_CORE_MODULE.directBuffers(), silent),
// following is optional, so get it always in silent mode
Expand Down
17 changes: 1 addition & 16 deletions commons/src/main/java/org/restheart/exchange/GraphQLRequest.java
Expand Up @@ -78,22 +78,7 @@ public JsonElement parseContent() throws IOException, BadRequestException {
}

private JsonElement _parseContentJson(String rawBody) throws JsonSyntaxException, BadRequestException {
var json = JsonParser.parseString(rawBody);

// json must contain the query field and is must be a string
// if (json.isJsonObject() && json.getAsJsonObject().has(QUERY_FIELD)) {
// if (json.getAsJsonObject().get(QUERY_FIELD).isJsonPrimitive()) {
// if (!json.getAsJsonObject().get(QUERY_FIELD).getAsJsonPrimitive().isString()) {
// throw new BadRequestException("query field must be a string", HttpStatus.SC_BAD_REQUEST);
// }
// } else {
// throw new BadRequestException("query field must be a string", HttpStatus.SC_BAD_REQUEST);
// }
// } else {
// throw new BadRequestException("missing query field", HttpStatus.SC_BAD_REQUEST);
// }

return json;
return JsonParser.parseString(rawBody);
}

private JsonElement _parseContentGraphQL(String rawBody) {
Expand Down
Expand Up @@ -1107,7 +1107,7 @@ public BsonValue parseContent() throws BadRequestException, IOException {
// for instance, by an Interceptor at interceptPoint=BEFORE_EXCHANGE_INIT
var attacheBsonContent = MongoServiceAttachments.attachedBsonContent(wrapped);
return attacheBsonContent == null
? MongoRequestContentInjector.inject(wrapped) // TODO throw BadRequestException if error parsing
? MongoRequestContentInjector.inject(wrapped)
: attacheBsonContent;
}

Expand Down
Expand Up @@ -297,16 +297,13 @@ private static BsonValue injectBson(HttpServerExchange exchange) throws BadReque
}

private static FormDataParser parser(HttpServerExchange exchange) {
if (!exchange.isBlocking()) {
exchange.startBlocking();
}
// form data requires exchange.startBlocking(); called by WorkingThreadsPoolDispatcher

return FORM_PARSER.createParser(exchange);
}

private static BsonValue injectMultipart(HttpServerExchange exchange, MongoRequest request, MongoResponse response) throws BadRequestException, IOException {
// form data requires
exchange.startBlocking();
// form data requires exchange.startBlocking(); called by WorkingThreadsPoolDispatcher

if (request.isWriteDocument() && (request.isFile() || request.isFilesBucket())) {
return injectMultiparForFiles(exchange, request, response);
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/java/org/restheart/Bootstrapper.java
Expand Up @@ -55,7 +55,7 @@
import static org.fusesource.jansi.Ansi.Color.RED;
import static org.fusesource.jansi.Ansi.ansi;
import org.fusesource.jansi.AnsiConsole;
import org.restheart.buffers.FastByteBufferPool;
import org.restheart.buffers.ThreadAwareByteBufferPool;
import org.restheart.configuration.Configuration;
import org.restheart.configuration.ConfigurationException;
import org.restheart.configuration.ProxiedResource;
Expand Down Expand Up @@ -530,10 +530,12 @@ private static void startCoreSystem() {

var builder = Undertow.builder();

// TODO use undertow default buffer pool for io-treads and FastByteBufferPool for virtual threads
builder.setByteBufferPool(new FastByteBufferPool(
// set the bytee buffer pool
// since the undertow default byte buffer is not good for virtual threads
builder.setByteBufferPool(new ThreadAwareByteBufferPool(
configuration.coreModule().directBuffers(),
configuration.coreModule().bufferSize()));
configuration.coreModule().bufferSize(),
configuration.coreModule().buffersPooling()));

var httpsListener = configuration.httpsListener();
if (httpsListener.enabled()) {
Expand Down
Expand Up @@ -24,19 +24,23 @@
import io.undertow.connector.PooledByteBuffer;

/**
* A fast byte buffer pool that is fast just because it is not a pool and avoids all multithreading overhead
* A byte buffer pool implementation that does not actually pool resources.
* <p>
* This is intended for use with Virtual Threads, where resource pooling
* is unnecessary and may even be disadvantageous due to potential overhead.
* </p>
*
* @author Andrea Di Cesare
*/
public class FastByteBufferPool implements ByteBufferPool {
public class NotPoolingByteBufferPool implements ByteBufferPool {
private final boolean direct;
private final int bufferSize;

/**
* @param direct If this implementation should use direct buffers
* @param bufferSize The buffer size to use
*/
public FastByteBufferPool(boolean direct, int bufferSize) {
public NotPoolingByteBufferPool(boolean direct, int bufferSize) {
this.direct = direct;
this.bufferSize = bufferSize;
}
Expand All @@ -57,7 +61,7 @@ public PooledByteBuffer allocate() {
}

@Override
public FastByteBufferPool getArrayBackedPool() {
public NotPoolingByteBufferPool getArrayBackedPool() {
return this;
}

Expand Down
@@ -0,0 +1,85 @@
/*-
* ========================LICENSE_START=================================
* restheart-core
* %%
* Copyright (C) 2014 - 2024 SoftInstigate
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* =========================LICENSE_END==================================
*/
package org.restheart.buffers;

import io.undertow.connector.ByteBufferPool;
import io.undertow.connector.PooledByteBuffer;
import io.undertow.server.DefaultByteBufferPool;

/**
* A byte buffer pool that delegates to specific implementations based on the thread type.
* <p>
* For IO threads, it utilizes the {@code undertow.DefaultByteBufferPool} to pool resources efficiently.
* For virtual worker threads, it uses the {@code NotPoolingByteBufferPool}, which doesn't pool resources, to optimize performance
* given the unique characteristics of virtual threads.
* </p>
*
* @author Andrea Di Cesare
*/

public class ThreadAwareByteBufferPool implements ByteBufferPool {
private final DefaultByteBufferPool undertowDefaultByteBufferPool;
private final NotPoolingByteBufferPool notPoolingByteBufferPool;

private final boolean enablePooling;

/**
* @param direct If this implementation should use direct buffers
* @param bufferSize The buffer size to use
* @param enablePooling true to enable pooling for platform threads
*/
public ThreadAwareByteBufferPool(boolean direct, int bufferSize, boolean enablePooling) {
this.undertowDefaultByteBufferPool = enablePooling ? new DefaultByteBufferPool(direct, bufferSize, -1, 4) : null;
this.notPoolingByteBufferPool = new NotPoolingByteBufferPool(direct, bufferSize);
this.enablePooling = enablePooling;
}

@Override
public int getBufferSize() {
return this.notPoolingByteBufferPool.getBufferSize();
}

@Override
public boolean isDirect() {
return this.notPoolingByteBufferPool.isDirect();
}

@Override
public PooledByteBuffer allocate() {
return !enablePooling || Thread.currentThread().isVirtual()
? this.notPoolingByteBufferPool.allocate()
: this.undertowDefaultByteBufferPool.allocate();
}

@Override
public ByteBufferPool getArrayBackedPool() {
return !enablePooling || Thread.currentThread().isVirtual()
? this.notPoolingByteBufferPool.getArrayBackedPool()
: this.undertowDefaultByteBufferPool.getArrayBackedPool();
}

@Override
public void close() {
if (enablePooling && !Thread.currentThread().isVirtual()) {
this.undertowDefaultByteBufferPool.close();
}
}
}

0 comments on commit 37d0b99

Please sign in to comment.