Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ private static void startSecureFlinkClusterWithRecoveryModeEnabled() {
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
config.setString(CoreOptions.STATE_BACKEND, "filesystem");
config.setString(ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints");
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + "/flink/recovery");
config.setString("state.backend.fs.checkpointdir", hdfsURI + "/flink/checkpoints");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -922,65 +922,111 @@ public final class ConfigConstants {

// --------------------------- ZooKeeper ----------------------------------

/** ZooKeeper servers. */
/**
* ZooKeeper servers.
* @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_QUORUM}.
*/
@PublicEvolving
@Deprecated
public static final String HA_ZOOKEEPER_QUORUM_KEY = "high-availability.zookeeper.quorum";

/**
* File system state backend base path for recoverable state handles. Recovery state is written
* to this path and the file state handles are persisted for recovery.
* @deprecated in favor of {@link HighAvailabilityOptions#HA_STORAGE_PATH}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this one. @tillrohrmann are high-availability.storageDir and high-availability.zookeeper.storageDir equivalent?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think so.

*/
@PublicEvolving
@Deprecated
public static final String HA_ZOOKEEPER_STORAGE_PATH = "high-availability.zookeeper.storageDir";

/** ZooKeeper root path. */
/**
* ZooKeeper root path.
* @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_ROOT}.
*/
@PublicEvolving
@Deprecated
public static final String HA_ZOOKEEPER_DIR_KEY = "high-availability.zookeeper.path.root";

/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_NAMESPACE}. */
@PublicEvolving
@Deprecated
public static final String HA_ZOOKEEPER_NAMESPACE_KEY = "high-availability.zookeeper.path.namespace";

/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LATCH_PATH}. */
@PublicEvolving
@Deprecated
public static final String HA_ZOOKEEPER_LATCH_PATH = "high-availability.zookeeper.path.latch";

/** ZooKeeper root path (ZNode) for job graphs. */
/**
* ZooKeeper root path (ZNode) for job graphs.
* @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_JOBGRAPHS_PATH}.
*/
@PublicEvolving
@Deprecated
public static final String HA_ZOOKEEPER_JOBGRAPHS_PATH = "high-availability.zookeeper.path.jobgraphs";

/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LEADER_PATH}. */
@PublicEvolving
@Deprecated
public static final String HA_ZOOKEEPER_LEADER_PATH = "high-availability.zookeeper.path.leader";

/** ZooKeeper root path (ZNode) for completed checkpoints. */
/**
* ZooKeeper root path (ZNode) for completed checkpoints.
* @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINTS_PATH}.
*/
@PublicEvolving
@Deprecated
public static final String HA_ZOOKEEPER_CHECKPOINTS_PATH = "high-availability.zookeeper.path.checkpoints";

/** ZooKeeper root path (ZNode) for checkpoint counters. */
/**
* ZooKeeper root path (ZNode) for checkpoint counters.
* @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH}.
*/
@PublicEvolving
@Deprecated
public static final String HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "high-availability.zookeeper.path.checkpoint-counter";

/** ZooKeeper root path (ZNode) for Mesos workers. */
/**
* ZooKeeper root path (ZNode) for Mesos workers.
* @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_MESOS_WORKERS_PATH}.
*/
@PublicEvolving
@Deprecated
public static final String HA_ZOOKEEPER_MESOS_WORKERS_PATH = "high-availability.zookeeper.path.mesos-workers";

/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_SESSION_TIMEOUT}. */
@PublicEvolving
@Deprecated
public static final String HA_ZOOKEEPER_SESSION_TIMEOUT = "high-availability.zookeeper.client.session-timeout";

/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_CONNECTION_TIMEOUT}. */
@PublicEvolving
@Deprecated
public static final String HA_ZOOKEEPER_CONNECTION_TIMEOUT = "high-availability.zookeeper.client.connection-timeout";

/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_RETRY_WAIT} */
@PublicEvolving
@Deprecated
public static final String HA_ZOOKEEPER_RETRY_WAIT = "high-availability.zookeeper.client.retry-wait";

/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_MAX_RETRY_ATTEMPTS}. */
@PublicEvolving
@Deprecated
public static final String HA_ZOOKEEPER_MAX_RETRY_ATTEMPTS = "high-availability.zookeeper.client.max-retry-attempts";

/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_CLIENT_ACL}. */
@PublicEvolving
@Deprecated
public static final String HA_ZOOKEEPER_CLIENT_ACL = "high-availability.zookeeper.client.acl";

/** @deprecated in favor of {@link SecurityOptions#ZOOKEEPER_SASL_DISABLE}. */
@PublicEvolving
@Deprecated
public static final String ZOOKEEPER_SASL_DISABLE = "zookeeper.sasl.disable";

/** @deprecated in favor of {@link SecurityOptions#ZOOKEEPER_SASL_SERVICE_NAME}. */
@PublicEvolving
@Deprecated
public static final String ZOOKEEPER_SASL_SERVICE_NAME = "zookeeper.sasl.service-name";

/** @deprecated Deprecated in favour of {@link #HA_ZOOKEEPER_QUORUM_KEY}. */
Expand Down Expand Up @@ -1594,51 +1640,103 @@ public final class ConfigConstants {

// --------------------------- ZooKeeper ----------------------------------

/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_ROOT}. */
@Deprecated
public static final String DEFAULT_ZOOKEEPER_DIR_KEY = "/flink";

/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_NAMESPACE}. */
@Deprecated
public static final String DEFAULT_ZOOKEEPER_NAMESPACE_KEY = "/default";

/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LATCH_PATH}. */
@Deprecated
public static final String DEFAULT_ZOOKEEPER_LATCH_PATH = "/leaderlatch";

/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_LEADER_PATH}. */
@Deprecated
public static final String DEFAULT_ZOOKEEPER_LEADER_PATH = "/leader";

/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_JOBGRAPHS_PATH}. */
@Deprecated
public static final String DEFAULT_ZOOKEEPER_JOBGRAPHS_PATH = "/jobgraphs";

/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINTS_PATH}. */
@Deprecated
public static final String DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH = "/checkpoints";

/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH} */
@Deprecated
public static final String DEFAULT_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "/checkpoint-counter";

/** @deprecated in favor of {@link HighAvailabilityOptions#HA_ZOOKEEPER_MESOS_WORKERS_PATH}. */
@Deprecated
public static final String DEFAULT_ZOOKEEPER_MESOS_WORKERS_PATH = "/mesos-workers";

/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_SESSION_TIMEOUT}. */
@Deprecated
public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT = 60000;

/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_CONNECTION_TIMEOUT}. */
@Deprecated
public static final int DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT = 15000;

/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_RETRY_WAIT}. */
@Deprecated
public static final int DEFAULT_ZOOKEEPER_RETRY_WAIT = 5000;

/** @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_MAX_RETRY_ATTEMPTS}. */
@Deprecated
public static final int DEFAULT_ZOOKEEPER_MAX_RETRY_ATTEMPTS = 3;

// - Defaults for required ZooKeeper configuration keys -------------------

/** ZooKeeper default client port. */
/**
* ZooKeeper default client port.
* @deprecated in favor of {@code FlinkZookeeperQuorumPeer#DEFAULT_ZOOKEEPER_CLIENT_PORT}.
*/
@Deprecated
public static final int DEFAULT_ZOOKEEPER_CLIENT_PORT = 2181;

/** ZooKeeper default init limit. */
/**
* ZooKeeper default init limit.
* @deprecated in favor of {@code FlinkZookeeperQuorumPeer#DEFAULT_ZOOKEEPER_INIT_LIMIT}.
*/
@Deprecated
public static final int DEFAULT_ZOOKEEPER_INIT_LIMIT = 10;

/** ZooKeeper default sync limit. */
/**
* ZooKeeper default sync limit.
* @deprecated in favor of {@code FlinkZookeeperQuorumPeer#DEFAULT_ZOOKEEPER_SYNC_LIMIT}.
*/
@Deprecated
public static final int DEFAULT_ZOOKEEPER_SYNC_LIMIT = 5;

/** ZooKeeper default peer port. */
/**
* ZooKeeper default peer port.
* @deprecated in favor of {@code FlinkZookeeperQuorumPeer#DEFAULT_ZOOKEEPER_PEER_PORT}.
*/
@Deprecated
public static final int DEFAULT_ZOOKEEPER_PEER_PORT = 2888;

/** ZooKeeper default leader port. */
/**
* ZooKeeper default leader port.
* @deprecated in favor of {@code FlinkZookeeperQuorumPeer#DEFAULT_ZOOKEEPER_LEADER_PORT}.
*/
@Deprecated
public static final int DEFAULT_ZOOKEEPER_LEADER_PORT = 3888;

/** Defaults for ZK client security **/
/**
* Defaults for ZK client security.
* @deprecated in favor of {@link SecurityOptions#ZOOKEEPER_SASL_DISABLE}.
*/
@Deprecated
public static final boolean DEFAULT_ZOOKEEPER_SASL_DISABLE = true;

/** ACL options supported "creator" or "open" */
/**
* ACL options supported "creator" or "open".
* @deprecated in favor of {@link HighAvailabilityOptions#ZOOKEEPER_CLIENT_ACL}.
*/
@Deprecated
public static final String DEFAULT_HA_ZOOKEEPER_CLIENT_ACL = "open";

// ----------------------------- Metrics ----------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,6 @@ public class HighAvailabilityOptions {
key("high-availability.storageDir")
.noDefaultValue()
.withDeprecatedKeys("high-availability.zookeeper.storageDir", "recovery.zookeeper.storageDir");

/**
* The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper.
*/
public static final ConfigOption<String> HA_ZOOKEEPER_QUORUM =
key("high-availability.zookeeper.quorum")
.noDefaultValue()
.withDeprecatedKeys("recovery.zookeeper.quorum");


// ------------------------------------------------------------------------
Expand All @@ -92,6 +84,14 @@ public class HighAvailabilityOptions {
// ZooKeeper Options
// ------------------------------------------------------------------------

/**
* The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper.
*/
public static final ConfigOption<String> HA_ZOOKEEPER_QUORUM =
key("high-availability.zookeeper.quorum")
.noDefaultValue()
.withDeprecatedKeys("recovery.zookeeper.quorum");

/**
* The root path under which Flink stores its entries in ZooKeeper
*/
Expand All @@ -100,6 +100,46 @@ public class HighAvailabilityOptions {
.defaultValue("/flink")
.withDeprecatedKeys("recovery.zookeeper.path.root");

public static final ConfigOption<String> HA_ZOOKEEPER_NAMESPACE =
key("high-availability.zookeeper.path.namespace")
.noDefaultValue()
.withDeprecatedKeys("recovery.zookeeper.path.namespace");

public static final ConfigOption<String> HA_ZOOKEEPER_LATCH_PATH =
key("high-availability.zookeeper.path.latch")
.defaultValue("/leaderlatch")
.withDeprecatedKeys("recovery.zookeeper.path.latch");

/** ZooKeeper root path (ZNode) for job graphs. */
public static final ConfigOption<String> HA_ZOOKEEPER_JOBGRAPHS_PATH =
key("high-availability.zookeeper.path.jobgraphs")
.defaultValue("/jobgraphs")
.withDeprecatedKeys("recovery.zookeeper.path.jobgraphs");

public static final ConfigOption<String> HA_ZOOKEEPER_LEADER_PATH =
key("high-availability.zookeeper.path.leader")
.defaultValue("/leader")
.withDeprecatedKeys("recovery.zookeeper.path.leader");

/** ZooKeeper root path (ZNode) for completed checkpoints. */
public static final ConfigOption<String> HA_ZOOKEEPER_CHECKPOINTS_PATH =
key("high-availability.zookeeper.path.checkpoints")
.defaultValue("/checkpoints")
.withDeprecatedKeys("recovery.zookeeper.path.checkpoints");

/** ZooKeeper root path (ZNode) for checkpoint counters. */
public static final ConfigOption<String> HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove the HA_ prefix from all options.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your reply. In addition to the zookeeper options, there're some other options such as HA_STORAGE_PATH, HA_JOB_MANAGER_PORT_RANGE. I think the HA_ prefix of them should be removed, otherwise the style may not be consistent. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well isn't that great.

OK, keep the prefix for the HA options, but remove it for ZOOKEEPER_CLIENT_ACL.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's nice, and I have fixed it. Thanks :)

key("high-availability.zookeeper.path.checkpoint-counter")
.defaultValue("/checkpoint-counter")
.withDeprecatedKeys("recovery.zookeeper.path.checkpoint-counter");

/** ZooKeeper root path (ZNode) for Mesos workers. */
@PublicEvolving
public static final ConfigOption<String> HA_ZOOKEEPER_MESOS_WORKERS_PATH =
key("high-availability.zookeeper.path.mesos-workers")
.defaultValue("/mesos-workers")
.withDeprecatedKeys("recovery.zookeeper.path.mesos-workers");

// ------------------------------------------------------------------------
// ZooKeeper Client Settings
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -128,6 +168,10 @@ public class HighAvailabilityOptions {
key("high-availability.zookeeper.path.running-registry")
.defaultValue("/running_job_registry/");

public static final ConfigOption<String> ZOOKEEPER_CLIENT_ACL =
key("high-availability.zookeeper.client.acl")
.defaultValue("open");

// ------------------------------------------------------------------------

/** Not intended to be instantiated */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@

package org.apache.flink.mesos.runtime.clusterframework.services;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.zookeeper.ZooKeeperUtilityFactory;
import org.apache.flink.util.ConfigurationUtil;

/**
* Utilities for the {@link MesosServices}.
Expand All @@ -44,11 +43,8 @@ public static MesosServices createMesosServices(Configuration configuration) thr
return new StandaloneMesosServices();

case ZOOKEEPER:
final String zkMesosRootPath = ConfigurationUtil.getStringWithDeprecatedKeys(
configuration,
ConfigConstants.HA_ZOOKEEPER_MESOS_WORKERS_PATH,
ConfigConstants.DEFAULT_ZOOKEEPER_MESOS_WORKERS_PATH,
ConfigConstants.ZOOKEEPER_MESOS_WORKERS_PATH);
final String zkMesosRootPath = configuration.getString(
HighAvailabilityOptions.HA_ZOOKEEPER_MESOS_WORKERS_PATH);

ZooKeeperUtilityFactory zooKeeperUtilityFactory = new ZooKeeperUtilityFactory(
configuration,
Expand Down
Loading