Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleaner stack when running HTTP requests. #4768

Merged
merged 2 commits into from
Aug 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/http/src/main/java/io/helidon/common/http/Http.java
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,11 @@ public int length() {
return length;
}

@Override
public String toString() {
return text();
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,46 +47,36 @@ public static Filters create(List<Filter> filters) {
/**
* Filter request.
*
* @param request request
* @param response response
* @param routingHandler this handler is called after all filters finish processing
* (unless a filter does not invoke the chain)
* @param request request
* @param response response
* @param routingExecutor this handler is called after all filters finish processing
* (unless a filter does not invoke the chain)
*/
public void filter(RoutingRequest request, RoutingResponse response, Handler routingHandler) {
public void filter(RoutingRequest request, RoutingResponse response, Runnable routingExecutor) {
if (noFilters) {
routingHandler.handle(request, response);
routingExecutor.run();
return;
}

FilterChain chain = new FilterChainImpl(filters, request, response, routingHandler);
FilterChain chain = new FilterChainImpl(filters, request, response, routingExecutor);
request.path(new FilterRoutedPath(request.prologue().uriPath()));

try {
chain.proceed();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw HttpException.builder()
.message("Failed to process request filters or routing")
.type(SimpleHandler.EventType.INTERNAL_ERROR)
.request(HttpSimpleRequest.create(request.prologue(),
request.headers()))
.cause(e)
.build();
}
chain.proceed();
}

private static final class FilterChainImpl implements FilterChain {
private final Iterator<Filter> filters;
private final Runnable routingExecutor;
private RoutingRequest request;
private RoutingResponse response;
private final Handler routingHandler;

private FilterChainImpl(List<Filter> filters, RoutingRequest request, RoutingResponse response, Handler routingHandler) {
private FilterChainImpl(List<Filter> filters,
RoutingRequest request,
RoutingResponse response,
Runnable routingExecutor) {
this.filters = filters.iterator();
this.request = request;
this.response = response;
this.routingHandler = routingHandler;
this.routingExecutor = routingExecutor;
}

@Override
Expand All @@ -97,7 +87,7 @@ public void proceed() {
if (filters.hasNext()) {
filters.next().filter(this, request, response);
} else {
routingHandler.handle(request, response);
routingExecutor.run();
if (!response.isSent()) {
throw HttpException.builder()
.request(request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ public static HttpRouting empty() {
* @param response routing response
*/
public void route(ConnectionContext ctx, RoutingRequest request, RoutingResponse response) {
RoutingHandler rh = new RoutingHandler(rootRoute, ctx, request, response);
filters.filter(request, response, rh);
RoutingExecutor routingExecutor = new RoutingExecutor(rootRoute, ctx, request, response);
filters.filter(request, response, routingExecutor);
}

@Override
Expand Down Expand Up @@ -278,60 +278,54 @@ public Builder any(String pattern, Handler handler) {
}
}

private static class RoutingHandler implements Handler {
private static final class RoutingExecutor implements Runnable {
private final ConnectionContext ctx;
private final RoutingRequest request;
private final RoutingResponse response;
private final ServiceRoute rootRoute;

private RoutingHandler(ServiceRoute rootRoute, ConnectionContext ctx, RoutingRequest request, RoutingResponse response) {
private RoutingExecutor(ServiceRoute rootRoute, ConnectionContext ctx, RoutingRequest request, RoutingResponse response) {
this.rootRoute = rootRoute;
this.ctx = ctx;
this.request = request;
this.response = response;
}

@Override
public void handle(ServerRequest req, ServerResponse res) {
public void run() {
try {
doHandle();
} catch (HttpException e) {
ctx.simpleHandlers().handle(e, response);
}
}
HttpPrologue prologue = request.prologue();
response.resetRouting();

private void doHandle() {
// ignore parameters, as we need instances of routing request (and we do not want to do an explicit cast)
HttpPrologue prologue = request.prologue();
response.resetRouting();

RoutingResult result = RoutingResult.ROUTE;
int counter = 0;
while (result == RoutingResult.ROUTE) {
counter++;
if (counter == 10) {
LOGGER.log(System.Logger.Level.ERROR, "Rerouted more than 10 times. Will not attempt further routing");

throw HttpException.builder()
.request(HttpSimpleRequest.create(prologue, request.headers()))
.type(SimpleHandler.EventType.INTERNAL_ERROR)
.build();
}
RoutingResult result = RoutingResult.ROUTE;
int counter = 0;
while (result == RoutingResult.ROUTE) {
counter++;
if (counter == 10) {
tomas-langer marked this conversation as resolved.
Show resolved Hide resolved
LOGGER.log(System.Logger.Level.ERROR, "Rerouted more than 10 times. Will not attempt further routing");

result = doRoute(ctx, request, response);
}
throw HttpException.builder()
.request(HttpSimpleRequest.create(prologue, request.headers()))
.type(SimpleHandler.EventType.INTERNAL_ERROR)
.build();
}

// finished and done
if (result == RoutingResult.FINISH) {
return;
}
result = doRoute(ctx, request, response);
}

throw HttpException.builder()
.request(request)
.response(response)
.type(SimpleHandler.EventType.NOT_FOUND)
.message("Endpoint not found")
.build();
// finished and done
if (result == RoutingResult.FINISH) {
return;
}
ctx.simpleHandlers().handle(HttpException.builder()
.request(request)
.response(response)
.type(SimpleHandler.EventType.NOT_FOUND)
.message("Endpoint not found")
.build(), response);
} catch (HttpException e) {
ctx.simpleHandlers().handle(e, response);
}
}

// todo we may have a lru cache (or most commonly used - weighted by number of usages) to
Expand All @@ -349,7 +343,7 @@ private RoutingResult doRoute(ConnectionContext ctx, RoutingRequest request, Rou
if (response.shouldReroute()) {
if (response.isSent()) {
LOGGER.log(System.Logger.Level.WARNING, "Request to " + request.prologue()
+ " in inconsistent state. Request for re-router, but response was already sent. Ignoring "
+ " in inconsistent state. Request to re-route, but response was already sent. Ignoring "
+ "reroute.");
return RoutingResult.FINISH;
}
Expand All @@ -361,17 +355,19 @@ private RoutingResult doRoute(ConnectionContext ctx, RoutingRequest request, Rou
if (response.isNexted()) {
if (response.isSent()) {
LOGGER.log(System.Logger.Level.WARNING, "Request to " + request.prologue()
+ " in inconsistent state. Request was nexted, but response was already sent. "
+ " in inconsistent state. Request to next, but response was already sent. "
+ "Ignoring next().");
return RoutingResult.FINISH;
}
continue;
}
if (!response.hasEntity()) {
// not nexted, not rerouted - just send it!
response.send();
if (response.isSent()) {
return RoutingResult.FINISH;
}

// not nexted, not rerouted - just send it!
response.send();

return RoutingResult.FINISH;
} catch (CloseConnectionException | HttpException e) {
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,82 +107,46 @@ public Http1Connection(ConnectionContext ctx,
@Override
public void handle() throws InterruptedException {
try {
try {
doHandle();
} catch (CloseConnectionException | HttpException | UncheckedIOException e) {
throw e;
} catch (Throwable e) {
throw HttpException.builder()
.message("Internal error")
.type(EventType.INTERNAL_ERROR)
.cause(e)
.build();
}
} catch (HttpException e) {
if (e.fullResponse().isPresent()) {
ctx.simpleHandlers().handle(e, e.fullResponse().get());
return;
}

SimpleHandler handler = ctx.simpleHandlers().handler(e.eventType());
SimpleHandler.SimpleResponse response = handler.handle(e.request(),
e.eventType(),
e.status(),
e.responseHeaders(),
e);

BufferData buffer = BufferData.growing(128);
HeadersServerResponse headers = response.headers();
if (!e.keepAlive()) {
headers.set(HeaderValues.CONNECTION_CLOSE);
}
byte[] message = response.message().orElse(BufferData.EMPTY_BYTES);
if (message.length != 0) {
headers.set(Http.HeaderValue.create(Http.Header.CONTENT_LENGTH, String.valueOf(message.length)));
}
Http1ServerResponse.nonEntityBytes(headers, response.status(), buffer, response.keepAlive());
if (message.length != 0) {
buffer.write(message);
}

writer.write(buffer);

if (response.status() == Http.Status.INTERNAL_SERVER_ERROR_500) {
LOGGER.log(WARNING, "Internal server error", e);
}
}
}
// handle connection until an exception (or explicit connection close)
while (true) {
// prologue (first line of request)
HttpPrologue prologue = http1prologue.readPrologue();
recvListener.prologue(ctx, prologue);
currentEntitySize = 0;
currentEntitySizeRead = 0;

private void doHandle() throws InterruptedException {
// handle connection until an exception (or explicit connection close)
while (true) {
// prologue (first line of request)
HttpPrologue prologue = http1prologue.readPrologue();
recvListener.prologue(ctx, prologue);
currentEntitySize = 0;
currentEntitySizeRead = 0;
HeadersWritable<?> headers = http1headers.readHeaders(prologue);
recvListener.headers(ctx, headers);

HeadersWritable<?> headers = http1headers.readHeaders(prologue);
recvListener.headers(ctx, headers);

if (canUpgrade) {
if (headers.contains(Http.Header.UPGRADE)) {
Http1UpgradeProvider upgrader = upgradeProviderMap.get(headers.get(Http.Header.UPGRADE).value());
if (upgrader != null) {
ServerConnection upgradeConnection = upgrader.upgrade(ctx, prologue, headers);
// upgrader may decide not to upgrade this connection
if (upgradeConnection != null) {
if (LOGGER.isLoggable(TRACE)) {
LOGGER.log(TRACE, "Connection upgrade using " + upgradeConnection);
if (canUpgrade) {
if (headers.contains(Http.Header.UPGRADE)) {
Http1UpgradeProvider upgrader = upgradeProviderMap.get(headers.get(Http.Header.UPGRADE).value());
if (upgrader != null) {
ServerConnection upgradeConnection = upgrader.upgrade(ctx, prologue, headers);
// upgrader may decide not to upgrade this connection
if (upgradeConnection != null) {
if (LOGGER.isLoggable(TRACE)) {
LOGGER.log(TRACE, "Connection upgrade using " + upgradeConnection);
}
// this will block until the connection terminates
upgradeConnection.handle();
return;
}
// this will block until the connection terminates
upgradeConnection.handle();
return;
}
}
}
route(prologue, headers);
}
route(prologue, headers);
} catch (CloseConnectionException | UncheckedIOException e) {
throw e;
} catch (HttpException e) {
handleHttpException(e);
} catch (Throwable e) {
handleHttpException(HttpException.builder()
.message("Internal error")
.type(EventType.INTERNAL_ERROR)
.cause(e)
.build());
}
}

Expand Down Expand Up @@ -370,4 +334,38 @@ private void consumeEntity(Http1ServerRequest request, Http1ServerResponse respo
throw new CloseConnectionException("Failed to consume request entity, must close", e);
}
}

private void handleHttpException(HttpException e) {
if (e.fullResponse().isPresent()) {
ctx.simpleHandlers().handle(e, e.fullResponse().get());
return;
}

SimpleHandler handler = ctx.simpleHandlers().handler(e.eventType());
SimpleHandler.SimpleResponse response = handler.handle(e.request(),
e.eventType(),
e.status(),
e.responseHeaders(),
e);

BufferData buffer = BufferData.growing(128);
HeadersServerResponse headers = response.headers();
if (!e.keepAlive()) {
headers.set(HeaderValues.CONNECTION_CLOSE);
}
byte[] message = response.message().orElse(BufferData.EMPTY_BYTES);
if (message.length != 0) {
headers.set(Http.HeaderValue.create(Http.Header.CONTENT_LENGTH, String.valueOf(message.length)));
}
Http1ServerResponse.nonEntityBytes(headers, response.status(), buffer, response.keepAlive());
if (message.length != 0) {
buffer.write(message);
}

writer.write(buffer);

if (response.status() == Http.Status.INTERNAL_SERVER_ERROR_500) {
LOGGER.log(WARNING, "Internal server error", e);
}
}
}