Permalink
Browse files

[FLINK-5254] [yarn] Implement YARN High-Availability Services

  • Loading branch information...
1 parent e2922ad commit 2a7dbda79a00863a511fcf64b339770d1d00f805 @StephanEwen StephanEwen committed Dec 5, 2016
Showing with 2,537 additions and 78 deletions.
  1. +8 −4 flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
  2. +121 −0 flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java
  3. +19 −1 flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
  4. +13 −2 flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
  5. +153 −0 ...untime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
  6. +61 −8 flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
  7. +16 −5 flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
  8. +40 −0 flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ServicesThreadFactory.java
  9. +16 −1 flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
  10. +384 −0 ...in/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java
  11. +8 −21 ...-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
  12. +1 −1 ...-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
  13. +0 −1 flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
  14. +1 −0 ...time/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
  15. +1 −1 flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
  16. +1 −1 flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
  17. +1 −2 flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
  18. +70 −0 flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
  19. +85 −0 ...me/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
  20. +12 −2 ...time/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
  21. +226 −0 ...ava/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionServiceTest.java
  22. +21 −0 flink-yarn/pom.xml
  23. +18 −16 flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
  24. +1 −1 flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
  25. +13 −5 flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
  26. +7 −6 flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
  27. +49 −0 flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
  28. +105 −0 flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java
  29. +343 −0 flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
  30. +188 −0 flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
  31. +172 −0 ...rn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
  32. +149 −0 ...k-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java
  33. +234 −0 ...n/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java
@@ -44,22 +44,22 @@
implements IOReadableWritable, java.io.Serializable, Cloneable {
private static final long serialVersionUID = 1L;
-
+
private static final byte TYPE_STRING = 0;
private static final byte TYPE_INT = 1;
private static final byte TYPE_LONG = 2;
private static final byte TYPE_BOOLEAN = 3;
private static final byte TYPE_FLOAT = 4;
private static final byte TYPE_DOUBLE = 5;
private static final byte TYPE_BYTES = 6;
-
+
/** The log object used for debugging. */
private static final Logger LOG = LoggerFactory.getLogger(Configuration.class);
-
+
/** Stores the concrete key/value pairs of this configuration object. */
protected final HashMap<String, Object> confData;
-
+
// --------------------------------------------------------------------------------------------
/**
@@ -639,12 +639,16 @@ private Object getRawValueFromOption(ConfigOption<?> configOption) {
Object o = getRawValue(configOption.key());
if (o != null) {
+ // found a value for the current proper key
return o;
}
else if (configOption.hasDeprecatedKeys()) {
+ // try the deprecated keys
for (String deprecatedKey : configOption.deprecatedKeys()) {
Object oo = getRawValue(deprecatedKey);
if (oo != null) {
+ LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'",
+ deprecatedKey, configOption.key());
return oo;
}
}
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hdfstests;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.highavailability.FsNegativeRunningJobsRegistry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class FsNegativeRunningJobsRegistryTest {
+
+ @ClassRule
+ public static final TemporaryFolder TEMP_DIR = new TemporaryFolder();
+
+ private static MiniDFSCluster HDFS_CLUSTER;
+
+ private static Path HDFS_ROOT_PATH;
+
+ // ------------------------------------------------------------------------
+ // startup / shutdown
+ // ------------------------------------------------------------------------
+
+ @BeforeClass
+ public static void createHDFS() throws Exception {
+ final File tempDir = TEMP_DIR.newFolder();
+
+ Configuration hdConf = new Configuration();
+ hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tempDir.getAbsolutePath());
+
+ MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+ HDFS_CLUSTER = builder.build();
+
+ HDFS_ROOT_PATH = new Path("hdfs://" + HDFS_CLUSTER.getURI().getHost() + ":"
+ + HDFS_CLUSTER.getNameNodePort() + "/");
+ }
+
+ @AfterClass
+ public static void destroyHDFS() {
+ if (HDFS_CLUSTER != null) {
+ HDFS_CLUSTER.shutdown();
+ }
+ HDFS_CLUSTER = null;
+ HDFS_ROOT_PATH = null;
+ }
+
+ // ------------------------------------------------------------------------
+ // Tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testCreateAndSetFinished() throws Exception {
+ final Path workDir = new Path(HDFS_ROOT_PATH, "test-work-dir");
+ final JobID jid = new JobID();
+
+ FsNegativeRunningJobsRegistry registry = new FsNegativeRunningJobsRegistry(workDir);
+
+ // initially, without any call, the job is considered running
+ assertTrue(registry.isJobRunning(jid));
+
+ // repeated setting should not affect the status
+ registry.setJobRunning(jid);
+ assertTrue(registry.isJobRunning(jid));
+
+ // set the job to finished and validate
+ registry.setJobFinished(jid);
+ assertFalse(registry.isJobRunning(jid));
+
+ // another registry should pick this up
+ FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(workDir);
+ assertFalse(otherRegistry.isJobRunning(jid));
+ }
+
+ @Test
+ public void testSetFinishedAndRunning() throws Exception {
+ final Path workDir = new Path(HDFS_ROOT_PATH, "änother_wörk_directörü");
+ final JobID jid = new JobID();
+
+ FsNegativeRunningJobsRegistry registry = new FsNegativeRunningJobsRegistry(workDir);
+
+ // set the job to finished and validate
+ registry.setJobFinished(jid);
+ assertFalse(registry.isJobRunning(jid));
+
+ // set the job to back to running and validate
+ registry.setJobRunning(jid);
+ assertTrue(registry.isJobRunning(jid));
+
+ // another registry should pick this up
+ FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(workDir);
+ assertTrue(otherRegistry.isJobRunning(jid));
+ }
+}
@@ -35,6 +35,8 @@
import java.net.URI;
import java.net.UnknownHostException;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* Concrete implementation of the {@link FileSystem} base class for the Hadoop File System. The
* class is a wrapper class which encapsulated the original Hadoop HDFS API.
@@ -60,7 +62,8 @@
/**
- * Creates a new DistributedFileSystem object to access HDFS
+ * Creates a new DistributedFileSystem object to access HDFS, based on a class name
+ * and picking up the configuration from the class path or the Flink configuration.
*
* @throws IOException
* throw if the required HDFS classes cannot be instantiated
@@ -76,6 +79,21 @@ public HadoopFileSystem(Class<? extends org.apache.hadoop.fs.FileSystem> fsClass
this.fs = instantiateFileSystem(fsClass);
}
+ /**
+ * Creates a new DistributedFileSystem that uses the given Hadoop
+ * {@link org.apache.hadoop.fs.FileSystem} under the hood.
+ *
+ * @param hadoopConfig The Hadoop configuration that the FileSystem is based on.
+ * @param hadoopFileSystem The Hadoop FileSystem that will be used under the hood.
+ */
+ public HadoopFileSystem(
+ org.apache.hadoop.conf.Configuration hadoopConfig,
+ org.apache.hadoop.fs.FileSystem hadoopFileSystem) {
+
+ this.conf = checkNotNull(hadoopConfig, "hadoopConfig");
+ this.fs = checkNotNull(hadoopFileSystem, "hadoopFileSystem");
+ }
+
private Class<? extends org.apache.hadoop.fs.FileSystem> getDefaultHDFSClass() throws IOException {
Class<? extends org.apache.hadoop.fs.FileSystem> fsClass = null;
@@ -43,6 +43,12 @@ public EmbeddedNonHaServices() {
// ------------------------------------------------------------------------
@Override
+ public String getResourceManagerEndpointName() {
+ // dynamic actor name
+ return null;
+ }
+
+ @Override
public LeaderRetrievalService getResourceManagerLeaderRetriever() {
return resourceManagerLeaderService.createLeaderRetrievalService();
}
@@ -55,11 +61,16 @@ public LeaderElectionService getResourceManagerLeaderElectionService() {
// ------------------------------------------------------------------------
@Override
- public void shutdown() throws Exception {
+ public void close() throws Exception {
try {
- super.shutdown();
+ super.close();
} finally {
resourceManagerLeaderService.shutdown();
}
}
+
+ @Override
+ public void closeAndCleanupAllData() throws Exception {
+ close();
+ }
}
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This {@link RunningJobsRegistry} tracks the status jobs via marker files,
+ * marking finished jobs via marker files.
+ *
+ * <p>The general contract is the following:
+ * <ul>
+ * <li>Initially, a marker file does not exist (no one created it, yet), which means
+ * the specific job is assumed to be running</li>
+ * <li>The JobManager that finishes calls this service to create the marker file,
+ * which marks the job as finished.</li>
+ * <li>If a JobManager gains leadership at some point when shutdown is in progress,
+ * it will see the marker file and realize that the job is finished.</li>
+ * <li>The application framework is expected to clean the file once the application
+ * is completely shut down. At that point, no JobManager will attempt to
+ * start the job, even if it gains leadership.</li>
+ * </ul>
+ *
+ * <p>It is especially tailored towards deployment modes like for example
+ * YARN, where HDFS is available as a persistent file system, and the YARN
+ * application's working directories on HDFS are automatically cleaned
+ * up after the application completed.
+ */
+public class FsNegativeRunningJobsRegistry implements RunningJobsRegistry {
+
+ private static final String PREFIX = ".job_complete_";
+
+ private final FileSystem fileSystem;
+
+ private final Path basePath;
+
+ /**
+ * Creates a new registry that writes to the FileSystem and working directory
+ * denoted by the given path.
+ *
+ * <p>The initialization will attempt to write to the given working directory, in
+ * order to catch setup/configuration errors early.
+ *
+ * @param workingDirectory The working directory for files to track the job status.
+ *
+ * @throws IOException Thrown, if the specified directory cannot be accessed.
+ */
+ public FsNegativeRunningJobsRegistry(Path workingDirectory) throws IOException {
+ this(workingDirectory.getFileSystem(), workingDirectory);
+ }
+
+ /**
+ * Creates a new registry that writes its files to the given FileSystem at
+ * the given working directory path.
+ *
+ * <p>The initialization will attempt to write to the given working directory, in
+ * order to catch setup/configuration errors early.
+ *
+ * @param fileSystem The FileSystem to use for the marker files.
+ * @param workingDirectory The working directory for files to track the job status.
+ *
+ * @throws IOException Thrown, if the specified directory cannot be accessed.
+ */
+ public FsNegativeRunningJobsRegistry(FileSystem fileSystem, Path workingDirectory) throws IOException {
+ this.fileSystem = checkNotNull(fileSystem, "fileSystem");
+ this.basePath = checkNotNull(workingDirectory, "workingDirectory");
+
+ // to be safe, attempt to write to the working directory, to
+ // catch problems early
+ final Path testFile = new Path(workingDirectory, ".registry_test");
+ try (FSDataOutputStream out = fileSystem.create(testFile, false)) {
+ out.write(42);
+ }
+ catch (IOException e) {
+ throw new IOException("Unable to write to working directory: " + workingDirectory, e);
+ }
+ finally {
+ fileSystem.delete(testFile, false);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void setJobRunning(JobID jobID) throws IOException {
+ checkNotNull(jobID, "jobID");
+ final Path filePath = createMarkerFilePath(jobID);
+
+ // delete the marker file, if it exists
+ try {
+ fileSystem.delete(filePath, false);
+ }
+ catch (FileNotFoundException e) {
+ // apparently job was already considered running
+ }
+ }
+
+ @Override
+ public void setJobFinished(JobID jobID) throws IOException {
+ checkNotNull(jobID, "jobID");
+ final Path filePath = createMarkerFilePath(jobID);
+
+ // create the file
+ // to avoid an exception if the job already exists, set overwrite=true
+ try (FSDataOutputStream out = fileSystem.create(filePath, true)) {
+ out.write(42);
+ }
+ }
+
+ @Override
+ public boolean isJobRunning(JobID jobID) throws IOException {
+ checkNotNull(jobID, "jobID");
+
+ // check for the existence of the file
+ try {
+ fileSystem.getFileStatus(createMarkerFilePath(jobID));
+ // file was found --> job is terminated
+ return false;
+ }
+ catch (FileNotFoundException e) {
+ // file does not exist, job is still running
+ return true;
+ }
+ }
+
+ private Path createMarkerFilePath(JobID jobId) {
+ return new Path(basePath, PREFIX + jobId.toString());
+ }
+}
Oops, something went wrong.

0 comments on commit 2a7dbda

Please sign in to comment.