Skip to content
Permalink
Browse files
Populate helix cloud property using cloud config (#2005)
Currently when instantiating a zk helix manager, we retrieve cloud config from zk and replace entire HelixCloudProperty object, which may cause some fields that user pass in in HelixCloudProperty that are not included in cloud config get missing. This commit changes the logic to only populate fields in HelixCloudProperty with values that are present in cloud config, and leave other fields unchanged.
  • Loading branch information
mgao0 committed Apr 5, 2022
1 parent dff4708 commit 9c70bb0bce59cfd49e0bdaf3be6f5131331f61b2
Showing 6 changed files with 79 additions and 53 deletions.
@@ -74,37 +74,46 @@ public class HelixCloudProperty {
* @param
*/
public HelixCloudProperty(CloudConfig cloudConfig) {
populateFieldsWithCloudConfig(cloudConfig);
}

public void populateFieldsWithCloudConfig(CloudConfig cloudConfig) {
if (cloudConfig == null) {
cloudConfig = new CloudConfig();
}
setCloudEnabled(cloudConfig.isCloudEnabled());
if (cloudConfig.isCloudEnabled()) {
setCloudId(cloudConfig.getCloudID());
setCloudProvider(cloudConfig.getCloudProvider());
switch (CloudProvider.valueOf(cloudConfig.getCloudProvider())) {
case AZURE:
Properties azureProperties = new Properties();
try {
InputStream stream = Thread.currentThread().getContextClassLoader()
.getResourceAsStream(AZURE_CLOUD_PROPERTY_FILE);
azureProperties.load(stream);
} catch (IOException e) {
String errMsg =
"failed to open Helix Azure cloud properties file: " + AZURE_CLOUD_PROPERTY_FILE;
throw new IllegalArgumentException(errMsg, e);
}
LOG.info("Successfully loaded Helix Azure cloud properties: {}", azureProperties);
setCloudInfoSources(
Collections.singletonList(azureProperties.getProperty(CLOUD_INFO_SOURCE)));
setCloudInfoProcessorName(azureProperties.getProperty(CLOUD_INFO_PROCESSOR_NAME));
setCloudMaxRetry(Integer.valueOf(azureProperties.getProperty(CLOUD_MAX_RETRY)));
setCloudConnectionTimeout(Long.valueOf(azureProperties.getProperty(CONNECTION_TIMEOUT_MS)));
setCloudRequestTimeout(Long.valueOf(azureProperties.getProperty(REQUEST_TIMEOUT_MS)));
break;
case CUSTOMIZED:
setCloudInfoSources(cloudConfig.getCloudInfoSources());
setCloudInfoProcessorName(cloudConfig.getCloudInfoProcessorName());
break;
default:
throw new HelixException(
String.format("Unsupported cloud provider: %s", cloudConfig.getCloudProvider()));
setCloudId(cloudConfig.getCloudID());
String cloudProviderStr = cloudConfig.getCloudProvider();
setCloudProvider(cloudProviderStr);
if (cloudProviderStr != null) {
switch (CloudProvider.valueOf(cloudProviderStr)) {
case AZURE:
Properties azureProperties = new Properties();
try {
InputStream stream = Thread.currentThread().getContextClassLoader()
.getResourceAsStream(AZURE_CLOUD_PROPERTY_FILE);
azureProperties.load(stream);
} catch (IOException e) {
String errMsg =
"failed to open Helix Azure cloud properties file: " + AZURE_CLOUD_PROPERTY_FILE;
throw new IllegalArgumentException(errMsg, e);
}
LOG.info("Successfully loaded Helix Azure cloud properties: {}", azureProperties);
setCloudInfoSources(
Collections.singletonList(azureProperties.getProperty(CLOUD_INFO_SOURCE)));
setCloudInfoProcessorName(azureProperties.getProperty(CLOUD_INFO_PROCESSOR_NAME));
setCloudMaxRetry(Integer.valueOf(azureProperties.getProperty(CLOUD_MAX_RETRY)));
setCloudConnectionTimeout(
Long.valueOf(azureProperties.getProperty(CONNECTION_TIMEOUT_MS)));
setCloudRequestTimeout(Long.valueOf(azureProperties.getProperty(REQUEST_TIMEOUT_MS)));
break;
case CUSTOMIZED:
setCloudInfoSources(cloudConfig.getCloudInfoSources());
setCloudInfoProcessorName(cloudConfig.getCloudInfoProcessorName());
break;
default:
throw new HelixException(
String.format("Unsupported cloud provider: %s", cloudConfig.getCloudProvider()));
}
}
}
@@ -66,13 +66,12 @@ private HelixManagerProperty(String version, long healthReportLatency,
}

public HelixCloudProperty getHelixCloudProperty() {
if (_helixCloudProperty == null) {
_helixCloudProperty = new HelixCloudProperty(new CloudConfig());
}
return _helixCloudProperty;
}

public void setHelixCloudProperty(CloudConfig cloudConfig) {
_helixCloudProperty = new HelixCloudProperty(cloudConfig);
}

public String getVersion() {
return _version;
}
@@ -70,10 +70,6 @@ public HelixManagerProperty getHelixManagerProperty(String zkAddress, String clu
return new HelixManagerProperty(properties, cloudConfig);
}

private static CloudConfig buildEmptyCloudConfig() {
return new CloudConfig.Builder().setCloudEnabled(false).build();
}

/**
* Retrieve the CloudConfig of the cluster if available.
* Note: the reason we create a dedicated zk client here is because we need an isolated access to
@@ -113,10 +109,12 @@ public static CloudConfig getCloudConfig(String zkAddress, String clusterName) {
// The try-catch logic is for backward compatibility reason only. Even if the cluster is not set
// up yet, constructing a new ZKHelixManager should not throw an exception
try {
cloudConfig = configAccessor.getCloudConfig(clusterName) == null ? buildEmptyCloudConfig()
: configAccessor.getCloudConfig(clusterName);
cloudConfig = configAccessor.getCloudConfig(clusterName);
if (cloudConfig == null) {
cloudConfig = new CloudConfig();
}
} catch (HelixException e) {
cloudConfig = buildEmptyCloudConfig();
cloudConfig = new CloudConfig();
}
} finally {
// Use a try-finally to make sure zkclient connection is closed properly
@@ -271,8 +271,8 @@ public ZKHelixManager(String clusterName, String instanceName, InstanceType inst
_stateListener = stateListener;
// read cloud config from ZK and set cloudConfig in HelixManagerProperty
_helixManagerProperty = helixManagerProperty;
_helixManagerProperty
.setHelixCloudProperty(HelixPropertyFactory.getCloudConfig(_zkAddress, _clusterName));
_helixManagerProperty.getHelixCloudProperty().populateFieldsWithCloudConfig(
HelixPropertyFactory.getCloudConfig(_zkAddress, _clusterName));

/**
* use system property if available
@@ -58,8 +58,9 @@ public enum CloudConfigProperty {
/**
* Instantiate the CloudConfig for the cloud
*/
private CloudConfig() {
public CloudConfig() {
super(CLOUD_CONFIG_KW);
setCloudEnabled(false);
}

/**
@@ -28,6 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.Set;

import com.google.common.collect.ImmutableList;
@@ -36,6 +37,7 @@
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixCloudProperty;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
@@ -96,7 +98,6 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;


/**
* TestMultiZkHelixJavaApis spins up multiple in-memory ZooKeepers with a pre-configured
* cluster-Zk realm routing information.
@@ -245,7 +246,8 @@ public void afterClass() throws Exception {
}
// todo: We should fail test here once we achieved 0 leakage and remove the following System print
if (!status) {
System.out.println("---------- Test Class " + testClassName + " thread leakage detected! ---------------");
System.out.println(
"---------- Test Class " + testClassName + " thread leakage detected! ---------------");
}
}

@@ -464,17 +466,25 @@ public void testZKHelixManagerCloudConfig() throws Exception {
// a default config because there is no CloudConfig ZNode in ZK.
CloudConfig.Builder cloudConfigBuilder = new CloudConfig.Builder();
cloudConfigBuilder.setCloudEnabled(true);
cloudConfigBuilder.setCloudProvider(CloudProvider.AZURE);
// Set to Customized so CloudInfoSources and CloudInfoProcessorName will be read from cloud config
// instead of properties
cloudConfigBuilder.setCloudProvider(CloudProvider.CUSTOMIZED);
cloudConfigBuilder.setCloudID("TestID");
List<String> infoURL = new ArrayList<String>();
infoURL.add("TestURL");
cloudConfigBuilder.setCloudInfoSources(infoURL);
cloudConfigBuilder.setCloudInfoProcessorName("TestProcessor");

CloudConfig cloudConfig = cloudConfigBuilder.build();
HelixCloudProperty oldCloudProperty = new HelixCloudProperty(cloudConfig);
HelixManagerProperty helixManagerProperty =
propertyBuilder.setRealmAWareZkConnectionConfig(validZkConnectionConfig).build();
helixManagerProperty.setHelixCloudProperty(cloudConfig);
propertyBuilder.setRealmAWareZkConnectionConfig(validZkConnectionConfig)
.setHelixCloudProperty(oldCloudProperty).build();
// Cloud property populated with fields defined in cloud config
oldCloudProperty.populateFieldsWithCloudConfig(cloudConfig);
// Add some property fields to cloud property that are not in cloud config
Properties properties = new Properties();
oldCloudProperty.setCustomizedCloudProperties(properties);

class TestZKHelixManager extends ZKHelixManager {
public TestZKHelixManager(String clusterName, String participantName,
@@ -493,8 +503,18 @@ public HelixManagerProperty getHelixManagerProperty() {
new TestZKHelixManager(clusterName, participantName, InstanceType.PARTICIPANT, null, null,
helixManagerProperty);
managerParticipant.connect();
Assert.assertFalse(
managerParticipant.getHelixManagerProperty().getHelixCloudProperty().getCloudEnabled());
HelixCloudProperty newCloudProperty =
managerParticipant.getHelixManagerProperty().getHelixCloudProperty();

// Test reading from zk cloud config overwrite property fields included in cloud config
Assert.assertFalse(newCloudProperty.getCloudEnabled());
Assert.assertNull(newCloudProperty.getCloudId());
Assert.assertNull(newCloudProperty.getCloudProvider());

// Test non-cloud config fields are not overwritten after reading cloud config from zk
Assert.assertEquals(newCloudProperty.getCustomizedCloudProperties(), properties);
Assert.assertEquals(newCloudProperty.getCloudInfoSources(), infoURL);
Assert.assertEquals(newCloudProperty.getCloudInfoProcessorName(), "TestProcessor");

// Clean up
managerParticipant.disconnect();
@@ -558,8 +578,7 @@ public void testCreateAndRebalanceResources() {
ZkHelixClusterVerifier verifier =
new BestPossibleExternalViewVerifier.Builder(cluster).setResources(resourceNames)
.setExpectLiveInstances(liveInstancesNames)
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
.build();
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
try {
Assert.assertTrue(verifier.verifyByPolling());
} finally {

0 comments on commit 9c70bb0

Please sign in to comment.