diff --git a/src/main/java/org/rakam/server/http/HaProxyBackendServerHandler.java b/src/main/java/org/rakam/server/http/HaProxyBackendServerHandler.java index 77ecf00..7b6236f 100644 --- a/src/main/java/org/rakam/server/http/HaProxyBackendServerHandler.java +++ b/src/main/java/org/rakam/server/http/HaProxyBackendServerHandler.java @@ -5,12 +5,14 @@ import io.netty.util.AttributeKey; import io.netty.util.internal.ConcurrentSet; +import java.util.List; + public class HaProxyBackendServerHandler extends HttpServerHandler { AttributeKey CLIENT_IP = AttributeKey.valueOf("ip"); - public HaProxyBackendServerHandler(ConcurrentSet activeChannels, HttpServer httpServer) { - super(activeChannels, httpServer); + public HaProxyBackendServerHandler(ConcurrentSet activeChannels, HttpServer httpServer, List postProcessors) { + super(activeChannels, httpServer, postProcessors); } @Override diff --git a/src/main/java/org/rakam/server/http/HttpServer.java b/src/main/java/org/rakam/server/http/HttpServer.java index e48b851..68efcda 100644 --- a/src/main/java/org/rakam/server/http/HttpServer.java +++ b/src/main/java/org/rakam/server/http/HttpServer.java @@ -298,12 +298,9 @@ else if (isRawRequestMethod(method)) { private HttpRequestHandler getJsonRequestHandler(Method method, HttpService service) { final List preprocessorRequest = getPreprocessorRequest(method); - List postProcessors = getPostPreprocessorsRequest(method); if (method.getParameterCount() == 1 && method.getParameters()[0].isAnnotationPresent(BodyParam.class)) { - return new JsonBeanRequestHandler(this, mapper, method, - getPreprocessorRequest(method), postProcessors, - service); + return new JsonBeanRequestHandler(this, mapper, method, getPreprocessorRequest(method), service); } ArrayList bodyParams = new ArrayList<>(); @@ -333,11 +330,11 @@ private HttpRequestHandler getJsonRequestHandler(Method method, HttpService serv invoke = lambda.apply(service); } catch (Throwable e) { - requestError(e.getCause(), request, postProcessors); + requestError(e.getCause(), request); return; } - handleRequest(mapper, isAsync, invoke, request, postProcessors); + handleRequest(mapper, isAsync, invoke, request); }; } else { @@ -351,7 +348,7 @@ private HttpRequestHandler getJsonRequestHandler(Method method, HttpService serv } return new JsonParametrizedRequestHandler(this, mapper, bodyParams, - methodHandle, postProcessors, service, + methodHandle, service, preprocessorForJsonRequest, isAsync, !method.getReturnType().equals(void.class)); } } @@ -364,17 +361,16 @@ private List getPreprocessorRequest(Method method) private List getPostPreprocessorsRequest(Method method) { - return postProcessors.stream() - .filter(p -> p.test(method)).map(PostProcessorEntry::getProcessor).collect(Collectors.toList()); + return postProcessors.stream().map(PostProcessorEntry::getProcessor).collect(Collectors.toList()); } - void handleRequest(ObjectMapper mapper, boolean isAsync, Object invoke, RakamHttpRequest request, List postProcessors) + void handleRequest(ObjectMapper mapper, boolean isAsync, Object invoke, RakamHttpRequest request) { if (isAsync) { - handleAsyncJsonRequest(mapper, request, (CompletionStage) invoke, postProcessors); + handleAsyncJsonRequest(mapper, request, (CompletionStage) invoke); } else { - returnJsonResponse(mapper, request, OK, invoke, postProcessors); + returnJsonResponse(mapper, request, OK, invoke); } } @@ -388,7 +384,6 @@ private boolean isRawRequestMethod(Method method) private HttpRequestHandler generateRawRequestHandler(HttpService service, Method method) { List requestPreprocessors = getPreprocessorRequest(method); - List postProcessors = getPostPreprocessorsRequest(method); // we don't need to pass service object is the method is static. // it's also better for performance since there will be only one object to send the stack. @@ -404,7 +399,7 @@ private HttpRequestHandler generateRawRequestHandler(HttpService service, Method lambda.accept(request); } catch (Exception e) { - requestError(e, request, postProcessors); + requestError(e, request); } }; } @@ -416,7 +411,7 @@ private HttpRequestHandler generateRawRequestHandler(HttpService service, Method lambda.accept(service, request); } catch (Exception e) { - requestError(e, request, postProcessors); + requestError(e, request); } }; } @@ -427,7 +422,6 @@ private HttpRequestHandler createGetRequestHandler(HttpService service, Method m boolean isAsync = CompletionStage.class.isAssignableFrom(method.getReturnType()); final List preprocessors = getPreprocessorRequest(method); - List postProcessors = getPostPreprocessorsRequest(method); if (method.getParameterCount() == 0) { Function function = produceLambdaForFunction(method); @@ -438,7 +432,7 @@ private HttpRequestHandler createGetRequestHandler(HttpService service, Method m } } catch (Throwable e) { - requestError(e, request, postProcessors); + requestError(e, request); return; } @@ -448,13 +442,13 @@ private HttpRequestHandler createGetRequestHandler(HttpService service, Method m apply = (CompletionStage) function.apply(service); } catch (Exception e) { - requestError(e.getCause(), request, postProcessors); + requestError(e.getCause(), request); return; } - handleAsyncJsonRequest(mapper, request, apply, postProcessors); + handleAsyncJsonRequest(mapper, request, apply); } else { - handleJsonRequest(mapper, service, request, function, postProcessors); + handleJsonRequest(mapper, service, request, function); } }; } @@ -479,7 +473,7 @@ private HttpRequestHandler createGetRequestHandler(HttpService service, Method m } } catch (Throwable e) { - requestError(e, request, postProcessors); + requestError(e, request); return; } @@ -493,7 +487,7 @@ private HttpRequestHandler createGetRequestHandler(HttpService service, Method m } } catch (Exception e) { - requestError(e, request, postProcessors); + requestError(e, request); return; } @@ -503,13 +497,13 @@ private HttpRequestHandler createGetRequestHandler(HttpService service, Method m apply = (CompletionStage) methodHandle.invokeWithArguments(objects); } catch (Throwable e) { - requestError(e.getCause(), request, postProcessors); + requestError(e.getCause(), request); return; } - handleAsyncJsonRequest(mapper, request, apply, postProcessors); + handleAsyncJsonRequest(mapper, request, apply); } else { - handleJsonRequest(mapper, request, methodHandle, objects, postProcessors); + handleJsonRequest(mapper, request, methodHandle, objects); } }; } @@ -622,41 +616,40 @@ else if (parameter.isAnnotationPresent(QueryParam.class)) { } } - void handleJsonRequest(ObjectMapper mapper, RakamHttpRequest request, MethodHandle methodHandle, Object[] arguments, List postProcessors) + void handleJsonRequest(ObjectMapper mapper, RakamHttpRequest request, MethodHandle methodHandle, Object[] arguments) { try { Object apply = methodHandle.invokeWithArguments(arguments); - returnJsonResponse(mapper, request, OK, apply, postProcessors); + returnJsonResponse(mapper, request, OK, apply); } catch (HttpRequestException e) { uncaughtExceptionHandler.handle(request, e); HttpResponseStatus statusCode = e.getStatusCode(); - returnJsonResponse(mapper, request, statusCode, new ErrorMessage(e.getErrors(), e.getMeta()), postProcessors); + returnJsonResponse(mapper, request, statusCode, new ErrorMessage(e.getErrors(), e.getMeta())); } catch (Throwable e) { uncaughtExceptionHandler.handle(request, e); LOGGER.error(e, "An uncaught exception raised while processing request."); - returnJsonResponse(mapper, request, BAD_REQUEST, getErrorMessage(e), postProcessors); + returnJsonResponse(mapper, request, INTERNAL_SERVER_ERROR, getErrorMessage(e)); } } - void handleJsonRequest(ObjectMapper mapper, HttpService serviceInstance, RakamHttpRequest request, Function function, List postProcessors) + void handleJsonRequest(ObjectMapper mapper, HttpService serviceInstance, RakamHttpRequest request, Function function) { try { Object apply = function.apply(serviceInstance); - returnJsonResponse(mapper, request, OK, apply, postProcessors); + returnJsonResponse(mapper, request, OK, apply); } catch (HttpRequestException ex) { uncaughtExceptionHandler.handle(request, ex); HttpResponseStatus statusCode = ex.getStatusCode(); - returnJsonResponse(mapper, request, statusCode, new ErrorMessage(ex.getErrors(), ex.getMeta()), postProcessors); + returnJsonResponse(mapper, request, statusCode, new ErrorMessage(ex.getErrors(), ex.getMeta())); } catch (Throwable e) { uncaughtExceptionHandler.handle(request, e); LOGGER.error(e, "An uncaught exception raised while processing request."); - returnJsonResponse(mapper, request, INTERNAL_SERVER_ERROR, - getErrorMessage(e), postProcessors); + returnJsonResponse(mapper, request, INTERNAL_SERVER_ERROR, getErrorMessage(e)); } } @@ -674,7 +667,7 @@ private ErrorMessage getErrorMessage(Throwable e) } } - private static void returnJsonResponse(ObjectMapper mapper, RakamHttpRequest request, HttpResponseStatus status, Object apply, List postProcessors) + private static void returnJsonResponse(ObjectMapper mapper, RakamHttpRequest request, HttpResponseStatus status, Object apply) { FullHttpResponse response; try { @@ -698,23 +691,10 @@ private static void returnJsonResponse(ObjectMapper mapper, RakamHttpRequest req throw new RuntimeException("couldn't serialize object", e); } response.headers().set(CONTENT_TYPE, "application/json; charset=utf-8"); - - if (postProcessors != null) { - applyPostProcessors(response, postProcessors); - } request.response(response).end(); } - static void applyPostProcessors(FullHttpResponse httpResponse, List postProcessors) - { - if (!postProcessors.isEmpty()) { - for (ResponsePostProcessor postProcessor : postProcessors) { - postProcessor.handle(httpResponse); - } - } - } - - void handleAsyncJsonRequest(ObjectMapper mapper, RakamHttpRequest request, CompletionStage apply, List postProcessors) + void handleAsyncJsonRequest(ObjectMapper mapper, RakamHttpRequest request, CompletionStage apply) { if (apply == null) { NullPointerException e = new NullPointerException(); @@ -729,10 +709,10 @@ void handleAsyncJsonRequest(ObjectMapper mapper, RakamHttpRequest request, Compl } uncaughtExceptionHandler.handle(request, ex); - requestError(ex, request, postProcessors); + requestError(ex, request); } else { - returnJsonResponse(mapper, request, OK, result, postProcessors); + returnJsonResponse(mapper, request, OK, result); } }); } @@ -780,10 +760,10 @@ protected void initChannel(SocketChannel ch) HttpServerHandler handler; if (proxyProtocol) { p.addLast(new HAProxyMessageDecoder()); - handler = new HaProxyBackendServerHandler(activeChannels, HttpServer.this); + handler = new HaProxyBackendServerHandler(activeChannels, HttpServer.this, postProcessors); } else { - handler = new HttpServerHandler(activeChannels, HttpServer.this); + handler = new HttpServerHandler(activeChannels, HttpServer.this, postProcessors); } // make it configurable @@ -887,16 +867,16 @@ public static JsonAPIError codeTitleDetail(String code, String title, String det } } - void requestError(Throwable ex, RakamHttpRequest request, List postProcessors) + void requestError(Throwable ex, RakamHttpRequest request) { uncaughtExceptionHandler.handle(request, ex); if (ex instanceof HttpRequestException) { HttpRequestException httpEx = (HttpRequestException) ex; HttpResponseStatus statusCode = httpEx.getStatusCode(); - returnJsonResponse(mapper, request, statusCode, new ErrorMessage(httpEx.getErrors(), httpEx.getMeta()), postProcessors); + returnJsonResponse(mapper, request, statusCode, new ErrorMessage(httpEx.getErrors(), httpEx.getMeta())); } else { - returnJsonResponse(mapper, request, INTERNAL_SERVER_ERROR, getErrorMessage(ex), postProcessors); + returnJsonResponse(mapper, request, INTERNAL_SERVER_ERROR, getErrorMessage(ex)); LOGGER.error(ex, "Error while processing request"); } } diff --git a/src/main/java/org/rakam/server/http/HttpServerBuilder.java b/src/main/java/org/rakam/server/http/HttpServerBuilder.java index 7343618..d4f236a 100644 --- a/src/main/java/org/rakam/server/http/HttpServerBuilder.java +++ b/src/main/java/org/rakam/server/http/HttpServerBuilder.java @@ -56,9 +56,9 @@ public HttpServerBuilder addJsonPreprocessor(RequestPreprocessor preprocessor, P return this; } - public HttpServerBuilder addPostProcessor(ResponsePostProcessor processor, Predicate predicate) + public HttpServerBuilder addPostProcessor(ResponsePostProcessor processor) { - postProcessorEntryBuilder.add(new PostProcessorEntry(processor, predicate)); + postProcessorEntryBuilder.add(new PostProcessorEntry(processor)); return this; } diff --git a/src/main/java/org/rakam/server/http/HttpServerHandler.java b/src/main/java/org/rakam/server/http/HttpServerHandler.java index a08d3d4..6420e74 100644 --- a/src/main/java/org/rakam/server/http/HttpServerHandler.java +++ b/src/main/java/org/rakam/server/http/HttpServerHandler.java @@ -32,30 +32,30 @@ public class HttpServerHandler private final HttpServer server; private final ConcurrentSet activeChannels; + private final List postProcessors; protected RakamHttpRequest request; private List body; - public HttpServerHandler(ConcurrentSet activeChannels, HttpServer server) + public HttpServerHandler(ConcurrentSet activeChannels, HttpServer server, List postProcessors) { this.server = server; this.activeChannels = activeChannels; + this.postProcessors = postProcessors; } RakamHttpRequest createRequest(ChannelHandlerContext ctx) { - return new RakamHttpRequest(ctx); + return new RakamHttpRequest(ctx, postProcessors); } @Override public void channelActive(ChannelHandlerContext ctx) - throws Exception { activeChannels.add(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) - throws Exception { activeChannels.remove(ctx); } @@ -134,6 +134,8 @@ else if (msg instanceof HttpContent) { } } if (value > server.maximumBodySize) { + String errorMessage = "Body is too large"; + server.uncaughtExceptionHandler.handle(request, new HttpRequestException(errorMessage, REQUEST_ENTITY_TOO_LARGE)); HttpServer.returnError(request, "Body is too large.", REQUEST_ENTITY_TOO_LARGE); ctx.close(); } diff --git a/src/main/java/org/rakam/server/http/IRequestParameter.java b/src/main/java/org/rakam/server/http/IRequestParameter.java index 7919fbe..53428f4 100644 --- a/src/main/java/org/rakam/server/http/IRequestParameter.java +++ b/src/main/java/org/rakam/server/http/IRequestParameter.java @@ -40,7 +40,11 @@ public T extract(ObjectNode node, RakamHttpRequest request) throw new HttpRequestException("'" + name + "' header parameter is required.", BAD_REQUEST); } - return mapper.apply(value); + try { + return mapper.apply(value); + } catch (Exception e) { + throw new HttpRequestException(String.format("Unable to parse header parameter %s", name), BAD_REQUEST); + } } } @@ -101,7 +105,11 @@ public T extract(ObjectNode node, RakamHttpRequest request) } } - return mapper.apply(strings.get(0)); + try { + return mapper.apply(strings.get(0)); + } catch (Exception e) { + throw new HttpRequestException(String.format("Unable to parse query parameter %s", name), BAD_REQUEST); + } } } diff --git a/src/main/java/org/rakam/server/http/JsonBeanRequestHandler.java b/src/main/java/org/rakam/server/http/JsonBeanRequestHandler.java index 83ce06f..cea423b 100644 --- a/src/main/java/org/rakam/server/http/JsonBeanRequestHandler.java +++ b/src/main/java/org/rakam/server/http/JsonBeanRequestHandler.java @@ -25,18 +25,15 @@ public class JsonBeanRequestHandler implements HttpRequestHandler { private final boolean isAsync; private final HttpService service; private final List requestPreprocessors; - private final List postProcessors; private final HttpServer httpServer; private final BiFunction function; public JsonBeanRequestHandler(HttpServer httpServer, ObjectMapper mapper, Method method, List requestPreprocessors, - List postProcessors, HttpService service) { this.httpServer = httpServer; this.mapper = mapper; this.service = service; - this.postProcessors = postProcessors; function = Lambda.produceLambdaForBiFunction(method); @@ -79,7 +76,7 @@ public void handle(RakamHttpRequest request) { } } } catch (Throwable e) { - httpServer.requestError(e, request, postProcessors); + httpServer.requestError(e, request); return; } @@ -88,12 +85,12 @@ public void handle(RakamHttpRequest request) { try { apply = (CompletionStage) function.apply(service, json); } catch (Throwable e) { - httpServer.requestError(e, request, postProcessors); + httpServer.requestError(e, request); return; } - httpServer.handleAsyncJsonRequest(mapper, request, apply, postProcessors); + httpServer.handleAsyncJsonRequest(mapper, request, apply); } else { - httpServer.handleJsonRequest(mapper, service, request, (service) -> function.apply(service, json), postProcessors); + httpServer.handleJsonRequest(mapper, service, request, (service) -> function.apply(service, json)); } }); } diff --git a/src/main/java/org/rakam/server/http/JsonParametrizedRequestHandler.java b/src/main/java/org/rakam/server/http/JsonParametrizedRequestHandler.java index 0cd45ca..aa6bf8a 100644 --- a/src/main/java/org/rakam/server/http/JsonParametrizedRequestHandler.java +++ b/src/main/java/org/rakam/server/http/JsonParametrizedRequestHandler.java @@ -39,14 +39,12 @@ public class JsonParametrizedRequestHandler implements HttpRequestHandler { emptyObject = DEFAULT_MAPPER.createObjectNode(); } - private final List postProcessors; private final HttpServer httpServer; private final boolean isJson; public JsonParametrizedRequestHandler(HttpServer httpServer, ObjectMapper mapper, List bodyParams, MethodHandle methodHandle, - List postProcessors, HttpService service, List jsonPreprocessors, boolean isAsync, boolean isJson) { @@ -55,7 +53,6 @@ public JsonParametrizedRequestHandler(HttpServer httpServer, ObjectMapper mapper this.bodyParams = bodyParams; this.methodHandle = methodHandle; this.service = service; - this.postProcessors = postProcessors; this.isAsync = isAsync; this.jsonPreprocessors = jsonPreprocessors; this.isJson = isJson; @@ -109,7 +106,7 @@ private void handleInternal(RakamHttpRequest request, ObjectNode node) { } } } catch (Throwable e) { - httpServer.requestError(e, request, postProcessors); + httpServer.requestError(e, request); return; } @@ -121,7 +118,7 @@ private void handleInternal(RakamHttpRequest request, ObjectNode node) { try { value = param.extract(node, request); } catch (Exception e) { - httpServer.requestError(e, request, postProcessors); + httpServer.requestError(e, request); return; } @@ -132,12 +129,12 @@ private void handleInternal(RakamHttpRequest request, ObjectNode node) { try { invoke = methodHandle.invokeWithArguments(values); } catch (Throwable e) { - httpServer.requestError(e, request, postProcessors); + httpServer.requestError(e, request); return; } if (isJson) { - httpServer.handleRequest(mapper, isAsync, invoke, request, postProcessors); + httpServer.handleRequest(mapper, isAsync, invoke, request); } } } diff --git a/src/main/java/org/rakam/server/http/PostProcessorEntry.java b/src/main/java/org/rakam/server/http/PostProcessorEntry.java index 8a9f9f8..2af3d25 100644 --- a/src/main/java/org/rakam/server/http/PostProcessorEntry.java +++ b/src/main/java/org/rakam/server/http/PostProcessorEntry.java @@ -5,16 +5,10 @@ public class PostProcessorEntry { - private final Predicate predicate; private final ResponsePostProcessor processor; - public PostProcessorEntry(ResponsePostProcessor processor, Predicate predicate) { + public PostProcessorEntry(ResponsePostProcessor processor) { this.processor = processor; - this.predicate = predicate; - } - - public boolean test(Method method) { - return predicate.test(method); } public ResponsePostProcessor getProcessor() { diff --git a/src/main/java/org/rakam/server/http/RakamHttpRequest.java b/src/main/java/org/rakam/server/http/RakamHttpRequest.java index a69cf32..c7519dd 100644 --- a/src/main/java/org/rakam/server/http/RakamHttpRequest.java +++ b/src/main/java/org/rakam/server/http/RakamHttpRequest.java @@ -31,6 +31,7 @@ public class RakamHttpRequest private final static InputStream REQUEST_DONE_STREAM = new InvalidInputStream(); private final ChannelHandlerContext ctx; + private final List postProcessors; private HttpRequest request; private FullHttpResponse response; private Map responseHeaders; @@ -47,8 +48,9 @@ public boolean equals(Object o) { private QueryStringDecoder qs; private String remoteAddress; - public RakamHttpRequest(ChannelHandlerContext ctx) { + public RakamHttpRequest(ChannelHandlerContext ctx, List postProcessors) { this.ctx = ctx; + this.postProcessors = postProcessors; } void setRequest(HttpRequest request) { @@ -220,6 +222,11 @@ public void end() { } } + for (PostProcessorEntry postProcessor : postProcessors) { + postProcessor.getProcessor().handle(response); + } + + if (keepAlive) { response.headers().set(CONTENT_LENGTH, response.content().readableBytes()); response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE); @@ -323,8 +330,7 @@ private static class InvalidInputStream extends InputStream { @Override - public int read() - throws IOException { + public int read() { throw new UnsupportedOperationException(); } }