Skip to content

Commit

Permalink
Make ClusterSetup realm-aware (#861)
Browse files Browse the repository at this point in the history
We make ClusterSetup, a Helix Java API, realm-aware so that this could be used in a multi-ZK environment.

Changelist:
Add a Builder to enable users to set internal ZkClient parameters
Add the realm-aware behavior in existing constructors
Update ConfigAccessor to reflect the change in the logic
  • Loading branch information
narendly committed Apr 9, 2020
1 parent b01e804 commit 7b57e9e
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ public class SystemPropertyKeys {
// ZKHelixManager
public static final String CLUSTER_MANAGER_VERSION = "cluster-manager-version.properties";

// soft constraints weight definitions
public static final String SOFT_CONSTRAINT_WEIGHTS = "soft-constraint-weight.properties";

public static final String FLAPPING_TIME_WINDOW = "helixmanager.flappingTimeWindow";

// max disconnect count during the flapping time window to trigger HelixManager flapping handling
Expand Down Expand Up @@ -57,4 +60,7 @@ public class SystemPropertyKeys {

// MBean monitor for helix.
public static final String HELIX_MONITOR_TIME_WINDOW_LENGTH_MS = "helix.monitor.slidingTimeWindow.ms";

// Multi-ZK mode enable/disable flag
public static final String MULTI_ZK_ENABLED = "helix.multiZkEnabled";
}
66 changes: 38 additions & 28 deletions helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.TreeMap;

import org.apache.helix.manager.zk.ZKUtil;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.CloudConfig;
import org.apache.helix.model.ConfigScope;
Expand All @@ -45,6 +44,7 @@
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
import org.slf4j.Logger;
Expand Down Expand Up @@ -87,18 +87,23 @@ public class ConfigAccessor {
* Constructor that creates a realm-aware ConfigAccessor using a builder.
* @param builder
*/
private ConfigAccessor(Builder builder) throws IOException, InvalidRoutingDataException {
private ConfigAccessor(Builder builder) {
switch (builder._realmMode) {
case MULTI_REALM:
_zkClient = new FederatedZkClient(builder._realmAwareZkConnectionConfig,
builder._realmAwareZkClientConfig);
try {
_zkClient = new FederatedZkClient(builder._realmAwareZkConnectionConfig,
builder._realmAwareZkClientConfig.setZkSerializer(new ZNRecordSerializer()));
} catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
throw new HelixException("Failed to create ConfigAccessor!", e);
}
break;
case SINGLE_REALM:
// Create a HelixZkClient: Use a SharedZkClient because ConfigAccessor does not need to do
// ephemeral operations
_zkClient = SharedZkClientFactory.getInstance()
.buildZkClient(builder._realmAwareZkConnectionConfig.createZkConnectionConfig(),
builder._realmAwareZkClientConfig.createHelixZkClientConfig());
.buildZkClient(new HelixZkClient.ZkConnectionConfig(builder._zkAddress),
builder._realmAwareZkClientConfig.createHelixZkClientConfig()
.setZkSerializer(new ZNRecordSerializer()));
break;
default:
throw new HelixException("Invalid RealmMode given: " + builder._realmMode);
Expand All @@ -124,24 +129,26 @@ public ConfigAccessor(RealmAwareZkClient zkClient) {
* ConfigAccessor only deals with Helix's data models like ResourceConfig.
* @param zkAddress
*/
@Deprecated
public ConfigAccessor(String zkAddress) {
// First, attempt to connect on multi-realm mode using FederatedZkClient
RealmAwareZkClient zkClient;
try {
zkClient = new FederatedZkClient(
new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(),
new RealmAwareZkClient.RealmAwareZkClientConfig());
} catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
// Connecting multi-realm failed - fall back to creating it on single-realm mode using the given ZK address
LOG.info(
"ConfigAccessor: not able to connect on multi-realm mode; connecting single-realm mode to ZK: {}",
zkAddress, e);
zkClient = SharedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer()));
}
_zkClient = zkClient;
_usesExternalZkClient = false;

// If the multi ZK config is enabled, use FederatedZkClient on multi-realm mode
if (Boolean.parseBoolean(System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED))) {
try {
_zkClient = new FederatedZkClient(
new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(),
new RealmAwareZkClient.RealmAwareZkClientConfig()
.setZkSerializer(new ZNRecordSerializer()));
return;
} catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
throw new HelixException("Failed to create ConfigAccessor!", e);
}
}

_zkClient = SharedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer()));
}

/**
Expand Down Expand Up @@ -1017,7 +1024,7 @@ public Builder setRealmAwareZkClientConfig(
return this;
}

public ConfigAccessor build() throws Exception {
public ConfigAccessor build() {
validate();
return new ConfigAccessor(this);
}
Expand All @@ -1032,17 +1039,20 @@ private void validate() {
throw new HelixException(
"ConfigAccessor: RealmMode cannot be single-realm without a valid ZkAddress set!");
}
if (_realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM && isZkAddressSet) {
throw new HelixException(
"ConfigAccessor: You cannot set the ZkAddress on multi-realm mode!");
}

if (_realmMode == null) {
_realmMode = isZkAddressSet ? RealmAwareZkClient.RealmMode.SINGLE_REALM
: RealmAwareZkClient.RealmMode.MULTI_REALM;
}

// Resolve RealmAwareZkClientConfig
boolean isZkClientConfigSet = _realmAwareZkClientConfig != null;
// Resolve which clientConfig to use
_realmAwareZkClientConfig =
isZkClientConfigSet ? _realmAwareZkClientConfig.createHelixZkClientConfig()
: new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer());
if (_realmAwareZkClientConfig == null) {
_realmAwareZkClientConfig = new RealmAwareZkClient.RealmAwareZkClientConfig();
}

// Resolve RealmAwareZkConnectionConfig
if (_realmAwareZkConnectionConfig == null) {
Expand Down
70 changes: 0 additions & 70 deletions helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.helix.util.HelixUtil;
import org.apache.helix.util.RebalanceUtil;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
Expand All @@ -94,7 +95,7 @@ public class ZKHelixAdmin implements HelixAdmin {
private static final String MAINTENANCE_ZNODE_ID = "maintenance";
private static final int DEFAULT_SUPERCLUSTER_REPLICA = 3;

private final HelixZkClient _zkClient;
private final RealmAwareZkClient _zkClient;
private final ConfigAccessor _configAccessor;
// true if ZKHelixAdmin was instantiated with a HelixZkClient, false otherwise
// This is used for close() to determine how ZKHelixAdmin should close the underlying ZkClient
Expand All @@ -103,7 +104,7 @@ public class ZKHelixAdmin implements HelixAdmin {
private static Logger logger = LoggerFactory.getLogger(ZKHelixAdmin.class);

@Deprecated
public ZKHelixAdmin(HelixZkClient zkClient) {
public ZKHelixAdmin(RealmAwareZkClient zkClient) {
_zkClient = zkClient;
_configAccessor = new ConfigAccessor(zkClient);
_usesExternalZkClient = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.helix.store.zk.ZNode;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
Expand Down Expand Up @@ -102,14 +103,14 @@ public AccessResult() {

private static Logger LOG = LoggerFactory.getLogger(ZkBaseDataAccessor.class);

private final HelixZkClient _zkClient;
private final RealmAwareZkClient _zkClient;
// true if ZkBaseDataAccessor was instantiated with a HelixZkClient, false otherwise
// This is used for close() to determine how ZkBaseDataAccessor should close the underlying
// ZkClient
private final boolean _usesExternalZkClient;

@Deprecated
public ZkBaseDataAccessor(HelixZkClient zkClient) {
public ZkBaseDataAccessor(RealmAwareZkClient zkClient) {
if (zkClient == null) {
throw new NullPointerException("zkclient is null");
}
Expand Down
Loading

0 comments on commit 7b57e9e

Please sign in to comment.