From db7d27bdda9c88b39eed94570fb661be6a54511d Mon Sep 17 00:00:00 2001 From: Esteban Serrano Date: Fri, 8 Jun 2018 12:11:27 -0400 Subject: [PATCH 1/5] [FLINK-9499] Allow POST parameters to be used for submitting run job in REST API --- .../webmonitor/HttpRequestHandler.java | 18 ++ .../webmonitor/HttpRequestHandlerITCase.java | 217 ++++++++++++++++++ .../webmonitor/testutils/HttpTestClient.java | 30 +++ 3 files changed, 265 insertions(+) create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/HttpRequestHandlerITCase.java diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java index a0fda9d1b75e7..a324ff3e2f928 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java @@ -44,6 +44,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.QueryStringDecoder; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.QueryStringEncoder; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.Attribute; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DiskFileUpload; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpDataFactory; @@ -55,6 +56,8 @@ import java.io.File; import java.io.IOException; import java.nio.charset.Charset; +import java.util.List; +import java.util.Map; import java.util.UUID; /** @@ -146,6 +149,21 @@ else if (currentDecoder != null && msg instanceof HttpContent) { currentRequest.setUri(encoder.toString()); } } + else if (data.getHttpDataType() == HttpDataType.Attribute) { + Attribute attribute = (Attribute) data; + QueryStringDecoder decoder = new QueryStringDecoder(currentRequest.getUri()); + Map> currentParams = decoder.parameters(); + + QueryStringEncoder encoder = new QueryStringEncoder(currentRequestPath); + for (String currentParam : currentParams.keySet()) { + encoder.addParam(currentParam, currentParams.get(currentParam).get(0)); + } + encoder.addParam(attribute.getName(), attribute.getValue()); + currentRequest.setUri(encoder.toString()); + } + else { + throw new IOException("Unsupported HTTP data type: " + data.getHttpDataType().name()); + } data.release(); } diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/HttpRequestHandlerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/HttpRequestHandlerITCase.java new file mode 100644 index 0000000000000..b79d615fa678b --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/HttpRequestHandlerITCase.java @@ -0,0 +1,217 @@ +package org.apache.flink.runtime.webmonitor; + +import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient; + +import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpDataFactory; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestEncoder; +import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler; + +import org.junit.Before; +import org.junit.Test; + +import java.net.InetSocketAddress; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.LOCATION; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link HttpRequestHandler} class. + */ +public class HttpRequestHandlerITCase { + + private static final String JAR_RUN_URI = "jars/jarID/run"; + + private static final FiniteDuration TestTimeout = new FiniteDuration(1, TimeUnit.MINUTES); + + private static String runJarUriWithQueryParams; + + @Before + public void setup() { + StringBuffer sb = new StringBuffer(JAR_RUN_URI) + .append("?") + .append("program-args=args") + .append("&") + .append("entry-class=EntryClassName.class") + .append("&") + .append("parallelism=4") + .append("&") + .append("allowNonRestoredState=true"); + runJarUriWithQueryParams = sb.toString(); + + NioEventLoopGroup parentGroup = new NioEventLoopGroup(1); + NioEventLoopGroup workerGroup = new NioEventLoopGroup(); + ChannelInitializer init = new ChannelInitializer() { + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline() + .addLast(new HttpServerCodec()) + .addLast(new ChunkedWriteHandler()) + .addLast(new HttpRequestHandler(null)) + .addLast(new TestRequestHandler()); + } + }; + ServerBootstrap server = new ServerBootstrap(); + server.group(parentGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(init); + server.bind(new InetSocketAddress(10000)); + } + + @Test + public void testPostRequestForRunJarOperationWithRequestParameters() throws Exception { + + HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, JAR_RUN_URI); + HttpDataFactory factory = new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE); + HttpPostRequestEncoder bodyRequestEncoder = new HttpPostRequestEncoder(factory, request, true); + bodyRequestEncoder.addBodyAttribute("program-args", "args"); + bodyRequestEncoder.addBodyAttribute("entry-class", "EntryClassName.class"); + bodyRequestEncoder.addBodyAttribute("parallelism", "4"); + bodyRequestEncoder.addBodyAttribute("allowNonRestoredState", "true"); + request = bodyRequestEncoder.finalizeRequest(); + + HttpTestClient.SimpleHttpResponse response = sendPostRequest(request, bodyRequestEncoder); + assertResponse(response, runJarUriWithQueryParams); + } + + @Test + public void testPostRequestForRunJarOperationWithMixedRequestAndQueryParameters() throws Exception { + + String requestUri = JAR_RUN_URI + "?program-args=args&entry-class=EntryClassName.class"; + HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, requestUri); + HttpDataFactory factory = new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE); + HttpPostRequestEncoder bodyRequestEncoder = new HttpPostRequestEncoder(factory, request, true); + bodyRequestEncoder.addBodyAttribute("parallelism", "4"); + bodyRequestEncoder.addBodyAttribute("allowNonRestoredState", "true"); + request = bodyRequestEncoder.finalizeRequest(); + + HttpTestClient.SimpleHttpResponse response = sendPostRequest(request, bodyRequestEncoder); + assertResponse(response, runJarUriWithQueryParams); + } + + @Test + public void testPostRequestForRunJarOperationWithQueryParams() { + + ByteBuf byteBuf = Unpooled.copiedBuffer("ping", Charset.defaultCharset()); + HttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, runJarUriWithQueryParams, byteBuf); + request.headers().set(HttpHeaders.Names.CONTENT_LENGTH, byteBuf.readableBytes()); + + HttpTestClient.SimpleHttpResponse response = sendPostRequest(request); + assertResponse(response, runJarUriWithQueryParams); + } + + @Test + public void testPostRequestForRunJarOperationWithLargeQueryString() { + + ByteBuf byteBuf = Unpooled.copiedBuffer("ping", Charset.defaultCharset()); + char[] chars = new char[4096]; + Arrays.fill(chars, "x".charAt(0)); + String runJarUrlWith4096Chars = new StringBuffer() + .append(JAR_RUN_URI).append("?").append("program-args=").append(chars).toString(); + HttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, runJarUrlWith4096Chars, byteBuf); + request.headers().set(HttpHeaders.Names.CONTENT_LENGTH, byteBuf.readableBytes()); + + HttpTestClient.SimpleHttpResponse response = sendPostRequest(request); + assertResponse(response, "/bad-request"); + } + + @Test + public void testPostRequestForRunJarOperationWithLargeParamSize() throws Exception { + HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, JAR_RUN_URI); + HttpDataFactory factory = new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE); + HttpPostRequestEncoder bodyRequestEncoder = new HttpPostRequestEncoder(factory, request, true); + char[] chars = new char[4096]; + Arrays.fill(chars, "x".charAt(0)); + bodyRequestEncoder.addBodyAttribute("program-args", new String(chars)); + request = bodyRequestEncoder.finalizeRequest(); + + HttpTestClient.SimpleHttpResponse response = sendPostRequest(request, bodyRequestEncoder); + assertResponse(response, new StringBuffer(JAR_RUN_URI).append("?program-args=").append(chars).toString()); + } + + private HttpTestClient.SimpleHttpResponse sendPostRequest(HttpRequest request) { + final Deadline deadline = TestTimeout.fromNow(); + + try (HttpTestClient client = new HttpTestClient("127.0.0.1", 10000)) { + client.sendRequest(request, deadline.timeLeft()); + HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft()); + return response; + } + catch (Exception e) { + fail("Post request failed:" + e.getMessage()); + throw new RuntimeException(e); + } + } + + private HttpTestClient.SimpleHttpResponse sendPostRequest(HttpRequest request, HttpPostRequestEncoder encoder) { + final Deadline deadline = TestTimeout.fromNow(); + + try (HttpTestClient client = new HttpTestClient("127.0.0.1", 10000)) { + client.sendPostRequest(request, encoder, deadline.timeLeft()); + HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft()); + return response; + } + catch (Exception e) { + fail("Post request failed: " + e.getMessage()); + throw new RuntimeException(e); + } + } + + private void assertResponse(HttpTestClient.SimpleHttpResponse response, String expectedUrl) { + assertNotNull(response); + assertEquals(OK, response.getStatus()); + assertEquals("text/plain; charset=UTF-8", response.getType()); + String content = response.getContent(); + assertNotNull(content); + assertTrue(content.contains("pong")); + assertEquals(expectedUrl, response.getLocation()); + } + + private static class TestRequestHandler extends SimpleChannelInboundHandler { + + @Override + public void channelRead0(ChannelHandlerContext ctx, HttpRequest request) throws Exception { + + //Processing request + ByteBuf byteBuf = Unpooled.copiedBuffer("pong", Charset.defaultCharset()); + + //Writing response, wait till it is completely written and close channel after that + HttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, byteBuf); + response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8"); + response.headers().set(CONTENT_LENGTH, byteBuf.readableBytes()); + response.headers().set(LOCATION, request.getUri()); + ctx.channel().writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); + } + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java index d94f7a265e5d6..6c595b98ce5d5 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java @@ -42,6 +42,8 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestEncoder; +import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler; import org.apache.flink.shaded.netty4.io.netty.util.CharsetUtil; import org.slf4j.Logger; @@ -112,6 +114,7 @@ protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new HttpClientCodec()); p.addLast(new HttpContentDecompressor()); + p.addLast(new ChunkedWriteHandler()); p.addLast(new ClientHandler(responses)); } }); @@ -204,6 +207,33 @@ public void sendPatchRequest(String path, FiniteDuration timeout) throws Timeout sendRequest(getRequest, timeout); } + /** + * Sends a POST request to the server. + * + * @param request The POST {@link HttpRequest} to send to the server + * @param encoder The {@link HttpPostRequestEncoder} used to construct the request + */ + public void sendPostRequest(HttpRequest request, HttpPostRequestEncoder encoder, FiniteDuration timeout) throws InterruptedException, TimeoutException { + LOG.debug("Writing post request {}.", request); + + // Make the connection attempt. + ChannelFuture connect = bootstrap.connect(host, port); + + Channel channel; + if (connect.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) { + channel = connect.channel(); + } + else { + throw new TimeoutException("Connection failed"); + } + + channel.write(request); + if (encoder.isChunked()) { + channel.write(encoder); + } + channel.flush(); + } + /** * Returns the next available HTTP response. A call to this method blocks until a response * becomes available. From 04d8393a168de516c9f9c0472c02a16ccbdeab8b Mon Sep 17 00:00:00 2001 From: Esteban Serrano Date: Fri, 8 Jun 2018 13:57:34 -0400 Subject: [PATCH 2/5] [FLINK-9499] Allow POST parameters to be used for submitting run job in REST API --- .../webmonitor/HttpRequestHandlerITCase.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/HttpRequestHandlerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/HttpRequestHandlerITCase.java index b79d615fa678b..e49bf8219c4d0 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/HttpRequestHandlerITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/HttpRequestHandlerITCase.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.flink.runtime.webmonitor; import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient; From 286f42478647117f62f4db94e513aaee31564c0b Mon Sep 17 00:00:00 2001 From: Esteban Serrano Date: Mon, 18 Jun 2018 10:15:07 -0400 Subject: [PATCH 3/5] [FLINK-9499] Allow POST parameters to be used for submitting run job in REST API --- .../flink/runtime/rest/FileUploadHandler.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java index 8854a1f433d28..1bba6dc41bd9f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java @@ -27,6 +27,9 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.QueryStringDecoder; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.QueryStringEncoder; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.Attribute; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DiskFileUpload; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpDataFactory; @@ -39,6 +42,8 @@ import java.nio.file.Path; import java.nio.file.Paths; +import java.util.List; +import java.util.Map; import java.util.UUID; import static java.util.Objects.requireNonNull; @@ -100,6 +105,16 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms fileUpload.renameTo(dest.toFile()); ctx.channel().attr(UPLOADED_FILE).set(dest); } + else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) { + Attribute attribute = (Attribute) data; + QueryStringDecoder decoder = new QueryStringDecoder(currentHttpRequest.getUri()); + Map> currentParams = decoder.parameters(); + + QueryStringEncoder encoder = new QueryStringEncoder(decoder.path()); + currentParams.entrySet().forEach(param -> encoder.addParam(param.getKey(), param.getValue().get(0))); + encoder.addParam(attribute.getName(), attribute.getValue()); + currentHttpRequest.setUri(encoder.toString()); + } data.release(); } From f5eeff04dd77d7f31afeccc51f265ea970aceb98 Mon Sep 17 00:00:00 2001 From: Esteban Serrano Date: Fri, 8 Jun 2018 12:11:27 -0400 Subject: [PATCH 4/5] [FLINK-9499] Allow POST parameters to be used for submitting run job in REST API --- .../webmonitor/HttpRequestHandler.java | 18 ++ .../webmonitor/HttpRequestHandlerITCase.java | 235 ++++++++++++++++++ .../webmonitor/testutils/HttpTestClient.java | 30 +++ .../flink/runtime/rest/FileUploadHandler.java | 15 ++ 4 files changed, 298 insertions(+) create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/HttpRequestHandlerITCase.java diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java index a0fda9d1b75e7..a324ff3e2f928 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java @@ -44,6 +44,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.QueryStringDecoder; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.QueryStringEncoder; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.Attribute; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DiskFileUpload; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpDataFactory; @@ -55,6 +56,8 @@ import java.io.File; import java.io.IOException; import java.nio.charset.Charset; +import java.util.List; +import java.util.Map; import java.util.UUID; /** @@ -146,6 +149,21 @@ else if (currentDecoder != null && msg instanceof HttpContent) { currentRequest.setUri(encoder.toString()); } } + else if (data.getHttpDataType() == HttpDataType.Attribute) { + Attribute attribute = (Attribute) data; + QueryStringDecoder decoder = new QueryStringDecoder(currentRequest.getUri()); + Map> currentParams = decoder.parameters(); + + QueryStringEncoder encoder = new QueryStringEncoder(currentRequestPath); + for (String currentParam : currentParams.keySet()) { + encoder.addParam(currentParam, currentParams.get(currentParam).get(0)); + } + encoder.addParam(attribute.getName(), attribute.getValue()); + currentRequest.setUri(encoder.toString()); + } + else { + throw new IOException("Unsupported HTTP data type: " + data.getHttpDataType().name()); + } data.release(); } diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/HttpRequestHandlerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/HttpRequestHandlerITCase.java new file mode 100644 index 0000000000000..e49bf8219c4d0 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/HttpRequestHandlerITCase.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor; + +import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient; + +import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpDataFactory; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestEncoder; +import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler; + +import org.junit.Before; +import org.junit.Test; + +import java.net.InetSocketAddress; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.LOCATION; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link HttpRequestHandler} class. + */ +public class HttpRequestHandlerITCase { + + private static final String JAR_RUN_URI = "jars/jarID/run"; + + private static final FiniteDuration TestTimeout = new FiniteDuration(1, TimeUnit.MINUTES); + + private static String runJarUriWithQueryParams; + + @Before + public void setup() { + StringBuffer sb = new StringBuffer(JAR_RUN_URI) + .append("?") + .append("program-args=args") + .append("&") + .append("entry-class=EntryClassName.class") + .append("&") + .append("parallelism=4") + .append("&") + .append("allowNonRestoredState=true"); + runJarUriWithQueryParams = sb.toString(); + + NioEventLoopGroup parentGroup = new NioEventLoopGroup(1); + NioEventLoopGroup workerGroup = new NioEventLoopGroup(); + ChannelInitializer init = new ChannelInitializer() { + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline() + .addLast(new HttpServerCodec()) + .addLast(new ChunkedWriteHandler()) + .addLast(new HttpRequestHandler(null)) + .addLast(new TestRequestHandler()); + } + }; + ServerBootstrap server = new ServerBootstrap(); + server.group(parentGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(init); + server.bind(new InetSocketAddress(10000)); + } + + @Test + public void testPostRequestForRunJarOperationWithRequestParameters() throws Exception { + + HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, JAR_RUN_URI); + HttpDataFactory factory = new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE); + HttpPostRequestEncoder bodyRequestEncoder = new HttpPostRequestEncoder(factory, request, true); + bodyRequestEncoder.addBodyAttribute("program-args", "args"); + bodyRequestEncoder.addBodyAttribute("entry-class", "EntryClassName.class"); + bodyRequestEncoder.addBodyAttribute("parallelism", "4"); + bodyRequestEncoder.addBodyAttribute("allowNonRestoredState", "true"); + request = bodyRequestEncoder.finalizeRequest(); + + HttpTestClient.SimpleHttpResponse response = sendPostRequest(request, bodyRequestEncoder); + assertResponse(response, runJarUriWithQueryParams); + } + + @Test + public void testPostRequestForRunJarOperationWithMixedRequestAndQueryParameters() throws Exception { + + String requestUri = JAR_RUN_URI + "?program-args=args&entry-class=EntryClassName.class"; + HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, requestUri); + HttpDataFactory factory = new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE); + HttpPostRequestEncoder bodyRequestEncoder = new HttpPostRequestEncoder(factory, request, true); + bodyRequestEncoder.addBodyAttribute("parallelism", "4"); + bodyRequestEncoder.addBodyAttribute("allowNonRestoredState", "true"); + request = bodyRequestEncoder.finalizeRequest(); + + HttpTestClient.SimpleHttpResponse response = sendPostRequest(request, bodyRequestEncoder); + assertResponse(response, runJarUriWithQueryParams); + } + + @Test + public void testPostRequestForRunJarOperationWithQueryParams() { + + ByteBuf byteBuf = Unpooled.copiedBuffer("ping", Charset.defaultCharset()); + HttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, runJarUriWithQueryParams, byteBuf); + request.headers().set(HttpHeaders.Names.CONTENT_LENGTH, byteBuf.readableBytes()); + + HttpTestClient.SimpleHttpResponse response = sendPostRequest(request); + assertResponse(response, runJarUriWithQueryParams); + } + + @Test + public void testPostRequestForRunJarOperationWithLargeQueryString() { + + ByteBuf byteBuf = Unpooled.copiedBuffer("ping", Charset.defaultCharset()); + char[] chars = new char[4096]; + Arrays.fill(chars, "x".charAt(0)); + String runJarUrlWith4096Chars = new StringBuffer() + .append(JAR_RUN_URI).append("?").append("program-args=").append(chars).toString(); + HttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, runJarUrlWith4096Chars, byteBuf); + request.headers().set(HttpHeaders.Names.CONTENT_LENGTH, byteBuf.readableBytes()); + + HttpTestClient.SimpleHttpResponse response = sendPostRequest(request); + assertResponse(response, "/bad-request"); + } + + @Test + public void testPostRequestForRunJarOperationWithLargeParamSize() throws Exception { + HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, JAR_RUN_URI); + HttpDataFactory factory = new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE); + HttpPostRequestEncoder bodyRequestEncoder = new HttpPostRequestEncoder(factory, request, true); + char[] chars = new char[4096]; + Arrays.fill(chars, "x".charAt(0)); + bodyRequestEncoder.addBodyAttribute("program-args", new String(chars)); + request = bodyRequestEncoder.finalizeRequest(); + + HttpTestClient.SimpleHttpResponse response = sendPostRequest(request, bodyRequestEncoder); + assertResponse(response, new StringBuffer(JAR_RUN_URI).append("?program-args=").append(chars).toString()); + } + + private HttpTestClient.SimpleHttpResponse sendPostRequest(HttpRequest request) { + final Deadline deadline = TestTimeout.fromNow(); + + try (HttpTestClient client = new HttpTestClient("127.0.0.1", 10000)) { + client.sendRequest(request, deadline.timeLeft()); + HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft()); + return response; + } + catch (Exception e) { + fail("Post request failed:" + e.getMessage()); + throw new RuntimeException(e); + } + } + + private HttpTestClient.SimpleHttpResponse sendPostRequest(HttpRequest request, HttpPostRequestEncoder encoder) { + final Deadline deadline = TestTimeout.fromNow(); + + try (HttpTestClient client = new HttpTestClient("127.0.0.1", 10000)) { + client.sendPostRequest(request, encoder, deadline.timeLeft()); + HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft()); + return response; + } + catch (Exception e) { + fail("Post request failed: " + e.getMessage()); + throw new RuntimeException(e); + } + } + + private void assertResponse(HttpTestClient.SimpleHttpResponse response, String expectedUrl) { + assertNotNull(response); + assertEquals(OK, response.getStatus()); + assertEquals("text/plain; charset=UTF-8", response.getType()); + String content = response.getContent(); + assertNotNull(content); + assertTrue(content.contains("pong")); + assertEquals(expectedUrl, response.getLocation()); + } + + private static class TestRequestHandler extends SimpleChannelInboundHandler { + + @Override + public void channelRead0(ChannelHandlerContext ctx, HttpRequest request) throws Exception { + + //Processing request + ByteBuf byteBuf = Unpooled.copiedBuffer("pong", Charset.defaultCharset()); + + //Writing response, wait till it is completely written and close channel after that + HttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, byteBuf); + response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8"); + response.headers().set(CONTENT_LENGTH, byteBuf.readableBytes()); + response.headers().set(LOCATION, request.getUri()); + ctx.channel().writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); + } + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java index d94f7a265e5d6..6c595b98ce5d5 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java @@ -42,6 +42,8 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestEncoder; +import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler; import org.apache.flink.shaded.netty4.io.netty.util.CharsetUtil; import org.slf4j.Logger; @@ -112,6 +114,7 @@ protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new HttpClientCodec()); p.addLast(new HttpContentDecompressor()); + p.addLast(new ChunkedWriteHandler()); p.addLast(new ClientHandler(responses)); } }); @@ -204,6 +207,33 @@ public void sendPatchRequest(String path, FiniteDuration timeout) throws Timeout sendRequest(getRequest, timeout); } + /** + * Sends a POST request to the server. + * + * @param request The POST {@link HttpRequest} to send to the server + * @param encoder The {@link HttpPostRequestEncoder} used to construct the request + */ + public void sendPostRequest(HttpRequest request, HttpPostRequestEncoder encoder, FiniteDuration timeout) throws InterruptedException, TimeoutException { + LOG.debug("Writing post request {}.", request); + + // Make the connection attempt. + ChannelFuture connect = bootstrap.connect(host, port); + + Channel channel; + if (connect.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) { + channel = connect.channel(); + } + else { + throw new TimeoutException("Connection failed"); + } + + channel.write(request); + if (encoder.isChunked()) { + channel.write(encoder); + } + channel.flush(); + } + /** * Returns the next available HTTP response. A call to this method blocks until a response * becomes available. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java index 8854a1f433d28..1bba6dc41bd9f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java @@ -27,6 +27,9 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.QueryStringDecoder; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.QueryStringEncoder; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.Attribute; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DiskFileUpload; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpDataFactory; @@ -39,6 +42,8 @@ import java.nio.file.Path; import java.nio.file.Paths; +import java.util.List; +import java.util.Map; import java.util.UUID; import static java.util.Objects.requireNonNull; @@ -100,6 +105,16 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms fileUpload.renameTo(dest.toFile()); ctx.channel().attr(UPLOADED_FILE).set(dest); } + else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) { + Attribute attribute = (Attribute) data; + QueryStringDecoder decoder = new QueryStringDecoder(currentHttpRequest.getUri()); + Map> currentParams = decoder.parameters(); + + QueryStringEncoder encoder = new QueryStringEncoder(decoder.path()); + currentParams.entrySet().forEach(param -> encoder.addParam(param.getKey(), param.getValue().get(0))); + encoder.addParam(attribute.getName(), attribute.getValue()); + currentHttpRequest.setUri(encoder.toString()); + } data.release(); } From dca036dda4ad6dd13a38ec1b6655d678ec6fd65b Mon Sep 17 00:00:00 2001 From: Esteban Serrano Date: Mon, 16 Jul 2018 13:59:19 -0400 Subject: [PATCH 5/5] [FLINK-9499] fix import order --- .../java/org/apache/flink/runtime/rest/FileUploadHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java index 7f948dfe5e519..6be831ebb779d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java @@ -52,9 +52,9 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Collections; import java.util.Optional; import java.util.UUID;