Skip to content

Commit

Permalink
♻️ ChannelReader uses exchange.getRequestReceiver()
Browse files Browse the repository at this point in the history
  • Loading branch information
ujibang committed Apr 26, 2024
1 parent 6f83acf commit 5d2bfbe
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 31 deletions.
Expand Up @@ -47,6 +47,7 @@

import io.undertow.server.HttpServerExchange;
import io.undertow.server.handlers.form.FormData;
import io.undertow.server.handlers.form.FormDataParser;
import io.undertow.server.handlers.form.FormParserFactory;
import io.undertow.util.HeaderValues;
import io.undertow.util.Headers;
Expand Down Expand Up @@ -312,12 +313,23 @@ private static BsonValue injectBson(HttpServerExchange exchange) {
return content;
}

private static FormDataParser parser(HttpServerExchange exchange) {
if (!exchange.isBlocking()) {
exchange.startBlocking();
}

return FORM_PARSER.createParser(exchange);
}

private static BsonValue injectMultipart(HttpServerExchange exchange, MongoRequest request, MongoResponse response) {
// form data requires
exchange.startBlocking();

if (request.isWriteDocument() && (request.isFile() || request.isFilesBucket())) {
return injectMultiparForFiles(exchange, request, response);
}

var parser = FORM_PARSER.createParser(exchange);
var parser = parser(exchange);

if (parser == null) {
response.setInError(HttpStatus.SC_NOT_ACCEPTABLE, "There is no form parser registered for the request content type");
Expand Down Expand Up @@ -369,7 +381,7 @@ private static BsonValue injectMultipart(HttpServerExchange exchange, MongoReque
private static BsonValue injectMultiparForFiles(HttpServerExchange exchange, MongoRequest request, MongoResponse response) {
BsonValue content;

var parser = FORM_PARSER.createParser(exchange);
var parser = parser(exchange);

if (parser == null) {
response.setInError(HttpStatus.SC_NOT_ACCEPTABLE, "There is no form parser registered for the request content type");
Expand Down
40 changes: 14 additions & 26 deletions commons/src/main/java/org/restheart/utils/ChannelReader.java
Expand Up @@ -20,13 +20,7 @@

package org.restheart.utils;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

import org.xnio.channels.Channels;

import io.undertow.server.HttpServerExchange;

Expand All @@ -35,18 +29,21 @@
* @author Andrea Di Cesare {@literal <andrea@softinstigate.com>}
*/
public class ChannelReader {

final static Charset CHARSET = StandardCharsets.UTF_8;
final static int CAPACITY = 1024;

/**
*
* @param exchange
* @return
* @throws IOException
*/
public static String readString(HttpServerExchange exchange) throws IOException {
return new String(readBytes(exchange), CHARSET);
final var receiver = exchange.getRequestReceiver();
final var ret = new String[1];

receiver.receiveFullString(
(_exchange, data) -> ret[0] = data,
(_exchange, ioe) -> LambdaUtils.throwsSneakyException(ioe));

return ret[0];
}

/**
Expand All @@ -56,22 +53,13 @@ public static String readString(HttpServerExchange exchange) throws IOException
* @throws IOException
*/
public static byte[] readBytes(HttpServerExchange exchange) throws IOException {
var channel = exchange.getRequestChannel();

if (channel == null) {
return null;
}

try (var os = new ByteArrayOutputStream(CAPACITY)) {
var buffer = ByteBuffer.allocate(CAPACITY);
final var receiver = exchange.getRequestReceiver();
final var ret = new byte[1][];

while (Channels.readBlocking(channel, buffer) != -1) {
buffer.flip();
os.write(buffer.array(), 0, buffer.remaining());
buffer.clear();
}
receiver.receiveFullBytes(
(_exchange, data) -> ret[0] = data,
(_exchange, ioe) -> LambdaUtils.throwsSneakyException(ioe));

return os.toByteArray();
}
return ret[0];
}
}
Expand Up @@ -25,7 +25,6 @@
import org.restheart.utils.ThreadsUtils;

import io.undertow.server.HttpServerExchange;
import io.undertow.server.handlers.BlockingHandler;

/**
*
Expand All @@ -42,7 +41,6 @@
*
*/
public class WorkingThreadsPoolDispatcher extends PipelinedHandler {
private final BlockingHandler blockingHandler = new BlockingHandler(this);
private static final Executor virtualThreadsExecutor = ThreadsUtils.virtualThreadsExecutor();

/**
Expand Down Expand Up @@ -70,7 +68,7 @@ public WorkingThreadsPoolDispatcher(PipelinedHandler next) {
@Override
public void handleRequest(HttpServerExchange exchange) throws Exception {
if (exchange.isInIoThread()) {
exchange.dispatch(virtualThreadsExecutor, blockingHandler);
exchange.dispatch(virtualThreadsExecutor, getNext());
} else {
next(exchange);
}
Expand Down

0 comments on commit 5d2bfbe

Please sign in to comment.