Skip to content

Commit

Permalink
platform-http: handle requests using a thread from the worker pool
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Sep 30, 2020
1 parent 2d6a816 commit 00cd740
Showing 1 changed file with 153 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
Expand Down Expand Up @@ -71,10 +72,10 @@ public class QuarkusPlatformHttpConsumer extends DefaultConsumer {

private final Router router;
private final List<Handler<RoutingContext>> handlers;
private Route route;
private final String fileNameExtWhitelist;
private final UploadAttacher uploadAttacher;
private final Pattern PATH_PARAMETER_PATTERN = Pattern.compile("\\{([^/}]+)\\}");
private Route route;

public QuarkusPlatformHttpConsumer(PlatformHttpEndpoint endpoint, Processor processor, Router router,
List<Handler<RoutingContext>> handlers, UploadAttacher uploadAttacher) {
Expand All @@ -86,81 +87,6 @@ public QuarkusPlatformHttpConsumer(PlatformHttpEndpoint endpoint, Processor proc
this.uploadAttacher = uploadAttacher;
}

@Override
public PlatformHttpEndpoint getEndpoint() {
return (PlatformHttpEndpoint) super.getEndpoint();
}

@Override
protected void doStart() throws Exception {
super.doStart();

final PlatformHttpEndpoint endpoint = getEndpoint();
final String path = endpoint.getPath();
/* Transform from the Camel path param syntax /path/{key} to vert.x web's /path/:key */
final String vertxPathParamPath = PATH_PARAMETER_PATTERN.matcher(path).replaceAll(":$1");
final Route newRoute = router.route(vertxPathParamPath);

final Set<Method> methods = Method.parseList(endpoint.getHttpMethodRestrict());
if (!methods.equals(Method.getAll())) {
methods.stream().forEach(m -> newRoute.method(HttpMethod.valueOf(m.name())));
}
if (endpoint.getConsumes() != null) {
newRoute.consumes(endpoint.getConsumes());
}
if (endpoint.getProduces() != null) {
newRoute.produces(endpoint.getProduces());
}

handlers.forEach(newRoute::handler);

newRoute.handler(
ctx -> {
Exchange exchg = null;
try {
final Exchange exchange = exchg = toExchange(ctx);
createUoW(exchange);
getAsyncProcessor().process(
exchange,
doneSync -> writeResponse(ctx, exchange, getEndpoint().getHeaderFilterStrategy()));
} catch (Exception e) {
ctx.fail(e);
getExceptionHandler().handleException("Failed handling platform-http endpoint " + path, exchg, e);
} finally {
if (exchg != null) {
doneUoW(exchg);
}
}
});

this.route = newRoute;
}

@Override
protected void doStop() throws Exception {
if (route != null) {
route.remove();
route = null;
}
super.doStop();
}

@Override
protected void doSuspend() throws Exception {
if (route != null) {
route.disable();
}
super.doSuspend();
}

@Override
protected void doResume() throws Exception {
if (route != null) {
route.enable();
}
super.doResume();
}

static Object toHttpResponse(HttpServerResponse response, Message message, HeaderFilterStrategy headerFilterStrategy) {
final Exchange exchange = message.getExchange();

Expand Down Expand Up @@ -307,20 +233,6 @@ static void writeResponse(RoutingContext ctx, Exchange camelExchange, HeaderFilt

}

Exchange toExchange(RoutingContext ctx) {
final Exchange exchange = getEndpoint().createExchange();
Message in = toCamelMessage(ctx, exchange);

final String charset = ctx.parsedHeaders().contentType().parameter("charset");
if (charset != null) {
exchange.setProperty(Exchange.CHARSET_NAME, charset);
in.setHeader(Exchange.HTTP_CHARACTER_ENCODING, charset);
}

exchange.setIn(in);
return exchange;
}

static void populateCamelHeaders(
RoutingContext ctx,
Map<String, Object> headersMap,
Expand Down Expand Up @@ -377,6 +289,157 @@ static void populateCamelHeaders(
headersMap.put(Exchange.HTTP_RAW_QUERY, request.query());
}

@SuppressWarnings("unchecked")
static void appendHeader(Map<String, Object> headers, String key, Object value) {
if (headers.containsKey(key)) {
Object existing = headers.get(key);
List<Object> list;
if (existing instanceof List) {
list = (List<Object>) existing;
} else {
list = new ArrayList<>();
list.add(existing);
}
list.add(value);
value = list;
}

headers.put(key, value);
}

@Override
public PlatformHttpEndpoint getEndpoint() {
return (PlatformHttpEndpoint) super.getEndpoint();
}

@Override
protected void doStart() throws Exception {
super.doStart();

final PlatformHttpEndpoint endpoint = getEndpoint();
final String path = endpoint.getPath();
/* Transform from the Camel path param syntax /path/{key} to vert.x web's /path/:key */
final String vertxPathParamPath = PATH_PARAMETER_PATTERN.matcher(path).replaceAll(":$1");
final Route newRoute = router.route(vertxPathParamPath);

final Set<Method> methods = Method.parseList(endpoint.getHttpMethodRestrict());
if (!methods.equals(Method.getAll())) {
methods.stream().forEach(m -> newRoute.method(HttpMethod.valueOf(m.name())));
}
if (endpoint.getConsumes() != null) {
newRoute.consumes(endpoint.getConsumes());
}
if (endpoint.getProduces() != null) {
newRoute.produces(endpoint.getProduces());
}

handlers.forEach(newRoute::handler);

newRoute.handler(this::handleRequest);

this.route = newRoute;
}

@Override
protected void doStop() throws Exception {
if (route != null) {
route.remove();
route = null;
}
super.doStop();
}

@Override
protected void doSuspend() throws Exception {
if (route != null) {
route.disable();
}
super.doSuspend();
}

@Override
protected void doResume() throws Exception {
if (route != null) {
route.enable();
}
super.doResume();
}

private void handleRequest(RoutingContext ctx) {
final Vertx vertx = ctx.vertx();
final Exchange exchange = toExchange(ctx);

//
// We do not know if any of the processing logic of the route is synchronous or not so we
// need to process the request on a thread on the Vert.x worker pool.
//
// As example, assuming the platform-http component is configured as the transport provider
// for the rest dsl, then the following code may result in a blocking operation that could
// block Vert.x event-loop for too long if the target service takes long to respond, as
// example in case the service is a knative service scaled to zero that could take some time
// to be come available:
//
// rest("/results")
// .get("/{id}")
// .route()
// .removeHeaders("*", "CamelHttpPath")
// .to("rest:get:?bridgeEndpoint=true");
//
vertx.executeBlocking(
promise -> {
try {
createUoW(exchange);
} catch (Exception e) {
promise.fail(e);
return;
}

getAsyncProcessor().process(exchange, c -> {
if (!exchange.isFailed()) {
promise.complete();
} else {
promise.fail(exchange.getException());
}
});
},
false,
result -> {
try {
if (result.succeeded()) {
try {
writeResponse(ctx, exchange, getEndpoint().getHeaderFilterStrategy());
} catch (Exception e) {
getExceptionHandler().handleException(
"Failed handling platform-http endpoint " + getEndpoint().getPath(),
e);
}
} else {
getExceptionHandler().handleException(
"Failed handling platform-http endpoint " + getEndpoint().getPath(),
result.cause());

ctx.fail(result.cause());
}
} finally {
doneUoW(exchange);
}
});
}

Exchange toExchange(RoutingContext ctx) {
final Exchange exchange = getEndpoint().createExchange();
Message in = toCamelMessage(ctx, exchange);

final String charset = ctx.parsedHeaders().contentType().parameter("charset");
if (charset != null) {
exchange.setProperty(Exchange.CHARSET_NAME, charset);
in.setHeader(Exchange.HTTP_CHARACTER_ENCODING, charset);
}

exchange.setIn(in);
return exchange;
}

Message toCamelMessage(RoutingContext ctx, Exchange exchange) {
final Message result = new DefaultMessage(exchange);

Expand Down Expand Up @@ -414,24 +477,6 @@ Message toCamelMessage(RoutingContext ctx, Exchange exchange) {
return result;
}

@SuppressWarnings("unchecked")
static void appendHeader(Map<String, Object> headers, String key, Object value) {
if (headers.containsKey(key)) {
Object existing = headers.get(key);
List<Object> list;
if (existing instanceof List) {
list = (List<Object>) existing;
} else {
list = new ArrayList<>();
list.add(existing);
}
list.add(value);
value = list;
}

headers.put(key, value);
}

void populateAttachments(Set<FileUpload> uploads, Message message) {
for (FileUpload upload : uploads) {
final String name = upload.name();
Expand Down

0 comments on commit 00cd740

Please sign in to comment.