Skip to content

Commit

Permalink
Move postProcessors to http request handler
Browse files Browse the repository at this point in the history
  • Loading branch information
buremba committed Oct 24, 2018
1 parent b5fc8dc commit f14a9a3
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 90 deletions.
Expand Up @@ -5,12 +5,14 @@
import io.netty.util.AttributeKey;
import io.netty.util.internal.ConcurrentSet;

import java.util.List;

public class HaProxyBackendServerHandler extends HttpServerHandler {
AttributeKey<String> CLIENT_IP = AttributeKey.valueOf("ip");


public HaProxyBackendServerHandler(ConcurrentSet activeChannels, HttpServer httpServer) {
super(activeChannels, httpServer);
public HaProxyBackendServerHandler(ConcurrentSet activeChannels, HttpServer httpServer, List<PostProcessorEntry> postProcessors) {
super(activeChannels, httpServer, postProcessors);
}

@Override
Expand Down
92 changes: 36 additions & 56 deletions src/main/java/org/rakam/server/http/HttpServer.java
Expand Up @@ -298,12 +298,9 @@ else if (isRawRequestMethod(method)) {
private HttpRequestHandler getJsonRequestHandler(Method method, HttpService service)
{
final List<RequestPreprocessor> preprocessorRequest = getPreprocessorRequest(method);
List<ResponsePostProcessor> postProcessors = getPostPreprocessorsRequest(method);

if (method.getParameterCount() == 1 && method.getParameters()[0].isAnnotationPresent(BodyParam.class)) {
return new JsonBeanRequestHandler(this, mapper, method,
getPreprocessorRequest(method), postProcessors,
service);
return new JsonBeanRequestHandler(this, mapper, method, getPreprocessorRequest(method), service);
}

ArrayList<IRequestParameter> bodyParams = new ArrayList<>();
Expand Down Expand Up @@ -333,11 +330,11 @@ private HttpRequestHandler getJsonRequestHandler(Method method, HttpService serv
invoke = lambda.apply(service);
}
catch (Throwable e) {
requestError(e.getCause(), request, postProcessors);
requestError(e.getCause(), request);
return;
}

handleRequest(mapper, isAsync, invoke, request, postProcessors);
handleRequest(mapper, isAsync, invoke, request);
};
}
else {
Expand All @@ -351,7 +348,7 @@ private HttpRequestHandler getJsonRequestHandler(Method method, HttpService serv
}

return new JsonParametrizedRequestHandler(this, mapper, bodyParams,
methodHandle, postProcessors, service,
methodHandle, service,
preprocessorForJsonRequest, isAsync, !method.getReturnType().equals(void.class));
}
}
Expand All @@ -364,17 +361,16 @@ private List<RequestPreprocessor> getPreprocessorRequest(Method method)

private List<ResponsePostProcessor> getPostPreprocessorsRequest(Method method)
{
return postProcessors.stream()
.filter(p -> p.test(method)).map(PostProcessorEntry::getProcessor).collect(Collectors.toList());
return postProcessors.stream().map(PostProcessorEntry::getProcessor).collect(Collectors.toList());
}

void handleRequest(ObjectMapper mapper, boolean isAsync, Object invoke, RakamHttpRequest request, List<ResponsePostProcessor> postProcessors)
void handleRequest(ObjectMapper mapper, boolean isAsync, Object invoke, RakamHttpRequest request)
{
if (isAsync) {
handleAsyncJsonRequest(mapper, request, (CompletionStage) invoke, postProcessors);
handleAsyncJsonRequest(mapper, request, (CompletionStage) invoke);
}
else {
returnJsonResponse(mapper, request, OK, invoke, postProcessors);
returnJsonResponse(mapper, request, OK, invoke);
}
}

Expand All @@ -388,7 +384,6 @@ private boolean isRawRequestMethod(Method method)
private HttpRequestHandler generateRawRequestHandler(HttpService service, Method method)
{
List<RequestPreprocessor> requestPreprocessors = getPreprocessorRequest(method);
List<ResponsePostProcessor> postProcessors = getPostPreprocessorsRequest(method);

// we don't need to pass service object is the method is static.
// it's also better for performance since there will be only one object to send the stack.
Expand All @@ -404,7 +399,7 @@ private HttpRequestHandler generateRawRequestHandler(HttpService service, Method
lambda.accept(request);
}
catch (Exception e) {
requestError(e, request, postProcessors);
requestError(e, request);
}
};
}
Expand All @@ -416,7 +411,7 @@ private HttpRequestHandler generateRawRequestHandler(HttpService service, Method
lambda.accept(service, request);
}
catch (Exception e) {
requestError(e, request, postProcessors);
requestError(e, request);
}
};
}
Expand All @@ -427,7 +422,6 @@ private HttpRequestHandler createGetRequestHandler(HttpService service, Method m
boolean isAsync = CompletionStage.class.isAssignableFrom(method.getReturnType());

final List<RequestPreprocessor> preprocessors = getPreprocessorRequest(method);
List<ResponsePostProcessor> postProcessors = getPostPreprocessorsRequest(method);

if (method.getParameterCount() == 0) {
Function<HttpService, Object> function = produceLambdaForFunction(method);
Expand All @@ -438,7 +432,7 @@ private HttpRequestHandler createGetRequestHandler(HttpService service, Method m
}
}
catch (Throwable e) {
requestError(e, request, postProcessors);
requestError(e, request);
return;
}

Expand All @@ -448,13 +442,13 @@ private HttpRequestHandler createGetRequestHandler(HttpService service, Method m
apply = (CompletionStage) function.apply(service);
}
catch (Exception e) {
requestError(e.getCause(), request, postProcessors);
requestError(e.getCause(), request);
return;
}
handleAsyncJsonRequest(mapper, request, apply, postProcessors);
handleAsyncJsonRequest(mapper, request, apply);
}
else {
handleJsonRequest(mapper, service, request, function, postProcessors);
handleJsonRequest(mapper, service, request, function);
}
};
}
Expand All @@ -479,7 +473,7 @@ private HttpRequestHandler createGetRequestHandler(HttpService service, Method m
}
}
catch (Throwable e) {
requestError(e, request, postProcessors);
requestError(e, request);
return;
}

Expand All @@ -493,7 +487,7 @@ private HttpRequestHandler createGetRequestHandler(HttpService service, Method m
}
}
catch (Exception e) {
requestError(e, request, postProcessors);
requestError(e, request);
return;
}

Expand All @@ -503,13 +497,13 @@ private HttpRequestHandler createGetRequestHandler(HttpService service, Method m
apply = (CompletionStage) methodHandle.invokeWithArguments(objects);
}
catch (Throwable e) {
requestError(e.getCause(), request, postProcessors);
requestError(e.getCause(), request);
return;
}
handleAsyncJsonRequest(mapper, request, apply, postProcessors);
handleAsyncJsonRequest(mapper, request, apply);
}
else {
handleJsonRequest(mapper, request, methodHandle, objects, postProcessors);
handleJsonRequest(mapper, request, methodHandle, objects);
}
};
}
Expand Down Expand Up @@ -622,41 +616,40 @@ else if (parameter.isAnnotationPresent(QueryParam.class)) {
}
}

void handleJsonRequest(ObjectMapper mapper, RakamHttpRequest request, MethodHandle methodHandle, Object[] arguments, List<ResponsePostProcessor> postProcessors)
void handleJsonRequest(ObjectMapper mapper, RakamHttpRequest request, MethodHandle methodHandle, Object[] arguments)
{
try {
Object apply = methodHandle.invokeWithArguments(arguments);
returnJsonResponse(mapper, request, OK, apply, postProcessors);
returnJsonResponse(mapper, request, OK, apply);
}
catch (HttpRequestException e) {
uncaughtExceptionHandler.handle(request, e);
HttpResponseStatus statusCode = e.getStatusCode();
returnJsonResponse(mapper, request, statusCode, new ErrorMessage(e.getErrors(), e.getMeta()), postProcessors);
returnJsonResponse(mapper, request, statusCode, new ErrorMessage(e.getErrors(), e.getMeta()));
}
catch (Throwable e) {
uncaughtExceptionHandler.handle(request, e);
LOGGER.error(e, "An uncaught exception raised while processing request.");

returnJsonResponse(mapper, request, BAD_REQUEST, getErrorMessage(e), postProcessors);
returnJsonResponse(mapper, request, INTERNAL_SERVER_ERROR, getErrorMessage(e));
}
}

void handleJsonRequest(ObjectMapper mapper, HttpService serviceInstance, RakamHttpRequest request, Function<HttpService, Object> function, List<ResponsePostProcessor> postProcessors)
void handleJsonRequest(ObjectMapper mapper, HttpService serviceInstance, RakamHttpRequest request, Function<HttpService, Object> function)
{
try {
Object apply = function.apply(serviceInstance);
returnJsonResponse(mapper, request, OK, apply, postProcessors);
returnJsonResponse(mapper, request, OK, apply);
}
catch (HttpRequestException ex) {
uncaughtExceptionHandler.handle(request, ex);
HttpResponseStatus statusCode = ex.getStatusCode();
returnJsonResponse(mapper, request, statusCode, new ErrorMessage(ex.getErrors(), ex.getMeta()), postProcessors);
returnJsonResponse(mapper, request, statusCode, new ErrorMessage(ex.getErrors(), ex.getMeta()));
}
catch (Throwable e) {
uncaughtExceptionHandler.handle(request, e);
LOGGER.error(e, "An uncaught exception raised while processing request.");
returnJsonResponse(mapper, request, INTERNAL_SERVER_ERROR,
getErrorMessage(e), postProcessors);
returnJsonResponse(mapper, request, INTERNAL_SERVER_ERROR, getErrorMessage(e));
}
}

Expand All @@ -674,7 +667,7 @@ private ErrorMessage getErrorMessage(Throwable e)
}
}

private static void returnJsonResponse(ObjectMapper mapper, RakamHttpRequest request, HttpResponseStatus status, Object apply, List<ResponsePostProcessor> postProcessors)
private static void returnJsonResponse(ObjectMapper mapper, RakamHttpRequest request, HttpResponseStatus status, Object apply)
{
FullHttpResponse response;
try {
Expand All @@ -698,23 +691,10 @@ private static void returnJsonResponse(ObjectMapper mapper, RakamHttpRequest req
throw new RuntimeException("couldn't serialize object", e);
}
response.headers().set(CONTENT_TYPE, "application/json; charset=utf-8");

if (postProcessors != null) {
applyPostProcessors(response, postProcessors);
}
request.response(response).end();
}

static void applyPostProcessors(FullHttpResponse httpResponse, List<ResponsePostProcessor> postProcessors)
{
if (!postProcessors.isEmpty()) {
for (ResponsePostProcessor postProcessor : postProcessors) {
postProcessor.handle(httpResponse);
}
}
}

void handleAsyncJsonRequest(ObjectMapper mapper, RakamHttpRequest request, CompletionStage apply, List<ResponsePostProcessor> postProcessors)
void handleAsyncJsonRequest(ObjectMapper mapper, RakamHttpRequest request, CompletionStage apply)
{
if (apply == null) {
NullPointerException e = new NullPointerException();
Expand All @@ -729,10 +709,10 @@ void handleAsyncJsonRequest(ObjectMapper mapper, RakamHttpRequest request, Compl
}
uncaughtExceptionHandler.handle(request, ex);

requestError(ex, request, postProcessors);
requestError(ex, request);
}
else {
returnJsonResponse(mapper, request, OK, result, postProcessors);
returnJsonResponse(mapper, request, OK, result);
}
});
}
Expand Down Expand Up @@ -780,10 +760,10 @@ protected void initChannel(SocketChannel ch)
HttpServerHandler handler;
if (proxyProtocol) {
p.addLast(new HAProxyMessageDecoder());
handler = new HaProxyBackendServerHandler(activeChannels, HttpServer.this);
handler = new HaProxyBackendServerHandler(activeChannels, HttpServer.this, postProcessors);
}
else {
handler = new HttpServerHandler(activeChannels, HttpServer.this);
handler = new HttpServerHandler(activeChannels, HttpServer.this, postProcessors);
}

// make it configurable
Expand Down Expand Up @@ -887,16 +867,16 @@ public static JsonAPIError codeTitleDetail(String code, String title, String det
}
}

void requestError(Throwable ex, RakamHttpRequest request, List<ResponsePostProcessor> postProcessors)
void requestError(Throwable ex, RakamHttpRequest request)
{
uncaughtExceptionHandler.handle(request, ex);
if (ex instanceof HttpRequestException) {
HttpRequestException httpEx = (HttpRequestException) ex;
HttpResponseStatus statusCode = httpEx.getStatusCode();
returnJsonResponse(mapper, request, statusCode, new ErrorMessage(httpEx.getErrors(), httpEx.getMeta()), postProcessors);
returnJsonResponse(mapper, request, statusCode, new ErrorMessage(httpEx.getErrors(), httpEx.getMeta()));
}
else {
returnJsonResponse(mapper, request, INTERNAL_SERVER_ERROR, getErrorMessage(ex), postProcessors);
returnJsonResponse(mapper, request, INTERNAL_SERVER_ERROR, getErrorMessage(ex));
LOGGER.error(ex, "Error while processing request");
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/rakam/server/http/HttpServerBuilder.java
Expand Up @@ -56,9 +56,9 @@ public HttpServerBuilder addJsonPreprocessor(RequestPreprocessor preprocessor, P
return this;
}

public HttpServerBuilder addPostProcessor(ResponsePostProcessor processor, Predicate<Method> predicate)
public HttpServerBuilder addPostProcessor(ResponsePostProcessor processor)
{
postProcessorEntryBuilder.add(new PostProcessorEntry(processor, predicate));
postProcessorEntryBuilder.add(new PostProcessorEntry(processor));
return this;
}

Expand Down
10 changes: 6 additions & 4 deletions src/main/java/org/rakam/server/http/HttpServerHandler.java
Expand Up @@ -32,30 +32,30 @@ public class HttpServerHandler

private final HttpServer server;
private final ConcurrentSet activeChannels;
private final List<PostProcessorEntry> postProcessors;
protected RakamHttpRequest request;
private List<ByteBuf> body;

public HttpServerHandler(ConcurrentSet activeChannels, HttpServer server)
public HttpServerHandler(ConcurrentSet activeChannels, HttpServer server, List<PostProcessorEntry> postProcessors)
{
this.server = server;
this.activeChannels = activeChannels;
this.postProcessors = postProcessors;
}

RakamHttpRequest createRequest(ChannelHandlerContext ctx)
{
return new RakamHttpRequest(ctx);
return new RakamHttpRequest(ctx, postProcessors);
}

@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception
{
activeChannels.add(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx)
throws Exception
{
activeChannels.remove(ctx);
}
Expand Down Expand Up @@ -134,6 +134,8 @@ else if (msg instanceof HttpContent) {
}
}
if (value > server.maximumBodySize) {
String errorMessage = "Body is too large";
server.uncaughtExceptionHandler.handle(request, new HttpRequestException(errorMessage, REQUEST_ENTITY_TOO_LARGE));
HttpServer.returnError(request, "Body is too large.", REQUEST_ENTITY_TOO_LARGE);
ctx.close();
}
Expand Down
12 changes: 10 additions & 2 deletions src/main/java/org/rakam/server/http/IRequestParameter.java
Expand Up @@ -40,7 +40,11 @@ public T extract(ObjectNode node, RakamHttpRequest request)
throw new HttpRequestException("'" + name + "' header parameter is required.", BAD_REQUEST);
}

return mapper.apply(value);
try {
return mapper.apply(value);
} catch (Exception e) {
throw new HttpRequestException(String.format("Unable to parse header parameter %s", name), BAD_REQUEST);
}
}
}

Expand Down Expand Up @@ -101,7 +105,11 @@ public T extract(ObjectNode node, RakamHttpRequest request)
}
}

return mapper.apply(strings.get(0));
try {
return mapper.apply(strings.get(0));
} catch (Exception e) {
throw new HttpRequestException(String.format("Unable to parse query parameter %s", name), BAD_REQUEST);
}
}
}

Expand Down

0 comments on commit f14a9a3

Please sign in to comment.