Skip to content

Commit

Permalink
HBASE-28529 Use ZKClientConfig instead of system properties when sett…
Browse files Browse the repository at this point in the history
…ing zookeeper configurations (#5835)

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
Reviewed-by: Andor Molnár <andor@apache.org>
Reviewed-by: BukrosSzabolcs <szabolcs@cloudera.com>
  • Loading branch information
Apache9 committed Apr 23, 2024
1 parent e3761ba commit 6c6e776
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -75,6 +76,8 @@ public final class ReadOnlyZKClient implements Closeable {

private final int keepAliveTimeMs;

private final ZKClientConfig zkClientConfig;

private static abstract class Task implements Delayed {

protected long time = System.nanoTime();
Expand Down Expand Up @@ -136,10 +139,12 @@ public ReadOnlyZKClient(Configuration conf) {
this.retryIntervalMs =
conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS);
this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS);
this.zkClientConfig = ZKConfig.getZKClientConfig(conf);
LOG.debug(
"Connect {} to {} with session timeout={}ms, retries {}, "
+ "retry interval {}ms, keepAlive={}ms",
getId(), connectString, sessionTimeoutMs, maxRetries, retryIntervalMs, keepAliveTimeMs);
"Connect {} to {} with session timeout={}ms, retries={}, "
+ "retry interval={}ms, keepAlive={}ms, zk client config={}",
getId(), connectString, sessionTimeoutMs, maxRetries, retryIntervalMs, keepAliveTimeMs,
zkClientConfig);
Threads.setDaemonThreadRunning(new Thread(this::run),
"ReadOnlyZKClient-" + connectString + "@" + getId());
}
Expand Down Expand Up @@ -316,7 +321,7 @@ private ZooKeeper getZk() throws IOException {
// may be closed when session expired
if (zookeeper == null || !zookeeper.getState().isAlive()) {
zookeeper = new ZooKeeper(connectString, sessionTimeoutMs, e -> {
});
}, zkClientConfig);
}
return zookeeper;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,22 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.client.ZKClientConfig;

import org.apache.hbase.thirdparty.com.google.common.base.Splitter;

/**
* Utility methods for reading, and building the ZooKeeper configuration. The order and priority for
* reading the config are as follows: (1). Property with "hbase.zookeeper.property." prefix from
* HBase XML (2). other zookeeper related properties in HBASE XML
* reading the config are as follows:
* <ol>
* <li>Property with "hbase.zookeeper.property." prefix from HBase XML.</li>
* <li>other zookeeper related properties in HBASE XML</li>
* </ol>
*/
@InterfaceAudience.Private
public final class ZKConfig {

private static final String VARIABLE_START = "${";
private static final String ZOOKEEPER_JAVA_PROPERTY_PREFIX = "zookeeper.";

private ZKConfig() {
}
Expand Down Expand Up @@ -132,7 +135,6 @@ private static String getZKQuorumServersStringFromHbaseConfig(Configuration conf
* @return Quorum servers
*/
public static String getZKQuorumServersString(Configuration conf) {
setZooKeeperClientSystemProperties(HConstants.ZK_CFG_PROPERTY_PREFIX, conf);
return getZKQuorumServersStringFromHbaseConfig(conf);
}

Expand Down Expand Up @@ -322,13 +324,19 @@ public String getZnodeParent() {
}
}

public static ZKClientConfig getZKClientConfig(Configuration conf) {
Properties zkProperties = extractZKPropsFromHBaseConfig(conf);
ZKClientConfig zkClientConfig = new ZKClientConfig();
zkProperties.forEach((k, v) -> zkClientConfig.setProperty(k.toString(), v.toString()));
return zkClientConfig;
}

/**
* Get the client ZK Quorum servers string
* @param conf the configuration to read
* @return Client quorum servers, or null if not specified
*/
public static String getClientZKQuorumServersString(Configuration conf) {
setZooKeeperClientSystemProperties(HConstants.ZK_CFG_PROPERTY_PREFIX, conf);
String clientQuromServers = conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM);
if (clientQuromServers == null) {
return null;
Expand All @@ -341,15 +349,4 @@ public static String getClientZKQuorumServersString(Configuration conf) {
final String[] serverHosts = StringUtils.getStrings(clientQuromServers);
return buildZKQuorumServerString(serverHosts, clientZkClientPort);
}

private static void setZooKeeperClientSystemProperties(String prefix, Configuration conf) {
Properties zkProperties = extractZKPropsFromHBaseConfig(conf);
for (Entry<Object, Object> entry : zkProperties.entrySet()) {
String key = entry.getKey().toString().trim();
String value = entry.getValue().toString().trim();
if (System.getProperty(ZOOKEEPER_JAVA_PROPERTY_PREFIX + key) == null) {
System.setProperty(ZOOKEEPER_JAVA_PROPERTY_PREFIX + key, value);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.zookeeper.client.ZKClientConfig;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand Down Expand Up @@ -100,62 +101,22 @@ public void testClusterKeyWithMultiplePorts() throws Exception {
}

@Test
public void testZooKeeperTlsPropertiesClient() {
public void testZooKeeperTlsProperties() {
// Arrange
Configuration conf = HBaseConfiguration.create();
for (String p : ZOOKEEPER_CLIENT_TLS_PROPERTIES) {
conf.set(HConstants.ZK_CFG_PROPERTY_PREFIX + p, p);
String zkprop = "zookeeper." + p;
System.clearProperty(zkprop);
}

// Act
ZKConfig.getClientZKQuorumServersString(conf);
ZKClientConfig zkClientConfig = ZKConfig.getZKClientConfig(conf);

// Assert
for (String p : ZOOKEEPER_CLIENT_TLS_PROPERTIES) {
String zkprop = "zookeeper." + p;
assertEquals("Invalid or unset system property: " + zkprop, p, System.getProperty(zkprop));
System.clearProperty(zkprop);
assertEquals("Invalid or unset system property: " + p, p, zkClientConfig.getProperty(p));
}
}

@Test
public void testZooKeeperTlsPropertiesServer() {
// Arrange
Configuration conf = HBaseConfiguration.create();
for (String p : ZOOKEEPER_CLIENT_TLS_PROPERTIES) {
conf.set(HConstants.ZK_CFG_PROPERTY_PREFIX + p, p);
String zkprop = "zookeeper." + p;
System.clearProperty(zkprop);
}

// Act
ZKConfig.getZKQuorumServersString(conf);

// Assert
for (String p : ZOOKEEPER_CLIENT_TLS_PROPERTIES) {
String zkprop = "zookeeper." + p;
assertEquals("Invalid or unset system property: " + zkprop, p, System.getProperty(zkprop));
System.clearProperty(zkprop);
}
}

@Test
public void testZooKeeperPropertiesDoesntOverwriteSystem() {
// Arrange
System.setProperty("zookeeper.a.b.c", "foo");
Configuration conf = HBaseConfiguration.create();
conf.set(HConstants.ZK_CFG_PROPERTY_PREFIX + "a.b.c", "bar");

// Act
ZKConfig.getZKQuorumServersString(conf);

// Assert
assertEquals("foo", System.getProperty("zookeeper.a.b.c"));
System.clearProperty("zookeeper.a.b.c");
}

private void testKey(String ensemble, int port, String znode) throws IOException {
testKey(ensemble, port, znode, false); // not support multiple client ports
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.CreateRequest;
Expand All @@ -49,19 +50,22 @@
import org.slf4j.LoggerFactory;

/**
* A zookeeper that can handle 'recoverable' errors. To handle recoverable errors, developers need
* to realize that there are two classes of requests: idempotent and non-idempotent requests. Read
* requests and unconditional sets and deletes are examples of idempotent requests, they can be
* reissued with the same results. (Although, the delete may throw a NoNodeException on reissue its
* effect on the ZooKeeper state is the same.) Non-idempotent requests need special handling,
* application and library writers need to keep in mind that they may need to encode information in
* the data or name of znodes to detect retries. A simple example is a create that uses a sequence
* flag. If a process issues a create("/x-", ..., SEQUENCE) and gets a connection loss exception,
* that process will reissue another create("/x-", ..., SEQUENCE) and get back x-111. When the
* process does a getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be that x-109
* was the result of the previous create, so the process actually owns both x-109 and x-111. An easy
* way around this is to use "x-process id-" when doing the create. If the process is using an id of
* 352, before reissuing the create it will do a getChildren("/") and see "x-222-1", "x-542-30",
* A zookeeper that can handle 'recoverable' errors.
* <p>
* To handle recoverable errors, developers need to realize that there are two classes of requests:
* idempotent and non-idempotent requests. Read requests and unconditional sets and deletes are
* examples of idempotent requests, they can be reissued with the same results.
* <p>
* (Although, the delete may throw a NoNodeException on reissue its effect on the ZooKeeper state is
* the same.) Non-idempotent requests need special handling, application and library writers need to
* keep in mind that they may need to encode information in the data or name of znodes to detect
* retries. A simple example is a create that uses a sequence flag. If a process issues a
* create("/x-", ..., SEQUENCE) and gets a connection loss exception, that process will reissue
* another create("/x-", ..., SEQUENCE) and get back x-111. When the process does a
* getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be that x-109 was the result
* of the previous create, so the process actually owns both x-109 and x-111. An easy way around
* this is to use "x-process id-" when doing the create. If the process is using an id of 352,
* before reissuing the create it will do a getChildren("/") and see "x-222-1", "x-542-30",
* "x-352-109", x-333-110". The process will know that the original create succeeded an the znode it
* created is "x-352-109".
* @see "https://cwiki.apache.org/confluence/display/HADOOP2/ZooKeeper+ErrorHandling"
Expand All @@ -79,37 +83,31 @@ public class RecoverableZooKeeper {
private final int sessionTimeout;
private final String quorumServers;
private final int maxMultiSize;
private final ZKClientConfig zkClientConfig;

/**
* See {@link #connect(Configuration, String, Watcher, String)}
* See {@link #connect(Configuration, String, Watcher, String, ZKClientConfig)}.
*/
public static RecoverableZooKeeper connect(Configuration conf, Watcher watcher)
throws IOException {
String ensemble = ZKConfig.getZKQuorumServersString(conf);
return connect(conf, ensemble, watcher);
}

/**
* See {@link #connect(Configuration, String, Watcher, String)}
*/
public static RecoverableZooKeeper connect(Configuration conf, String ensemble, Watcher watcher)
throws IOException {
return connect(conf, ensemble, watcher, null);
return connect(conf, ensemble, watcher, null, null);
}

/**
* Creates a new connection to ZooKeeper, pulling settings and ensemble config from the specified
* configuration object using methods from {@link ZKConfig}. Sets the connection status monitoring
* watcher to the specified watcher.
* @param conf configuration to pull ensemble and other settings from
* @param watcher watcher to monitor connection changes
* @param ensemble ZooKeeper servers quorum string
* @param identifier value used to identify this client instance.
* @param conf configuration to pull ensemble and other settings from
* @param watcher watcher to monitor connection changes
* @param ensemble ZooKeeper servers quorum string
* @param identifier value used to identify this client instance.
* @param zkClientConfig client specific configurations for this instance
* @return connection to zookeeper
* @throws IOException if unable to connect to zk or config problem
*/
public static RecoverableZooKeeper connect(Configuration conf, String ensemble, Watcher watcher,
final String identifier) throws IOException {
final String identifier, ZKClientConfig zkClientConfig) throws IOException {
if (ensemble == null) {
throw new IOException("Unable to determine ZooKeeper ensemble");
}
Expand All @@ -122,14 +120,12 @@ public static RecoverableZooKeeper connect(Configuration conf, String ensemble,
int maxSleepTime = conf.getInt("zookeeper.recovery.retry.maxsleeptime", 60000);
int multiMaxSize = conf.getInt("zookeeper.multi.max.size", 1024 * 1024);
return new RecoverableZooKeeper(ensemble, timeout, watcher, retry, retryIntervalMillis,
maxSleepTime, identifier, multiMaxSize);
maxSleepTime, identifier, multiMaxSize, zkClientConfig);
}

@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE",
justification = "None. Its always been this way.")
public RecoverableZooKeeper(String quorumServers, int sessionTimeout, Watcher watcher,
int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier, int maxMultiSize)
throws IOException {
RecoverableZooKeeper(String quorumServers, int sessionTimeout, Watcher watcher, int maxRetries,
int retryIntervalMillis, int maxSleepTime, String identifier, int maxMultiSize,
ZKClientConfig zkClientConfig) throws IOException {
// TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should.
this.retryCounterFactory =
new RetryCounterFactory(maxRetries + 1, retryIntervalMillis, maxSleepTime);
Expand All @@ -147,12 +143,7 @@ public RecoverableZooKeeper(String quorumServers, int sessionTimeout, Watcher wa
this.sessionTimeout = sessionTimeout;
this.quorumServers = quorumServers;
this.maxMultiSize = maxMultiSize;

try {
checkZk();
} catch (Exception x) {
/* ignore */
}
this.zkClientConfig = zkClientConfig;
}

/**
Expand All @@ -171,10 +162,10 @@ public int getMaxMultiSizeLimit() {
* @return The created ZooKeeper connection object
* @throws KeeperException if a ZooKeeper operation fails
*/
protected synchronized ZooKeeper checkZk() throws KeeperException {
private synchronized ZooKeeper checkZk() throws KeeperException {
if (this.zk == null) {
try {
this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher);
this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher, zkClientConfig);
} catch (IOException ex) {
LOG.warn("Unable to create ZooKeeper Connection", ex);
throw new KeeperException.OperationTimeoutException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ public ZKWatcher(Configuration conf, String identifier, Abortable abortable,
this.abortable = abortable;
this.znodePaths = new ZNodePaths(conf);
PendingWatcher pendingWatcher = new PendingWatcher();
this.recoverableZooKeeper =
RecoverableZooKeeper.connect(conf, quorum, pendingWatcher, identifier);
this.recoverableZooKeeper = RecoverableZooKeeper.connect(conf, quorum, pendingWatcher,
identifier, ZKConfig.getZKClientConfig(conf));
pendingWatcher.prepare(this);
if (canCreateBaseZNode) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void testSetDataVersionMismatchInLoop() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
ZKWatcher zkw = new ZKWatcher(conf, "testSetDataVersionMismatchInLoop", abortable, true);
String ensemble = ZKConfig.getZKQuorumServersString(conf);
RecoverableZooKeeper rzk = RecoverableZooKeeper.connect(conf, ensemble, zkw);
RecoverableZooKeeper rzk = RecoverableZooKeeper.connect(conf, ensemble, zkw, null, null);
rzk.create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
rzk.setData(znode, Bytes.toBytes("OPENING"), 0);
Field zkField = RecoverableZooKeeper.class.getDeclaredField("zk");
Expand Down

0 comments on commit 6c6e776

Please sign in to comment.