Skip to content
Permalink
Browse files
Allow using passed in connection config when reading cloud config (#2099
)

This change allow user to pass in zooScalability config and initiate zkHelixManager using the passed in config. We only read from env variable before this change.
  • Loading branch information
xyuanlu committed Jun 2, 2022
1 parent ae0f50d commit 2097a088abb94486691b0edf71bffc0c51509d62
Show file tree
Hide file tree
Showing 6 changed files with 575 additions and 7 deletions.
@@ -54,7 +54,7 @@ public static HelixPropertyFactory getInstance() {
* Clients may override these values.
*/
public HelixManagerProperty getHelixManagerProperty(String zkAddress, String clusterName) {
CloudConfig cloudConfig = getCloudConfig(zkAddress, clusterName);
CloudConfig cloudConfig = getCloudConfig(zkAddress, clusterName, null);
Properties properties = new Properties();
try {
InputStream stream = Thread.currentThread().getContextClassLoader()
@@ -80,20 +80,27 @@ public HelixManagerProperty getHelixManagerProperty(String zkAddress, String clu
* @param clusterName
* @return
*/

public static CloudConfig getCloudConfig(String zkAddress, String clusterName) {
return getCloudConfig(zkAddress, clusterName, null);
}
public static CloudConfig getCloudConfig(String zkAddress, String clusterName,
RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig) {
CloudConfig cloudConfig;
RealmAwareZkClient dedicatedZkClient = null;
try {
if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || zkAddress == null) {
// If the multi ZK config is enabled or zkAddress is null, use realm-aware mode with
// DedicatedZkClient
try {
RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
.setZkRealmShardingKey("/" + clusterName).build();
if (realmAwareZkConnectionConfig == null) {
realmAwareZkConnectionConfig =
new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
.setZkRealmShardingKey("/" + clusterName).build();
}
dedicatedZkClient =
DedicatedZkClientFactory.getInstance().buildZkClient(connectionConfig);
DedicatedZkClientFactory.getInstance().buildZkClient(realmAwareZkConnectionConfig);
} catch (IOException | InvalidRoutingDataException e) {
throw new HelixException("Not able to connect on multi-ZK mode!", e);
}
@@ -282,7 +282,8 @@ public ZKHelixManager(String clusterName, String instanceName, InstanceType inst
// read cloud config from ZK and set cloudConfig in HelixManagerProperty
_helixManagerProperty = helixManagerProperty;
_helixManagerProperty.getHelixCloudProperty().populateFieldsWithCloudConfig(
HelixPropertyFactory.getCloudConfig(_zkAddress, _clusterName));
HelixPropertyFactory.getCloudConfig(_zkAddress, _clusterName,
helixManagerProperty.getZkConnectionConfig()));

/**
* use system property if available
@@ -19,6 +19,7 @@
* under the License.
*/

import org.apache.helix.HelixManagerProperty;
import org.apache.helix.InstanceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,6 +38,10 @@ public ClusterControllerManager(String zkAddr, String clusterName, String contro
super(zkAddr, clusterName, controllerName, InstanceType.CONTROLLER);
}

public ClusterControllerManager(String clusterName, HelixManagerProperty helixManagerProperty) {
super(clusterName, "controller", InstanceType.CONTROLLER, null, null, helixManagerProperty);
}

@Override
public void finalize() {
super.finalize();
@@ -23,8 +23,10 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.helix.HelixManagerProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.manager.zk.CallbackHandler;
import org.apache.helix.manager.zk.HelixManagerStateListener;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.slf4j.Logger;
@@ -57,6 +59,15 @@ protected ClusterManager(String zkAddr, String clusterName, String instanceName,
_type = type;
_uid = UID.getAndIncrement();
}
protected ClusterManager(String clusterName, String instanceName, InstanceType instanceType,
String zkAddress, HelixManagerStateListener stateListener,
HelixManagerProperty helixManagerProperty) {
super(clusterName, instanceName, instanceType, zkAddress, stateListener, helixManagerProperty);
_clusterName = clusterName;
_instanceName = instanceName;
_type = instanceType;
_uid = UID.getAndIncrement();
}

public void syncStop() {
_stopCountDown.countDown();
@@ -23,6 +23,7 @@
import java.util.concurrent.CountDownLatch;

import org.apache.helix.HelixCloudProperty;
import org.apache.helix.HelixManagerProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.manager.zk.CallbackHandler;
import org.apache.helix.mock.participant.DummyProcess.DummyLeaderStandbyStateModelFactory;
@@ -65,6 +66,16 @@ public MockParticipantManager(String zkAddr, String clusterName, String instance
_helixCloudProperty = helixCloudProperty;
}

public MockParticipantManager(String clusterName, String instanceName,
HelixManagerProperty helixManagerProperty, int transDelay, HelixCloudProperty helixCloudProperty) {
super(clusterName, instanceName, InstanceType.PARTICIPANT, null, null, helixManagerProperty);
_transDelay = transDelay;
_msModelFactory = new MockMSModelFactory(null);
_lsModelFactory = new DummyLeaderStandbyStateModelFactory(_transDelay);
_ofModelFactory = new DummyOnlineOfflineStateModelFactory(_transDelay);
_helixCloudProperty = helixCloudProperty;
}

public void setTransition(MockTransition transition) {
_msModelFactory.setTrasition(transition);
}

0 comments on commit 2097a08

Please sign in to comment.