Skip to content

Commit

Permalink
[FLINK-2893] [runtime] Consistent naming of recovery config parameters
Browse files Browse the repository at this point in the history
Rename config key prefix from 'ha.zookeeper' to 'recovery.zookeeper'
Rename config key from 'state.backend.fs.dir.recovery' => 'state.backend.fs.recoverydir'
Move ZooKeeper file system state backend configuration keys

This closes #1286
  • Loading branch information
uce authored and fhueske committed Oct 23, 2015
1 parent 3c8a658 commit ab2895f
Show file tree
Hide file tree
Showing 15 changed files with 45 additions and 45 deletions.
18 changes: 9 additions & 9 deletions docs/setup/config.md
Expand Up @@ -390,23 +390,23 @@ Flink supports the 'standalone' mode where only a single JobManager runs and no
The high availability mode 'zookeeper' supports the execution of multiple JobManagers and JobManager state checkpointing. The high availability mode 'zookeeper' supports the execution of multiple JobManagers and JobManager state checkpointing.
Among the group of JobManagers, ZooKeeper elects one of them as the leader which is responsible for the cluster execution. Among the group of JobManagers, ZooKeeper elects one of them as the leader which is responsible for the cluster execution.
In case of a JobManager failure, a standby JobManager will be elected as the new leader and is given the last checkpointed JobManager state. In case of a JobManager failure, a standby JobManager will be elected as the new leader and is given the last checkpointed JobManager state.
In order to use the 'zookeeper' mode, it is mandatory to also define the `ha.zookeeper.quorum` configuration value. In order to use the 'zookeeper' mode, it is mandatory to also define the `recovery.zookeeper.quorum` configuration value.


- `ha.zookeeper.quorum`: Defines the ZooKeeper quorum URL which is used to connet to the ZooKeeper cluster when the 'zookeeper' recovery mode is selected - `recovery.zookeeper.quorum`: Defines the ZooKeeper quorum URL which is used to connet to the ZooKeeper cluster when the 'zookeeper' recovery mode is selected


- `ha.zookeeper.dir`: (Default '/flink') Defines the root dir under which the ZooKeeper recovery mode will create znodes. - `recovery.zookeeper.path.root`: (Default '/flink') Defines the root dir under which the ZooKeeper recovery mode will create znodes.


- `ha.zookeeper.dir.latch`: (Default '/leaderlatch') Defines the znode of the leader latch which is used to elect the leader. - `recovery.zookeeper.path.latch`: (Default '/leaderlatch') Defines the znode of the leader latch which is used to elect the leader.


- `ha.zookeeper.dir.leader`: (Default '/leader') Defines the znode of the leader which contains the URL to the leader and the current leader session ID - `recovery.zookeeper.path.leader`: (Default '/leader') Defines the znode of the leader which contains the URL to the leader and the current leader session ID


- `ha.zookeeper.client.session-timeout`: (Default '60000') Defines the session timeout for the ZooKeeper session in ms. - `recovery.zookeeper.client.session-timeout`: (Default '60000') Defines the session timeout for the ZooKeeper session in ms.


- `ha.zookeeper.client.connection-timeout`: (Default '15000') Defines the connection timeout for ZooKeeper in ms. - `recovery.zookeeper.client.connection-timeout`: (Default '15000') Defines the connection timeout for ZooKeeper in ms.


- `ha.zookeeper.client.retry-wait`: (Default '5000') Defines the pause between consecutive retries in ms. - `recovery.zookeeper.client.retry-wait`: (Default '5000') Defines the pause between consecutive retries in ms.


- `ha.zookeeper.client.max-retry-attempts`: (Default '3') Defines the number of connection retries before the client gives up. - `recovery.zookeeper.client.max-retry-attempts`: (Default '3') Defines the number of connection retries before the client gives up.


## Background ## Background


Expand Down
6 changes: 3 additions & 3 deletions docs/setup/jobmanager_high_availability.md
Expand Up @@ -50,13 +50,13 @@ In high availabliity mode, all Flink components try to connect to a JobManager v


- **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide the distributed coordination service. - **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide the distributed coordination service.


<pre>ha.zookeeper.quorum: address1:2181[,...],addressX:2181</pre> <pre>recovery.zookeeper.quorum: address1:2181[,...],addressX:2181</pre>


Each *addressX:port* refers to a ZooKeeper server, which is reachable by Flink at the given address and port. Each *addressX:port* refers to a ZooKeeper server, which is reachable by Flink at the given address and port.


- The following configuration keys are optional: - The following configuration keys are optional:


- `ha.zookeeper.dir: /flink [default]`: ZooKeeper directory to use for coordination - `recovery.zookeeper.path.root: /flink [default]`: ZooKeeper directory to use for coordination
- TODO Add client configuration keys - TODO Add client configuration keys


## Starting an HA-cluster ## Starting an HA-cluster
Expand Down Expand Up @@ -93,7 +93,7 @@ The script `bin/start-zookeeper-quorum.sh` will start a ZooKeeper server on each


<pre> <pre>
recovery.mode: zookeeper recovery.mode: zookeeper
ha.zookeeper.quorum: localhost</pre> recovery.zookeeper.quorum: localhost</pre>


2. **Configure masters** in `conf/masters`: 2. **Configure masters** in `conf/masters`:


Expand Down
Expand Up @@ -410,12 +410,6 @@ public final class ConfigConstants {
*/ */
public static final String STATE_BACKEND = "state.backend"; public static final String STATE_BACKEND = "state.backend";


/**
* File system state backend base path for recoverable state handles. Recovery state is written
* to this path and the file state handles are persisted for recovery.
*/
public static final String STATE_BACKEND_FS_RECOVERY_PATH = "state.backend.fs.dir.recovery";

// ----------------------------- Miscellaneous ---------------------------- // ----------------------------- Miscellaneous ----------------------------


/** /**
Expand All @@ -433,31 +427,37 @@ public final class ConfigConstants {
// --------------------------- ZooKeeper ---------------------------------- // --------------------------- ZooKeeper ----------------------------------


/** ZooKeeper servers. */ /** ZooKeeper servers. */
public static final String ZOOKEEPER_QUORUM_KEY = "ha.zookeeper.quorum"; public static final String ZOOKEEPER_QUORUM_KEY = "recovery.zookeeper.quorum";

/**
* File system state backend base path for recoverable state handles. Recovery state is written
* to this path and the file state handles are persisted for recovery.
*/
public static final String ZOOKEEPER_RECOVERY_PATH = "recovery.zookeeper.storageDir";


/** ZooKeeper root path. */ /** ZooKeeper root path. */
public static final String ZOOKEEPER_DIR_KEY = "ha.zookeeper.dir"; public static final String ZOOKEEPER_DIR_KEY = "recovery.zookeeper.path.root";


public static final String ZOOKEEPER_LATCH_PATH = "ha.zookeeper.dir.latch"; public static final String ZOOKEEPER_LATCH_PATH = "recovery.zookeeper.path.latch";


public static final String ZOOKEEPER_LEADER_PATH = "ha.zookeeper.dir.leader"; public static final String ZOOKEEPER_LEADER_PATH = "recovery.zookeeper.path.leader";


/** ZooKeeper root path (ZNode) for job graphs. */ /** ZooKeeper root path (ZNode) for job graphs. */
public static final String ZOOKEEPER_JOBGRAPHS_PATH = "ha.zookeeper.dir.jobgraphs"; public static final String ZOOKEEPER_JOBGRAPHS_PATH = "recovery.zookeeper.path.jobgraphs";


/** ZooKeeper root path (ZNode) for completed checkpoints. */ /** ZooKeeper root path (ZNode) for completed checkpoints. */
public static final String ZOOKEEPER_CHECKPOINTS_PATH = "ha.zookeeper.dir.checkpoints"; public static final String ZOOKEEPER_CHECKPOINTS_PATH = "recovery.zookeeper.path.checkpoints";


/** ZooKeeper root path (ZNode) for checkpoint counters. */ /** ZooKeeper root path (ZNode) for checkpoint counters. */
public static final String ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "ha.zookeeper.dir.checkpoint-counter"; public static final String ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "recovery.zookeeper.path.checkpoint-counter";


public static final String ZOOKEEPER_SESSION_TIMEOUT = "ha.zookeeper.client.session-timeout"; public static final String ZOOKEEPER_SESSION_TIMEOUT = "recovery.zookeeper.client.session-timeout";


public static final String ZOOKEEPER_CONNECTION_TIMEOUT = "ha.zookeeper.client.connection-timeout"; public static final String ZOOKEEPER_CONNECTION_TIMEOUT = "recovery.zookeeper.client.connection-timeout";


public static final String ZOOKEEPER_RETRY_WAIT = "ha.zookeeper.client.retry-wait"; public static final String ZOOKEEPER_RETRY_WAIT = "recovery.zookeeper.client.retry-wait";


public static final String ZOOKEEPER_MAX_RETRY_ATTEMPTS = "ha.zookeeper.client.max-retry-attempts"; public static final String ZOOKEEPER_MAX_RETRY_ATTEMPTS = "recovery.zookeeper.client.max-retry-attempts";


// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// Default Values // Default Values
Expand Down
2 changes: 1 addition & 1 deletion flink-dist/src/main/resources/flink-conf.yaml
Expand Up @@ -137,4 +137,4 @@ webclient.port: 8080
# #
# recovery.mode: zookeeper # recovery.mode: zookeeper
# #
# ha.zookeeper.quorum: localhost # recovery.zookeeper.quorum: localhost
Expand Up @@ -100,7 +100,7 @@ public BlobServer(Configuration config) throws IOException {
// Recovery. Check that everything has been setup correctly. This is not clean, but it's // Recovery. Check that everything has been setup correctly. This is not clean, but it's
// better to resolve this with some upcoming changes to the state backend setup. // better to resolve this with some upcoming changes to the state backend setup.
else if (config.containsKey(ConfigConstants.STATE_BACKEND) && else if (config.containsKey(ConfigConstants.STATE_BACKEND) &&
config.containsKey(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH)) { config.containsKey(ConfigConstants.ZOOKEEPER_RECOVERY_PATH)) {


this.blobStore = new FileSystemBlobStore(config); this.blobStore = new FileSystemBlobStore(config);
} }
Expand Down
Expand Up @@ -51,12 +51,12 @@ class FileSystemBlobStore implements BlobStore {


FileSystemBlobStore(Configuration config) throws IOException { FileSystemBlobStore(Configuration config) throws IOException {
String stateBackendBasePath = config.getString( String stateBackendBasePath = config.getString(
ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, ""); ConfigConstants.ZOOKEEPER_RECOVERY_PATH, "");


if (stateBackendBasePath.equals("")) { if (stateBackendBasePath.equals("")) {
throw new IllegalConfigurationException(String.format("Missing configuration for " + throw new IllegalConfigurationException(String.format("Missing configuration for " +
"file system state backend recovery path. Please specify via " + "file system state backend recovery path. Please specify via " +
"'%s' key.", ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH)); "'%s' key.", ConfigConstants.ZOOKEEPER_RECOVERY_PATH));
} }


stateBackendBasePath += "/blob"; stateBackendBasePath += "/blob";
Expand Down
Expand Up @@ -276,11 +276,11 @@ private static <T extends Serializable> FileSystemStateStorageHelper<T> createFi
String prefix) throws IOException { String prefix) throws IOException {


String rootPath = configuration.getString( String rootPath = configuration.getString(
ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, ""); ConfigConstants.ZOOKEEPER_RECOVERY_PATH, "");


if (rootPath.equals("")) { if (rootPath.equals("")) {
throw new IllegalConfigurationException("Missing recovery path. Specify via " + throw new IllegalConfigurationException("Missing recovery path. Specify via " +
"configuration key '" + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH + "'."); "configuration key '" + ConfigConstants.ZOOKEEPER_RECOVERY_PATH + "'.");
} else { } else {
return new FileSystemStateStorageHelper<T>(rootPath, prefix); return new FileSystemStateStorageHelper<T>(rootPath, prefix);
} }
Expand Down
Expand Up @@ -70,7 +70,7 @@ public void testBlobServerRecovery() throws Exception {
Configuration config = new Configuration(); Configuration config = new Configuration();
config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER"); config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, recoveryDir.getPath()); config.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, recoveryDir.getPath());


for (int i = 0; i < server.length; i++) { for (int i = 0; i < server.length; i++) {
server[i] = new BlobServer(config); server[i] = new BlobServer(config);
Expand Down
Expand Up @@ -65,7 +65,7 @@ public void testRecoveryRegisterAndDownload() throws Exception {
Configuration config = new Configuration(); Configuration config = new Configuration();
config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER"); config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, temporaryFolder.getRoot().getAbsolutePath()); config.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, temporaryFolder.getRoot().getAbsolutePath());


for (int i = 0; i < server.length; i++) { for (int i = 0; i < server.length; i++) {
server[i] = new BlobServer(config); server[i] = new BlobServer(config);
Expand Down
Expand Up @@ -197,7 +197,7 @@ public static class JobManagerProcessEntryPoint {
* <code>--port PORT</code>. * <code>--port PORT</code>.
* *
* <p>Other arguments are parsed to a {@link Configuration} and passed to the * <p>Other arguments are parsed to a {@link Configuration} and passed to the
* JobManager, for instance: <code>--recovery.mode ZOOKEEPER --ha.zookeeper.quorum * JobManager, for instance: <code>--recovery.mode ZOOKEEPER --recovery.zookeeper.quorum
* "xyz:123:456"</code>. * "xyz:123:456"</code>.
*/ */
public static void main(String[] args) { public static void main(String[] args) {
Expand Down
Expand Up @@ -99,7 +99,7 @@ public static class TaskManagerProcessEntryPoint {
* and streaming jobs). * and streaming jobs).
* *
* <p>All arguments are parsed to a {@link Configuration} and passed to the Taskmanager, * <p>All arguments are parsed to a {@link Configuration} and passed to the Taskmanager,
* for instance: <code>--recovery.mode ZOOKEEPER --ha.zookeeper.quorum "xyz:123:456"</code>. * for instance: <code>--recovery.mode ZOOKEEPER --recovery.zookeeper.quorum "xyz:123:456"</code>.
*/ */
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
try { try {
Expand Down
Expand Up @@ -81,7 +81,7 @@ public static Configuration setZooKeeperRecoveryMode(
// File system state backend // File system state backend
config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, fsStateHandlePath + "/checkpoints"); config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, fsStateHandlePath + "/checkpoints");
config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, fsStateHandlePath + "/recovery"); config.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, fsStateHandlePath + "/recovery");


// Akka failure detection and execution retries // Akka failure detection and execution retries
config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms"); config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms");
Expand Down
Expand Up @@ -556,7 +556,7 @@ private void checkCleanRecoveryState(Configuration config) throws Exception {
0, files.length); 0, files.length);
} }


File fsRecovery = new File(config.getString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, "")); File fsRecovery = new File(config.getString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, ""));


LOG.info("Checking " + fsRecovery); LOG.info("Checking " + fsRecovery);


Expand Down
Expand Up @@ -95,7 +95,7 @@ public void testTaskManagerRegistrationAtReelectedLeader() throws Exception {
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs); configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs); configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem"); configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem");
configuration.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, tempDirectory.getAbsoluteFile().toURI().toString()); configuration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, tempDirectory.getAbsoluteFile().toURI().toString());


ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration); ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration);


Expand Down Expand Up @@ -144,7 +144,7 @@ public void testJobExecutionOnClusterWithLeaderReelection() throws Exception {
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs); configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM); configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem"); configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem");
configuration.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, tempDirectory.getAbsoluteFile().toURI().toString()); configuration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, tempDirectory.getAbsoluteFile().toURI().toString());


// we "effectively" disable the automatic RecoverAllJobs message and sent it manually to make // we "effectively" disable the automatic RecoverAllJobs message and sent it manually to make
// sure that all TMs have registered to the JM prior to issueing the RecoverAllJobs message // sure that all TMs have registered to the JM prior to issueing the RecoverAllJobs message
Expand Down
Expand Up @@ -112,11 +112,11 @@ public void testMultipleAMKill() throws Exception {
String fsStateHandlePath = tmp.getRoot().getPath(); String fsStateHandlePath = tmp.getRoot().getPath();


flinkYarnClient.setFlinkConfigurationObject(GlobalConfiguration.getConfiguration()); flinkYarnClient.setFlinkConfigurationObject(GlobalConfiguration.getConfiguration());
flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@ha.zookeeper.quorum=" + flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum=" +
zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts + zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts +
"@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" + "@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" +
"@@" + FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + fsStateHandlePath + "/checkpoints" + "@@" + FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + fsStateHandlePath + "/checkpoints" +
"@@" + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH + "=" + fsStateHandlePath + "/recovery"); "@@" + ConfigConstants.ZOOKEEPER_RECOVERY_PATH + "=" + fsStateHandlePath + "/recovery");
flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml")); flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));


AbstractFlinkYarnCluster yarnCluster = null; AbstractFlinkYarnCluster yarnCluster = null;
Expand Down

0 comments on commit ab2895f

Please sign in to comment.