Skip to content
Permalink
Browse files
Read cloud config from zk and propagate to HelixManagerProperty in Zk…
…HelixManager constructor (#1986)

Read cloud config from zk and propagate to HelixManagerProperty in ZkHelixManager constructor.
  • Loading branch information
xyuanlu committed Mar 24, 2022
1 parent 54e25ce commit 86e98817b30a9b84b692cd16b8cca5da4edf9960
Showing 4 changed files with 73 additions and 3 deletions.
@@ -69,6 +69,10 @@ public HelixCloudProperty getHelixCloudProperty() {
return _helixCloudProperty;
}

public void setHelixCloudProperty(CloudConfig cloudConfig) {
_helixCloudProperty = new HelixCloudProperty(cloudConfig);
}

public String getVersion() {
return _version;
}
@@ -84,7 +84,7 @@ private static CloudConfig buildEmptyCloudConfig() {
* @param clusterName
* @return
*/
private CloudConfig getCloudConfig(String zkAddress, String clusterName) {
public static CloudConfig getCloudConfig(String zkAddress, String clusterName) {
CloudConfig cloudConfig;
RealmAwareZkClient dedicatedZkClient = null;
try {
@@ -122,7 +122,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
private final List<PreConnectCallback> _preConnectCallbacks;
protected final List<CallbackHandler> _handlers;
private final HelixManagerProperties _properties;
private final HelixManagerProperty _helixManagerProperty;
protected final HelixManagerProperty _helixManagerProperty;
private final HelixManagerStateListener _stateListener;

/**
@@ -269,7 +269,10 @@ public ZKHelixManager(String clusterName, String instanceName, InstanceType inst
}

_stateListener = stateListener;
// read cloud config from ZK and set cloudConfig in HelixManagerProperty
_helixManagerProperty = helixManagerProperty;
_helixManagerProperty
.setHelixCloudProperty(HelixPropertyFactory.getCloudConfig(_zkAddress, _clusterName));

/**
* use system property if available
@@ -45,16 +45,19 @@
import org.apache.helix.TestHelper;
import org.apache.helix.ThreadLeakageChecker;
import org.apache.helix.api.config.RebalanceConfig;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.cloud.constants.CloudProvider;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.integration.task.MockTask;
import org.apache.helix.integration.task.WorkflowGenerator;
import org.apache.helix.manager.zk.HelixManagerStateListener;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.manager.zk.ZKUtil;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.model.CloudConfig;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
@@ -438,6 +441,66 @@ public void testZKHelixManager() throws Exception {
_zkHelixAdmin.dropInstance(clusterName, instanceConfig);
}

/**
* Test creation of HelixManager and makes sure it connects correctly.
*/
@Test(dependsOnMethods = "testZKHelixManager")
public void testZKHelixManagerCloudConfig() throws Exception {
String clusterName = "CLUSTER_1";
String participantName = "HelixManager";
InstanceConfig instanceConfig = new InstanceConfig(participantName);
_zkHelixAdmin.addInstance(clusterName, instanceConfig);

RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder =
new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
// Try with a connection config without ZK realm sharding key set (should fail)
RealmAwareZkClient.RealmAwareZkConnectionConfig invalidZkConnectionConfig =
connectionConfigBuilder.build();
RealmAwareZkClient.RealmAwareZkConnectionConfig validZkConnectionConfig =
connectionConfigBuilder.setZkRealmShardingKey("/" + clusterName).build();
HelixManagerProperty.Builder propertyBuilder = new HelixManagerProperty.Builder();

// create a dummy cloud config and pass to ManagerFactory. It should be overwrite by
// a default config because there is no CloudConfig ZNode in ZK.
CloudConfig.Builder cloudConfigBuilder = new CloudConfig.Builder();
cloudConfigBuilder.setCloudEnabled(true);
cloudConfigBuilder.setCloudProvider(CloudProvider.AZURE);
cloudConfigBuilder.setCloudID("TestID");
List<String> infoURL = new ArrayList<String>();
infoURL.add("TestURL");
cloudConfigBuilder.setCloudInfoSources(infoURL);
cloudConfigBuilder.setCloudInfoProcessorName("TestProcessor");

CloudConfig cloudConfig = cloudConfigBuilder.build();
HelixManagerProperty helixManagerProperty =
propertyBuilder.setRealmAWareZkConnectionConfig(validZkConnectionConfig).build();
helixManagerProperty.setHelixCloudProperty(cloudConfig);

class TestZKHelixManager extends ZKHelixManager {
public TestZKHelixManager(String clusterName, String participantName,
InstanceType instanceType, String zkAddress, HelixManagerStateListener stateListener,
HelixManagerProperty helixManagerProperty) {
super(clusterName, participantName, instanceType, zkAddress, stateListener,
helixManagerProperty);
}

public HelixManagerProperty getHelixManagerProperty() {
return _helixManagerProperty;
}
}
// Connect as a participant
TestZKHelixManager managerParticipant =
new TestZKHelixManager(clusterName, participantName, InstanceType.PARTICIPANT, null, null,
helixManagerProperty);
managerParticipant.connect();
Assert.assertFalse(
managerParticipant.getHelixManagerProperty().getHelixCloudProperty().getCloudEnabled());

// Clean up
managerParticipant.disconnect();
_zkHelixAdmin.dropInstance(clusterName, instanceConfig);
}

/**
* Test that clusters and instances are set up properly.
* Helix Java APIs tested in this method is ZkUtil.

0 comments on commit 86e9881

Please sign in to comment.