Skip to content
Permalink
Browse files
IGNITE-16495 Make RestModule handlers asynchronous
  • Loading branch information
sashapolo authored and SammyVimes committed Feb 8, 2022
1 parent 9aaaf48 commit 2cf7b4b75be02562132ec19293b1d77ee841038f
Show file tree
Hide file tree
Showing 13 changed files with 268 additions and 148 deletions.
@@ -90,6 +90,12 @@
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
@@ -18,6 +18,10 @@
package org.apache.ignite.cli;

import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -135,6 +139,43 @@ public void setAndGetWithManualHost() {
assertEquals("localhost1", document.read("$.node.metastorageNodes[0]"));
}

@Test
public void setWithWrongData() {
int exitCode = cmd(ctx).execute(
"config",
"set",
"--node-endpoint",
"localhost:" + restPort,
"--type", "node", //TODO: Fix in https://issues.apache.org/jira/browse/IGNITE-15306
"node.metastorgeNodes=[\"localhost1\"]"
);

assertEquals(1, exitCode);
assertThat(
err.toString(),
both(startsWith("org.apache.ignite.cli.IgniteCliException: Failed to set configuration"))
.and(containsString("'node' configuration doesn't have the 'metastorgeNodes' sub-configuration"))
);

resetStreams();

exitCode = cmd(ctx).execute(
"config",
"set",
"--node-endpoint",
"localhost:" + restPort,
"--type", "node", //TODO: Fix in https://issues.apache.org/jira/browse/IGNITE-15306
"node.metastorageNodes=abc"
);

assertEquals(1, exitCode);
assertThat(
err.toString(),
both(startsWith("org.apache.ignite.cli.IgniteCliException: Failed to set configuration"))
.and(containsString("'String[]' is expected as a type for the 'node.metastorageNodes' configuration value"))
);
}

@Test
public void partialGet() {
int exitCode = cmd(ctx).execute(
@@ -163,7 +204,7 @@ public void partialGet() {
* @throws IOException if can't allocate port to open socket.
*/
// TODO: Must be removed after IGNITE-15131.
private int getAvailablePort() throws IOException {
private static int getAvailablePort() throws IOException {
ServerSocket s = new ServerSocket(0);
s.close();
return s.getLocalPort();
@@ -80,6 +80,12 @@
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
@@ -99,6 +105,13 @@
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<!-- Logging in tests -->
<dependency>
<groupId>org.slf4j</groupId>
@@ -19,15 +19,17 @@

import static io.netty.handler.codec.http.HttpHeaderValues.APPLICATION_JSON;
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static java.nio.charset.StandardCharsets.UTF_8;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import java.net.BindException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import org.apache.ignite.configuration.schemas.rest.RestConfiguration;
import org.apache.ignite.configuration.schemas.rest.RestView;
import org.apache.ignite.configuration.validation.ConfigurationValidationException;
@@ -112,11 +114,19 @@ public void start() {
router
.get(
NODE_CFG_URL,
(req, resp) -> resp.json(nodeCfgPresentation.represent())
(req, resp) -> {
resp.json(nodeCfgPresentation.represent());

return CompletableFuture.completedFuture(resp);
}
)
.get(
CLUSTER_CFG_URL,
(req, resp) -> resp.json(clusterCfgPresentation.represent())
(req, resp) -> {
resp.json(clusterCfgPresentation.represent());

return CompletableFuture.completedFuture(resp);
}
)
.get(
NODE_CFG_URL + ":" + PATH_PARAM,
@@ -208,7 +218,7 @@ public void stop() throws Exception {
* @param res Rest response.
* @param presentation Configuration presentation.
*/
private void handleRepresentByPath(
private static CompletableFuture<RestApiHttpResponse> handleRepresentByPath(
RestApiHttpRequest req,
RestApiHttpResponse res,
ConfigurationPresentation<String> presentation
@@ -223,6 +233,8 @@ private void handleRepresentByPath(
res.status(BAD_REQUEST);
res.json(Map.of("error", errRes));
}

return CompletableFuture.completedFuture(res);
}

/**
@@ -232,34 +244,36 @@ private void handleRepresentByPath(
* @param res Rest response.
* @param presentation Configuration presentation.
*/
private void handleUpdate(
private static CompletableFuture<RestApiHttpResponse> handleUpdate(
RestApiHttpRequest req,
RestApiHttpResponse res,
ConfigurationPresentation<String> presentation
) {
try {
String updateReq = req
.request()
.content()
.readCharSequence(req.request().content().readableBytes(), UTF_8)
.toString();

presentation.update(updateReq);
} catch (IllegalArgumentException e) {
ErrorResult errRes = new ErrorResult("INVALID_CONFIG_FORMAT", e.getMessage());

res.status(BAD_REQUEST);
res.json(Map.of("error", errRes));
} catch (ConfigurationValidationException e) {
ErrorResult errRes = new ErrorResult("VALIDATION_EXCEPTION", e.getMessage());

res.status(BAD_REQUEST);
res.json(Map.of("error", errRes));
} catch (IgniteException e) {
ErrorResult errRes = new ErrorResult("APPLICATION_EXCEPTION", e.getMessage());

res.status(BAD_REQUEST);
res.json(Map.of("error", errRes));
}
String updateReq = req.request().content().toString(StandardCharsets.UTF_8);

return presentation.update(updateReq)
.thenApply(v -> res)
.exceptionally(e -> {
if (e instanceof CompletionException) {
e = e.getCause();
}

ErrorResult errRes;

if (e instanceof IllegalArgumentException) {
errRes = new ErrorResult("INVALID_CONFIG_FORMAT", e.getMessage());
} else if (e instanceof ConfigurationValidationException) {
errRes = new ErrorResult("VALIDATION_EXCEPTION", e.getMessage());
} else if (e instanceof IgniteException) {
errRes = new ErrorResult("APPLICATION_EXCEPTION", e.getMessage());
} else {
throw new CompletionException(e);
}

res.status(BAD_REQUEST);
res.json(Map.of("error", errRes));

return res;
});
}
}
@@ -23,6 +23,7 @@
import static io.netty.handler.codec.http.HttpHeaderValues.KEEP_ALIVE;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.EmptyByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
@@ -34,18 +35,17 @@
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.EmptyHttpHeaders;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.rest.routes.Router;
import org.apache.ignite.lang.IgniteLogger;

/**
* Main handler of REST HTTP chain. It receives http request, process it by {@link Router} and produce http response.
*/
public class RestApiHandler extends SimpleChannelInboundHandler<HttpObject> {
public class RestApiHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
/** Ignite logger. */
private final IgniteLogger log = IgniteLogger.forClass(getClass());

@@ -63,55 +63,63 @@ public RestApiHandler(Router router) {

/** {@inheritDoc} */
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {
CompletableFuture<DefaultFullHttpResponse> responseFuture = router.route(request)
.map(route -> {
var response = new RestApiHttpResponse(new DefaultHttpResponse(HttpVersion.HTTP_1_1, OK));

/** {@inheritDoc} */
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
if (msg instanceof FullHttpRequest) {
FullHttpRequest req = (FullHttpRequest) msg;
FullHttpResponse res;

var maybeRoute = router.route(req);
if (maybeRoute.isPresent()) {
var resp = new RestApiHttpResponse(new DefaultHttpResponse(HttpVersion.HTTP_1_1, OK));
maybeRoute.get().handle(req, resp);
var content = resp.content() != null
? Unpooled.wrappedBuffer(resp.content()) : new EmptyByteBuf(UnpooledByteBufAllocator.DEFAULT);
res = new DefaultFullHttpResponse(resp.protocolVersion(), resp.status(),
content, resp.headers(), EmptyHttpHeaders.INSTANCE);
} else {
res = new DefaultFullHttpResponse(req.protocolVersion(), HttpResponseStatus.NOT_FOUND);
}

res.headers()
.setInt(CONTENT_LENGTH, res.content().readableBytes());

boolean keepAlive = HttpUtil.isKeepAlive(req);
if (keepAlive) {
if (!req.protocolVersion().isKeepAliveDefault()) {
res.headers().set(CONNECTION, KEEP_ALIVE);
}
} else {
res.headers().set(CONNECTION, CLOSE);
}

ChannelFuture f = ctx.write(res);

if (!keepAlive) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
return route.handle(request, response)
.thenApply(resp -> {
ByteBuf content = resp.content() != null
? Unpooled.wrappedBuffer(resp.content())
: new EmptyByteBuf(UnpooledByteBufAllocator.DEFAULT);

return new DefaultFullHttpResponse(
resp.protocolVersion(),
resp.status(),
content,
resp.headers(),
EmptyHttpHeaders.INSTANCE
);
});
})
.orElseGet(() -> CompletableFuture.completedFuture(
new DefaultFullHttpResponse(request.protocolVersion(), HttpResponseStatus.NOT_FOUND)
));

responseFuture
.whenCompleteAsync((response, e) -> {
if (e != null) {
exceptionCaught(ctx, e);

return;
}

response.headers().setInt(CONTENT_LENGTH, response.content().readableBytes());

boolean keepAlive = HttpUtil.isKeepAlive(request);

if (keepAlive) {
if (!request.protocolVersion().isKeepAliveDefault()) {
response.headers().set(CONNECTION, KEEP_ALIVE);
}
} else {
response.headers().set(CONNECTION, CLOSE);
}

ChannelFuture f = ctx.writeAndFlush(response);

if (!keepAlive) {
f.addListener(ChannelFutureListener.CLOSE);
}
}, ctx.executor());
}

/** {@inheritDoc} */
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("Failed to process http request:", cause);
var res = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR);
ctx.write(res).addListener(ChannelFutureListener.CLOSE);
ctx.writeAndFlush(res).addListener(ChannelFutureListener.CLOSE);
}
}
@@ -22,7 +22,6 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpServerExpectContinueHandler;
import org.apache.ignite.internal.rest.routes.Router;

/**
@@ -46,7 +45,6 @@ public RestApiInitializer(Router router) {
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpServerCodec());
p.addLast(new HttpServerExpectContinueHandler());
p.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
p.addLast(new RestApiHandler(router));
}
@@ -17,6 +17,7 @@

package org.apache.ignite.internal.rest.presentation;

import java.util.concurrent.CompletableFuture;
import org.apache.ignite.configuration.validation.ConfigurationValidationException;
import org.apache.ignite.lang.IgniteException;
import org.jetbrains.annotations.Nullable;
@@ -49,9 +50,10 @@
* Converts and applies configuration update request to system configuration.
*
* @param cfgUpdate Configuration update request in representation form.
* @return A future that resolves when the update operation either finishes or fails.
* @throws IllegalArgumentException If the configuration format is invalid.
* @throws ConfigurationValidationException If configuration validation failed.
* @throws IgniteException If an error happens.
*/
void update(R cfgUpdate);
CompletableFuture<Void> update(R cfgUpdate);
}

0 comments on commit 2cf7b4b

Please sign in to comment.