From 687a313c2abaf7494bff0974695414d584fafd39 Mon Sep 17 00:00:00 2001 From: Sachin Goel Date: Fri, 24 Jul 2015 21:11:34 +0530 Subject: [PATCH] Added an allowed version range for Client to be able to submit job graphs --- .../flink/configuration/ConfigConstants.java | 7 ++ .../flink/configuration/Configuration.java | 3 + .../org/apache/flink/util/VersionUtils.java | 81 +++++++++++++++++++ .../flink/runtime/jobgraph/JobGraph.java | 2 + .../flink/runtime/jobmanager/JobManager.scala | 16 +++- .../runtime/jobmanager/JobSubmitTest.java | 75 +++++++++++++++++ 6 files changed, 183 insertions(+), 1 deletion(-) create mode 100644 flink-core/src/main/java/org/apache/flink/util/VersionUtils.java diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 251ea9c4d95d9..0b9a64c852c84 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -28,6 +28,13 @@ public final class ConfigConstants { // Configuration Keys // ------------------------------------------------------------------------ + // ---------------------------- Version ------------------------------- + + /** + * Version of this flink implementation. + */ + public static final String FLINK_VERSION_KEY = "flink.version"; + // ---------------------------- Parallelism ------------------------------- /** diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java index da42958d40603..0ab3d1775fb77 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java @@ -55,6 +55,9 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters /** Stores the concrete key/value pairs of this configuration object. */ private final HashMap confData; + + /** Stores the version of this implementation of flink */ + public static final String FLINK_VERSION = "0.10"; // -------------------------------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/VersionUtils.java b/flink-core/src/main/java/org/apache/flink/util/VersionUtils.java new file mode 100644 index 0000000000000..08a266d877946 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/VersionUtils.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.apache.flink.configuration.Configuration; + +/** + * Version numbers to check compatibility between JobManager, TaskManager and JobClient. + */ +public class VersionUtils { + + public static final String FLINK_VERSION = Configuration.FLINK_VERSION; + /** + * Lower limit on client versions this job manager can work with. + */ + public static final String JOB_CLIENT_VERSION_LOWER = "0.9.0"; + + /** + * Gets the minimum supported Client version by this Job Manager. + * + * @return Minimum supported client version number. + */ + public static String getJobClientVersionLower() { + return JOB_CLIENT_VERSION_LOWER; + } + + /** + * Checks whether the given client version is compatible with this Job Manager + * + * @param clientVersion Version of the client + * @return Whether the given client is compatible with the Job Manager. + */ + public static boolean isClientCompatible(String clientVersion) { + return versionComparator(JOB_CLIENT_VERSION_LOWER, clientVersion) <= 0 && versionComparator(clientVersion, FLINK_VERSION) <= 0; + } + + /** + * Checks which of the two given version strings is higher. + * + * @param version1 Version 1 + * @param version2 Version 2 + * @return 1 if version1 > version2, -1 if version1 < version2 and 0 if version1 = version2 + *

+ * Code taken from Stack Overflow + */ + private static int versionComparator(String version1, String version2) { + String[] vals1 = version1.split("\\."); + String[] vals2 = version2.split("\\."); + int i = 0; + // set index to first non-equal ordinal or length of shortest version string + while (i < vals1.length && i < vals2.length && vals1[i].equals(vals2[i])) { + i++; + } + // compare first non-equal ordinal number + if (i < vals1.length && i < vals2.length) { + int diff = Integer.valueOf(vals1[i]).compareTo(Integer.valueOf(vals2[i])); + return Integer.signum(diff); + } + // the strings are equal or one string is a substring of the other + // e.g. "1.2.3" = "1.2.3" or "1.2.3" < "1.2.3.4" + else { + return Integer.signum(vals1.length - vals2.length); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java index 566e44f8c9cf7..93e3fe0dab6c8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java @@ -33,6 +33,7 @@ import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FileSystem; @@ -126,6 +127,7 @@ public JobGraph(String jobName) { public JobGraph(JobID jobId, String jobName) { this.jobID = jobId == null ? new JobID() : jobId; this.jobName = jobName == null ? "(unnamed job)" : jobName; + this.jobConfiguration.setString(ConfigConstants.FLINK_VERSION_KEY, Configuration.FLINK_VERSION); } /** diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 8cbb13a7cb81b..7a8d9f8e367f1 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -61,7 +61,7 @@ import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.util._ import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils} import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages, StreamingMode} -import org.apache.flink.util.{ExceptionUtils, InstantiationUtil, NetUtils} +import org.apache.flink.util.{ExceptionUtils, InstantiationUtil, NetUtils, VersionUtils} import scala.collection.JavaConverters._ import scala.concurrent._ @@ -293,6 +293,7 @@ class JobManager( } else { try { + val instanceID = instanceManager.registerTaskManager( taskManager, connectionInfo, @@ -761,6 +762,19 @@ class JobManager( ) )) } + else if (!jobGraph.getJobConfiguration.containsKey(ConfigConstants.FLINK_VERSION_KEY) || + !VersionUtils.isClientCompatible( + jobGraph.getJobConfiguration.getString(ConfigConstants.FLINK_VERSION_KEY, null))) { + sender ! decorateMessage( + Failure( + new JobSubmissionException( + null, + "Version mismatch error. Client version must be at least " + + VersionUtils.JOB_CLIENT_VERSION_LOWER + ) + ) + ) + } else { val jobId = jobGraph.getJobID val jobName = jobGraph.getName diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java index 61f536c82732a..a74fd7d411592 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.blob.BlobClient; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -196,4 +197,78 @@ public void initializeOnMaster(ClassLoader loader) throws Exception { fail(e.getMessage()); } } + + /** + * Verifies failure when the client version is lower than supported. + */ + @Test + public void testFailureClientNotSupportedLower() { + try { + Future submitFuture = dummyJobSubmitTest("0.1"); + + try { + Await.result(submitFuture, timeout); + } + catch (JobSubmissionException e) { + assertTrue(e.getMessage().startsWith("Version mismatch error")); + } + catch (Exception e) { + fail("Wrong exception type"); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + /** + * Verifies failure when the client version is higher than the job manager version. + */ + @Test + public void testFailureClientNotSupportedUpper() { + try { + Future submitFuture = dummyJobSubmitTest(Configuration.FLINK_VERSION + ".0"); + + try { + Await.result(submitFuture, timeout); + } + catch (JobSubmissionException e) { + assertTrue(e.getMessage().startsWith("Version mismatch error")); + } + catch (Exception e) { + fail("Wrong exception type"); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + /** + * Verifies success when the client version is supported properly. + */ + @Test + public void testSuccessClientSupported() { + try { + Future submitFuture = dummyJobSubmitTest(Configuration.FLINK_VERSION); + Await.result(submitFuture, timeout); + } + catch (Exception e) { + fail(e.getMessage()); + } + } + + private Future dummyJobSubmitTest(String version) { + JobVertex jobVertex = new JobVertex("Test Vertex"); + jobVertex.setInvokableClass(Tasks.NoOpInvokable.class); + + // create a test job graph + JobGraph jg = new JobGraph("test job", jobVertex); + jg.getJobConfiguration().setString(ConfigConstants.FLINK_VERSION_KEY, version); + + // return the future. + return jmGateway.ask(new JobManagerMessages.SubmitJob(jg, ListeningBehaviour.EXECUTION_RESULT), timeout); + } }