From 64eb0562adc58e86277b2e03bf0f3eecc99224e1 Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 20 Sep 2017 14:58:42 +0200 Subject: [PATCH 01/16] [hotfix] [REST] Add utility HandlerRequest constructor --- .../org/apache/flink/runtime/rest/handler/HandlerRequest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java index c0de3db08e375..aacf0a22a1661 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java @@ -42,6 +42,10 @@ public class HandlerRequest private final Map>, MessagePathParameter> pathParameters = new HashMap<>(2); private final Map>, MessageQueryParameter> queryParameters = new HashMap<>(2); + public HandlerRequest(R requestBody, M messageParameters) throws HandlerRequestException { + this(requestBody, messageParameters, Collections.emptyMap(), Collections.emptyMap()); + } + public HandlerRequest(R requestBody, M messageParameters, Map receivedPathParameters, Map> receivedQueryParameters) throws HandlerRequestException { this.requestBody = Preconditions.checkNotNull(requestBody); Preconditions.checkNotNull(messageParameters); From 0f4c40e72a2c101b08d1dc67fa99a8f717ce7c44 Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 20 Sep 2017 14:55:46 +0200 Subject: [PATCH 02/16] [FLINK-7072] [REST] Define protocol for job submit/cancel/stop --- .../rest/messages/BlobServerPortHeaders.java | 69 ++++++++++++++++ .../messages/BlobServerPortResponseBody.java | 57 +++++++++++++ .../JobTerminationMessageParameters.java | 4 +- .../rest/messages/job/JobSubmitHeaders.java | 71 ++++++++++++++++ .../messages/job/JobSubmitRequestBody.java | 80 +++++++++++++++++++ .../messages/job/JobSubmitResponseBody.java | 61 ++++++++++++++ .../messages/BlobServerPortResponseTest.java | 37 +++++++++ .../messages/JobSubmitRequestBodyTest.java | 41 ++++++++++ .../messages/JobSubmitResponseBodyTest.java | 38 +++++++++ 9 files changed, 456 insertions(+), 2 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitResponseBody.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitResponseBodyTest.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java new file mode 100644 index 0000000000000..8edec160c4c5d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * These headers define the protocol for querying the port of the blob server. + */ +public class BlobServerPortHeaders implements MessageHeaders { + + private static final String URL = "/blobserver/port"; + private static final BlobServerPortHeaders INSTANCE = new BlobServerPortHeaders(); + + private BlobServerPortHeaders() { + } + + @Override + public Class getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + @Override + public Class getResponseClass() { + return BlobServerPortResponseBody.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public EmptyMessageParameters getUnresolvedMessageParameters() { + return EmptyMessageParameters.getInstance(); + } + + public static BlobServerPortHeaders getInstance() { + return INSTANCE; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java new file mode 100644 index 0000000000000..846475f4b751f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Response containing the blob server port. + */ +public final class BlobServerPortResponseBody implements ResponseBody { + + static final String FIELD_NAME_PORT = "port"; + + /** + * The port of the blob server. + */ + @JsonProperty(FIELD_NAME_PORT) + public final int port; + + @JsonCreator + public BlobServerPortResponseBody( + @JsonProperty(FIELD_NAME_PORT) int port) { + + this.port = port; + } + + @Override + public int hashCode() { + return 67 * port; + } + + @Override + public boolean equals(Object object) { + if (object instanceof BlobServerPortResponseBody) { + BlobServerPortResponseBody other = (BlobServerPortResponseBody) object; + return this.port == other.port; + } + return false; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java index fd8731630a033..a59dc83ab38ac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java @@ -28,8 +28,8 @@ */ public class JobTerminationMessageParameters extends MessageParameters { - private final JobIDPathParameter jobPathParameter = new JobIDPathParameter(); - private final TerminationModeQueryParameter terminationModeQueryParameter = new TerminationModeQueryParameter(); + public final JobIDPathParameter jobPathParameter = new JobIDPathParameter(); + public final TerminationModeQueryParameter terminationModeQueryParameter = new TerminationModeQueryParameter(); @Override public Collection> getPathParameters() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java new file mode 100644 index 0000000000000..6235214f80fc6 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.MessageHeaders; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * These headers define the protocol for submitting a job to a flink cluster. + */ +public class JobSubmitHeaders implements MessageHeaders { + + private static final String URL = "/jobs"; + private static final JobSubmitHeaders INSTANCE = new JobSubmitHeaders(); + + private JobSubmitHeaders() { + } + + @Override + public Class getRequestClass() { + return JobSubmitRequestBody.class; + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.POST; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + @Override + public Class getResponseClass() { + return JobSubmitResponseBody.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.ACCEPTED; + } + + @Override + public EmptyMessageParameters getUnresolvedMessageParameters() { + return EmptyMessageParameters.getInstance(); + } + + public static JobSubmitHeaders getInstance() { + return INSTANCE; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java new file mode 100644 index 0000000000000..756e60c743ffe --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job; + +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.rest.messages.RequestBody; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.Arrays; + +/** + * Request for submitting a job. + * + *

We currently require the job-jars to be uploaded through the blob-server. + */ +public final class JobSubmitRequestBody implements RequestBody { + + private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = "serializedJobGraph"; + + /** + * The serialized job graph. + */ + @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) + public final byte[] serializedJobGraph; + + public JobSubmitRequestBody(JobGraph jobGraph) throws IOException { + this(serializeJobGraph(jobGraph)); + } + + @JsonCreator + public JobSubmitRequestBody( + @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) byte[] serializedJobGraph) { + + this.serializedJobGraph = serializedJobGraph; + } + + @Override + public int hashCode() { + return 71 * Arrays.hashCode(this.serializedJobGraph); + } + + @Override + public boolean equals(Object object) { + if (object instanceof JobSubmitRequestBody) { + JobSubmitRequestBody other = (JobSubmitRequestBody) object; + return Arrays.equals(this.serializedJobGraph, other.serializedJobGraph); + } + return false; + } + + private static byte[] serializeJobGraph(JobGraph jobGraph) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); + ObjectOutputStream out = new ObjectOutputStream(baos); + + out.writeObject(jobGraph); + + return baos.toByteArray(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitResponseBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitResponseBody.java new file mode 100644 index 0000000000000..fefd435079512 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitResponseBody.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job; + +import org.apache.flink.runtime.rest.messages.ResponseBody; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +/** + * Response to the submission of a job, containing a URL from which the status of the job can be retrieved from. + */ +public final class JobSubmitResponseBody implements ResponseBody { + + public static final String FIELD_NAME_JOB_URL = "jobUrl"; + + /** + * The URL under which the job status can monitored. + */ + @JsonProperty(FIELD_NAME_JOB_URL) + public final String jobUrl; + + @JsonCreator + public JobSubmitResponseBody( + @JsonProperty(FIELD_NAME_JOB_URL) String jobUrl) { + + this.jobUrl = jobUrl; + } + + @Override + public int hashCode() { + return 73 * jobUrl.hashCode(); + } + + @Override + public boolean equals(Object object) { + if (object instanceof JobSubmitResponseBody) { + JobSubmitResponseBody other = (JobSubmitResponseBody) object; + return Objects.equals(this.jobUrl, other.jobUrl); + } + return false; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java new file mode 100644 index 0000000000000..add4e3beed4f8 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.legacy.messages.RestResponseMarshallingTestBase; + +/** + * Tests for {@link BlobServerPortResponseBody}. + */ +public class BlobServerPortResponseTest extends RestResponseMarshallingTestBase { + + @Override + protected Class getTestResponseClass() { + return BlobServerPortResponseBody.class; + } + + @Override + protected BlobServerPortResponseBody getTestResponseInstance() throws Exception { + return new BlobServerPortResponseBody(64); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java new file mode 100644 index 0000000000000..e69913c808673 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.rest.handler.legacy.messages.RestRequestMarshallingTestBase; +import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; + +import java.io.IOException; + +/** + * Tests for the {@link JobSubmitRequestBody}. + */ +public class JobSubmitRequestBodyTest extends RestRequestMarshallingTestBase { + + @Override + protected Class getTestRequestClass() { + return JobSubmitRequestBody.class; + } + + @Override + protected JobSubmitRequestBody getTestRequestInstance() throws IOException { + return new JobSubmitRequestBody(new JobGraph("job")); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitResponseBodyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitResponseBodyTest.java new file mode 100644 index 0000000000000..9dc832aa3666a --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitResponseBodyTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.handler.legacy.messages.RestResponseMarshallingTestBase; +import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; + +/** + * Tests for {@link JobSubmitResponseBody}. + */ +public class JobSubmitResponseBodyTest extends RestResponseMarshallingTestBase { + + @Override + protected Class getTestResponseClass() { + return JobSubmitResponseBody.class; + } + + @Override + protected JobSubmitResponseBody getTestResponseInstance() throws Exception { + return new JobSubmitResponseBody("/url"); + } +} From f8798ad38f76bf01957116d4ebf4fda5c01c0d35 Mon Sep 17 00:00:00 2001 From: zentol Date: Thu, 28 Sep 2017 10:49:39 +0200 Subject: [PATCH 03/16] [FLINK-7072] [REST] Extend Dispatcher --- .../org/apache/flink/runtime/dispatcher/Dispatcher.java | 5 +++++ .../flink/runtime/dispatcher/DispatcherGateway.java | 8 ++++++++ 2 files changed, 13 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 4d89dc8781e58..93ac02a93b907 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -311,6 +311,11 @@ public CompletableFuture requestJob(JobID jobId, Time time return jobManagerRunner.getJobManagerGateway().requestArchivedExecutionGraph(timeout); } } + + @Override + public CompletableFuture getBlobServerPort(Time timeout) { + return CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort()); + } /** * Cleans up the job related data from the dispatcher. If cleanupHA is true, then diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java index fe7b91ec7c5e0..12cbbfbd498f7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java @@ -71,4 +71,12 @@ CompletableFuture> listJobs( * @return A future acknowledge if the stopping succeeded */ CompletableFuture stopJob(JobID jobId, @RpcTimeout Time timeout); + + /** + * Returns the port of the blob server. + * + * @param timeout of the operation + * @return A future integer of the blob server port + */ + CompletableFuture getBlobServerPort(@RpcTimeout Time timeout); } From 51e49b1d2e1e88b7e438e40408b937cbef503395 Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 20 Sep 2017 14:59:42 +0200 Subject: [PATCH 04/16] [FLINK-7072] [REST] Add handlers for job submit/cancel/stop --- .../dispatcher/DispatcherRestEndpoint.java | 8 ++ .../handler/job/BlobServerPortHandler.java | 63 +++++++++++++ .../rest/handler/job/JobSubmitHandler.java | 66 ++++++++++++++ .../job/BlobServerPortHandlerTest.java | 65 ++++++++++++++ .../handler/job/JobSubmitHandlerTest.java | 90 +++++++++++++++++++ 5 files changed, 292 insertions(+) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java index 84710781eb8c0..d64e6493ac78a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java @@ -27,7 +27,9 @@ import org.apache.flink.runtime.rest.handler.LegacyRestHandlerAdapter; import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler; import org.apache.flink.runtime.rest.handler.job.JobConfigHandler; +import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler; import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler; import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler; import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticsHandler; @@ -192,6 +194,12 @@ protected Collection> in handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), checkpointConfigHandler)); handlers.add(Tuple2.of(CheckpointStatisticsHeaders.getInstance(), checkpointStatisticsHandler)); + BlobServerPortHandler blobServerPortHandler = new BlobServerPortHandler(restAddressFuture, leaderRetriever, timeout); + handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler)); + + JobSubmitHandler jobSubmitHandler = new JobSubmitHandler(restAddressFuture, leaderRetriever, timeout); + handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler)); + // This handler MUST be added last, as it otherwise masks all subsequent GET handlers optWebContent.ifPresent( webContent -> handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), webContent))); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java new file mode 100644 index 0000000000000..8314f8ef992a9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders; +import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; + +import java.util.concurrent.CompletableFuture; + +/** + * This handler can be used to retrieve the port that the blob server runs on. + */ +public final class BlobServerPortHandler extends AbstractRestHandler { + + public BlobServerPortHandler(CompletableFuture localRestAddress, GatewayRetriever leaderRetriever, Time timeout) { + super(localRestAddress, leaderRetriever, timeout, BlobServerPortHeaders.getInstance()); + } + + @Override + protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { + return gateway + .getBlobServerPort(Time.seconds(5)) + .handleAsync((Integer port, Throwable error) -> { + if (error != null) { + log.error("Failed to retrieve blob server port.", ExceptionUtils.stripCompletionException(error)); + return FutureUtils.completedExceptionally(new RestHandlerException("Failed to retrieve blob server port.", HttpResponseStatus.INTERNAL_SERVER_ERROR)); + } else { + return CompletableFuture.completedFuture(new BlobServerPortResponseBody(port)); + } + }) + .thenCompose(future -> future); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java new file mode 100644 index 0000000000000..ef8a1c19c2b66 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; +import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; +import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; + +import java.io.ByteArrayInputStream; +import java.io.ObjectInputStream; +import java.util.concurrent.CompletableFuture; + +/** + * This handler can be used to submit jobs to a Flink cluster. + */ +public final class JobSubmitHandler extends AbstractRestHandler { + + public JobSubmitHandler(CompletableFuture localRestAddress, GatewayRetriever leaderRetriever, Time timeout) { + super(localRestAddress, leaderRetriever, timeout, JobSubmitHeaders.getInstance()); + } + + @Override + protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { + JobGraph jobGraph; + try { + ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); + jobGraph = (JobGraph) objectIn.readObject(); + } catch (Exception e) { + log.error("Failed to deserialize JobGraph.", e); + return FutureUtils.completedExceptionally(new RestHandlerException("Failed to deserialize JobGraph.", HttpResponseStatus.BAD_REQUEST)); + } + + gateway.submitJob(jobGraph, Time.seconds(5)); + + return CompletableFuture.completedFuture(new JobSubmitResponseBody("Submitted job with ID " + jobGraph.getJobID() + ".")); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java new file mode 100644 index 0000000000000..85ad1dc473eb8 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.CompletableFuture; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link BlobServerPortHandler}. + */ +public class BlobServerPortHandlerTest { + private static final int PORT = 64; + + @Test + public void testPortRetrieval() throws Exception { + DispatcherGateway mockGateway = mock(DispatcherGateway.class); + when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(PORT)); + GatewayRetriever mockGatewayRetriever = mock(GatewayRetriever.class); + + BlobServerPortHandler handler = new BlobServerPortHandler( + CompletableFuture.completedFuture("http://localhost:1234"), + mockGatewayRetriever, + RpcUtils.INF_TIMEOUT); + + BlobServerPortResponseBody portResponse = handler.handleRequest(new HandlerRequest<>(EmptyRequestBody.getInstance(), EmptyMessageParameters.getInstance()), mockGateway).get(); + + Assert.assertEquals(PORT, portResponse.port); + } + + @Test + public void testPortRetrievalFailureHandling() throws Exception { + + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java new file mode 100644 index 0000000000000..981ac9c29bda2 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link JobSubmitHandler}. + */ +public class JobSubmitHandlerTest { + + @Test + public void testSerializationFailureHandling() throws Exception { + DispatcherGateway mockGateway = mock(DispatcherGateway.class); + when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + GatewayRetriever mockGatewayRetriever = mock(GatewayRetriever.class); + + JobSubmitHandler handler = new JobSubmitHandler( + CompletableFuture.completedFuture("http://localhost:1234"), + mockGatewayRetriever, + RpcUtils.INF_TIMEOUT); + + JobSubmitRequestBody request = new JobSubmitRequestBody(new byte[0]); + + handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway); + + try { + handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway).get(); + Assert.fail(); + } catch (ExecutionException ee) { + RestHandlerException rhe = (RestHandlerException) ee.getCause(); + + Assert.assertEquals(HttpResponseStatus.BAD_REQUEST, rhe.getHttpResponseStatus()); + } + } + + @Test + public void testSuccessfulJobSubmission() throws Exception { + DispatcherGateway mockGateway = mock(DispatcherGateway.class); + when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + GatewayRetriever mockGatewayRetriever = mock(GatewayRetriever.class); + + JobSubmitHandler handler = new JobSubmitHandler( + CompletableFuture.completedFuture("http://localhost:1234"), + mockGatewayRetriever, + RpcUtils.INF_TIMEOUT); + + JobGraph job = new JobGraph("testjob"); + JobSubmitRequestBody request = new JobSubmitRequestBody(job); + + handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway).get(); + } +} From 1ffc5b4c2a631a035ab30767b87139ee04b4467d Mon Sep 17 00:00:00 2001 From: zentol Date: Thu, 28 Sep 2017 11:17:54 +0200 Subject: [PATCH 05/16] [FLINK-7072] [REST] CLI integration --- .../org/apache/flink/client/CliFrontend.java | 33 ++- .../flink/client/cli/Flip6DefaultCLI.java | 98 +++++++++ .../Flip6StandaloneClusterDescriptor.java | 62 ++++++ .../flink/client/program/ClusterClient.java | 2 +- .../program/rest/RestClusterClient.java | 206 ++++++++++++++++++ .../rest/RestClusterClientConfiguration.java | 77 +++++++ .../client/CliFrontendListCancelTest.java | 11 + .../flink/client/CliFrontendRunTest.java | 9 + .../flink/client/CliFrontendStopTest.java | 11 + .../program/rest/RestClusterClientTest.java | 191 ++++++++++++++++ 10 files changed, 698 insertions(+), 2 deletions(-) create mode 100644 flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java create mode 100644 flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java create mode 100644 flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java create mode 100644 flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java create mode 100644 flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index 9d1f52eadeb0b..4b719d969ebdb 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -30,6 +30,7 @@ import org.apache.flink.client.cli.CommandLineOptions; import org.apache.flink.client.cli.CustomCommandLine; import org.apache.flink.client.cli.DefaultCLI; +import org.apache.flink.client.cli.Flip6DefaultCLI; import org.apache.flink.client.cli.InfoOptions; import org.apache.flink.client.cli.ListOptions; import org.apache.flink.client.cli.ProgramOptions; @@ -146,6 +147,8 @@ public class CliFrontend { } catch (Exception e) { LOG.warn("Could not load CLI class {}.", flinkYarnCLI, e); } + + customCommandLines.add(new Flip6DefaultCLI()); customCommandLines.add(new DefaultCLI()); } @@ -554,6 +557,18 @@ protected int stop(String[] args) { return handleArgException(new CliArgsException("Missing JobID")); } + // FLIP-6 specific branch + try { + CustomCommandLine activeCommandLine = getActiveCustomCommandLine(options.getCommandLine()); + if (activeCommandLine instanceof Flip6DefaultCLI) { + ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory); + client.stop(jobId); + return 0; + } + } catch (Throwable t) { + return handleError(t); + } + try { ActorGateway jobManager = getJobManagerGateway(options); Future response = jobManager.ask(new StopJob(jobId), clientTimeout); @@ -635,6 +650,18 @@ protected int cancel(String[] args) { return 1; } + // FLIP-6 specific branch + try { + CustomCommandLine activeCommandLine = getActiveCustomCommandLine(options.getCommandLine()); + if (activeCommandLine instanceof Flip6DefaultCLI) { + ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory); + client.cancel(jobId); + return 0; + } + } catch (Throwable t) { + return handleError(t); + } + try { ActorGateway jobManager = getJobManagerGateway(options); @@ -978,7 +1005,11 @@ protected ClusterClient createClient( // Avoid resolving the JobManager Gateway here to prevent blocking until we invoke the user's program. final InetSocketAddress jobManagerAddress = client.getJobManagerAddress(); logAndSysout("Using address " + jobManagerAddress.getHostString() + ":" + jobManagerAddress.getPort() + " to connect to JobManager."); - logAndSysout("JobManager web interface address " + client.getWebInterfaceURL()); + try { + logAndSysout("JobManager web interface address " + client.getWebInterfaceURL()); + } catch (UnsupportedOperationException uoe) { + logAndSysout("JobManager web interface not active."); + } return client; } diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java new file mode 100644 index 0000000000000..5fb9dfce7ff08 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.cli; + +import org.apache.flink.client.ClientUtils; +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.client.deployment.Flip6StandaloneClusterDescriptor; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import java.net.InetSocketAddress; +import java.net.URL; +import java.util.List; + +import static org.apache.flink.client.CliFrontend.setJobManagerAddressInConfig; + +/** + * The default CLI which is used for interaction with standalone clusters. + */ +public class Flip6DefaultCLI implements CustomCommandLine { + + public static final Option FLIP_6 = new Option("flip6", "Switches the client to Flip-6 mode."); + + static { + FLIP_6.setRequired(false); + } + + @Override + public boolean isActive(CommandLine commandLine, Configuration configuration) { + return commandLine.hasOption(FLIP_6.getOpt()); + } + + @Override + public String getId() { + return "flip6"; + } + + @Override + public void addRunOptions(Options baseOptions) { + } + + @Override + public void addGeneralOptions(Options baseOptions) { + baseOptions.addOption(FLIP_6); + } + + @Override + public RestClusterClient retrieveCluster(CommandLine commandLine, Configuration config, String configurationDirectory) { + if (commandLine.hasOption(CliFrontendParser.ADDRESS_OPTION.getOpt())) { + String addressWithPort = commandLine.getOptionValue(CliFrontendParser.ADDRESS_OPTION.getOpt()); + InetSocketAddress jobManagerAddress = ClientUtils.parseHostPortAddress(addressWithPort); + setJobManagerAddressInConfig(config, jobManagerAddress); + } + + if (commandLine.hasOption(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt())) { + String zkNamespace = commandLine.getOptionValue(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt()); + config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace); + } + + Flip6StandaloneClusterDescriptor descriptor = new Flip6StandaloneClusterDescriptor(config); + return descriptor.retrieve(null); + } + + @Override + public RestClusterClient createCluster( + String applicationName, + CommandLine commandLine, + Configuration config, + String configurationDirectory, + List userJarFiles) throws UnsupportedOperationException { + + Flip6StandaloneClusterDescriptor descriptor = new Flip6StandaloneClusterDescriptor(config); + ClusterSpecification clusterSpecification = ClusterSpecification.fromConfiguration(config); + + return descriptor.deploySessionCluster(clusterSpecification); + } +} diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java new file mode 100644 index 0000000000000..0ba776c858049 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment; + +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.jobgraph.JobGraph; + +/** + * A deployment descriptor for an existing cluster. + */ +public class Flip6StandaloneClusterDescriptor implements ClusterDescriptor { + + private final Configuration config; + + public Flip6StandaloneClusterDescriptor(Configuration config) { + this.config = config; + } + + @Override + public String getClusterDescription() { + String host = config.getString(JobManagerOptions.ADDRESS, ""); + int port = config.getInteger(JobManagerOptions.PORT, -1); + return "FLIP-6 Standalone cluster at " + host + ":" + port; + } + + @Override + public RestClusterClient retrieve(String applicationID) { + try { + return new RestClusterClient(config); + } catch (Exception e) { + throw new RuntimeException("Couldn't retrieve standalone cluster", e); + } + } + + @Override + public RestClusterClient deploySessionCluster(ClusterSpecification clusterSpecification) throws UnsupportedOperationException { + throw new UnsupportedOperationException("Can't deploy a standalone cluster."); + } + + @Override + public RestClusterClient deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph) { + throw new UnsupportedOperationException("Can't deploy a standalone per-job cluster."); + } +} diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index c8a236e52dea6..5e58f5d78fcad 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -82,7 +82,7 @@ */ public abstract class ClusterClient { - private final Logger log = LoggerFactory.getLogger(getClass()); + protected final Logger log = LoggerFactory.getLogger(getClass()); /** The optimizer used in the optimization of batch programs. */ final Optimizer compiler; diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java new file mode 100644 index 0000000000000..f7cf0edbb85c2 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.rest; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobClient; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.client.JobSubmissionException; +import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders; +import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; +import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters; +import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter; +import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; +import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; +import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; + +import java.net.InetSocketAddress; +import java.net.URL; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; + +/** + * A {@link ClusterClient} implementation that communicates via HTTP REST requests. + */ +public class RestClusterClient extends ClusterClient { + + private RestClusterClientConfiguration configuration; + private final RestClient restEndpoint; + + public RestClusterClient(Configuration config) throws Exception { + this(config, RestClusterClientConfiguration.fromConfiguration(config)); + } + + public RestClusterClient(Configuration config, RestClusterClientConfiguration configuration) throws Exception { + super(config); + this.configuration = configuration; + this.restEndpoint = new RestClient(configuration.getRestEndpointConfiguration(), Executors.newFixedThreadPool(4)); + } + + @Override + public void shutdown() { + this.restEndpoint.shutdown(Time.seconds(5)); + } + + @Override + protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { + log.info("Submitting job."); + try { + // temporary hack for FLIP-6 + jobGraph.setAllowQueuedScheduling(true); + submitJob(jobGraph); + } catch (JobSubmissionException e) { + throw new RuntimeException(e); + } + // don't return just a JobSubmissionResult here, the signature is lying + // The CliFrontend expects this to be a JobExecutionResult + + // TOOD: do not exit this method until job is finished + return new JobExecutionResult(jobGraph.getJobID(), 1, Collections.emptyMap()); + } + + private void submitJob(JobGraph jobGraph) throws JobSubmissionException { + log.info("Requesting blob server port."); + int blobServerPort; + try { + CompletableFuture portFuture = restEndpoint.sendRequest( + configuration.getRestServerAddress(), + configuration.getRestServerPort(), + BlobServerPortHeaders.getInstance()); + blobServerPort = portFuture.get().port; + } catch (Exception e) { + throw new JobSubmissionException(jobGraph.getJobID(), "Failed to retrieve blob server port.", e); + } + + log.info("Uploading jar files."); + try { + InetSocketAddress address = new InetSocketAddress(configuration.getBlobServerAddress(), blobServerPort); + List keys = BlobClient.uploadJarFiles(address, new Configuration(), jobGraph.getJobID(), jobGraph.getUserJars()); + for (BlobKey key : keys) { + jobGraph.addBlob(key); + } + } catch (Exception e) { + throw new JobSubmissionException(jobGraph.getJobID(), "Failed to upload user jars to blob server.", e); + } + + log.info("Submitting job graph."); + try { + CompletableFuture responseFuture = restEndpoint.sendRequest( + configuration.getRestServerAddress(), + configuration.getRestServerPort(), + JobSubmitHeaders.getInstance(), + new JobSubmitRequestBody(jobGraph)); + JobSubmitResponseBody response = responseFuture.get(); + System.out.println(response.jobUrl); + } catch (Exception e) { + throw new JobSubmissionException(jobGraph.getJobID(), "Failed to submit JobGraph.", e); + } + } + + @Override + public void stop(JobID jobID) throws Exception { + JobTerminationMessageParameters param = new JobTerminationMessageParameters(); + param.jobPathParameter.resolve(jobID); + param.terminationModeQueryParameter.resolve(Collections.singletonList(TerminationModeQueryParameter.TerminationMode.STOP)); + CompletableFuture responseFuture = restEndpoint.sendRequest( + configuration.getRestServerAddress(), + configuration.getRestServerPort(), + JobTerminationHeaders.getInstance(), + param + ); + responseFuture.get(); + System.out.println("Job stopping initiated."); + } + + @Override + public void cancel(JobID jobID) throws Exception { + JobTerminationMessageParameters param = new JobTerminationMessageParameters(); + param.jobPathParameter.resolve(jobID); + param.terminationModeQueryParameter.resolve(Collections.singletonList(TerminationModeQueryParameter.TerminationMode.CANCEL)); + CompletableFuture responseFuture = restEndpoint.sendRequest( + configuration.getRestServerAddress(), + configuration.getRestServerPort(), + JobTerminationHeaders.getInstance(), + param + ); + responseFuture.get(); + System.out.println("Job canceling initiated."); + } + + // ====================================== + // Legacy stuff we actually implement + // ====================================== + + @Override + public String getClusterIdentifier() { + return "Flip-6 Standalone cluster with dispatcher at " + configuration.getRestServerAddress(); + } + + @Override + public boolean hasUserJarsInClassPath(List userJarFiles) { + return false; + } + + // ====================================== + // Legacy stuff we ignore + // ====================================== + + @Override + public void waitForClusterToBeReady() { + throw new UnsupportedOperationException(); + } + + @Override + public String getWebInterfaceURL() { + throw new UnsupportedOperationException(); + } + + @Override + public GetClusterStatusResponse getClusterStatus() { + throw new UnsupportedOperationException(); + } + + @Override + protected List getNewMessages() { + throw new UnsupportedOperationException(); + } + + @Override + protected void finalizeCluster() { + throw new UnsupportedOperationException(); + } + + @Override + public int getMaxSlots() { + return 0; + } +} diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java new file mode 100644 index 0000000000000..a6850d54f69f4 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.rest; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.rest.RestClientConfiguration; +import org.apache.flink.util.ConfigurationException; + +/** + * A configuration object for {@link RestClusterClient}s. + */ +public final class RestClusterClientConfiguration { + + private final String blobServerAddress; + + private final RestClientConfiguration restEndpointConfiguration; + + private final String restServerAddress; + + private final int restServerPort; + + private RestClusterClientConfiguration( + String blobServerAddress, + RestClientConfiguration endpointConfiguration, + String restServerAddress, + int restServerPort) { + this.blobServerAddress = blobServerAddress; + this.restEndpointConfiguration = endpointConfiguration; + this.restServerAddress = restServerAddress; + this.restServerPort = restServerPort; + } + + public String getBlobServerAddress() { + return blobServerAddress; + } + + public String getRestServerAddress() { + return restServerAddress; + } + + public int getRestServerPort() { + return restServerPort; + } + + public RestClientConfiguration getRestEndpointConfiguration() { + return restEndpointConfiguration; + } + + public static RestClusterClientConfiguration fromConfiguration(Configuration config) throws ConfigurationException { + String address = config.getString(JobManagerOptions.ADDRESS); + + String serverAddress = config.getString(RestOptions.REST_ADDRESS); + int serverPort = config.getInteger(RestOptions.REST_PORT); + + RestClientConfiguration endpointConfiguration = RestClientConfiguration.fromConfiguration(config); + + return new RestClusterClientConfiguration(address, endpointConfiguration, serverAddress, serverPort); + } +} diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java index 725d95ae55ddf..1bb9960bb519f 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java @@ -19,7 +19,10 @@ package org.apache.flink.client; import org.apache.flink.api.common.JobID; +import org.apache.flink.client.cli.CancelOptions; +import org.apache.flink.client.cli.CliFrontendParser; import org.apache.flink.client.cli.CommandLineOptions; +import org.apache.flink.client.cli.Flip6DefaultCLI; import org.apache.flink.runtime.akka.FlinkUntypedActor; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.AkkaActorGateway; @@ -130,6 +133,14 @@ public void testCancel() { assertTrue(testFrontend.cancel(parameters) != 0); } + + // test flip6 switch + { + String[] parameters = + {"-flip6", String.valueOf(new JobID())}; + CancelOptions options = CliFrontendParser.parseCancelCommand(parameters); + assertTrue(options.getCommandLine().hasOption(Flip6DefaultCLI.FLIP_6.getOpt())); + } } catch (Exception e) { e.printStackTrace(); diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java index e453d37a70733..0edc44483e4ec 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java @@ -19,6 +19,7 @@ package org.apache.flink.client; import org.apache.flink.client.cli.CliFrontendParser; +import org.apache.flink.client.cli.Flip6DefaultCLI; import org.apache.flink.client.cli.RunOptions; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.PackagedProgram; @@ -128,6 +129,14 @@ public void testRun() { assertEquals("--arg2", options.getProgramArgs()[3]); assertEquals("value2", options.getProgramArgs()[4]); } + + // test flip6 switch + { + String[] parameters = + {"-flip6", getTestJarPath()}; + RunOptions options = CliFrontendParser.parseRunCommand(parameters); + assertTrue(options.getCommandLine().hasOption(Flip6DefaultCLI.FLIP_6.getOpt())); + } } catch (Exception e) { e.printStackTrace(); diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java index fef4880858af9..cf80e7a529d39 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java @@ -19,7 +19,10 @@ package org.apache.flink.client; import org.apache.flink.api.common.JobID; +import org.apache.flink.client.cli.CliFrontendParser; import org.apache.flink.client.cli.CommandLineOptions; +import org.apache.flink.client.cli.Flip6DefaultCLI; +import org.apache.flink.client.cli.StopOptions; import org.apache.flink.runtime.akka.FlinkUntypedActor; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.AkkaActorGateway; @@ -109,6 +112,14 @@ public void testStop() throws Exception { assertTrue(testFrontend.stop(parameters) != 0); } + + // test flip6 switch + { + String[] parameters = + {"-flip6", String.valueOf(new JobID())}; + StopOptions options = CliFrontendParser.parseStopCommand(parameters); + assertTrue(options.getCommandLine().hasOption(Flip6DefaultCLI.FLIP_6.getOpt())); + } } private static final class StopTestCliFrontend extends CliFrontend { diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java new file mode 100644 index 0000000000000..1da1a139b730d --- /dev/null +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.rest; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.rest.RestServerEndpoint; +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders; +import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; +import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters; +import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter; +import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; +import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; +import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; + +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link RestClusterClient}. + */ +public class RestClusterClientTest { + + private static final String restAddress = "http://localhost:1234"; + private static final Dispatcher mockRestfulGateway = mock(Dispatcher.class); + private static final GatewayRetriever mockGatewayRetriever = mock(GatewayRetriever.class); + + static { + when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress)); + when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway)); + } + + @Test + public void testABC() throws Exception { + + Configuration config = new Configuration(); + config.setString(JobManagerOptions.ADDRESS, "localhost"); + + RestServerEndpointConfiguration rsec = RestServerEndpointConfiguration.fromConfiguration(config); + + TestBlobServerPortHandler portHandler = new TestBlobServerPortHandler(); + TestJobSubmitHandler submitHandler = new TestJobSubmitHandler(); + TestJobTerminationHandler terminationHandler = new TestJobTerminationHandler(); + + RestServerEndpoint rse = new RestServerEndpoint(rsec) { + @Override + protected Collection> initializeHandlers(CompletableFuture restAddressFuture) { + + Collection> handlers = new ArrayList<>(); + handlers.add(Tuple2.of(portHandler.getMessageHeaders(), portHandler)); + handlers.add(Tuple2.of(submitHandler.getMessageHeaders(), submitHandler)); + handlers.add(Tuple2.of(terminationHandler.getMessageHeaders(), terminationHandler)); + return handlers; + } + }; + + RestClusterClient rcc = new RestClusterClient(config); + try { + rse.start(); + + JobGraph job = new JobGraph("testjob"); + JobID id = job.getJobID(); + + Assert.assertFalse(portHandler.portRetrieved); + Assert.assertFalse(submitHandler.jobSubmitted); + rcc.submitJob(job, ClassLoader.getSystemClassLoader()); + Assert.assertTrue(portHandler.portRetrieved); + Assert.assertTrue(submitHandler.jobSubmitted); + + Assert.assertFalse(terminationHandler.jobCanceled); + rcc.cancel(id); + Assert.assertTrue(terminationHandler.jobCanceled); + + Assert.assertFalse(terminationHandler.jobStopped); + rcc.stop(id); + Assert.assertTrue(terminationHandler.jobStopped); + + } finally { + rcc.shutdown(); + rse.shutdown(Time.seconds(5)); + } + } + + private static class TestBlobServerPortHandler extends AbstractRestHandler { + private boolean portRetrieved = false; + + private TestBlobServerPortHandler() { + super( + CompletableFuture.completedFuture(restAddress), + mockGatewayRetriever, + RpcUtils.INF_TIMEOUT, + BlobServerPortHeaders.getInstance()); + } + + @Override + protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { + portRetrieved = true; + return CompletableFuture.completedFuture(new BlobServerPortResponseBody(12000)); + } + } + + private static class TestJobSubmitHandler extends AbstractRestHandler { + private boolean jobSubmitted = false; + + private TestJobSubmitHandler() { + super( + CompletableFuture.completedFuture(restAddress), + mockGatewayRetriever, + RpcUtils.INF_TIMEOUT, + JobSubmitHeaders.getInstance()); + } + + @Override + protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { + jobSubmitted = true; + return CompletableFuture.completedFuture(new JobSubmitResponseBody("/url")); + } + } + + private static class TestJobTerminationHandler extends AbstractRestHandler { + private boolean jobCanceled = false; + private boolean jobStopped = false; + + private TestJobTerminationHandler() { + super( + CompletableFuture.completedFuture(restAddress), + mockGatewayRetriever, + RpcUtils.INF_TIMEOUT, + JobTerminationHeaders.getInstance()); + } + + @Override + protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { + switch (request.getQueryParameter(TerminationModeQueryParameter.class).get(0)) { + case CANCEL: + jobCanceled = true; + break; + case STOP: + jobStopped = true; + break; + } + return CompletableFuture.completedFuture(EmptyResponseBody.getInstance()); + } + } +} From 4fa822bea5e48c91c5d233be4c8c2a7d96721d5c Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 4 Oct 2017 11:45:42 +0200 Subject: [PATCH 06/16] Address PR comments --- .../org/apache/flink/client/CliFrontend.java | 68 +++------------- .../Flip6StandaloneClusterDescriptor.java | 10 ++- .../flink/client/program/ClusterClient.java | 73 ++++++++++------- .../program/rest/RestClusterClient.java | 81 +++++++++++-------- .../rest/RestClusterClientConfiguration.java | 13 +-- .../program/rest/RestClusterClientTest.java | 7 +- .../handler/job/BlobServerPortHandler.java | 19 +++-- .../rest/handler/job/JobSubmitHandler.java | 12 +-- .../messages/job/JobSubmitRequestBody.java | 12 +-- .../job/BlobServerPortHandlerTest.java | 38 ++++++++- .../handler/job/JobSubmitHandlerTest.java | 6 +- 11 files changed, 181 insertions(+), 158 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index 4b719d969ebdb..9fa667bc024dc 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -557,29 +557,14 @@ protected int stop(String[] args) { return handleArgException(new CliArgsException("Missing JobID")); } - // FLIP-6 specific branch try { CustomCommandLine activeCommandLine = getActiveCustomCommandLine(options.getCommandLine()); - if (activeCommandLine instanceof Flip6DefaultCLI) { - ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory); - client.stop(jobId); - return 0; - } - } catch (Throwable t) { - return handleError(t); - } - - try { - ActorGateway jobManager = getJobManagerGateway(options); - Future response = jobManager.ask(new StopJob(jobId), clientTimeout); - - final Object rc = Await.result(response, clientTimeout); - - if (rc instanceof StoppingFailure) { - throw new Exception("Stopping the job with ID " + jobId + " failed.", - ((StoppingFailure) rc).cause()); - } + ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory); + logAndSysout("Stopping job " + jobId + '.'); + client.stop(jobId); + logAndSysout("Stopped job " + jobId + '.'); + return 0; } catch (Throwable t) { @@ -650,50 +635,21 @@ protected int cancel(String[] args) { return 1; } - // FLIP-6 specific branch try { CustomCommandLine activeCommandLine = getActiveCustomCommandLine(options.getCommandLine()); - if (activeCommandLine instanceof Flip6DefaultCLI) { - ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory); - client.cancel(jobId); - return 0; - } - } catch (Throwable t) { - return handleError(t); - } - - try { - ActorGateway jobManager = getJobManagerGateway(options); - - Object cancelMsg; + ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory); if (withSavepoint) { if (targetDirectory == null) { logAndSysout("Cancelling job " + jobId + " with savepoint to default savepoint directory."); } else { - logAndSysout("Cancelling job " + jobId + " with savepoint to " + targetDirectory + "."); + logAndSysout("Cancelling job " + jobId + " with savepoint to " + targetDirectory + '.'); } - cancelMsg = new CancelJobWithSavepoint(jobId, targetDirectory); + String savepointPath = client.cancelWithSavepoint(jobId, targetDirectory); + logAndSysout("Cancelled job " + jobId + ". Savepoint stored in " + savepointPath + '.'); } else { - logAndSysout("Cancelling job " + jobId + "."); - cancelMsg = new CancelJob(jobId); - } - - Future response = jobManager.ask(cancelMsg, clientTimeout); - final Object rc = Await.result(response, clientTimeout); - - if (rc instanceof CancellationSuccess) { - if (withSavepoint) { - CancellationSuccess success = (CancellationSuccess) rc; - String savepointPath = success.savepointPath(); - logAndSysout("Cancelled job " + jobId + ". Savepoint stored in " + savepointPath + "."); - } else { - logAndSysout("Cancelled job " + jobId + "."); - } - } else if (rc instanceof CancellationFailure) { - throw new Exception("Canceling the job with ID " + jobId + " failed.", - ((CancellationFailure) rc).cause()); - } else { - throw new IllegalStateException("Unexpected response: " + rc); + logAndSysout("Cancelling job " + jobId + '.'); + client.cancel(jobId); + logAndSysout("Cancelled job " + jobId + '.'); } return 0; diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java index 0ba776c858049..1c905876173bf 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java @@ -22,6 +22,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; /** * A deployment descriptor for an existing cluster. @@ -31,7 +33,7 @@ public class Flip6StandaloneClusterDescriptor implements ClusterDescriptor response; - try { - response = jobManagerGateway.ask(new JobManagerMessages.CancelJob(jobId), timeout); - } catch (final Exception e) { - throw new ProgramInvocationException("Failed to query the job manager gateway.", e); + Object cancelMsg = new JobManagerMessages.CancelJob(jobId); + + Future response = jobManager.ask(cancelMsg, timeout); + final Object rc = Await.result(response, timeout); + + if (rc instanceof JobManagerMessages.CancellationSuccess) { + // no further action required + } else if (rc instanceof JobManagerMessages.CancellationFailure) { + throw new Exception("Canceling the job with ID " + jobId + " failed.", + ((JobManagerMessages.CancellationFailure) rc).cause()); + } else { + throw new IllegalStateException("Unexpected response: " + rc); } + } + + /** + * Cancels a job identified by the job id and triggers a savepoint. + * @param jobId the job id + * @param savepointDirectory directory the savepoint should be written to + * @return path where the savepoint is located + * @throws Exception In case an error cocurred. + */ + public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirectory) throws Exception { + final ActorGateway jobManager = getJobManagerGateway(); + + Object cancelMsg = new JobManagerMessages.CancelJobWithSavepoint(jobId, savepointDirectory); - final Object result = Await.result(response, timeout); + Future response = jobManager.ask(cancelMsg, timeout); + final Object rc = Await.result(response, timeout); - if (result instanceof JobManagerMessages.CancellationSuccess) { - logAndSysout("Job cancellation with ID " + jobId + " succeeded."); - } else if (result instanceof JobManagerMessages.CancellationFailure) { - final Throwable t = ((JobManagerMessages.CancellationFailure) result).cause(); - logAndSysout("Job cancellation with ID " + jobId + " failed because of " + t.getMessage()); - throw new Exception("Failed to cancel the job with id " + jobId, t); + if (rc instanceof JobManagerMessages.CancellationSuccess) { + JobManagerMessages.CancellationSuccess success = (JobManagerMessages.CancellationSuccess) rc; + return success.savepointPath(); + } else if (rc instanceof JobManagerMessages.CancellationFailure) { + throw new Exception("Cancel & savepoint for the job with ID " + jobId + " failed.", + ((JobManagerMessages.CancellationFailure) rc).cause()); } else { - throw new Exception("Unknown message received while cancelling: " + result.getClass().getName()); + throw new IllegalStateException("Unexpected response: " + rc); } } @@ -610,25 +633,15 @@ public void cancel(JobID jobId) throws Exception { * failed. That might be due to an I/O problem, ie, the job-manager is unreachable. */ public void stop(final JobID jobId) throws Exception { - final ActorGateway jobManagerGateway = getJobManagerGateway(); + final ActorGateway jobManager = getJobManagerGateway(); - final Future response; - try { - response = jobManagerGateway.ask(new JobManagerMessages.StopJob(jobId), timeout); - } catch (final Exception e) { - throw new ProgramInvocationException("Failed to query the job manager gateway.", e); - } + Future response = jobManager.ask(new JobManagerMessages.StopJob(jobId), timeout); - final Object result = Await.result(response, timeout); + final Object rc = Await.result(response, timeout); - if (result instanceof JobManagerMessages.StoppingSuccess) { - log.info("Job stopping with ID " + jobId + " succeeded."); - } else if (result instanceof JobManagerMessages.StoppingFailure) { - final Throwable t = ((JobManagerMessages.StoppingFailure) result).cause(); - log.info("Job stopping with ID " + jobId + " failed.", t); - throw new Exception("Failed to stop the job because of \n" + t.getMessage()); - } else { - throw new Exception("Unknown message received while stopping: " + result.getClass().getName()); + if (rc instanceof JobManagerMessages.StoppingFailure) { + throw new Exception("Stopping the job with ID " + jobId + " failed.", + ((JobManagerMessages.StoppingFailure) rc).cause()); } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index f7cf0edbb85c2..e253ac6a45489 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -41,11 +41,14 @@ import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; +import javax.annotation.Nullable; + import java.net.InetSocketAddress; import java.net.URL; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** @@ -53,8 +56,9 @@ */ public class RestClusterClient extends ClusterClient { - private RestClusterClientConfiguration configuration; - private final RestClient restEndpoint; + private final RestClusterClientConfiguration restClusterClientConfiguration; + private final RestClient restClient; + private final ExecutorService executorService = Executors.newFixedThreadPool(4); public RestClusterClient(Configuration config) throws Exception { this(config, RestClusterClientConfiguration.fromConfiguration(config)); @@ -62,24 +66,31 @@ public RestClusterClient(Configuration config) throws Exception { public RestClusterClient(Configuration config, RestClusterClientConfiguration configuration) throws Exception { super(config); - this.configuration = configuration; - this.restEndpoint = new RestClient(configuration.getRestEndpointConfiguration(), Executors.newFixedThreadPool(4)); + this.restClusterClientConfiguration = configuration; + this.restClient = new RestClient(configuration.getRestEndpointConfiguration(), executorService); } @Override public void shutdown() { - this.restEndpoint.shutdown(Time.seconds(5)); + try { + // we only call this for legacy reasons to shutdown components that are started in the ClusterClient constructor + super.shutdown(); + } catch (Exception e) { + log.error("An error occurred during the client shutdown.", e); + } + this.restClient.shutdown(Time.seconds(5)); + this.executorService.shutdown(); } @Override protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { log.info("Submitting job."); try { - // temporary hack for FLIP-6 + // temporary hack for FLIP-6 since slot-sharing isn't implemented yet jobGraph.setAllowQueuedScheduling(true); submitJob(jobGraph); } catch (JobSubmissionException e) { - throw new RuntimeException(e); + throw new ProgramInvocationException(e); } // don't return just a JobSubmissionResult here, the signature is lying // The CliFrontend expects this to be a JobExecutionResult @@ -92,9 +103,9 @@ private void submitJob(JobGraph jobGraph) throws JobSubmissionException { log.info("Requesting blob server port."); int blobServerPort; try { - CompletableFuture portFuture = restEndpoint.sendRequest( - configuration.getRestServerAddress(), - configuration.getRestServerPort(), + CompletableFuture portFuture = restClient.sendRequest( + restClusterClientConfiguration.getRestServerAddress(), + restClusterClientConfiguration.getRestServerPort(), BlobServerPortHeaders.getInstance()); blobServerPort = portFuture.get().port; } catch (Exception e) { @@ -103,8 +114,8 @@ private void submitJob(JobGraph jobGraph) throws JobSubmissionException { log.info("Uploading jar files."); try { - InetSocketAddress address = new InetSocketAddress(configuration.getBlobServerAddress(), blobServerPort); - List keys = BlobClient.uploadJarFiles(address, new Configuration(), jobGraph.getJobID(), jobGraph.getUserJars()); + InetSocketAddress address = new InetSocketAddress(restClusterClientConfiguration.getBlobServerAddress(), blobServerPort); + List keys = BlobClient.uploadJarFiles(address, this.flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars()); for (BlobKey key : keys) { jobGraph.addBlob(key); } @@ -114,13 +125,12 @@ private void submitJob(JobGraph jobGraph) throws JobSubmissionException { log.info("Submitting job graph."); try { - CompletableFuture responseFuture = restEndpoint.sendRequest( - configuration.getRestServerAddress(), - configuration.getRestServerPort(), + CompletableFuture responseFuture = restClient.sendRequest( + restClusterClientConfiguration.getRestServerAddress(), + restClusterClientConfiguration.getRestServerPort(), JobSubmitHeaders.getInstance(), new JobSubmitRequestBody(jobGraph)); - JobSubmitResponseBody response = responseFuture.get(); - System.out.println(response.jobUrl); + responseFuture.get(); } catch (Exception e) { throw new JobSubmissionException(jobGraph.getJobID(), "Failed to submit JobGraph.", e); } @@ -128,32 +138,35 @@ private void submitJob(JobGraph jobGraph) throws JobSubmissionException { @Override public void stop(JobID jobID) throws Exception { - JobTerminationMessageParameters param = new JobTerminationMessageParameters(); - param.jobPathParameter.resolve(jobID); - param.terminationModeQueryParameter.resolve(Collections.singletonList(TerminationModeQueryParameter.TerminationMode.STOP)); - CompletableFuture responseFuture = restEndpoint.sendRequest( - configuration.getRestServerAddress(), - configuration.getRestServerPort(), + JobTerminationMessageParameters params = new JobTerminationMessageParameters(); + params.jobPathParameter.resolve(jobID); + params.terminationModeQueryParameter.resolve(Collections.singletonList(TerminationModeQueryParameter.TerminationMode.STOP)); + CompletableFuture responseFuture = restClient.sendRequest( + restClusterClientConfiguration.getRestServerAddress(), + restClusterClientConfiguration.getRestServerPort(), JobTerminationHeaders.getInstance(), - param + params ); responseFuture.get(); - System.out.println("Job stopping initiated."); } @Override public void cancel(JobID jobID) throws Exception { - JobTerminationMessageParameters param = new JobTerminationMessageParameters(); - param.jobPathParameter.resolve(jobID); - param.terminationModeQueryParameter.resolve(Collections.singletonList(TerminationModeQueryParameter.TerminationMode.CANCEL)); - CompletableFuture responseFuture = restEndpoint.sendRequest( - configuration.getRestServerAddress(), - configuration.getRestServerPort(), + JobTerminationMessageParameters params = new JobTerminationMessageParameters(); + params.jobPathParameter.resolve(jobID); + params.terminationModeQueryParameter.resolve(Collections.singletonList(TerminationModeQueryParameter.TerminationMode.CANCEL)); + CompletableFuture responseFuture = restClient.sendRequest( + restClusterClientConfiguration.getRestServerAddress(), + restClusterClientConfiguration.getRestServerPort(), JobTerminationHeaders.getInstance(), - param + params ); responseFuture.get(); - System.out.println("Job canceling initiated."); + } + + @Override + public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirectory) throws Exception { + throw new UnsupportedOperationException(); } // ====================================== @@ -162,7 +175,7 @@ public void cancel(JobID jobID) throws Exception { @Override public String getClusterIdentifier() { - return "Flip-6 Standalone cluster with dispatcher at " + configuration.getRestServerAddress(); + return "Flip-6 Standalone cluster with dispatcher at " + restClusterClientConfiguration.getRestServerAddress() + '.'; } @Override diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java index a6850d54f69f4..5327ad6462328 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.RestOptions; import org.apache.flink.runtime.rest.RestClientConfiguration; import org.apache.flink.util.ConfigurationException; +import org.apache.flink.util.Preconditions; /** * A configuration object for {@link RestClusterClient}s. @@ -42,9 +43,9 @@ private RestClusterClientConfiguration( RestClientConfiguration endpointConfiguration, String restServerAddress, int restServerPort) { - this.blobServerAddress = blobServerAddress; - this.restEndpointConfiguration = endpointConfiguration; - this.restServerAddress = restServerAddress; + this.blobServerAddress = Preconditions.checkNotNull(blobServerAddress); + this.restEndpointConfiguration = Preconditions.checkNotNull(endpointConfiguration); + this.restServerAddress = Preconditions.checkNotNull(restServerAddress); this.restServerPort = restServerPort; } @@ -65,13 +66,13 @@ public RestClientConfiguration getRestEndpointConfiguration() { } public static RestClusterClientConfiguration fromConfiguration(Configuration config) throws ConfigurationException { - String address = config.getString(JobManagerOptions.ADDRESS); + String blobServerAddress = config.getString(JobManagerOptions.ADDRESS); String serverAddress = config.getString(RestOptions.REST_ADDRESS); int serverPort = config.getInteger(RestOptions.REST_PORT); - RestClientConfiguration endpointConfiguration = RestClientConfiguration.fromConfiguration(config); + RestClientConfiguration restClientConfiguration = RestClientConfiguration.fromConfiguration(config); - return new RestClusterClientConfiguration(address, endpointConfiguration, serverAddress, serverPort); + return new RestClusterClientConfiguration(blobServerAddress, restClientConfiguration, serverAddress, serverPort); } } diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index 1da1a139b730d..55feab740c3ac 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -45,6 +45,7 @@ import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; @@ -65,7 +66,7 @@ /** * Tests for the {@link RestClusterClient}. */ -public class RestClusterClientTest { +public class RestClusterClientTest extends TestLogger { private static final String restAddress = "http://localhost:1234"; private static final Dispatcher mockRestfulGateway = mock(Dispatcher.class); @@ -164,8 +165,8 @@ protected CompletableFuture handleRequest(@Nonnull Handle } private static class TestJobTerminationHandler extends AbstractRestHandler { - private boolean jobCanceled = false; - private boolean jobStopped = false; + private volatile boolean jobCanceled = false; + private volatile boolean jobStopped = false; private TestJobTerminationHandler() { super( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java index 8314f8ef992a9..0ab9ec036ac85 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java @@ -36,6 +36,7 @@ import javax.annotation.Nonnull; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; /** * This handler can be used to retrieve the port that the blob server runs on. @@ -49,15 +50,13 @@ public BlobServerPortHandler(CompletableFuture localRestAddress, Gateway @Override protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { return gateway - .getBlobServerPort(Time.seconds(5)) - .handleAsync((Integer port, Throwable error) -> { - if (error != null) { - log.error("Failed to retrieve blob server port.", ExceptionUtils.stripCompletionException(error)); - return FutureUtils.completedExceptionally(new RestHandlerException("Failed to retrieve blob server port.", HttpResponseStatus.INTERNAL_SERVER_ERROR)); - } else { - return CompletableFuture.completedFuture(new BlobServerPortResponseBody(port)); - } - }) - .thenCompose(future -> future); + .getBlobServerPort(timeout) + .thenApply(BlobServerPortResponseBody::new) + .exceptionally(error -> { + throw new CompletionException(new RestHandlerException( + "Failed to retrieve blob server port.", + HttpResponseStatus.INTERNAL_SERVER_ERROR, + ExceptionUtils.stripCompletionException(error))); + }); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java index ef8a1c19c2b66..53cc6225234b2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; @@ -55,12 +56,13 @@ protected CompletableFuture handleRequest(@Nonnull Handle ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); jobGraph = (JobGraph) objectIn.readObject(); } catch (Exception e) { - log.error("Failed to deserialize JobGraph.", e); - return FutureUtils.completedExceptionally(new RestHandlerException("Failed to deserialize JobGraph.", HttpResponseStatus.BAD_REQUEST)); + return FutureUtils.completedExceptionally(new RestHandlerException( + "Failed to deserialize JobGraph.", + HttpResponseStatus.BAD_REQUEST, + e)); } - gateway.submitJob(jobGraph, Time.seconds(5)); - - return CompletableFuture.completedFuture(new JobSubmitResponseBody("Submitted job with ID " + jobGraph.getJobID() + ".")); + return gateway.submitJob(jobGraph, timeout) + .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID())); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java index 756e60c743ffe..0cdcb1417c7c3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java @@ -20,6 +20,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.util.Preconditions; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -52,7 +53,7 @@ public JobSubmitRequestBody(JobGraph jobGraph) throws IOException { public JobSubmitRequestBody( @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) byte[] serializedJobGraph) { - this.serializedJobGraph = serializedJobGraph; + this.serializedJobGraph = Preconditions.checkNotNull(serializedJobGraph); } @Override @@ -70,11 +71,12 @@ public boolean equals(Object object) { } private static byte[] serializeJobGraph(JobGraph jobGraph) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); - ObjectOutputStream out = new ObjectOutputStream(baos); + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(64 * 1024)) { + ObjectOutputStream out = new ObjectOutputStream(baos); - out.writeObject(jobGraph); + out.writeObject(jobGraph); - return baos.toByteArray(); + return baos.toByteArray(); + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java index 85ad1dc473eb8..0ea18dbbe6719 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java @@ -19,18 +19,24 @@ package org.apache.flink.runtime.rest.handler.job; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; import org.junit.Assert; import org.junit.Test; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; @@ -39,13 +45,14 @@ /** * Tests for the {@link BlobServerPortHandler}. */ -public class BlobServerPortHandlerTest { +public class BlobServerPortHandlerTest extends TestLogger { private static final int PORT = 64; @Test public void testPortRetrieval() throws Exception { DispatcherGateway mockGateway = mock(DispatcherGateway.class); - when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(PORT)); + when(mockGateway.getBlobServerPort(any(Time.class))) + .thenReturn(CompletableFuture.completedFuture(PORT)); GatewayRetriever mockGatewayRetriever = mock(GatewayRetriever.class); BlobServerPortHandler handler = new BlobServerPortHandler( @@ -53,13 +60,38 @@ public void testPortRetrieval() throws Exception { mockGatewayRetriever, RpcUtils.INF_TIMEOUT); - BlobServerPortResponseBody portResponse = handler.handleRequest(new HandlerRequest<>(EmptyRequestBody.getInstance(), EmptyMessageParameters.getInstance()), mockGateway).get(); + BlobServerPortResponseBody portResponse = handler + .handleRequest(new HandlerRequest<>(EmptyRequestBody.getInstance(), EmptyMessageParameters.getInstance()), mockGateway) + .get(); Assert.assertEquals(PORT, portResponse.port); } @Test public void testPortRetrievalFailureHandling() throws Exception { + DispatcherGateway mockGateway = mock(DispatcherGateway.class); + when(mockGateway.getBlobServerPort(any(Time.class))) + .thenReturn(FutureUtils.completedExceptionally(new TestException())); + GatewayRetriever mockGatewayRetriever = mock(GatewayRetriever.class); + + BlobServerPortHandler handler = new BlobServerPortHandler( + CompletableFuture.completedFuture("http://localhost:1234"), + mockGatewayRetriever, + RpcUtils.INF_TIMEOUT); + + try { + handler + .handleRequest(new HandlerRequest<>(EmptyRequestBody.getInstance(), EmptyMessageParameters.getInstance()), mockGateway) + .get(); + Assert.fail(); + } catch (ExecutionException ee) { + RestHandlerException rhe = (RestHandlerException) ee.getCause(); + + Assert.assertEquals(TestException.class, rhe.getCause().getClass()); + Assert.assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, rhe.getHttpResponseStatus()); + } + } + private static class TestException extends Exception { } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java index 981ac9c29bda2..0afc1ceb9ffe9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; @@ -44,7 +45,7 @@ /** * Tests for the {@link JobSubmitHandler}. */ -public class JobSubmitHandlerTest { +public class JobSubmitHandlerTest extends TestLogger { @Test public void testSerializationFailureHandling() throws Exception { @@ -85,6 +86,7 @@ public void testSuccessfulJobSubmission() throws Exception { JobGraph job = new JobGraph("testjob"); JobSubmitRequestBody request = new JobSubmitRequestBody(job); - handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway).get(); + handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway) + .get(); } } From 89ed39c8522f2f13cd509dd82304e91efaff4b46 Mon Sep 17 00:00:00 2001 From: zentol Date: Thu, 5 Oct 2017 16:32:02 +0200 Subject: [PATCH 07/16] address more PR comments --- .../flink/client/program/ClusterClient.java | 6 ++- .../program/rest/RestClusterClient.java | 38 +++++++++++++++---- .../rest/RestClusterClientConfiguration.java | 8 ++-- .../program/rest/RestClusterClientTest.java | 6 +-- .../flink/runtime/dispatcher/Dispatcher.java | 2 +- .../runtime/rest/RestServerEndpoint.java | 4 +- .../handler/job/BlobServerPortHandler.java | 1 - .../rest/handler/job/JobSubmitHandler.java | 5 +-- .../messages/job/JobSubmitRequestBody.java | 6 +++ 9 files changed, 55 insertions(+), 21 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index a0ef4d9cf9aa5..78455c108f780 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -639,9 +639,13 @@ public void stop(final JobID jobId) throws Exception { final Object rc = Await.result(response, timeout); - if (rc instanceof JobManagerMessages.StoppingFailure) { + if (rc instanceof JobManagerMessages.StoppingSuccess) { + // no further action required + } else if (rc instanceof JobManagerMessages.StoppingFailure) { throw new Exception("Stopping the job with ID " + jobId + " failed.", ((JobManagerMessages.StoppingFailure) rc).cause()); + } else { + throw new IllegalStateException("Unexpected response: " + rc); } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index e253ac6a45489..2efebd5a4f6e8 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -50,6 +50,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** * A {@link ClusterClient} implementation that communicates via HTTP REST requests. @@ -58,7 +61,7 @@ public class RestClusterClient extends ClusterClient { private final RestClusterClientConfiguration restClusterClientConfiguration; private final RestClient restClient; - private final ExecutorService executorService = Executors.newFixedThreadPool(4); + private final ExecutorService executorService = Executors.newFixedThreadPool(4, new RestClusterClientThreadFactory()); public RestClusterClient(Configuration config) throws Exception { this(config, RestClusterClientConfiguration.fromConfiguration(config)); @@ -67,7 +70,7 @@ public RestClusterClient(Configuration config) throws Exception { public RestClusterClient(Configuration config, RestClusterClientConfiguration configuration) throws Exception { super(config); this.restClusterClientConfiguration = configuration; - this.restClient = new RestClient(configuration.getRestEndpointConfiguration(), executorService); + this.restClient = new RestClient(configuration.getRestClientConfiguration(), executorService); } @Override @@ -79,7 +82,7 @@ public void shutdown() { log.error("An error occurred during the client shutdown.", e); } this.restClient.shutdown(Time.seconds(5)); - this.executorService.shutdown(); + org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(5, TimeUnit.SECONDS, this.executorService); } @Override @@ -107,7 +110,7 @@ private void submitJob(JobGraph jobGraph) throws JobSubmissionException { restClusterClientConfiguration.getRestServerAddress(), restClusterClientConfiguration.getRestServerPort(), BlobServerPortHeaders.getInstance()); - blobServerPort = portFuture.get().port; + blobServerPort = portFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS).port; } catch (Exception e) { throw new JobSubmissionException(jobGraph.getJobID(), "Failed to retrieve blob server port.", e); } @@ -130,7 +133,7 @@ private void submitJob(JobGraph jobGraph) throws JobSubmissionException { restClusterClientConfiguration.getRestServerPort(), JobSubmitHeaders.getInstance(), new JobSubmitRequestBody(jobGraph)); - responseFuture.get(); + responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); } catch (Exception e) { throw new JobSubmissionException(jobGraph.getJobID(), "Failed to submit JobGraph.", e); } @@ -147,7 +150,7 @@ public void stop(JobID jobID) throws Exception { JobTerminationHeaders.getInstance(), params ); - responseFuture.get(); + responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); } @Override @@ -161,7 +164,7 @@ public void cancel(JobID jobID) throws Exception { JobTerminationHeaders.getInstance(), params ); - responseFuture.get(); + responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); } @Override @@ -216,4 +219,25 @@ protected void finalizeCluster() { public int getMaxSlots() { return 0; } + + private static final class RestClusterClientThreadFactory implements ThreadFactory { + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + + RestClusterClientThreadFactory() { + SecurityManager s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); + } + + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, "Flink-RestClusterClient-IOThread-" + threadNumber.getAndIncrement(), 0); + if (t.isDaemon()) { + t.setDaemon(false); + } + if (t.getPriority() != Thread.NORM_PRIORITY) { + t.setPriority(Thread.NORM_PRIORITY); + } + return t; + } + } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java index 5327ad6462328..788eba91dfdd1 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java @@ -32,7 +32,7 @@ public final class RestClusterClientConfiguration { private final String blobServerAddress; - private final RestClientConfiguration restEndpointConfiguration; + private final RestClientConfiguration restClientConfiguration; private final String restServerAddress; @@ -44,7 +44,7 @@ private RestClusterClientConfiguration( String restServerAddress, int restServerPort) { this.blobServerAddress = Preconditions.checkNotNull(blobServerAddress); - this.restEndpointConfiguration = Preconditions.checkNotNull(endpointConfiguration); + this.restClientConfiguration = Preconditions.checkNotNull(endpointConfiguration); this.restServerAddress = Preconditions.checkNotNull(restServerAddress); this.restServerPort = restServerPort; } @@ -61,8 +61,8 @@ public int getRestServerPort() { return restServerPort; } - public RestClientConfiguration getRestEndpointConfiguration() { - return restEndpointConfiguration; + public RestClientConfiguration getRestClientConfiguration() { + return restClientConfiguration; } public static RestClusterClientConfiguration fromConfiguration(Configuration config) throws ConfigurationException { diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index 55feab740c3ac..617dd38a7017c 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -78,7 +78,7 @@ public class RestClusterClientTest extends TestLogger { } @Test - public void testABC() throws Exception { + public void testJobSubmitCancelStop() throws Exception { Configuration config = new Configuration(); config.setString(JobManagerOptions.ADDRESS, "localhost"); @@ -129,7 +129,7 @@ protected Collection> in } private static class TestBlobServerPortHandler extends AbstractRestHandler { - private boolean portRetrieved = false; + private volatile boolean portRetrieved = false; private TestBlobServerPortHandler() { super( @@ -147,7 +147,7 @@ protected CompletableFuture handleRequest(@Nonnull H } private static class TestJobSubmitHandler extends AbstractRestHandler { - private boolean jobSubmitted = false; + private volatile boolean jobSubmitted = false; private TestJobSubmitHandler() { super( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 93ac02a93b907..e9d0e85f54b21 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -311,7 +311,7 @@ public CompletableFuture requestJob(JobID jobId, Time time return jobManagerRunner.getJobManagerGateway().requestArchivedExecutionGraph(timeout); } } - + @Override public CompletableFuture getBlobServerPort(Time timeout) { return CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java index d09aad972f73a..6da237eeff421 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java @@ -54,6 +54,8 @@ * An abstract class for netty-based REST server endpoints. */ public abstract class RestServerEndpoint { + + public static final long MAX_REQUEST_SIZE_BYTES = 1024 * 1024 * 10; protected final Logger log = LoggerFactory.getLogger(getClass()); private final Object lock = new Object(); @@ -120,7 +122,7 @@ protected void initChannel(SocketChannel ch) { ch.pipeline() .addLast(new HttpServerCodec()) - .addLast(new HttpObjectAggregator(1024 * 1024 * 10)) + .addLast(new HttpObjectAggregator(MAX_REQUEST_SIZE_BYTES)) .addLast(handler.name(), handler) .addLast(new PipelineErrorHandler(log)); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java index 0ab9ec036ac85..cdf562f55f2f4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.rest.handler.job; import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.rest.handler.AbstractRestHandler; import org.apache.flink.runtime.rest.handler.HandlerRequest; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java index 53cc6225234b2..799a605c67c96 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java @@ -30,7 +30,6 @@ import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; -import org.apache.flink.util.ExceptionUtils; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; @@ -56,10 +55,10 @@ protected CompletableFuture handleRequest(@Nonnull Handle ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); jobGraph = (JobGraph) objectIn.readObject(); } catch (Exception e) { - return FutureUtils.completedExceptionally(new RestHandlerException( + throw new RestHandlerException( "Failed to deserialize JobGraph.", HttpResponseStatus.BAD_REQUEST, - e)); + e); } return gateway.submitJob(jobGraph, timeout) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java index 0cdcb1417c7c3..257b241fc184b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.rest.messages.job; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.rest.RestServerEndpoint; import org.apache.flink.runtime.rest.messages.RequestBody; import org.apache.flink.util.Preconditions; @@ -53,6 +54,11 @@ public JobSubmitRequestBody(JobGraph jobGraph) throws IOException { public JobSubmitRequestBody( @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) byte[] serializedJobGraph) { + // check that job graph can be read completely by the HttpObjectAggregator on the server + // we subtract 1024 bytes to account for http headers and such. + if (serializedJobGraph.length > RestServerEndpoint.MAX_REQUEST_SIZE_BYTES - 1024) { + throw new IllegalArgumentException("Serialized job graph exceeded max request size."); + } this.serializedJobGraph = Preconditions.checkNotNull(serializedJobGraph); } From a554cd238a35e925efb474f56fdade160a86ecf0 Mon Sep 17 00:00:00 2001 From: zentol Date: Thu, 5 Oct 2017 17:04:57 +0200 Subject: [PATCH 08/16] checkstyle --- .../java/org/apache/flink/runtime/dispatcher/Dispatcher.java | 2 +- .../java/org/apache/flink/runtime/rest/RestServerEndpoint.java | 2 +- .../apache/flink/runtime/rest/handler/job/JobSubmitHandler.java | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index e9d0e85f54b21..bce2bed7670c1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -311,7 +311,7 @@ public CompletableFuture requestJob(JobID jobId, Time time return jobManagerRunner.getJobManagerGateway().requestArchivedExecutionGraph(timeout); } } - + @Override public CompletableFuture getBlobServerPort(Time timeout) { return CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java index 6da237eeff421..4e40a1106b883 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java @@ -54,7 +54,7 @@ * An abstract class for netty-based REST server endpoints. */ public abstract class RestServerEndpoint { - + public static final long MAX_REQUEST_SIZE_BYTES = 1024 * 1024 * 10; protected final Logger log = LoggerFactory.getLogger(getClass()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java index 799a605c67c96..f810b5ad2f293 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.rest.handler.job; import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.rest.handler.AbstractRestHandler; From 704b48a87da38a960fe5f2ec47a96b3d85c66874 Mon Sep 17 00:00:00 2001 From: Chesnay Date: Thu, 5 Oct 2017 23:07:44 +0200 Subject: [PATCH 09/16] make max_size an int --- .../java/org/apache/flink/runtime/rest/RestServerEndpoint.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java index 4e40a1106b883..18766c0046dd7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java @@ -55,7 +55,7 @@ */ public abstract class RestServerEndpoint { - public static final long MAX_REQUEST_SIZE_BYTES = 1024 * 1024 * 10; + public static final int MAX_REQUEST_SIZE_BYTES = 1024 * 1024 * 10; protected final Logger log = LoggerFactory.getLogger(getClass()); private final Object lock = new Object(); From 93eff9ed51d36cf2ba2b99d05455f2d3f6212df1 Mon Sep 17 00:00:00 2001 From: zentol Date: Fri, 6 Oct 2017 12:38:22 +0200 Subject: [PATCH 10/16] use ExecutorThradFactory + rebase(blobKey fix) --- .../program/rest/RestClusterClient.java | 30 ++++--------------- 1 file changed, 5 insertions(+), 25 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index 2efebd5a4f6e8..35409c9cdcead 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -26,7 +26,7 @@ import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobClient; -import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.blob.PermanentBlobKey; import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -40,6 +40,7 @@ import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; +import org.apache.flink.runtime.util.ExecutorThreadFactory; import javax.annotation.Nullable; @@ -61,7 +62,7 @@ public class RestClusterClient extends ClusterClient { private final RestClusterClientConfiguration restClusterClientConfiguration; private final RestClient restClient; - private final ExecutorService executorService = Executors.newFixedThreadPool(4, new RestClusterClientThreadFactory()); + private final ExecutorService executorService = Executors.newFixedThreadPool(4, new ExecutorThreadFactory("RestClusterClient-IO")); public RestClusterClient(Configuration config) throws Exception { this(config, RestClusterClientConfiguration.fromConfiguration(config)); @@ -118,8 +119,8 @@ private void submitJob(JobGraph jobGraph) throws JobSubmissionException { log.info("Uploading jar files."); try { InetSocketAddress address = new InetSocketAddress(restClusterClientConfiguration.getBlobServerAddress(), blobServerPort); - List keys = BlobClient.uploadJarFiles(address, this.flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars()); - for (BlobKey key : keys) { + List keys = BlobClient.uploadJarFiles(address, this.flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars()); + for (PermanentBlobKey key : keys) { jobGraph.addBlob(key); } } catch (Exception e) { @@ -219,25 +220,4 @@ protected void finalizeCluster() { public int getMaxSlots() { return 0; } - - private static final class RestClusterClientThreadFactory implements ThreadFactory { - private final ThreadGroup group; - private final AtomicInteger threadNumber = new AtomicInteger(1); - - RestClusterClientThreadFactory() { - SecurityManager s = System.getSecurityManager(); - group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); - } - - public Thread newThread(Runnable r) { - Thread t = new Thread(group, r, "Flink-RestClusterClient-IOThread-" + threadNumber.getAndIncrement(), 0); - if (t.isDaemon()) { - t.setDaemon(false); - } - if (t.getPriority() != Thread.NORM_PRIORITY) { - t.setPriority(Thread.NORM_PRIORITY); - } - return t; - } - } } From 07963bc1caba9a0987cc408515a5fd6e6e4f9f34 Mon Sep 17 00:00:00 2001 From: Chesnay Date: Fri, 6 Oct 2017 13:05:59 +0200 Subject: [PATCH 11/16] add "Flink" prefix to RestCC threads --- .../org/apache/flink/client/program/rest/RestClusterClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index 35409c9cdcead..e6c7709f359ef 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -62,7 +62,7 @@ public class RestClusterClient extends ClusterClient { private final RestClusterClientConfiguration restClusterClientConfiguration; private final RestClient restClient; - private final ExecutorService executorService = Executors.newFixedThreadPool(4, new ExecutorThreadFactory("RestClusterClient-IO")); + private final ExecutorService executorService = Executors.newFixedThreadPool(4, new ExecutorThreadFactory("Flink-RestClusterClient-IO")); public RestClusterClient(Configuration config) throws Exception { this(config, RestClusterClientConfiguration.fromConfiguration(config)); From abfba094bea5b1f53a739baf889d72bfa2e0bba0 Mon Sep 17 00:00:00 2001 From: zentol Date: Fri, 6 Oct 2017 13:08:08 +0200 Subject: [PATCH 12/16] checkstyle --- .../main/java/org/apache/flink/client/CliFrontend.java | 8 +------- .../flink/client/program/rest/RestClusterClient.java | 2 -- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index 9fa667bc024dc..ff5a8a27e9b02 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -60,13 +60,7 @@ import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob; -import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint; -import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure; -import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess; import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus; -import org.apache.flink.runtime.messages.JobManagerMessages.StopJob; -import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure; import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint; import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess; import org.apache.flink.runtime.security.SecurityConfiguration; @@ -564,7 +558,7 @@ protected int stop(String[] args) { logAndSysout("Stopping job " + jobId + '.'); client.stop(jobId); logAndSysout("Stopped job " + jobId + '.'); - + return 0; } catch (Throwable t) { diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index e6c7709f359ef..a37ee632f0c86 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -51,9 +51,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; /** * A {@link ClusterClient} implementation that communicates via HTTP REST requests. From 650273fb6c61f9c274a00b57db0e4f95ad2f5e3a Mon Sep 17 00:00:00 2001 From: zentol Date: Fri, 6 Oct 2017 13:09:00 +0200 Subject: [PATCH 13/16] fix JobSubmitHandlerTest --- .../runtime/rest/handler/job/JobSubmitHandlerTest.java | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java index 0afc1ceb9ffe9..1196d4059876d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java @@ -36,7 +36,6 @@ import org.junit.Test; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; @@ -60,14 +59,10 @@ public void testSerializationFailureHandling() throws Exception { JobSubmitRequestBody request = new JobSubmitRequestBody(new byte[0]); - handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway); - try { - handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway).get(); + handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway); Assert.fail(); - } catch (ExecutionException ee) { - RestHandlerException rhe = (RestHandlerException) ee.getCause(); - + } catch (RestHandlerException rhe) { Assert.assertEquals(HttpResponseStatus.BAD_REQUEST, rhe.getHttpResponseStatus()); } } From 477c698cadbe5152ddfac3ac7b7a115dec837e73 Mon Sep 17 00:00:00 2001 From: zentol Date: Fri, 6 Oct 2017 13:45:32 +0200 Subject: [PATCH 14/16] chcekstyle................. --- .../client/deployment/Flip6StandaloneClusterDescriptor.java | 1 - 1 file changed, 1 deletion(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java index 1c905876173bf..9d88f597a4565 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java @@ -22,7 +22,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; /** From cd24f68ec4df7b06fd5fd25eb1f1a322fd37028b Mon Sep 17 00:00:00 2001 From: zentol Date: Fri, 6 Oct 2017 15:19:19 +0200 Subject: [PATCH 15/16] shutdown client for cancel/shutdown --- .../org/apache/flink/client/CliFrontend.java | 40 +++++++++++-------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index ff5a8a27e9b02..9be82953e8dc6 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -554,12 +554,16 @@ protected int stop(String[] args) { try { CustomCommandLine activeCommandLine = getActiveCustomCommandLine(options.getCommandLine()); ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory); + try { + logAndSysout("Stopping job " + jobId + '.'); + client.stop(jobId); + logAndSysout("Stopped job " + jobId + '.'); - logAndSysout("Stopping job " + jobId + '.'); - client.stop(jobId); - logAndSysout("Stopped job " + jobId + '.'); + return 0; + } finally { + client.shutdown(); + } - return 0; } catch (Throwable t) { return handleError(t); @@ -632,21 +636,25 @@ protected int cancel(String[] args) { try { CustomCommandLine activeCommandLine = getActiveCustomCommandLine(options.getCommandLine()); ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory); - if (withSavepoint) { - if (targetDirectory == null) { - logAndSysout("Cancelling job " + jobId + " with savepoint to default savepoint directory."); + try { + if (withSavepoint) { + if (targetDirectory == null) { + logAndSysout("Cancelling job " + jobId + " with savepoint to default savepoint directory."); + } else { + logAndSysout("Cancelling job " + jobId + " with savepoint to " + targetDirectory + '.'); + } + String savepointPath = client.cancelWithSavepoint(jobId, targetDirectory); + logAndSysout("Cancelled job " + jobId + ". Savepoint stored in " + savepointPath + '.'); } else { - logAndSysout("Cancelling job " + jobId + " with savepoint to " + targetDirectory + '.'); + logAndSysout("Cancelling job " + jobId + '.'); + client.cancel(jobId); + logAndSysout("Cancelled job " + jobId + '.'); } - String savepointPath = client.cancelWithSavepoint(jobId, targetDirectory); - logAndSysout("Cancelled job " + jobId + ". Savepoint stored in " + savepointPath + '.'); - } else { - logAndSysout("Cancelling job " + jobId + '.'); - client.cancel(jobId); - logAndSysout("Cancelled job " + jobId + '.'); - } - return 0; + return 0; + } finally { + client.shutdown(); + } } catch (Throwable t) { return handleError(t); From 537c413de00dd09cd5b0f9ec5dc1172447a46669 Mon Sep 17 00:00:00 2001 From: zentol Date: Fri, 6 Oct 2017 16:55:23 +0200 Subject: [PATCH 16/16] Rework CliFrontEnd Stop/Cancel tests These tests verified that the CLI was sending the correct messages and parameters to the JM actor. This is now handled by the ClusterClient, so the tests were adjusted to verify that the correct methods on the ClusterClient are being called. Additional tests were added to the ClusterClientTest class to verify that the correct messages and parameters are being sent. --- .../client/CliFrontendListCancelTest.java | 114 +++++++------- .../flink/client/CliFrontendStopTest.java | 109 ++++--------- .../client/program/ClusterClientTest.java | 143 ++++++++++++++++++ 3 files changed, 229 insertions(+), 137 deletions(-) diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java index 1bb9960bb519f..e52dde1216639 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java @@ -22,7 +22,10 @@ import org.apache.flink.client.cli.CancelOptions; import org.apache.flink.client.cli.CliFrontendParser; import org.apache.flink.client.cli.CommandLineOptions; +import org.apache.flink.client.cli.CustomCommandLine; import org.apache.flink.client.cli.Flip6DefaultCLI; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.FlinkUntypedActor; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.AkkaActorGateway; @@ -33,9 +36,11 @@ import akka.actor.Props; import akka.actor.Status; import akka.testkit.JavaTestKit; +import org.apache.commons.cli.CommandLine; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; import java.util.UUID; @@ -44,6 +49,14 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.isNull; +import static org.mockito.Matchers.notNull; +import static org.mockito.Mockito.times; +import static org.powermock.api.mockito.PowerMockito.doThrow; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; /** * Tests for the CANCEL and LIST commands. @@ -91,47 +104,27 @@ public void testCancel() { // test cancel properly { JobID jid = new JobID(); - String jidString = jid.toString(); - final UUID leaderSessionID = UUID.randomUUID(); - - final ActorRef jm = actorSystem.actorOf(Props.create( - CliJobManager.class, - jid, - leaderSessionID - ) - ); - - final ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID); - - String[] parameters = { jidString }; - InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(gateway); + String[] parameters = { jid.toString() }; + CancelTestCliFrontend testFrontend = new CancelTestCliFrontend(false); int retCode = testFrontend.cancel(parameters); assertTrue(retCode == 0); + + Mockito.verify(testFrontend.client, times(1)).cancel(any(JobID.class)); } // test cancel properly { - JobID jid1 = new JobID(); - JobID jid2 = new JobID(); - - final UUID leaderSessionID = UUID.randomUUID(); - - final ActorRef jm = actorSystem.actorOf( - Props.create( - CliJobManager.class, - jid1, - leaderSessionID - ) - ); + JobID jid = new JobID(); - final ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID); + String[] parameters = { jid.toString() }; + CancelTestCliFrontend testFrontend = new CancelTestCliFrontend(true); - String[] parameters = { jid2.toString() }; - InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(gateway); + int retCode = testFrontend.cancel(parameters); + assertTrue(retCode != 0); - assertTrue(testFrontend.cancel(parameters) != 0); + Mockito.verify(testFrontend.client, times(1)).cancel(any(JobID.class)); } // test flip6 switch @@ -156,56 +149,38 @@ public void testCancelWithSavepoint() throws Exception { { // Cancel with savepoint (no target directory) JobID jid = new JobID(); - UUID leaderSessionID = UUID.randomUUID(); - - Props props = Props.create(CliJobManager.class, jid, leaderSessionID); - ActorRef jm = actorSystem.actorOf(props); - ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID); String[] parameters = { "-s", jid.toString() }; - InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(gateway); + CancelTestCliFrontend testFrontend = new CancelTestCliFrontend(false); assertEquals(0, testFrontend.cancel(parameters)); + + Mockito.verify(testFrontend.client, times(1)) + .cancelWithSavepoint(any(JobID.class), isNull(String.class)); } { // Cancel with savepoint (with target directory) JobID jid = new JobID(); - UUID leaderSessionID = UUID.randomUUID(); - - Props props = Props.create(CliJobManager.class, jid, leaderSessionID, "targetDirectory"); - ActorRef jm = actorSystem.actorOf(props); - ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID); String[] parameters = { "-s", "targetDirectory", jid.toString() }; - InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(gateway); + CancelTestCliFrontend testFrontend = new CancelTestCliFrontend(false); assertEquals(0, testFrontend.cancel(parameters)); + + Mockito.verify(testFrontend.client, times(1)) + .cancelWithSavepoint(any(JobID.class), notNull(String.class)); } { // Cancel with savepoint (with target directory), but no job ID - JobID jid = new JobID(); - UUID leaderSessionID = UUID.randomUUID(); - - Props props = Props.create(CliJobManager.class, jid, leaderSessionID, "targetDirectory"); - ActorRef jm = actorSystem.actorOf(props); - ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID); - String[] parameters = { "-s", "targetDirectory" }; - InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(gateway); + CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); assertNotEquals(0, testFrontend.cancel(parameters)); } { // Cancel with savepoint (no target directory) and no job ID - JobID jid = new JobID(); - UUID leaderSessionID = UUID.randomUUID(); - - Props props = Props.create(CliJobManager.class, jid, leaderSessionID); - ActorRef jm = actorSystem.actorOf(props); - ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID); - String[] parameters = { "-s" }; - InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(gateway); + CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); assertNotEquals(0, testFrontend.cancel(parameters)); } } @@ -245,11 +220,32 @@ public void testList() { } } + private static final class CancelTestCliFrontend extends CliFrontend { + private final ClusterClient client; + + CancelTestCliFrontend(boolean reject) throws Exception { + super(CliFrontendTestUtils.getConfigDir()); + this.client = mock(ClusterClient.class); + if (reject) { + doThrow(new IllegalArgumentException("Test exception")).when(client).cancel(any(JobID.class)); + doThrow(new IllegalArgumentException("Test exception")).when(client).cancelWithSavepoint(any(JobID.class), anyString()); + } + } + + @Override + public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) { + CustomCommandLine ccl = mock(CustomCommandLine.class); + when(ccl.retrieveCluster(any(CommandLine.class), any(Configuration.class), anyString())) + .thenReturn(client); + return ccl; + } + } + private static final class InfoListTestCliFrontend extends CliFrontend { private ActorGateway jobManagerGateway; - public InfoListTestCliFrontend(ActorGateway jobManagerGateway) throws Exception { + InfoListTestCliFrontend(ActorGateway jobManagerGateway) throws Exception { super(CliFrontendTestUtils.getConfigDir()); this.jobManagerGateway = jobManagerGateway; } diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java index cf80e7a529d39..ab817134face6 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java @@ -20,46 +20,36 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.client.cli.CliFrontendParser; -import org.apache.flink.client.cli.CommandLineOptions; +import org.apache.flink.client.cli.CustomCommandLine; import org.apache.flink.client.cli.Flip6DefaultCLI; import org.apache.flink.client.cli.StopOptions; -import org.apache.flink.runtime.akka.FlinkUntypedActor; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.instance.AkkaActorGateway; -import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.Configuration; import org.apache.flink.util.TestLogger; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Props; -import akka.actor.Status; -import akka.testkit.JavaTestKit; -import org.junit.AfterClass; +import org.apache.commons.cli.CommandLine; import org.junit.BeforeClass; import org.junit.Test; - -import java.util.UUID; +import org.mockito.Mockito; import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.times; +import static org.powermock.api.mockito.PowerMockito.doThrow; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; /** * Tests for the STOP command. */ public class CliFrontendStopTest extends TestLogger { - private static ActorSystem actorSystem; - @BeforeClass public static void setup() { pipeSystemOutToNull(); - actorSystem = ActorSystem.create("TestingActorSystem"); - } - - @AfterClass - public static void teardown() { - JavaTestKit.shutdownActorSystem(actorSystem); - actorSystem = null; } @Test @@ -85,32 +75,25 @@ public void testStop() throws Exception { JobID jid = new JobID(); String jidString = jid.toString(); - final UUID leaderSessionID = UUID.randomUUID(); - final ActorRef jm = actorSystem.actorOf(Props.create(CliJobManager.class, jid, leaderSessionID)); - - final ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID); - String[] parameters = { jidString }; - StopTestCliFrontend testFrontend = new StopTestCliFrontend(gateway); + StopTestCliFrontend testFrontend = new StopTestCliFrontend(false); int retCode = testFrontend.stop(parameters); - assertTrue(retCode == 0); + assertEquals(0, retCode); + + Mockito.verify(testFrontend.client, times(1)).stop(any(JobID.class)); } // test unknown job Id { - JobID jid1 = new JobID(); - JobID jid2 = new JobID(); - - final UUID leaderSessionID = UUID.randomUUID(); - final ActorRef jm = actorSystem.actorOf(Props.create(CliJobManager.class, jid1, leaderSessionID)); - - final ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID); + JobID jid = new JobID(); - String[] parameters = { jid2.toString() }; - StopTestCliFrontend testFrontend = new StopTestCliFrontend(gateway); + String[] parameters = { jid.toString() }; + StopTestCliFrontend testFrontend = new StopTestCliFrontend(true); assertTrue(testFrontend.stop(parameters) != 0); + + Mockito.verify(testFrontend.client, times(1)).stop(any(JobID.class)); } // test flip6 switch @@ -123,52 +106,22 @@ public void testStop() throws Exception { } private static final class StopTestCliFrontend extends CliFrontend { + private final ClusterClient client; - private ActorGateway jobManagerGateway; - - public StopTestCliFrontend(ActorGateway jobManagerGateway) throws Exception { + StopTestCliFrontend(boolean reject) throws Exception { super(CliFrontendTestUtils.getConfigDir()); - this.jobManagerGateway = jobManagerGateway; - } - - @Override - public ActorGateway getJobManagerGateway(CommandLineOptions options) { - return jobManagerGateway; - } - } - - private static final class CliJobManager extends FlinkUntypedActor { - private final JobID jobID; - private final UUID leaderSessionID; - - public CliJobManager(final JobID jobID, final UUID leaderSessionID) { - this.jobID = jobID; - this.leaderSessionID = leaderSessionID; - } - - @Override - public void handleMessage(Object message) { - if (message instanceof JobManagerMessages.RequestTotalNumberOfSlots$) { - getSender().tell(decorateMessage(1), getSelf()); - } else if (message instanceof JobManagerMessages.StopJob) { - JobManagerMessages.StopJob stopJob = (JobManagerMessages.StopJob) message; - - if (jobID != null && jobID.equals(stopJob.jobID())) { - getSender().tell(decorateMessage(new Status.Success(new Object())), getSelf()); - } else { - getSender() - .tell(decorateMessage(new Status.Failure(new Exception( - "Wrong or no JobID"))), getSelf()); - } - } else if (message instanceof JobManagerMessages.RequestRunningJobsStatus$) { - getSender().tell(decorateMessage(new JobManagerMessages.RunningJobsStatus()), - getSelf()); + this.client = mock(ClusterClient.class); + if (reject) { + doThrow(new IllegalArgumentException("Test exception")).when(client).stop(any(JobID.class)); } } @Override - protected UUID getLeaderSessionID() { - return leaderSessionID; + public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) { + CustomCommandLine ccl = mock(CustomCommandLine.class); + when(ccl.retrieveCluster(any(CommandLine.class), any(Configuration.class), anyString())) + .thenReturn(client); + return ccl; } } } diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java index 97a881c6dc759..98c7d26384a3a 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java @@ -18,12 +18,22 @@ package org.apache.flink.client.program; +import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.DummyActorGateway; +import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.util.TestLogger; +import org.junit.Assert; import org.junit.Test; +import scala.concurrent.Future; +import scala.concurrent.Future$; +import scala.concurrent.duration.FiniteDuration; + import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -51,4 +61,137 @@ public void testClusterClientShutdown() throws Exception { verify(highAvailabilityServices, never()).closeAndCleanupAllData(); verify(highAvailabilityServices).close(); } + + @Test + public void testClusterClientStop() throws Exception { + Configuration config = new Configuration(); + config.setString(JobManagerOptions.ADDRESS, "localhost"); + + JobID jobID = new JobID(); + TestStopActorGateway gateway = new TestStopActorGateway(jobID); + ClusterClient clusterClient = new TestClusterClient(config, gateway); + try { + clusterClient.stop(jobID); + Assert.assertTrue(gateway.messageArrived); + } finally { + clusterClient.shutdown(); + } + } + + @Test + public void testClusterClientCancel() throws Exception { + Configuration config = new Configuration(); + config.setString(JobManagerOptions.ADDRESS, "localhost"); + + JobID jobID = new JobID(); + TestCancelActorGateway gateway = new TestCancelActorGateway(jobID); + ClusterClient clusterClient = new TestClusterClient(config, gateway); + try { + clusterClient.cancel(jobID); + Assert.assertTrue(gateway.messageArrived); + } finally { + clusterClient.shutdown(); + } + } + + @Test + public void testClusterClientCancelWithSavepoint() throws Exception { + Configuration config = new Configuration(); + config.setString(JobManagerOptions.ADDRESS, "localhost"); + + JobID jobID = new JobID(); + String savepointPath = "/test/path"; + TestCancelWithSavepointActorGateway gateway = new TestCancelWithSavepointActorGateway(jobID, savepointPath); + ClusterClient clusterClient = new TestClusterClient(config, gateway); + try { + clusterClient.cancelWithSavepoint(jobID, savepointPath); + Assert.assertTrue(gateway.messageArrived); + } finally { + clusterClient.shutdown(); + } + } + + private static class TestStopActorGateway extends DummyActorGateway { + + private final JobID expectedJobID; + private volatile boolean messageArrived = false; + + TestStopActorGateway(JobID expectedJobID) { + this.expectedJobID = expectedJobID; + } + + @Override + public Future ask(Object message, FiniteDuration timeout) { + messageArrived = true; + if (message instanceof JobManagerMessages.StopJob) { + JobManagerMessages.StopJob stopJob = (JobManagerMessages.StopJob) message; + Assert.assertEquals(expectedJobID, stopJob.jobID()); + return Future$.MODULE$.successful(new JobManagerMessages.StoppingSuccess(stopJob.jobID())); + } + Assert.fail("Expected StopJob message, got: " + message.getClass()); + return null; + } + } + + private static class TestCancelActorGateway extends DummyActorGateway { + + private final JobID expectedJobID; + private volatile boolean messageArrived = false; + + TestCancelActorGateway(JobID expectedJobID) { + this.expectedJobID = expectedJobID; + } + + @Override + public Future ask(Object message, FiniteDuration timeout) { + messageArrived = true; + if (message instanceof JobManagerMessages.CancelJob) { + JobManagerMessages.CancelJob cancelJob = (JobManagerMessages.CancelJob) message; + Assert.assertEquals(expectedJobID, cancelJob.jobID()); + return Future$.MODULE$.successful(new JobManagerMessages.CancellationSuccess(cancelJob.jobID(), null)); + } + Assert.fail("Expected CancelJob message, got: " + message.getClass()); + return null; + } + } + + private static class TestCancelWithSavepointActorGateway extends DummyActorGateway { + + private final JobID expectedJobID; + private final String expectedTargetDirectory; + private volatile boolean messageArrived = false; + + TestCancelWithSavepointActorGateway(JobID expectedJobID, String expectedTargetDirectory) { + this.expectedJobID = expectedJobID; + this.expectedTargetDirectory = expectedTargetDirectory; + } + + @Override + public Future ask(Object message, FiniteDuration timeout) { + messageArrived = true; + if (message instanceof JobManagerMessages.CancelJobWithSavepoint) { + JobManagerMessages.CancelJobWithSavepoint cancelJob = (JobManagerMessages.CancelJobWithSavepoint) message; + Assert.assertEquals(expectedJobID, cancelJob.jobID()); + Assert.assertEquals(expectedTargetDirectory, cancelJob.savepointDirectory()); + return Future$.MODULE$.successful(new JobManagerMessages.CancellationSuccess(cancelJob.jobID(), null)); + } + Assert.fail("Expected CancelJobWithSavepoint message, got: " + message.getClass()); + return null; + } + } + + private static class TestClusterClient extends StandaloneClusterClient { + + private final ActorGateway jobmanagerGateway; + + public TestClusterClient(Configuration config, ActorGateway jobmanagerGateway) throws Exception { + super(config); + this.jobmanagerGateway = jobmanagerGateway; + } + + @Override + public ActorGateway getJobManagerGateway() { + return jobmanagerGateway; + } + } }