From edf336fd852486611fbb0366cef3c92b82ca71a5 Mon Sep 17 00:00:00 2001 From: gyao Date: Mon, 19 Feb 2018 18:35:09 +0100 Subject: [PATCH 1/4] [FLINK-7712][flip6] Implement JarDeleteHandler --- .../webmonitor/WebSubmissionExtension.java | 12 ++ .../webmonitor/handlers/JarDeleteHandler.java | 93 +++++++++++ .../webmonitor/handlers/JarDeleteHeaders.java | 69 ++++++++ .../handlers/JarDeleteMessageParameters.java | 44 +++++ .../handlers/JarIdPathParameter.java | 7 + .../handlers/JarDeleteHandlerTest.java | 151 ++++++++++++++++++ .../handlers/JarDeleteHeadersTest.java | 42 +++++ .../handlers/JarIdPathParameterTest.java | 53 ++++++ 8 files changed, 471 insertions(+) create mode 100644 flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java create mode 100644 flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHeaders.java create mode 100644 flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteMessageParameters.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHeadersTest.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarIdPathParameterTest.java diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java index bbb5f98119712..2388360c72834 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java @@ -23,6 +23,8 @@ import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler; +import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHeaders; import org.apache.flink.runtime.webmonitor.handlers.JarListHandler; import org.apache.flink.runtime.webmonitor.handlers.JarListHeaders; import org.apache.flink.runtime.webmonitor.handlers.JarRunHandler; @@ -91,9 +93,19 @@ public WebSubmissionExtension( executor, restClusterClient); + final JarDeleteHandler jarDeleteHandler = new JarDeleteHandler( + restAddressFuture, + leaderRetriever, + timeout, + responseHeaders, + JarDeleteHeaders.getInstance(), + jarDir, + executor); + webSubmissionHandlers.add(Tuple2.of(JarUploadMessageHeaders.getInstance(), jarUploadHandler)); webSubmissionHandlers.add(Tuple2.of(JarListHeaders.getInstance(), jarListHandler)); webSubmissionHandlers.add(Tuple2.of(JarRunHeaders.getInstance(), jarRunHandler)); + webSubmissionHandlers.add(Tuple2.of(JarDeleteHeaders.getInstance(), jarDeleteHandler)); } @Override diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java new file mode 100644 index 0000000000000..db7743ada5af5 --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; + +import static java.util.Objects.requireNonNull; + +/** + * Handles requests for deletion of jars. + */ +public class JarDeleteHandler + extends AbstractRestHandler { + + private final Path jarDir; + + private final Executor executor; + + public JarDeleteHandler( + final CompletableFuture localRestAddress, + final GatewayRetriever leaderRetriever, + final Time timeout, + final Map responseHeaders, + final MessageHeaders messageHeaders, + final Path jarDir, + final Executor executor) { + super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders); + this.jarDir = requireNonNull(jarDir); + this.executor = requireNonNull(executor); + } + + @Override + protected CompletableFuture handleRequest( + @Nonnull final HandlerRequest request, + @Nonnull final RestfulGateway gateway) throws RestHandlerException { + + final String jarId = request.getPathParameter(JarIdPathParameter.class); + return CompletableFuture.supplyAsync(() -> { + final Path jarToDelete = jarDir.resolve(jarId); + if (!Files.exists(jarToDelete)) { + throw new CompletionException(new RestHandlerException( + String.format("File %s does not exist in %s.", jarId, jarDir), + HttpResponseStatus.BAD_REQUEST)); + } else { + try { + Files.delete(jarToDelete); + return EmptyResponseBody.getInstance(); + } catch (final IOException e) { + throw new CompletionException(new RestHandlerException( + String.format("Failed to delete jar %s.", jarToDelete), + HttpResponseStatus.INTERNAL_SERVER_ERROR, + e)); + } + } + }, executor); + } +} diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHeaders.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHeaders.java new file mode 100644 index 0000000000000..bd3cd0a519933 --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHeaders.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Message headers for {@link JarDeleteHandler}. + */ +public class JarDeleteHeaders implements MessageHeaders { + + private static final JarDeleteHeaders INSTANCE = new JarDeleteHeaders(); + + @Override + public Class getResponseClass() { + return EmptyResponseBody.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public Class getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public JarDeleteMessageParameters getUnresolvedMessageParameters() { + return new JarDeleteMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.DELETE; + } + + @Override + public String getTargetRestEndpointURL() { + return "/jars/:" + JarIdPathParameter.KEY; + } + + public static JarDeleteHeaders getInstance() { + return INSTANCE; + } + +} diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteMessageParameters.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteMessageParameters.java new file mode 100644 index 0000000000000..908040948de38 --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteMessageParameters.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.MessagePathParameter; +import org.apache.flink.runtime.rest.messages.MessageQueryParameter; + +import java.util.Collection; +import java.util.Collections; + +/** + * Message parameters for {@link JarDeleteHandler}. + */ +public class JarDeleteMessageParameters extends MessageParameters { + + private JarIdPathParameter jarIdPathParameter = new JarIdPathParameter(); + + @Override + public Collection> getPathParameters() { + return Collections.singletonList(jarIdPathParameter); + } + + @Override + public Collection> getQueryParameters() { + return Collections.emptyList(); + } +} diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarIdPathParameter.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarIdPathParameter.java index d4bd602f485c9..4120ed2999ca7 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarIdPathParameter.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarIdPathParameter.java @@ -21,6 +21,9 @@ import org.apache.flink.runtime.rest.messages.ConversionException; import org.apache.flink.runtime.rest.messages.MessagePathParameter; +import java.nio.file.Path; +import java.nio.file.Paths; + /** * Path parameter to identify uploaded jar files. */ @@ -34,6 +37,10 @@ protected JarIdPathParameter() { @Override protected String convertFromString(final String value) throws ConversionException { + final Path path = Paths.get(value); + if (path.getParent() != null) { + throw new ConversionException(String.format("%s must be a filename only (%s)", KEY, path)); + } return value; } diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java new file mode 100644 index 0000000000000..0bbbb0941144c --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.HandlerRequestException; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import org.junit.Assume; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.attribute.PosixFilePermission; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertThat; + +/** + * Unit tests for {@link JarDeleteHandler}. + */ +public class JarDeleteHandlerTest { + + private static final String TEST_JAR_NAME = "test.jar"; + + private JarDeleteHandler jarDeleteHandler; + + private RestfulGateway restfulGateway; + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private Path jarDir; + + @Before + public void setUp() throws Exception { + jarDir = temporaryFolder.newFolder().toPath(); + restfulGateway = TestingRestfulGateway.newBuilder().build(); + jarDeleteHandler = new JarDeleteHandler( + CompletableFuture.completedFuture("localhost:12345"), + () -> CompletableFuture.completedFuture(restfulGateway), + Time.seconds(10), + Collections.emptyMap(), + new JarDeleteHeaders(), + jarDir, + Executors.directExecutor() + ); + + Files.createFile(jarDir.resolve(TEST_JAR_NAME)); + } + + @Test + public void testDeleteJarById() throws Exception { + assertThat(Files.exists(jarDir.resolve(TEST_JAR_NAME)), equalTo(true)); + + final HandlerRequest request = createRequest(TEST_JAR_NAME); + jarDeleteHandler.handleRequest(request, restfulGateway).get(); + + assertThat(Files.exists(jarDir.resolve(TEST_JAR_NAME)), equalTo(false)); + } + + @Test + public void testDeleteUnknownJar() throws Exception { + final HandlerRequest request = createRequest("doesnotexist.jar"); + try { + jarDeleteHandler.handleRequest(request, restfulGateway).get(); + } catch (final ExecutionException e) { + final Throwable throwable = ExceptionUtils.stripCompletionException(e.getCause()); + assertThat(throwable, instanceOf(RestHandlerException.class)); + + final RestHandlerException restHandlerException = (RestHandlerException) throwable; + assertThat(restHandlerException.getMessage(), containsString("File doesnotexist.jar does not exist in")); + assertThat(restHandlerException.getHttpResponseStatus(), equalTo(HttpResponseStatus.BAD_REQUEST)); + } + } + + @Test + public void testFailedDelete() throws Exception { + makeJarDirReadOnly(); + + final HandlerRequest request = createRequest(TEST_JAR_NAME); + try { + jarDeleteHandler.handleRequest(request, restfulGateway).get(); + } catch (final ExecutionException e) { + final Throwable throwable = ExceptionUtils.stripCompletionException(e.getCause()); + assertThat(throwable, instanceOf(RestHandlerException.class)); + + final RestHandlerException restHandlerException = (RestHandlerException) throwable; + assertThat(restHandlerException.getMessage(), containsString("Failed to delete jar")); + assertThat(restHandlerException.getHttpResponseStatus(), equalTo(HttpResponseStatus.INTERNAL_SERVER_ERROR)); + } + } + + private static HandlerRequest createRequest( + final String jarFileName) throws HandlerRequestException { + return new HandlerRequest<>( + EmptyRequestBody.getInstance(), + new JarDeleteMessageParameters(), + Collections.singletonMap(JarIdPathParameter.KEY, jarFileName), + Collections.emptyMap()); + } + + private void makeJarDirReadOnly() { + try { + Files.setPosixFilePermissions(jarDir, new HashSet<>(Arrays.asList( + PosixFilePermission.OTHERS_READ, + PosixFilePermission.GROUP_READ, + PosixFilePermission.OWNER_READ, + PosixFilePermission.OTHERS_EXECUTE, + PosixFilePermission.GROUP_EXECUTE, + PosixFilePermission.OWNER_EXECUTE))); + } catch (final Exception e) { + Assume.assumeNoException(e); + } + } + +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHeadersTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHeadersTest.java new file mode 100644 index 0000000000000..6247accad10fe --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHeadersTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; + +import org.junit.Test; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link JarDeleteHeaders}. + */ +public class JarDeleteHeadersTest { + + @Test + public void testUrl() { + assertThat(JarDeleteHeaders.getInstance().getTargetRestEndpointURL(), equalTo("/jars/:" + JarIdPathParameter.KEY)); + } + + @Test + public void testHttpMethod() { + assertThat(JarDeleteHeaders.getInstance().getHttpMethod(), equalTo(HttpMethodWrapper.DELETE)); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarIdPathParameterTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarIdPathParameterTest.java new file mode 100644 index 0000000000000..69459e4e55e1c --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarIdPathParameterTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.runtime.rest.messages.ConversionException; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link JarIdPathParameter}. + */ +public class JarIdPathParameterTest { + + private JarIdPathParameter jarIdPathParameter = new JarIdPathParameter(); + + @Test(expected = ConversionException.class) + public void testJarIdWithParentDir() throws Exception { + jarIdPathParameter.convertFromString("../../test.jar"); + } + + @Test + public void testConvertFromString() throws Exception { + final String expectedJarId = "test.jar"; + final String jarId = jarIdPathParameter.convertFromString(expectedJarId); + assertEquals(expectedJarId, jarId); + } + + @Test + public void testConvertToString() throws Exception { + final String expected = "test.jar"; + final String toString = jarIdPathParameter.convertToString(expected); + assertEquals(expected, toString); + } + +} From 780f0591f7d7794eb2f9581b0ae472b533eb3ffd Mon Sep 17 00:00:00 2001 From: gyao Date: Mon, 19 Feb 2018 18:43:20 +0100 Subject: [PATCH 2/4] [hotfix] Rename JarUploadMessageHeaders to JarUploadHeaders --- .../flink/runtime/webmonitor/WebSubmissionExtension.java | 6 +++--- ...JarUploadMessageHeaders.java => JarUploadHeaders.java} | 8 ++++---- .../runtime/webmonitor/handlers/JarUploadHandlerTest.java | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) rename flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/{JarUploadMessageHeaders.java => JarUploadHeaders.java} (85%) diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java index 2388360c72834..e75432510db16 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java @@ -30,7 +30,7 @@ import org.apache.flink.runtime.webmonitor.handlers.JarRunHandler; import org.apache.flink.runtime.webmonitor.handlers.JarRunHeaders; import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler; -import org.apache.flink.runtime.webmonitor.handlers.JarUploadMessageHeaders; +import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; @@ -69,7 +69,7 @@ public WebSubmissionExtension( leaderRetriever, timeout, responseHeaders, - JarUploadMessageHeaders.getInstance(), + JarUploadHeaders.getInstance(), jarDir, executor); @@ -102,7 +102,7 @@ public WebSubmissionExtension( jarDir, executor); - webSubmissionHandlers.add(Tuple2.of(JarUploadMessageHeaders.getInstance(), jarUploadHandler)); + webSubmissionHandlers.add(Tuple2.of(JarUploadHeaders.getInstance(), jarUploadHandler)); webSubmissionHandlers.add(Tuple2.of(JarListHeaders.getInstance(), jarListHandler)); webSubmissionHandlers.add(Tuple2.of(JarRunHeaders.getInstance(), jarRunHandler)); webSubmissionHandlers.add(Tuple2.of(JarDeleteHeaders.getInstance(), jarDeleteHandler)); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadMessageHeaders.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHeaders.java similarity index 85% rename from flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadMessageHeaders.java rename to flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHeaders.java index a0ec06bfc59f3..ce6be27638b86 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadMessageHeaders.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHeaders.java @@ -28,11 +28,11 @@ /** * {@link MessageHeaders} for uploading jars. */ -public final class JarUploadMessageHeaders implements MessageHeaders { +public final class JarUploadHeaders implements MessageHeaders { - private static final JarUploadMessageHeaders INSTANCE = new JarUploadMessageHeaders(); + private static final JarUploadHeaders INSTANCE = new JarUploadHeaders(); - private JarUploadMessageHeaders() {} + private JarUploadHeaders() {} @Override public Class getResponseClass() { @@ -64,7 +64,7 @@ public String getTargetRestEndpointURL() { return "/jars/upload"; } - public static JarUploadMessageHeaders getInstance() { + public static JarUploadHeaders getInstance() { return INSTANCE; } } diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java index 1eee275a48e88..2ce75e74a8398 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java @@ -75,7 +75,7 @@ public void setUp() throws Exception { () -> CompletableFuture.completedFuture(mockDispatcherGateway), Time.seconds(10), Collections.emptyMap(), - JarUploadMessageHeaders.getInstance(), + JarUploadHeaders.getInstance(), jarDir, Executors.directExecutor()); } From 23ad73ea44391107013cd9015f47b430b911dc6c Mon Sep 17 00:00:00 2001 From: gyao Date: Mon, 19 Feb 2018 18:31:40 +0100 Subject: [PATCH 3/4] [hotfix] Move JarListInfoTest to correct package --- .../runtime/webmonitor/handlers/{ng => }/JarListInfoTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) rename flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/{ng => }/JarListInfoTest.java (94%) diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarListInfoTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarListInfoTest.java similarity index 94% rename from flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarListInfoTest.java rename to flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarListInfoTest.java index a32610f5f9d51..7f5f50aeb262c 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarListInfoTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarListInfoTest.java @@ -16,10 +16,9 @@ * limitations under the License. */ -package org.apache.flink.runtime.webmonitor.handlers.ng; +package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase; -import org.apache.flink.runtime.webmonitor.handlers.JarListInfo; import java.util.ArrayList; import java.util.List; From 12cafa072fc3b40a8748aae7213a56813d82186b Mon Sep 17 00:00:00 2001 From: gyao Date: Tue, 20 Feb 2018 11:58:54 +0100 Subject: [PATCH 4/4] [FLINK-7714][flip6] Implement JarPlanHandler --- .../webmonitor/WebSubmissionExtension.java | 14 +++ .../webmonitor/handlers/JarPlanHandler.java | 109 ++++++++++++++++++ .../webmonitor/handlers/JarPlanHeaders.java | 68 +++++++++++ .../handlers/JarPlanMessageParameters.java | 54 +++++++++ .../webmonitor/handlers/JarRunHandler.java | 71 +----------- .../handlers/utils/JarHandlerUtils.java | 65 +++++++++++ .../handlers/utils/JarHandlerUtilsTest.java | 55 +++++++++ .../scripts/modules/submit/submit.ctrl.coffee | 21 +++- .../scripts/modules/submit/submit.svc.coffee | 2 + .../web-dashboard/web/js/hs/index.js | 2 +- .../web-dashboard/web/js/index.js | 2 +- .../handler/util/HandlerRequestUtils.java | 66 +++++++++++ .../handler/util/HandlerRequestUtilsTest.java | 89 ++++++++------ 13 files changed, 507 insertions(+), 111 deletions(-) create mode 100644 flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java create mode 100644 flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHeaders.java create mode 100644 flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanMessageParameters.java create mode 100644 flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java create mode 100644 flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtilsTest.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerRequestUtils.java rename flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java => flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/util/HandlerRequestUtilsTest.java (51%) diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java index e75432510db16..e0ac2500e9ab5 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java @@ -27,6 +27,8 @@ import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHeaders; import org.apache.flink.runtime.webmonitor.handlers.JarListHandler; import org.apache.flink.runtime.webmonitor.handlers.JarListHeaders; +import org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler; +import org.apache.flink.runtime.webmonitor.handlers.JarPlanHeaders; import org.apache.flink.runtime.webmonitor.handlers.JarRunHandler; import org.apache.flink.runtime.webmonitor.handlers.JarRunHeaders; import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler; @@ -102,10 +104,22 @@ public WebSubmissionExtension( jarDir, executor); + final JarPlanHandler jarPlanHandler = new JarPlanHandler( + restAddressFuture, + leaderRetriever, + timeout, + responseHeaders, + JarPlanHeaders.getInstance(), + jarDir, + configuration, + executor + ); + webSubmissionHandlers.add(Tuple2.of(JarUploadHeaders.getInstance(), jarUploadHandler)); webSubmissionHandlers.add(Tuple2.of(JarListHeaders.getInstance(), jarListHandler)); webSubmissionHandlers.add(Tuple2.of(JarRunHeaders.getInstance(), jarRunHandler)); webSubmissionHandlers.add(Tuple2.of(JarDeleteHeaders.getInstance(), jarDeleteHandler)); + webSubmissionHandlers.add(Tuple2.of(JarPlanHeaders.getInstance(), jarPlanHandler)); } @Override diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java new file mode 100644 index 0000000000000..2f0631610fcc0 --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.client.program.PackagedProgramUtils; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobPlanInfo; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; + +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils.tokenizeArguments; +import static org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull; + +/** + * This handler handles requests to fetch the plan for a jar. + */ +public class JarPlanHandler + extends AbstractRestHandler { + + private final Path jarDir; + + private final Configuration configuration; + + private final Executor executor; + + public JarPlanHandler( + final CompletableFuture localRestAddress, + final GatewayRetriever leaderRetriever, + final Time timeout, + final Map responseHeaders, + final MessageHeaders messageHeaders, + final Path jarDir, + final Configuration configuration, + final Executor executor) { + super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders); + this.jarDir = requireNonNull(jarDir); + this.configuration = requireNonNull(configuration); + this.executor = requireNonNull(executor); + } + + @Override + protected CompletableFuture handleRequest( + @Nonnull final HandlerRequest request, + @Nonnull final RestfulGateway gateway) throws RestHandlerException { + + final String jarId = request.getPathParameter(JarIdPathParameter.class); + final String entryClass = emptyToNull(HandlerRequestUtils.getQueryParameter(request, EntryClassQueryParameter.class)); + final Integer parallelism = HandlerRequestUtils.getQueryParameter(request, ParallelismQueryParameter.class, ExecutionConfig.PARALLELISM_DEFAULT); + final List programArgs = tokenizeArguments(HandlerRequestUtils.getQueryParameter(request, ProgramArgsQueryParameter.class)); + final Path jarFile = jarDir.resolve(jarId); + + return CompletableFuture.supplyAsync(() -> { + final JobGraph jobGraph; + try { + final PackagedProgram packagedProgram = new PackagedProgram( + jarFile.toFile(), + entryClass, + programArgs.toArray(new String[programArgs.size()])); + jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, configuration, parallelism); + } catch (final ProgramInvocationException e) { + throw new CompletionException(new RestHandlerException( + e.getMessage(), + HttpResponseStatus.INTERNAL_SERVER_ERROR, + e)); + } + return new JobPlanInfo(JsonPlanGenerator.generatePlan(jobGraph)); + }, executor); + } +} diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHeaders.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHeaders.java new file mode 100644 index 0000000000000..1a4544091fd72 --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHeaders.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobPlanInfo; +import org.apache.flink.runtime.rest.messages.MessageHeaders; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Message headers for {@link JarPlanHandler}. + */ +public class JarPlanHeaders implements MessageHeaders { + + private static final JarPlanHeaders INSTANCE = new JarPlanHeaders(); + + @Override + public Class getResponseClass() { + return JobPlanInfo.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public Class getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public JarPlanMessageParameters getUnresolvedMessageParameters() { + return new JarPlanMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return "/jars/:" + JarIdPathParameter.KEY + "/plan"; + } + + public static JarPlanHeaders getInstance() { + return INSTANCE; + } +} diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanMessageParameters.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanMessageParameters.java new file mode 100644 index 0000000000000..7dd9950722374 --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanMessageParameters.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.MessagePathParameter; +import org.apache.flink.runtime.rest.messages.MessageQueryParameter; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +/** + * Message parameters for {@link JarPlanHandler}. + */ +public class JarPlanMessageParameters extends MessageParameters { + + private final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter(); + + private final EntryClassQueryParameter entryClassQueryParameter = new EntryClassQueryParameter(); + + private final ParallelismQueryParameter parallelismQueryParameter = new ParallelismQueryParameter(); + + private final ProgramArgsQueryParameter programArgsQueryParameter = new ProgramArgsQueryParameter(); + + @Override + public Collection> getPathParameters() { + return Collections.singletonList(jarIdPathParameter); + } + + @Override + public Collection> getQueryParameters() { + return Collections.unmodifiableCollection(Arrays.asList( + entryClassQueryParameter, + parallelismQueryParameter, + programArgsQueryParameter)); + } +} diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java index bf2286f5bed95..09b7a8b3769b7 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.webmonitor.handlers; -import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.time.Time; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.PackagedProgramUtils; @@ -32,7 +32,6 @@ import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.MessageHeaders; -import org.apache.flink.runtime.rest.messages.MessageQueryParameter; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; @@ -43,17 +42,15 @@ import java.nio.file.Files; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import static java.util.Objects.requireNonNull; +import static org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.getQueryParameter; +import static org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils.tokenizeArguments; import static org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull; /** @@ -62,8 +59,6 @@ public class JarRunHandler extends AbstractRestHandler { - private static final Pattern ARGUMENTS_TOKENIZE_PATTERN = Pattern.compile("([^\"\']\\S*|\".+?\"|\'.+?\')\\s*"); - private final Path jarDir; private final Configuration configuration; @@ -100,7 +95,7 @@ protected CompletableFuture handleRequest( final String entryClass = emptyToNull(getQueryParameter(request, EntryClassQueryParameter.class)); final List programArgs = tokenizeArguments(getQueryParameter(request, ProgramArgsQueryParameter.class)); - final int parallelism = getQueryParameter(request, ParallelismQueryParameter.class, -1); + final int parallelism = getQueryParameter(request, ParallelismQueryParameter.class, ExecutionConfig.PARALLELISM_DEFAULT); final SavepointRestoreSettings savepointRestoreSettings = getSavepointRestoreSettings(request); final CompletableFuture jobGraphFuture = getJobGraphAsync( @@ -165,62 +160,4 @@ private CompletableFuture getJobGraphAsync( return jobGraph; }, executor); } - - /** - * Takes program arguments as a single string, and splits them into a list of string. - * - *
-	 * tokenizeArguments("--foo bar")            = ["--foo" "bar"]
-	 * tokenizeArguments("--foo \"bar baz\"")    = ["--foo" "bar baz"]
-	 * tokenizeArguments("--foo 'bar baz'")      = ["--foo" "bar baz"]
-	 * 
- * - * WARNING: This method does not respect escaped quotes. - */ - @VisibleForTesting - static List tokenizeArguments(@Nullable final String args) { - if (args == null) { - return Collections.emptyList(); - } - final Matcher matcher = ARGUMENTS_TOKENIZE_PATTERN.matcher(args); - final List tokens = new ArrayList<>(); - while (matcher.find()) { - tokens.add(matcher.group() - .trim() - .replace("\"", "") - .replace("\'", "")); - } - return tokens; - } - - /** - * Returns the value of a query parameter, or {@code null} if the query parameter is not set. - * @throws RestHandlerException If the query parameter is repeated. - */ - @VisibleForTesting - static > X getQueryParameter( - final HandlerRequest request, - final Class

queryParameterClass) throws RestHandlerException { - return getQueryParameter(request, queryParameterClass, null); - } - - @VisibleForTesting - static > X getQueryParameter( - final HandlerRequest request, - final Class

queryParameterClass, - final X defaultValue) throws RestHandlerException { - - final List values = request.getQueryParameter(queryParameterClass); - final X value; - if (values.size() > 1) { - throw new RestHandlerException( - String.format("Expected only one value %s.", values), - HttpResponseStatus.BAD_REQUEST); - } else if (values.size() == 1) { - value = values.get(0); - } else { - value = defaultValue; - } - return value; - } } diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java new file mode 100644 index 0000000000000..3b1b744930373 --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers.utils; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Utils for jar handlers. + * + * @see org.apache.flink.runtime.webmonitor.handlers.JarRunHandler + * @see org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler + */ +public class JarHandlerUtils { + + private static final Pattern ARGUMENTS_TOKENIZE_PATTERN = Pattern.compile("([^\"\']\\S*|\".+?\"|\'.+?\')\\s*"); + + /** + * Takes program arguments as a single string, and splits them into a list of string. + * + *

+	 * tokenizeArguments("--foo bar")            = ["--foo" "bar"]
+	 * tokenizeArguments("--foo \"bar baz\"")    = ["--foo" "bar baz"]
+	 * tokenizeArguments("--foo 'bar baz'")      = ["--foo" "bar baz"]
+	 * tokenizeArguments(null)                   = []
+	 * 
+ * + * WARNING: This method does not respect escaped quotes. + */ + public static List tokenizeArguments(@Nullable final String args) { + if (args == null) { + return Collections.emptyList(); + } + final Matcher matcher = ARGUMENTS_TOKENIZE_PATTERN.matcher(args); + final List tokens = new ArrayList<>(); + while (matcher.find()) { + tokens.add(matcher.group() + .trim() + .replace("\"", "") + .replace("\'", "")); + } + return tokens; + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtilsTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtilsTest.java new file mode 100644 index 0000000000000..b87e1c6898e13 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtilsTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers.utils; + +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link JarHandlerUtils}. + */ +public class JarHandlerUtilsTest extends TestLogger { + + @Test + public void testTokenizeNonQuoted() { + final List arguments = JarHandlerUtils.tokenizeArguments("--foo bar"); + assertThat(arguments.get(0), equalTo("--foo")); + assertThat(arguments.get(1), equalTo("bar")); + } + + @Test + public void testTokenizeSingleQuoted() { + final List arguments = JarHandlerUtils.tokenizeArguments("--foo 'bar baz '"); + assertThat(arguments.get(0), equalTo("--foo")); + assertThat(arguments.get(1), equalTo("bar baz ")); + } + + @Test + public void testTokenizeDoubleQuoted() { + final List arguments = JarHandlerUtils.tokenizeArguments("--name \"K. Bote \""); + assertThat(arguments.get(0), equalTo("--name")); + assertThat(arguments.get(1), equalTo("K. Bote ")); + } +} diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee index 4e14a6508f586..bb1b1ff07770d 100644 --- a/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee +++ b/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee @@ -79,17 +79,28 @@ angular.module('flinkApp') $scope.state['plan-button'] = "Getting Plan" $scope.error = null $scope.plan = null + + queryParameters = {} + + if $scope.state['entry-class'] + queryParameters['entry-class'] = $scope.state['entry-class'] + + if $scope.state.parallelism + queryParameters['parallelism'] = $scope.state['parallelism'] + + if $scope.state['program-args'] + queryParameters['program-args'] = $scope.state['program-args'] + JobSubmitService.getPlan( - $scope.state.selected, { - 'entry-class': $scope.state['entry-class'], - parallelism: $scope.state.parallelism, - 'program-args': $scope.state['program-args'] - } + $scope.state.selected, queryParameters ).then (data) -> if action == $scope.state['action-time'] $scope.state['plan-button'] = "Show Plan" $scope.error = data.error $scope.plan = data.plan + .catch (err) -> + $scope.state['plan-button'] = "Show Plan" + $scope.error = err $scope.runJob = () -> if $scope.state['submit-button'] == "Submit" diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.svc.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.svc.coffee index d4a22a432c77c..26dd7efd09edb 100644 --- a/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.svc.coffee +++ b/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.svc.coffee @@ -44,6 +44,8 @@ angular.module('flinkApp') $http.get(flinkConfig.jobServer + "jars/" + encodeURIComponent(id) + "/plan", {params: args}) .success (data, status, headers, config) -> deferred.resolve(data) + .error (err) -> + deferred.reject(err) deferred.promise diff --git a/flink-runtime-web/web-dashboard/web/js/hs/index.js b/flink-runtime-web/web-dashboard/web/js/hs/index.js index 7260097f772a7..2e17c1866012c 100644 --- a/flink-runtime-web/web-dashboard/web/js/hs/index.js +++ b/flink-runtime-web/web-dashboard/web/js/hs/index.js @@ -1,2 +1,2 @@ angular.module("flinkApp",["ui.router","angularMoment","dndLists"]).run(["$rootScope",function(e){return e.sidebarVisible=!1,e.showSidebar=function(){return e.sidebarVisible=!e.sidebarVisible,e.sidebarClass="force-show"}}]).value("flinkConfig",{jobServer:"","refresh-interval":1e4}).value("watermarksConfig",{noWatermark:-0x8000000000000000}).run(["JobsService","MainService","flinkConfig","$interval",function(e,t,r,n){return t.loadConfig().then(function(t){return angular.extend(r,t),e.listJobs(),n(function(){return e.listJobs()},r["refresh-interval"])})}]).config(["$uiViewScrollProvider",function(e){return e.useAnchorScroll()}]).run(["$rootScope","$state",function(e,t){return e.$on("$stateChangeStart",function(e,r,n,i){if(r.redirectTo)return e.preventDefault(),t.go(r.redirectTo,n)})}]).config(["$stateProvider","$urlRouterProvider",function(e,t){return e.state("completed-jobs",{url:"/completed-jobs",views:{main:{templateUrl:"partials/jobs/completed-jobs.html",controller:"CompletedJobsController"}}}).state("single-job",{url:"/jobs/{jobid}","abstract":!0,views:{main:{templateUrl:"partials/jobs/job.html",controller:"SingleJobController"}}}).state("single-job.plan",{url:"",redirectTo:"single-job.plan.subtasks",views:{details:{templateUrl:"partials/jobs/job.plan.html",controller:"JobPlanController"}}}).state("single-job.plan.subtasks",{url:"",views:{"node-details":{templateUrl:"partials/jobs/job.plan.node-list.subtasks.html",controller:"JobPlanSubtasksController"}}}).state("single-job.plan.metrics",{url:"/metrics",views:{"node-details":{templateUrl:"partials/jobs/job.plan.node-list.metrics.html",controller:"JobPlanMetricsController"}}}).state("single-job.plan.watermarks",{url:"/watermarks",views:{"node-details":{templateUrl:"partials/jobs/job.plan.node-list.watermarks.html"}}}).state("single-job.plan.taskmanagers",{url:"/taskmanagers",views:{"node-details":{templateUrl:"partials/jobs/job.plan.node-list.taskmanagers.html",controller:"JobPlanTaskManagersController"}}}).state("single-job.plan.accumulators",{url:"/accumulators",views:{"node-details":{templateUrl:"partials/jobs/job.plan.node-list.accumulators.html",controller:"JobPlanAccumulatorsController"}}}).state("single-job.plan.checkpoints",{url:"/checkpoints",redirectTo:"single-job.plan.checkpoints.overview",views:{"node-details":{templateUrl:"partials/jobs/job.plan.node-list.checkpoints.html",controller:"JobPlanCheckpointsController"}}}).state("single-job.plan.checkpoints.overview",{url:"/overview",views:{"checkpoints-view":{templateUrl:"partials/jobs/job.plan.node.checkpoints.overview.html",controller:"JobPlanCheckpointsController"}}}).state("single-job.plan.checkpoints.summary",{url:"/summary",views:{"checkpoints-view":{templateUrl:"partials/jobs/job.plan.node.checkpoints.summary.html",controller:"JobPlanCheckpointsController"}}}).state("single-job.plan.checkpoints.history",{url:"/history",views:{"checkpoints-view":{templateUrl:"partials/jobs/job.plan.node.checkpoints.history.html",controller:"JobPlanCheckpointsController"}}}).state("single-job.plan.checkpoints.config",{url:"/config",views:{"checkpoints-view":{templateUrl:"partials/jobs/job.plan.node.checkpoints.config.html",controller:"JobPlanCheckpointsController"}}}).state("single-job.plan.checkpoints.details",{url:"/details/{checkpointId}",views:{"checkpoints-view":{templateUrl:"partials/jobs/job.plan.node.checkpoints.details.html",controller:"JobPlanCheckpointDetailsController"}}}).state("single-job.plan.backpressure",{url:"/backpressure",views:{"node-details":{templateUrl:"partials/jobs/job.plan.node-list.backpressure.html",controller:"JobPlanBackPressureController"}}}).state("single-job.timeline",{url:"/timeline",views:{details:{templateUrl:"partials/jobs/job.timeline.html"}}}).state("single-job.timeline.vertex",{url:"/{vertexId}",views:{vertex:{templateUrl:"partials/jobs/job.timeline.vertex.html",controller:"JobTimelineVertexController"}}}).state("single-job.exceptions",{url:"/exceptions",views:{details:{templateUrl:"partials/jobs/job.exceptions.html",controller:"JobExceptionsController"}}}).state("single-job.config",{url:"/config",views:{details:{templateUrl:"partials/jobs/job.config.html"}}}),t.otherwise("/completed-jobs")}]),angular.module("flinkApp").directive("bsLabel",["JobsService",function(e){return{transclude:!0,replace:!0,scope:{getLabelClass:"&",status:"@"},template:"",link:function(t,r,n){return t.getLabelClass=function(){return"label label-"+e.translateLabelState(n.status)}}}}]).directive("bpLabel",["JobsService",function(e){return{transclude:!0,replace:!0,scope:{getBackPressureLabelClass:"&",status:"@"},template:"",link:function(t,r,n){return t.getBackPressureLabelClass=function(){return"label label-"+e.translateBackPressureLabelState(n.status)}}}}]).directive("indicatorPrimary",["JobsService",function(e){return{replace:!0,scope:{getLabelClass:"&",status:"@"},template:"",link:function(t,r,n){return t.getLabelClass=function(){return"fa fa-circle indicator indicator-"+e.translateLabelState(n.status)}}}}]).directive("tableProperty",function(){return{replace:!0,scope:{value:"="},template:"{{value || 'None'}}"}}),angular.module("flinkApp").filter("amDurationFormatExtended",["angularMomentConfig",function(e){var t;return t=function(e,t,r){return"undefined"==typeof e||null===e?"":moment.duration(e,t).format(r,{trim:!1})},t.$stateful=e.statefulFilters,t}]).filter("humanizeDuration",function(){return function(e,t){var r,n,i,o,s,a;return"undefined"==typeof e||null===e?"":(o=e%1e3,a=Math.floor(e/1e3),s=a%60,a=Math.floor(a/60),i=a%60,a=Math.floor(a/60),n=a%24,a=Math.floor(a/24),r=a,0===r?0===n?0===i?0===s?o+"ms":s+"s ":i+"m "+s+"s":t?n+"h "+i+"m":n+"h "+i+"m "+s+"s":t?r+"d "+n+"h":r+"d "+n+"h "+i+"m "+s+"s")}}).filter("limit",function(){return function(e){return e.length>73&&(e=e.substring(0,35)+"..."+e.substring(e.length-35,e.length)),e}}).filter("humanizeText",function(){return function(e){return e?e.replace(/>/g,">").replace(//g,""):""}}).filter("humanizeBytes",function(){return function(e){var t,r;return r=["B","KB","MB","GB","TB","PB","EB"],t=function(e,n){var i;return i=Math.pow(1024,n),e=r;n=0<=r?++e:--e)i.push(n+".currentLowWatermark");return i}(),i.getMetrics(o,t.id,s).then(function(e){var t,n,i,o,s,a,l;i=NaN,l={},o=e.values;for(t in o)a=o[t],s=t.replace(".currentLowWatermark",""),l[s]=a,(isNaN(i)||au.noWatermark?i:NaN,r.resolve({lowWatermark:n,watermarks:l})}),r.promise}}(this),r=l.defer(),s={},n=t.length,angular.forEach(t,function(e){return function(e,t){var i;return i=e.id,o(e).then(function(e){if(s[i]=e,t>=n-1)return r.resolve(s)})}}(this)),r.promise},e.hasWatermark=function(t){return e.watermarks[t]&&!isNaN(e.watermarks[t].lowWatermark)},e.$watch("plan",function(t){if(t)return c(t.nodes).then(function(t){return e.watermarks=t})}),e.$on("reload",function(){if(e.plan)return c(e.plan.nodes).then(function(t){return e.watermarks=t})})}]).controller("JobPlanController",["$scope","$state","$stateParams","$window","JobsService",function(e,t,r,n,i){return e.nodeid=null,e.nodeUnfolded=!1,e.stateList=i.stateList(),e.changeNode=function(t){return t!==e.nodeid?(e.nodeid=t,e.vertex=null,e.subtasks=null,e.accumulators=null,e.operatorCheckpointStats=null,e.$broadcast("reload"),e.$broadcast("node:change",e.nodeid)):(e.nodeid=null,e.nodeUnfolded=!1,e.vertex=null,e.subtasks=null,e.accumulators=null,e.operatorCheckpointStats=null)},e.deactivateNode=function(){return e.nodeid=null,e.nodeUnfolded=!1,e.vertex=null,e.subtasks=null,e.accumulators=null,e.operatorCheckpointStats=null},e.toggleFold=function(){return e.nodeUnfolded=!e.nodeUnfolded}}]).controller("JobPlanSubtasksController",["$scope","JobsService",function(e,t){var r;return e.aggregate=!1,r=function(){return e.aggregate?t.getTaskManagers(e.nodeid).then(function(t){return e.taskmanagers=t}):t.getSubtasks(e.nodeid).then(function(t){return e.subtasks=t})},!e.nodeid||e.vertex&&e.vertex.st||r(),e.$on("reload",function(t){if(e.nodeid)return r()})}]).controller("JobPlanAccumulatorsController",["$scope","JobsService",function(e,t){var r;return r=function(){return t.getAccumulators(e.nodeid).then(function(t){return e.accumulators=t.main,e.subtaskAccumulators=t.subtasks})},!e.nodeid||e.vertex&&e.vertex.accumulators||r(),e.$on("reload",function(t){if(e.nodeid)return r()})}]).controller("JobPlanCheckpointsController",["$scope","$state","$stateParams","JobsService",function(e,t,r,n){var i;return e.checkpointDetails={},e.checkpointDetails.id=-1,n.getCheckpointConfig().then(function(t){return e.checkpointConfig=t}),i=function(){return n.getCheckpointStats().then(function(t){if(null!==t)return e.checkpointStats=t})},i(),e.$on("reload",function(e){return i()})}]).controller("JobPlanCheckpointDetailsController",["$scope","$state","$stateParams","JobsService",function(e,t,r,n){var i,o;return e.subtaskDetails={},e.checkpointDetails.id=r.checkpointId,i=function(t){return n.getCheckpointDetails(t).then(function(t){return null!==t?e.checkpoint=t:e.unknown_checkpoint=!0})},o=function(t,r){return n.getCheckpointSubtaskDetails(t,r).then(function(t){if(null!==t)return e.subtaskDetails[r]=t})},i(r.checkpointId),e.nodeid&&o(r.checkpointId,e.nodeid),e.$on("reload",function(t){if(i(r.checkpointId),e.nodeid)return o(r.checkpointId,e.nodeid)}),e.$on("$destroy",function(){return e.checkpointDetails.id=-1})}]).controller("JobPlanBackPressureController",["$scope","JobsService",function(e,t){var r;return r=function(){if(e.now=Date.now(),e.nodeid)return t.getOperatorBackPressure(e.nodeid).then(function(t){return e.backPressureOperatorStats[e.nodeid]=t})},r(),e.$on("reload",function(e){return r()})}]).controller("JobTimelineVertexController",["$scope","$state","$stateParams","JobsService",function(e,t,r,n){var i;return i=function(){return n.getVertex(r.vertexId).then(function(t){return e.vertex=t})},i(),e.$on("reload",function(e){return i()})}]).controller("JobExceptionsController",["$scope","$state","$stateParams","JobsService",function(e,t,r,n){return n.loadExceptions().then(function(t){return e.exceptions=t})}]).controller("JobPropertiesController",["$scope","JobsService",function(e,t){return e.changeNode=function(r){return r!==e.nodeid?(e.nodeid=r,t.getNode(r).then(function(t){return e.node=t})):(e.nodeid=null,e.node=null)}}]).controller("JobPlanMetricsController",["$scope","JobsService","MetricsService",function(e,t,r){var n,i;if(e.dragging=!1,e.window=r.getWindow(),e.availableMetrics=null,e.$on("$destroy",function(){return r.unRegisterObserver()}),i=function(){return t.getVertex(e.nodeid).then(function(t){return e.vertex=t}),r.getAvailableMetrics(e.jobid,e.nodeid).then(function(t){return e.availableMetrics=t.sort(n),e.metrics=r.getMetricsSetup(e.jobid,e.nodeid).names,r.registerObserver(e.jobid,e.nodeid,function(t){return e.$broadcast("metrics:data:update",t.timestamp,t.values)})})},n=function(e,t){var r,n;return r=e.id.toLowerCase(),n=t.id.toLowerCase(),rn?1:0},e.dropped=function(t,n,o,s,a){return r.orderMetrics(e.jobid,e.nodeid,o,n),e.$broadcast("metrics:refresh",o),i(),!1},e.dragStart=function(){return e.dragging=!0},e.dragEnd=function(){return e.dragging=!1},e.addMetric=function(t){return r.addMetric(e.jobid,e.nodeid,t.id),i()},e.removeMetric=function(t){return r.removeMetric(e.jobid,e.nodeid,t),i()},e.setMetricSize=function(t,n){return r.setMetricSize(e.jobid,e.nodeid,t,n),i()},e.setMetricView=function(t,n){return r.setMetricView(e.jobid,e.nodeid,t,n),i()},e.getValues=function(t){return r.getValues(e.jobid,e.nodeid,t)},e.$on("node:change",function(t,r){if(!e.dragging)return i()}),e.nodeid)return i()}]),angular.module("flinkApp").directive("vertex",["$state",function(e){return{template:"",scope:{data:"="},link:function(e,t,r){var n,i,o;o=t.children()[0],i=t.width(),angular.element(o).attr("width",i),(n=function(e){var t,r,n;return d3.select(o).selectAll("*").remove(),n=[],angular.forEach(e.subtasks,function(e,t){var r;return r=[{label:"Scheduled",color:"#666",borderColor:"#555",starting_time:e.timestamps.SCHEDULED,ending_time:e.timestamps.DEPLOYING,type:"regular"},{label:"Deploying",color:"#aaa",borderColor:"#555",starting_time:e.timestamps.DEPLOYING,ending_time:e.timestamps.RUNNING,type:"regular"}],e.timestamps.FINISHED>0&&r.push({label:"Running",color:"#ddd",borderColor:"#555",starting_time:e.timestamps.RUNNING,ending_time:e.timestamps.FINISHED,type:"regular"}),n.push({label:"("+e.subtask+") "+e.host,times:r})}),t=d3.timeline().stack().tickFormat({format:d3.time.format("%L"),tickSize:1}).prefix("single").labelFormat(function(e){return e}).margin({left:100,right:0,top:0,bottom:0}).itemHeight(30).relativeTime(),r=d3.select(o).datum(n).call(t)})(e.data)}}}]).directive("timeline",["$state",function(e){return{template:"",scope:{vertices:"=",jobid:"="},link:function(t,r,n){var i,o,s,a;s=r.children()[0],o=r.width(),angular.element(s).attr("width",o),a=function(e){return e.replace(">",">")},i=function(r){var n,i,o;return d3.select(s).selectAll("*").remove(),o=[],angular.forEach(r,function(e){if(e["start-time"]>-1)return"scheduled"===e.type?o.push({times:[{label:a(e.name),color:"#cccccc",borderColor:"#555555",starting_time:e["start-time"],ending_time:e["end-time"],type:e.type}]}):o.push({times:[{label:a(e.name),color:"#d9f1f7",borderColor:"#62cdea",starting_time:e["start-time"],ending_time:e["end-time"],link:e.id,type:e.type}]})}),n=d3.timeline().stack().click(function(r,n,i){if(r.link)return e.go("single-job.timeline.vertex",{jobid:t.jobid,vertexId:r.link})}).tickFormat({format:d3.time.format("%L"),tickSize:1}).prefix("main").margin({left:0,right:0,top:0,bottom:0}).itemHeight(30).showBorderLine().showHourTimeline(),i=d3.select(s).datum(o).call(n)},t.$watch(n.vertices,function(e){if(e)return i(e)})}}}]).directive("split",function(){return{compile:function(e,t){return Split(e.children(),{sizes:[50,50],direction:"vertical"})}}}).directive("jobPlan",["$timeout",function(e){return{template:"
",scope:{plan:"=",watermarks:"=",setNode:"&"},link:function(e,t,r){var n,i,o,s,a,l,u,c,d,f,p,m,h,g,b,v,k,j,S,w,C,$,y,J,M;p=null,C=d3.behavior.zoom(),M=[],g=r.jobid,S=t.children()[0],j=t.children().children()[0],w=t.children()[1],l=d3.select(S),u=d3.select(j),c=d3.select(w),n=t.width(),angular.element(t.children()[0]).width(n),v=0,b=0,e.zoomIn=function(){var e,t,r;if(C.scale()<2.99)return e=C.translate(),t=e[0]*(C.scale()+.1/C.scale()),r=e[1]*(C.scale()+.1/C.scale()),C.scale(C.scale()+.1),C.translate([t,r]),u.attr("transform","translate("+t+","+r+") scale("+C.scale()+")"),v=C.scale(),b=C.translate()},e.zoomOut=function(){var e,t,r;if(C.scale()>.31)return C.scale(C.scale()-.1),e=C.translate(),t=e[0]*(C.scale()-.1/C.scale()),r=e[1]*(C.scale()-.1/C.scale()),C.translate([t,r]),u.attr("transform","translate("+t+","+r+") scale("+C.scale()+")"),v=C.scale(),b=C.translate()},o=function(e){var t;return t="",null==e.ship_strategy&&null==e.local_strategy||(t+="
",null!=e.ship_strategy&&(t+=e.ship_strategy),void 0!==e.temp_mode&&(t+=" ("+e.temp_mode+")"),void 0!==e.local_strategy&&(t+=",
"+e.local_strategy),t+="
"),t},h=function(e){return"partialSolution"===e||"nextPartialSolution"===e||"workset"===e||"nextWorkset"===e||"solutionSet"===e||"solutionDelta"===e},m=function(e,t){return"mirror"===t?"node-mirror":h(t)?"node-iteration":"node-normal"},s=function(e,t,r,n){var i,o;return i="
",i+="mirror"===t?"

Mirror of "+e.operator+"

":"

"+e.operator+"

",""===e.description?i+="":(o=e.description,o=J(o),i+="

"+o+"

"),null!=e.step_function?i+=f(e.id,r,n):(h(t)&&(i+="
"+t+" Node
"),""!==e.parallelism&&(i+="
Parallelism: "+e.parallelism+"
"),void 0!==e.lowWatermark&&(i+="
Low Watermark: "+e.lowWatermark+"
"),void 0!==e.operator&&e.operator_strategy&&(i+="
Operation: "+J(e.operator_strategy)+"
")),i+="
"},f=function(e,t,r){var n,i;return i="svg-"+e,n=""},J=function(e){var t;for("<"===e.charAt(0)&&(e=e.replace("<","<"),e=e.replace(">",">")),t="";e.length>30;)t=t+e.substring(0,30)+"
",e=e.substring(30,e.length);return t+=e},a=function(e,t,r,n,i,o){return null==n&&(n=!1),r.id===t.partial_solution?e.setNode(r.id,{label:s(r,"partialSolution",i,o),labelType:"html","class":m(r,"partialSolution")}):r.id===t.next_partial_solution?e.setNode(r.id,{label:s(r,"nextPartialSolution",i,o),labelType:"html","class":m(r,"nextPartialSolution")}):r.id===t.workset?e.setNode(r.id,{label:s(r,"workset",i,o),labelType:"html","class":m(r,"workset")}):r.id===t.next_workset?e.setNode(r.id,{label:s(r,"nextWorkset",i,o),labelType:"html","class":m(r,"nextWorkset")}):r.id===t.solution_set?e.setNode(r.id,{label:s(r,"solutionSet",i,o),labelType:"html","class":m(r,"solutionSet")}):r.id===t.solution_delta?e.setNode(r.id,{label:s(r,"solutionDelta",i,o),labelType:"html","class":m(r,"solutionDelta")}):e.setNode(r.id,{label:s(r,"",i,o),labelType:"html","class":m(r,"")})},i=function(e,t,r,n,i){return e.setEdge(i.id,r.id,{label:o(i),labelType:"html",arrowhead:"normal"})},k=function(e,t){var r,n,o,s,l,u,d,f,p,m,h,g,b,v;for(n=[],null!=t.nodes?v=t.nodes:(v=t.step_function,o=!0),s=0,u=v.length;s-1))return e["end-time"]=e["start-time"]+e.duration})},this.processVertices=function(e){return angular.forEach(e.vertices,function(e,t){return e.type="regular"}),e.vertices.unshift({name:"Scheduled","start-time":e.timestamps.CREATED,"end-time":e.timestamps.CREATED+1,type:"scheduled"})},this.listJobs=function(){var r;return r=i.defer(),e.get(t.jobServer+"jobs/overview").success(function(e){return function(t,n,i,o){return c.finished=[],c.running=[],_(t.jobs).groupBy(function(e){switch(e.state.toLowerCase()){case"finished":return"finished";case"failed":return"finished";case"canceled":return"finished";default:return"running"}}).forEach(function(t,r){switch(r){case"finished":return c.finished=e.setEndTimes(t);case"running":return c.running=e.setEndTimes(t)}}).value(),r.resolve(c),d()}}(this)),r.promise},this.getJobs=function(e){return c[e]},this.getAllJobs=function(){return c},this.loadJob=function(r){return s=null,l.job=i.defer(),e.get(t.jobServer+"jobs/"+r).success(function(n){return function(i,o,a,u){return n.setEndTimes(i.vertices),n.processVertices(i),e.get(t.jobServer+"jobs/"+r+"/config").success(function(e){return i=angular.extend(i,e),s=i,l.job.resolve(s)})}}(this)),l.job.promise},this.getNode=function(e){var t,r;return r=function(e,t){var n,i,o,s;for(n=0,i=t.length;n
{{metric.id}}
{{value | humanizeChartNumeric:metric}}
',replace:!0,scope:{metric:"=",window:"=",removeMetric:"&",setMetricSize:"=",setMetricView:"=",getValues:"&"},link:function(e,t,r){return e.btnClasses=["btn","btn-default","btn-xs"],e.value=null,e.data=[{values:e.getValues() -}],e.options={x:function(e,t){return e.x},y:function(e,t){return e.y},xTickFormat:function(e){return d3.time.format("%H:%M:%S")(new Date(e))},yTickFormat:function(e){var t,r,n,i;for(r=!1,n=0,i=1,t=Math.abs(e);!r&&n<50;)Math.pow(10,n)<=t&&t6?e/Math.pow(10,n)+"E"+n:""+e}},e.showChart=function(){return d3.select(t.find("svg")[0]).datum(e.data).transition().duration(250).call(e.chart)},e.chart=nv.models.lineChart().options(e.options).showLegend(!1).margin({top:15,left:60,bottom:30,right:30}),e.chart.yAxis.showMaxMin(!1),e.chart.tooltip.hideDelay(0),e.chart.tooltip.contentGenerator(function(e){return"

"+d3.time.format("%H:%M:%S")(new Date(e.point.x))+" | "+e.point.y+"

"}),nv.utils.windowResize(e.chart.update),e.setSize=function(t){return e.setMetricSize(e.metric,t)},e.setView=function(t){if(e.setMetricView(e.metric,t),"chart"===t)return e.showChart()},"chart"===e.metric.view&&e.showChart(),e.$on("metrics:data:update",function(t,r,n){return e.value=parseFloat(n[e.metric.id]),e.data[0].values.push({x:r,y:e.value}),e.data[0].values.length>e.window&&e.data[0].values.shift(),"chart"===e.metric.view&&e.showChart(),"chart"===e.metric.view&&e.chart.clearHighlights(),e.chart.tooltip.hidden(!0)}),t.find(".metric-title").qtip({content:{text:e.metric.id},position:{my:"bottom left",at:"top left"},style:{classes:"qtip-light qtip-timeline-bar"}})}}}),angular.module("flinkApp").service("MetricsService",["$http","$q","flinkConfig","$interval",function(e,t,r,n){return this.metrics={},this.values={},this.watched={},this.observer={jobid:null,nodeid:null,callback:null},this.refresh=n(function(e){return function(){return angular.forEach(e.metrics,function(t,r){return angular.forEach(t,function(t,n){var i;if(i=[],angular.forEach(t,function(e,t){return i.push(e.id)}),i.length>0)return e.getMetrics(r,n,i).then(function(t){if(r===e.observer.jobid&&n===e.observer.nodeid&&e.observer.callback)return e.observer.callback(t)})})})}}(this),r["refresh-interval"]),this.registerObserver=function(e,t,r){return this.observer.jobid=e,this.observer.nodeid=t,this.observer.callback=r},this.unRegisterObserver=function(){return this.observer={jobid:null,nodeid:null,callback:null}},this.setupMetrics=function(e,t){return this.setupLS(),this.watched[e]=[],angular.forEach(t,function(t){return function(r,n){if(r.id)return t.watched[e].push(r.id)}}(this))},this.getWindow=function(){return 100},this.setupLS=function(){return null==sessionStorage.flinkMetrics&&this.saveSetup(),this.metrics=JSON.parse(sessionStorage.flinkMetrics)},this.saveSetup=function(){return sessionStorage.flinkMetrics=JSON.stringify(this.metrics)},this.saveValue=function(e,t,r){if(null==this.values[e]&&(this.values[e]={}),null==this.values[e][t]&&(this.values[e][t]=[]),this.values[e][t].push(r),this.values[e][t].length>this.getWindow())return this.values[e][t].shift()},this.getValues=function(e,t,r){var n;return null==this.values[e]?[]:null==this.values[e][t]?[]:(n=[],angular.forEach(this.values[e][t],function(e){return function(e,t){if(null!=e.values[r])return n.push({x:e.timestamp,y:e.values[r]})}}(this)),n)},this.setupLSFor=function(e,t){if(null==this.metrics[e]&&(this.metrics[e]={}),null==this.metrics[e][t])return this.metrics[e][t]=[]},this.addMetric=function(e,t,r){return this.setupLSFor(e,t),this.metrics[e][t].push({id:r,size:"small",view:"chart"}),this.saveSetup()},this.removeMetric=function(e){return function(t,r,n){var i;if(null!=e.metrics[t][r])return i=e.metrics[t][r].indexOf(n),i===-1&&(i=_.findIndex(e.metrics[t][r],{id:n})),i!==-1&&e.metrics[t][r].splice(i,1),e.saveSetup()}}(this),this.setMetricSize=function(e){return function(t,r,n,i){var o;if(null!=e.metrics[t][r])return o=e.metrics[t][r].indexOf(n.id),o===-1&&(o=_.findIndex(e.metrics[t][r],{id:n.id})),o!==-1&&(e.metrics[t][r][o]={id:n.id,size:i,view:n.view}),e.saveSetup()}}(this),this.setMetricView=function(e){return function(t,r,n,i){var o;if(null!=e.metrics[t][r])return o=e.metrics[t][r].indexOf(n.id),o===-1&&(o=_.findIndex(e.metrics[t][r],{id:n.id})),o!==-1&&(e.metrics[t][r][o]={id:n.id,size:n.size,view:i}),e.saveSetup()}}(this),this.orderMetrics=function(e,t,r,n){return this.setupLSFor(e,t),angular.forEach(this.metrics[e][t],function(i){return function(o,s){if(o.id===r.id&&(i.metrics[e][t].splice(s,1),s6?e/Math.pow(10,n)+"E"+n:""+e}},e.showChart=function(){return d3.select(t.find("svg")[0]).datum(e.data).transition().duration(250).call(e.chart)},e.chart=nv.models.lineChart().options(e.options).showLegend(!1).margin({top:15,left:60,bottom:30,right:30}),e.chart.yAxis.showMaxMin(!1),e.chart.tooltip.hideDelay(0),e.chart.tooltip.contentGenerator(function(e){return"

"+d3.time.format("%H:%M:%S")(new Date(e.point.x))+" | "+e.point.y+"

"}),nv.utils.windowResize(e.chart.update),e.setSize=function(t){return e.setMetricSize(e.metric,t)},e.setView=function(t){if(e.setMetricView(e.metric,t),"chart"===t)return e.showChart()},"chart"===e.metric.view&&e.showChart(),e.$on("metrics:data:update",function(t,r,n){return e.value=parseFloat(n[e.metric.id]),e.data[0].values.push({x:r,y:e.value}),e.data[0].values.length>e.window&&e.data[0].values.shift(),"chart"===e.metric.view&&e.showChart(),"chart"===e.metric.view&&e.chart.clearHighlights(),e.chart.tooltip.hidden(!0)}),t.find(".metric-title").qtip({content:{text:e.metric.id},position:{my:"bottom left",at:"top left"},style:{classes:"qtip-light qtip-timeline-bar"}})}}}),angular.module("flinkApp").service("MetricsService",["$http","$q","flinkConfig","$interval",function(e,t,r,n){return this.metrics={},this.values={},this.watched={},this.observer={jobid:null,nodeid:null,callback:null},this.refresh=n(function(e){return function(){return angular.forEach(e.metrics,function(t,r){return angular.forEach(t,function(t,n){var i;if(i=[],angular.forEach(t,function(e,t){return i.push(e.id)}),i.length>0)return e.getMetrics(r,n,i).then(function(t){if(r===e.observer.jobid&&n===e.observer.nodeid&&e.observer.callback)return e.observer.callback(t)})})})}}(this),r["refresh-interval"]),this.registerObserver=function(e,t,r){return this.observer.jobid=e,this.observer.nodeid=t,this.observer.callback=r},this.unRegisterObserver=function(){return this.observer={jobid:null,nodeid:null,callback:null}},this.setupMetrics=function(e,t){return this.setupLS(),this.watched[e]=[],angular.forEach(t,function(t){return function(r,n){if(r.id)return t.watched[e].push(r.id)}}(this))},this.getWindow=function(){return 100},this.setupLS=function(){return null==sessionStorage.flinkMetrics&&this.saveSetup(),this.metrics=JSON.parse(sessionStorage.flinkMetrics)},this.saveSetup=function(){return sessionStorage.flinkMetrics=JSON.stringify(this.metrics)},this.saveValue=function(e,t,r){if(null==this.values[e]&&(this.values[e]={}),null==this.values[e][t]&&(this.values[e][t]=[]),this.values[e][t].push(r),this.values[e][t].length>this.getWindow())return this.values[e][t].shift()},this.getValues=function(e,t,r){var n;return null==this.values[e]?[]:null==this.values[e][t]?[]:(n=[],angular.forEach(this.values[e][t],function(e){return function(e,t){if(null!=e.values[r])return n.push({x:e.timestamp,y:e.values[r]})}}(this)),n)},this.setupLSFor=function(e,t){if(null==this.metrics[e]&&(this.metrics[e]={}),null==this.metrics[e][t])return this.metrics[e][t]=[]},this.addMetric=function(e,t,r){return this.setupLSFor(e,t),this.metrics[e][t].push({id:r,size:"small",view:"chart"}),this.saveSetup()},this.removeMetric=function(e){return function(t,r,n){var i;if(null!=e.metrics[t][r])return i=e.metrics[t][r].indexOf(n),i===-1&&(i=_.findIndex(e.metrics[t][r],{id:n})),i!==-1&&e.metrics[t][r].splice(i,1),e.saveSetup()}}(this),this.setMetricSize=function(e){return function(t,r,n,i){var o;if(null!=e.metrics[t][r])return o=e.metrics[t][r].indexOf(n.id),o===-1&&(o=_.findIndex(e.metrics[t][r],{id:n.id})),o!==-1&&(e.metrics[t][r][o]={id:n.id,size:i,view:n.view}),e.saveSetup()}}(this),this.setMetricView=function(e){return function(t,r,n,i){var o;if(null!=e.metrics[t][r])return o=e.metrics[t][r].indexOf(n.id),o===-1&&(o=_.findIndex(e.metrics[t][r],{id:n.id})),o!==-1&&(e.metrics[t][r][o]={id:n.id,size:n.size,view:i}),e.saveSetup()}}(this),this.orderMetrics=function(e,t,r,n){return this.setupLSFor(e,t),angular.forEach(this.metrics[e][t],function(i){return function(o,s){if(o.id===r.id&&(i.metrics[e][t].splice(s,1),s",link:function(t,r,n){return t.getLabelClass=function(){return"label label-"+e.translateLabelState(n.status)}}}}]).directive("bpLabel",["JobsService",function(e){return{transclude:!0,replace:!0,scope:{getBackPressureLabelClass:"&",status:"@"},template:"",link:function(t,r,n){return t.getBackPressureLabelClass=function(){return"label label-"+e.translateBackPressureLabelState(n.status)}}}}]).directive("indicatorPrimary",["JobsService",function(e){return{replace:!0,scope:{getLabelClass:"&",status:"@"},template:"",link:function(t,r,n){return t.getLabelClass=function(){return"fa fa-circle indicator indicator-"+e.translateLabelState(n.status)}}}}]).directive("tableProperty",function(){return{replace:!0,scope:{value:"="},template:"{{value || 'None'}}"}}),angular.module("flinkApp").filter("amDurationFormatExtended",["angularMomentConfig",function(e){var t;return t=function(e,t,r){return"undefined"==typeof e||null===e?"":moment.duration(e,t).format(r,{trim:!1})},t.$stateful=e.statefulFilters,t}]).filter("humanizeDuration",function(){return function(e,t){var r,n,i,o,a,s;return"undefined"==typeof e||null===e?"":(o=e%1e3,s=Math.floor(e/1e3),a=s%60,s=Math.floor(s/60),i=s%60,s=Math.floor(s/60),n=s%24,s=Math.floor(s/24),r=s,0===r?0===n?0===i?0===a?o+"ms":a+"s ":i+"m "+a+"s":t?n+"h "+i+"m":n+"h "+i+"m "+a+"s":t?r+"d "+n+"h":r+"d "+n+"h "+i+"m "+a+"s")}}).filter("limit",function(){return function(e){return e.length>73&&(e=e.substring(0,35)+"..."+e.substring(e.length-35,e.length)),e}}).filter("humanizeText",function(){return function(e){return e?e.replace(/>/g,">").replace(//g,""):""}}).filter("humanizeBytes",function(){return function(e){var t,r;return r=["B","KB","MB","GB","TB","PB","EB"],t=function(e,n){var i;return i=Math.pow(1024,n),e=r;n=0<=r?++e:--e)i.push(n+".currentLowWatermark");return i}(),i.getMetrics(o,t.id,a).then(function(e){var t,n,i,o,a,s,l;i=NaN,l={},o=e.values;for(t in o)s=o[t],a=t.replace(".currentLowWatermark",""),l[a]=s,(isNaN(i)||su.noWatermark?i:NaN,r.resolve({lowWatermark:n,watermarks:l})}),r.promise}}(this),r=l.defer(),a={},n=t.length,angular.forEach(t,function(e){return function(e,t){var i;return i=e.id,o(e).then(function(e){if(a[i]=e,t>=n-1)return r.resolve(a)})}}(this)),r.promise},e.hasWatermark=function(t){return e.watermarks[t]&&!isNaN(e.watermarks[t].lowWatermark)},e.$watch("plan",function(t){if(t)return c(t.nodes).then(function(t){return e.watermarks=t})}),e.$on("reload",function(){if(e.plan)return c(e.plan.nodes).then(function(t){return e.watermarks=t})})}]).controller("JobPlanController",["$scope","$state","$stateParams","$window","JobsService",function(e,t,r,n,i){return e.nodeid=null,e.nodeUnfolded=!1,e.stateList=i.stateList(),e.changeNode=function(t){return t!==e.nodeid?(e.nodeid=t,e.vertex=null,e.subtasks=null,e.accumulators=null,e.operatorCheckpointStats=null,e.$broadcast("reload"),e.$broadcast("node:change",e.nodeid)):(e.nodeid=null,e.nodeUnfolded=!1,e.vertex=null,e.subtasks=null,e.accumulators=null,e.operatorCheckpointStats=null)},e.deactivateNode=function(){return e.nodeid=null,e.nodeUnfolded=!1,e.vertex=null,e.subtasks=null,e.accumulators=null,e.operatorCheckpointStats=null},e.toggleFold=function(){return e.nodeUnfolded=!e.nodeUnfolded}}]).controller("JobPlanSubtasksController",["$scope","JobsService",function(e,t){var r;return e.aggregate=!1,r=function(){return e.aggregate?t.getTaskManagers(e.nodeid).then(function(t){return e.taskmanagers=t}):t.getSubtasks(e.nodeid).then(function(t){return e.subtasks=t})},!e.nodeid||e.vertex&&e.vertex.st||r(),e.$on("reload",function(t){if(e.nodeid)return r()})}]).controller("JobPlanAccumulatorsController",["$scope","JobsService",function(e,t){var r;return r=function(){return t.getAccumulators(e.nodeid).then(function(t){return e.accumulators=t.main,e.subtaskAccumulators=t.subtasks})},!e.nodeid||e.vertex&&e.vertex.accumulators||r(),e.$on("reload",function(t){if(e.nodeid)return r()})}]).controller("JobPlanCheckpointsController",["$scope","$state","$stateParams","JobsService",function(e,t,r,n){var i;return e.checkpointDetails={},e.checkpointDetails.id=-1,n.getCheckpointConfig().then(function(t){return e.checkpointConfig=t}),i=function(){return n.getCheckpointStats().then(function(t){if(null!==t)return e.checkpointStats=t})},i(),e.$on("reload",function(e){return i()})}]).controller("JobPlanCheckpointDetailsController",["$scope","$state","$stateParams","JobsService",function(e,t,r,n){var i,o;return e.subtaskDetails={},e.checkpointDetails.id=r.checkpointId,i=function(t){return n.getCheckpointDetails(t).then(function(t){return null!==t?e.checkpoint=t:e.unknown_checkpoint=!0})},o=function(t,r){return n.getCheckpointSubtaskDetails(t,r).then(function(t){if(null!==t)return e.subtaskDetails[r]=t})},i(r.checkpointId),e.nodeid&&o(r.checkpointId,e.nodeid),e.$on("reload",function(t){if(i(r.checkpointId),e.nodeid)return o(r.checkpointId,e.nodeid)}),e.$on("$destroy",function(){return e.checkpointDetails.id=-1})}]).controller("JobPlanBackPressureController",["$scope","JobsService",function(e,t){var r;return r=function(){if(e.now=Date.now(),e.nodeid)return t.getOperatorBackPressure(e.nodeid).then(function(t){return e.backPressureOperatorStats[e.nodeid]=t})},r(),e.$on("reload",function(e){return r()})}]).controller("JobTimelineVertexController",["$scope","$state","$stateParams","JobsService",function(e,t,r,n){var i;return i=function(){return n.getVertex(r.vertexId).then(function(t){return e.vertex=t})},i(),e.$on("reload",function(e){return i()})}]).controller("JobExceptionsController",["$scope","$state","$stateParams","JobsService",function(e,t,r,n){return n.loadExceptions().then(function(t){return e.exceptions=t})}]).controller("JobPropertiesController",["$scope","JobsService",function(e,t){return e.changeNode=function(r){return r!==e.nodeid?(e.nodeid=r,t.getNode(r).then(function(t){return e.node=t})):(e.nodeid=null,e.node=null)}}]).controller("JobPlanMetricsController",["$scope","JobsService","MetricsService",function(e,t,r){var n,i;if(e.dragging=!1,e.window=r.getWindow(),e.availableMetrics=null,e.$on("$destroy",function(){return r.unRegisterObserver()}),i=function(){return t.getVertex(e.nodeid).then(function(t){return e.vertex=t}),r.getAvailableMetrics(e.jobid,e.nodeid).then(function(t){return e.availableMetrics=t.sort(n),e.metrics=r.getMetricsSetup(e.jobid,e.nodeid).names,r.registerObserver(e.jobid,e.nodeid,function(t){return e.$broadcast("metrics:data:update",t.timestamp,t.values)})})},n=function(e,t){var r,n;return r=e.id.toLowerCase(),n=t.id.toLowerCase(),rn?1:0},e.dropped=function(t,n,o,a,s){return r.orderMetrics(e.jobid,e.nodeid,o,n),e.$broadcast("metrics:refresh",o),i(),!1},e.dragStart=function(){return e.dragging=!0},e.dragEnd=function(){return e.dragging=!1},e.addMetric=function(t){return r.addMetric(e.jobid,e.nodeid,t.id),i()},e.removeMetric=function(t){return r.removeMetric(e.jobid,e.nodeid,t),i()},e.setMetricSize=function(t,n){return r.setMetricSize(e.jobid,e.nodeid,t,n),i()},e.setMetricView=function(t,n){return r.setMetricView(e.jobid,e.nodeid,t,n),i()},e.getValues=function(t){return r.getValues(e.jobid,e.nodeid,t)},e.$on("node:change",function(t,r){if(!e.dragging)return i()}),e.nodeid)return i()}]),angular.module("flinkApp").directive("vertex",["$state",function(e){return{template:"",scope:{data:"="},link:function(e,t,r){var n,i,o;o=t.children()[0],i=t.width(),angular.element(o).attr("width",i),(n=function(e){var t,r,n;return d3.select(o).selectAll("*").remove(),n=[],angular.forEach(e.subtasks,function(e,t){var r;return r=[{label:"Scheduled",color:"#666",borderColor:"#555",starting_time:e.timestamps.SCHEDULED,ending_time:e.timestamps.DEPLOYING,type:"regular"},{label:"Deploying",color:"#aaa",borderColor:"#555",starting_time:e.timestamps.DEPLOYING,ending_time:e.timestamps.RUNNING,type:"regular"}],e.timestamps.FINISHED>0&&r.push({label:"Running",color:"#ddd",borderColor:"#555",starting_time:e.timestamps.RUNNING,ending_time:e.timestamps.FINISHED,type:"regular"}),n.push({label:"("+e.subtask+") "+e.host,times:r})}),t=d3.timeline().stack().tickFormat({format:d3.time.format("%L"),tickSize:1}).prefix("single").labelFormat(function(e){return e}).margin({left:100,right:0,top:0,bottom:0}).itemHeight(30).relativeTime(),r=d3.select(o).datum(n).call(t)})(e.data)}}}]).directive("timeline",["$state",function(e){return{template:"",scope:{vertices:"=",jobid:"="},link:function(t,r,n){var i,o,a,s;a=r.children()[0],o=r.width(),angular.element(a).attr("width",o),s=function(e){return e.replace(">",">")},i=function(r){var n,i,o;return d3.select(a).selectAll("*").remove(),o=[],angular.forEach(r,function(e){if(e["start-time"]>-1)return"scheduled"===e.type?o.push({times:[{label:s(e.name),color:"#cccccc",borderColor:"#555555",starting_time:e["start-time"],ending_time:e["end-time"],type:e.type}]}):o.push({times:[{label:s(e.name),color:"#d9f1f7",borderColor:"#62cdea",starting_time:e["start-time"],ending_time:e["end-time"],link:e.id,type:e.type}]})}),n=d3.timeline().stack().click(function(r,n,i){if(r.link)return e.go("single-job.timeline.vertex",{jobid:t.jobid,vertexId:r.link})}).tickFormat({format:d3.time.format("%L"),tickSize:1}).prefix("main").margin({left:0,right:0,top:0,bottom:0}).itemHeight(30).showBorderLine().showHourTimeline(),i=d3.select(a).datum(o).call(n)},t.$watch(n.vertices,function(e){if(e)return i(e)})}}}]).directive("split",function(){return{compile:function(e,t){return Split(e.children(),{sizes:[50,50],direction:"vertical"})}}}).directive("jobPlan",["$timeout",function(e){return{template:"
",scope:{plan:"=",watermarks:"=",setNode:"&"},link:function(e,t,r){var n,i,o,a,s,l,u,c,d,f,p,m,g,h,b,v,k,j,S,w,C,$,y,M,J;p=null,C=d3.behavior.zoom(),J=[],h=r.jobid,S=t.children()[0],j=t.children().children()[0],w=t.children()[1],l=d3.select(S),u=d3.select(j),c=d3.select(w),n=t.width(),angular.element(t.children()[0]).width(n),v=0,b=0,e.zoomIn=function(){var e,t,r;if(C.scale()<2.99)return e=C.translate(),t=e[0]*(C.scale()+.1/C.scale()),r=e[1]*(C.scale()+.1/C.scale()),C.scale(C.scale()+.1),C.translate([t,r]),u.attr("transform","translate("+t+","+r+") scale("+C.scale()+")"),v=C.scale(),b=C.translate()},e.zoomOut=function(){var e,t,r;if(C.scale()>.31)return C.scale(C.scale()-.1),e=C.translate(),t=e[0]*(C.scale()-.1/C.scale()),r=e[1]*(C.scale()-.1/C.scale()),C.translate([t,r]),u.attr("transform","translate("+t+","+r+") scale("+C.scale()+")"),v=C.scale(),b=C.translate()},o=function(e){var t;return t="",null==e.ship_strategy&&null==e.local_strategy||(t+="
",null!=e.ship_strategy&&(t+=e.ship_strategy),void 0!==e.temp_mode&&(t+=" ("+e.temp_mode+")"),void 0!==e.local_strategy&&(t+=",
"+e.local_strategy),t+="
"),t},g=function(e){return"partialSolution"===e||"nextPartialSolution"===e||"workset"===e||"nextWorkset"===e||"solutionSet"===e||"solutionDelta"===e},m=function(e,t){return"mirror"===t?"node-mirror":g(t)?"node-iteration":"node-normal"},a=function(e,t,r,n){var i,o;return i="
",i+="mirror"===t?"

Mirror of "+e.operator+"

":"

"+e.operator+"

",""===e.description?i+="":(o=e.description,o=M(o),i+="

"+o+"

"),null!=e.step_function?i+=f(e.id,r,n):(g(t)&&(i+="
"+t+" Node
"),""!==e.parallelism&&(i+="
Parallelism: "+e.parallelism+"
"),void 0!==e.lowWatermark&&(i+="
Low Watermark: "+e.lowWatermark+"
"),void 0!==e.operator&&e.operator_strategy&&(i+="
Operation: "+M(e.operator_strategy)+"
")),i+="
"},f=function(e,t,r){var n,i;return i="svg-"+e,n=""},M=function(e){var t;for("<"===e.charAt(0)&&(e=e.replace("<","<"),e=e.replace(">",">")),t="";e.length>30;)t=t+e.substring(0,30)+"
",e=e.substring(30,e.length);return t+=e},s=function(e,t,r,n,i,o){return null==n&&(n=!1),r.id===t.partial_solution?e.setNode(r.id,{label:a(r,"partialSolution",i,o),labelType:"html","class":m(r,"partialSolution")}):r.id===t.next_partial_solution?e.setNode(r.id,{label:a(r,"nextPartialSolution",i,o),labelType:"html","class":m(r,"nextPartialSolution")}):r.id===t.workset?e.setNode(r.id,{label:a(r,"workset",i,o),labelType:"html","class":m(r,"workset")}):r.id===t.next_workset?e.setNode(r.id,{label:a(r,"nextWorkset",i,o),labelType:"html","class":m(r,"nextWorkset")}):r.id===t.solution_set?e.setNode(r.id,{label:a(r,"solutionSet",i,o),labelType:"html","class":m(r,"solutionSet")}):r.id===t.solution_delta?e.setNode(r.id,{label:a(r,"solutionDelta",i,o),labelType:"html","class":m(r,"solutionDelta")}):e.setNode(r.id,{label:a(r,"",i,o),labelType:"html","class":m(r,"")})},i=function(e,t,r,n,i){return e.setEdge(i.id,r.id,{label:o(i),labelType:"html",arrowhead:"normal"})},k=function(e,t){var r,n,o,a,l,u,d,f,p,m,g,h,b,v;for(n=[],null!=t.nodes?v=t.nodes:(v=t.step_function,o=!0),a=0,u=v.length;a-1))return e["end-time"]=e["start-time"]+e.duration})},this.processVertices=function(e){return angular.forEach(e.vertices,function(e,t){return e.type="regular"}),e.vertices.unshift({name:"Scheduled","start-time":e.timestamps.CREATED,"end-time":e.timestamps.CREATED+1,type:"scheduled"})},this.listJobs=function(){var r;return r=i.defer(),e.get(t.jobServer+"jobs/overview").success(function(e){return function(t,n,i,o){return c.finished=[],c.running=[],_(t.jobs).groupBy(function(e){switch(e.state.toLowerCase()){case"finished":return"finished";case"failed":return"finished";case"canceled":return"finished";default:return"running"}}).forEach(function(t,r){switch(r){case"finished":return c.finished=e.setEndTimes(t);case"running":return c.running=e.setEndTimes(t)}}).value(),r.resolve(c),d()}}(this)),r.promise},this.getJobs=function(e){return c[e]},this.getAllJobs=function(){return c},this.loadJob=function(r){return a=null,l.job=i.defer(),e.get(t.jobServer+"jobs/"+r).success(function(n){return function(i,o,s,u){return n.setEndTimes(i.vertices),n.processVertices(i),e.get(t.jobServer+"jobs/"+r+"/config").success(function(e){return i=angular.extend(i,e),a=i,l.job.resolve(a)})}}(this)),l.job.promise},this.getNode=function(e){var t,r;return r=function(e,t){var n,i,o,a;for(n=0,i=t.length;n
{{metric.id}}
{{value | humanizeChartNumeric:metric}}
',replace:!0,scope:{metric:"=",window:"=",removeMetric:"&",setMetricSize:"=",setMetricView:"=",getValues:"&"},link:function(e,t,r){return e.btnClasses=["btn","btn-default","btn-xs"],e.value=null,e.data=[{values:e.getValues()}],e.options={x:function(e,t){return e.x},y:function(e,t){return e.y},xTickFormat:function(e){return d3.time.format("%H:%M:%S")(new Date(e))},yTickFormat:function(e){var t,r,n,i;for(r=!1,n=0,i=1,t=Math.abs(e);!r&&n<50;)Math.pow(10,n)<=t&&t6?e/Math.pow(10,n)+"E"+n:""+e}},e.showChart=function(){return d3.select(t.find("svg")[0]).datum(e.data).transition().duration(250).call(e.chart)},e.chart=nv.models.lineChart().options(e.options).showLegend(!1).margin({top:15,left:60,bottom:30,right:30}),e.chart.yAxis.showMaxMin(!1),e.chart.tooltip.hideDelay(0),e.chart.tooltip.contentGenerator(function(e){return"

"+d3.time.format("%H:%M:%S")(new Date(e.point.x))+" | "+e.point.y+"

"}),nv.utils.windowResize(e.chart.update),e.setSize=function(t){return e.setMetricSize(e.metric,t)},e.setView=function(t){if(e.setMetricView(e.metric,t),"chart"===t)return e.showChart()},"chart"===e.metric.view&&e.showChart(),e.$on("metrics:data:update",function(t,r,n){return e.value=parseFloat(n[e.metric.id]),e.data[0].values.push({x:r,y:e.value}),e.data[0].values.length>e.window&&e.data[0].values.shift(),"chart"===e.metric.view&&e.showChart(),"chart"===e.metric.view&&e.chart.clearHighlights(),e.chart.tooltip.hidden(!0)}),t.find(".metric-title").qtip({content:{text:e.metric.id},position:{my:"bottom left",at:"top left"},style:{classes:"qtip-light qtip-timeline-bar"}})}}}),angular.module("flinkApp").service("MetricsService",["$http","$q","flinkConfig","$interval",function(e,t,r,n){return this.metrics={},this.values={},this.watched={},this.observer={jobid:null,nodeid:null,callback:null},this.refresh=n(function(e){return function(){return angular.forEach(e.metrics,function(t,r){return angular.forEach(t,function(t,n){var i;if(i=[],angular.forEach(t,function(e,t){return i.push(e.id)}),i.length>0)return e.getMetrics(r,n,i).then(function(t){if(r===e.observer.jobid&&n===e.observer.nodeid&&e.observer.callback)return e.observer.callback(t)})})})}}(this),r["refresh-interval"]),this.registerObserver=function(e,t,r){return this.observer.jobid=e,this.observer.nodeid=t,this.observer.callback=r},this.unRegisterObserver=function(){return this.observer={jobid:null,nodeid:null,callback:null}},this.setupMetrics=function(e,t){return this.setupLS(),this.watched[e]=[],angular.forEach(t,function(t){return function(r,n){if(r.id)return t.watched[e].push(r.id)}}(this))},this.getWindow=function(){return 100},this.setupLS=function(){return null==sessionStorage.flinkMetrics&&this.saveSetup(),this.metrics=JSON.parse(sessionStorage.flinkMetrics)},this.saveSetup=function(){return sessionStorage.flinkMetrics=JSON.stringify(this.metrics)},this.saveValue=function(e,t,r){if(null==this.values[e]&&(this.values[e]={}),null==this.values[e][t]&&(this.values[e][t]=[]),this.values[e][t].push(r),this.values[e][t].length>this.getWindow())return this.values[e][t].shift()},this.getValues=function(e,t,r){var n;return null==this.values[e]?[]:null==this.values[e][t]?[]:(n=[],angular.forEach(this.values[e][t],function(e){return function(e,t){if(null!=e.values[r])return n.push({x:e.timestamp,y:e.values[r]})}}(this)),n)},this.setupLSFor=function(e,t){if(null==this.metrics[e]&&(this.metrics[e]={}),null==this.metrics[e][t])return this.metrics[e][t]=[]},this.addMetric=function(e,t,r){return this.setupLSFor(e,t),this.metrics[e][t].push({id:r,size:"small",view:"chart"}),this.saveSetup()},this.removeMetric=function(e){return function(t,r,n){var i;if(null!=e.metrics[t][r])return i=e.metrics[t][r].indexOf(n),i===-1&&(i=_.findIndex(e.metrics[t][r],{id:n})),i!==-1&&e.metrics[t][r].splice(i,1),e.saveSetup()}}(this),this.setMetricSize=function(e){return function(t,r,n,i){var o;if(null!=e.metrics[t][r])return o=e.metrics[t][r].indexOf(n.id),o===-1&&(o=_.findIndex(e.metrics[t][r],{id:n.id})),o!==-1&&(e.metrics[t][r][o]={id:n.id,size:i,view:n.view}),e.saveSetup()}}(this),this.setMetricView=function(e){return function(t,r,n,i){var o;if(null!=e.metrics[t][r])return o=e.metrics[t][r].indexOf(n.id),o===-1&&(o=_.findIndex(e.metrics[t][r],{id:n.id})),o!==-1&&(e.metrics[t][r][o]={id:n.id,size:n.size,view:i}),e.saveSetup()}}(this),this.orderMetrics=function(e,t,r,n){return this.setupLSFor(e,t),angular.forEach(this.metrics[e][t],function(i){return function(o,a){if(o.id===r.id&&(i.metrics[e][t].splice(a,1),a
{{metric.id}}
{{value | humanizeChartNumeric:metric}}
',replace:!0,scope:{metric:"=",window:"=",removeMetric:"&",setMetricSize:"=",setMetricView:"=",getValues:"&"},link:function(e,t,r){return e.btnClasses=["btn","btn-default","btn-xs"],e.value=null,e.data=[{values:e.getValues()}],e.options={x:function(e,t){return e.x},y:function(e,t){return e.y},xTickFormat:function(e){return d3.time.format("%H:%M:%S")(new Date(e))},yTickFormat:function(e){var t,r,n,i;for(r=!1,n=0,i=1,t=Math.abs(e);!r&&n<50;)Math.pow(10,n)<=t&&t6?e/Math.pow(10,n)+"E"+n:""+e}},e.showChart=function(){return d3.select(t.find("svg")[0]).datum(e.data).transition().duration(250).call(e.chart)},e.chart=nv.models.lineChart().options(e.options).showLegend(!1).margin({top:15,left:60,bottom:30,right:30}),e.chart.yAxis.showMaxMin(!1),e.chart.tooltip.hideDelay(0),e.chart.tooltip.contentGenerator(function(e){return"

"+d3.time.format("%H:%M:%S")(new Date(e.point.x))+" | "+e.point.y+"

"}),nv.utils.windowResize(e.chart.update),e.setSize=function(t){return e.setMetricSize(e.metric,t)},e.setView=function(t){if(e.setMetricView(e.metric,t),"chart"===t)return e.showChart()},"chart"===e.metric.view&&e.showChart(),e.$on("metrics:data:update",function(t,r,n){return e.value=parseFloat(n[e.metric.id]),e.data[0].values.push({x:r,y:e.value}),e.data[0].values.length>e.window&&e.data[0].values.shift(),"chart"===e.metric.view&&e.showChart(),"chart"===e.metric.view&&e.chart.clearHighlights(),e.chart.tooltip.hidden(!0)}),t.find(".metric-title").qtip({content:{text:e.metric.id},position:{my:"bottom left",at:"top left"},style:{classes:"qtip-light qtip-timeline-bar"}})}}}),angular.module("flinkApp").service("MetricsService",["$http","$q","flinkConfig","$interval",function(e,t,r,n){return this.metrics={},this.values={},this.watched={},this.observer={jobid:null,nodeid:null,callback:null},this.refresh=n(function(e){return function(){return angular.forEach(e.metrics,function(t,r){return angular.forEach(t,function(t,n){var i;if(i=[],angular.forEach(t,function(e,t){return i.push(e.id)}),i.length>0)return e.getMetrics(r,n,i).then(function(t){if(r===e.observer.jobid&&n===e.observer.nodeid&&e.observer.callback)return e.observer.callback(t)})})})}}(this),r["refresh-interval"]),this.registerObserver=function(e,t,r){return this.observer.jobid=e,this.observer.nodeid=t,this.observer.callback=r},this.unRegisterObserver=function(){return this.observer={jobid:null,nodeid:null,callback:null}},this.setupMetrics=function(e,t){return this.setupLS(),this.watched[e]=[],angular.forEach(t,function(t){return function(r,n){if(r.id)return t.watched[e].push(r.id)}}(this))},this.getWindow=function(){return 100},this.setupLS=function(){return null==sessionStorage.flinkMetrics&&this.saveSetup(),this.metrics=JSON.parse(sessionStorage.flinkMetrics)},this.saveSetup=function(){return sessionStorage.flinkMetrics=JSON.stringify(this.metrics)},this.saveValue=function(e,t,r){if(null==this.values[e]&&(this.values[e]={}),null==this.values[e][t]&&(this.values[e][t]=[]),this.values[e][t].push(r),this.values[e][t].length>this.getWindow())return this.values[e][t].shift()},this.getValues=function(e,t,r){var n;return null==this.values[e]?[]:null==this.values[e][t]?[]:(n=[],angular.forEach(this.values[e][t],function(e){return function(e,t){if(null!=e.values[r])return n.push({x:e.timestamp,y:e.values[r]})}}(this)),n)},this.setupLSFor=function(e,t){if(null==this.metrics[e]&&(this.metrics[e]={}),null==this.metrics[e][t])return this.metrics[e][t]=[]},this.addMetric=function(e,t,r){return this.setupLSFor(e,t),this.metrics[e][t].push({id:r,size:"small",view:"chart"}),this.saveSetup()},this.removeMetric=function(e){return function(t,r,n){var i;if(null!=e.metrics[t][r])return i=e.metrics[t][r].indexOf(n),i===-1&&(i=_.findIndex(e.metrics[t][r],{id:n})),i!==-1&&e.metrics[t][r].splice(i,1),e.saveSetup()}}(this),this.setMetricSize=function(e){return function(t,r,n,i){var o;if(null!=e.metrics[t][r])return o=e.metrics[t][r].indexOf(n.id),o===-1&&(o=_.findIndex(e.metrics[t][r],{id:n.id})),o!==-1&&(e.metrics[t][r][o]={id:n.id,size:i,view:n.view}),e.saveSetup()}}(this),this.setMetricView=function(e){return function(t,r,n,i){var o;if(null!=e.metrics[t][r])return o=e.metrics[t][r].indexOf(n.id),o===-1&&(o=_.findIndex(e.metrics[t][r],{id:n.id})),o!==-1&&(e.metrics[t][r][o]={id:n.id,size:n.size,view:i}),e.saveSetup()}}(this),this.orderMetrics=function(e,t,r,n){return this.setupLSFor(e,t),angular.forEach(this.metrics[e][t],function(i){return function(o,a){if(o.id===r.id&&(i.metrics[e][t].splice(a,1),a, R extends RequestBody, M extends MessageParameters> X getQueryParameter( + final HandlerRequest request, + final Class

queryParameterClass) throws RestHandlerException { + + return getQueryParameter(request, queryParameterClass, null); + } + + public static , R extends RequestBody, M extends MessageParameters> X getQueryParameter( + final HandlerRequest request, + final Class

queryParameterClass, + final X defaultValue) throws RestHandlerException { + + final List values = request.getQueryParameter(queryParameterClass); + final X value; + if (values.size() > 1) { + throw new RestHandlerException( + String.format("Expected only one value %s.", values), + HttpResponseStatus.BAD_REQUEST); + } else if (values.size() == 1) { + value = values.get(0); + } else { + value = defaultValue; + } + return value; + } + +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/util/HandlerRequestUtilsTest.java similarity index 51% rename from flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/util/HandlerRequestUtilsTest.java index 47feebc5bb343..09433f53021ca 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/util/HandlerRequestUtilsTest.java @@ -16,71 +16,53 @@ * limitations under the License. */ -package org.apache.flink.runtime.webmonitor.handlers; +package org.apache.flink.runtime.rest.handler.util; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.MessagePathParameter; +import org.apache.flink.runtime.rest.messages.MessageQueryParameter; import org.apache.flink.util.TestLogger; import org.junit.Test; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; -import java.util.List; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; /** - * Tests for {@link JarRunHandler}. + * Tests for {@link HandlerRequestUtils}. */ -public class JarRunHandlerTest extends TestLogger { - - @Test - public void testTokenizeNonQuoted() { - final List arguments = JarRunHandler.tokenizeArguments("--foo bar"); - assertThat(arguments.get(0), equalTo("--foo")); - assertThat(arguments.get(1), equalTo("bar")); - } - - @Test - public void testTokenizeSingleQuoted() { - final List arguments = JarRunHandler.tokenizeArguments("--foo 'bar baz '"); - assertThat(arguments.get(0), equalTo("--foo")); - assertThat(arguments.get(1), equalTo("bar baz ")); - } - - @Test - public void testTokenizeDoubleQuoted() { - final List arguments = JarRunHandler.tokenizeArguments("--name \"K. Bote \""); - assertThat(arguments.get(0), equalTo("--name")); - assertThat(arguments.get(1), equalTo("K. Bote ")); - } +public class HandlerRequestUtilsTest extends TestLogger { @Test public void testGetQueryParameter() throws Exception { - final Boolean queryParameter = JarRunHandler.getQueryParameter( + final Boolean queryParameter = HandlerRequestUtils.getQueryParameter( new HandlerRequest<>( EmptyRequestBody.getInstance(), - new JarRunMessageParameters(), + new TestMessageParameters(), Collections.emptyMap(), - Collections.singletonMap("allowNonRestoredState", Collections.singletonList("true"))), - AllowNonRestoredStateQueryParameter.class); + Collections.singletonMap("key", Collections.singletonList("true"))), + TestBooleanQueryParameter.class); assertThat(queryParameter, equalTo(true)); } @Test public void testGetQueryParameterRepeated() throws Exception { try { - JarRunHandler.getQueryParameter( + HandlerRequestUtils.getQueryParameter( new HandlerRequest<>( EmptyRequestBody.getInstance(), - new JarRunMessageParameters(), + new TestMessageParameters(), Collections.emptyMap(), - Collections.singletonMap("allowNonRestoredState", Arrays.asList("true", "false"))), - AllowNonRestoredStateQueryParameter.class); + Collections.singletonMap("key", Arrays.asList("true", "false"))), + TestBooleanQueryParameter.class); } catch (final RestHandlerException e) { assertThat(e.getMessage(), containsString("Expected only one value")); } @@ -88,13 +70,46 @@ public void testGetQueryParameterRepeated() throws Exception { @Test public void testGetQueryParameterDefaultValue() throws Exception { - final Boolean allowNonRestoredState = JarRunHandler.getQueryParameter( + final Boolean allowNonRestoredState = HandlerRequestUtils.getQueryParameter( new HandlerRequest<>( EmptyRequestBody.getInstance(), - new JarRunMessageParameters(), + new TestMessageParameters(), Collections.emptyMap(), - Collections.singletonMap("allowNonRestoredState", Collections.emptyList())), - AllowNonRestoredStateQueryParameter.class, true); + Collections.singletonMap("key", Collections.emptyList())), + TestBooleanQueryParameter.class, true); assertThat(allowNonRestoredState, equalTo(true)); } + + private static class TestMessageParameters extends MessageParameters { + + private TestBooleanQueryParameter testBooleanQueryParameter; + + @Override + public Collection> getPathParameters() { + return Collections.emptyList(); + } + + @Override + public Collection> getQueryParameters() { + testBooleanQueryParameter = new TestBooleanQueryParameter(); + return Collections.singletonList(testBooleanQueryParameter); + } + } + + private static class TestBooleanQueryParameter extends MessageQueryParameter { + + private TestBooleanQueryParameter() { + super("key", MessageParameterRequisiteness.OPTIONAL); + } + + @Override + public Boolean convertValueFromString(final String value) { + return Boolean.parseBoolean(value); + } + + @Override + public String convertStringToValue(final Boolean value) { + return value.toString(); + } + } }