Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -458,14 +458,20 @@ public static ReplicationPeerConfig appendTableCFsToReplicationPeerConfig(
}

/**
* Helper method to add/removev base peer configs from Configuration to ReplicationPeerConfig This
* merges the user supplied peer configuration
* {@link org.apache.hadoop.hbase.replication.ReplicationPeerConfig} with peer configs provided as
* property hbase.replication.peer.base.configs in hbase configuration. Expected format for this
* hbase configuration is "k1=v1;k2=v2,v2_1;k3=""". If value is empty, it will remove the existing
* key-value from peer config.
* Helper method to add/remove base peer configs from Configuration to ReplicationPeerConfig.
*
* Precedence rules:
* - If a key exists in both the provided ReplicationPeerConfig and base configs, the value in the
* provided ReplicationPeerConfig is preserved (user-provided wins).
* - If a key exists only in base configs (non-empty), it is added to the ReplicationPeerConfig.
* - If a key exists in base configs with an empty value (e.g. k1=""), the key is removed from
* the ReplicationPeerConfig.
*
* The base configs come from the property hbase.replication.peer.base.config in the HBase
* configuration. Expected format: "k1=v1;k2=v2,v2_1;k3=\"\"".
*
* @param conf Configuration
* @return ReplicationPeerConfig containing updated configs.
* @return ReplicationPeerConfig containing updated configs per the precedence rules above.
*/
public static ReplicationPeerConfig updateReplicationBasePeerConfigs(Configuration conf,
ReplicationPeerConfig receivedPeerConfig) {
Expand All @@ -485,8 +491,8 @@ public static ReplicationPeerConfig updateReplicationBasePeerConfigs(Configurati
// is required so that it doesn't remove any other config unknowingly.
if (Strings.isNullOrEmpty(configValue)) {
copiedPeerConfigBuilder.removeConfiguration(configName);
} else if (!receivedPeerConfigMap.getOrDefault(configName, "").equals(configValue)) {
// update the configuration if exact config and value doesn't exists
} else if (!receivedPeerConfigMap.containsKey(configName)) {
// Only add base config if the key is absent in the provided config
copiedPeerConfigBuilder.putConfiguration(configName, configValue);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,21 +284,43 @@ public void testBaseReplicationPeerConfig() throws ReplicationException {
assertEquals(customPeerConfigSecondValue,
updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigSecondKey));

// validates base configs get updated values even if config already present
// With new precedence, base updates must NOT override existing keys in peer config
conf.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
customPeerConfigKey.concat("=").concat(customPeerConfigUpdatedValue).concat(";")
.concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondUpdatedValue));
.concat(customPeerConfigSecondKey).concat("=")
.concat(customPeerConfigSecondUpdatedValue));

ReplicationPeerConfig replicationPeerConfigAfterValueUpdate = ReplicationPeerConfigUtil
.updateReplicationBasePeerConfigs(conf, updatedReplicationPeerConfig);

assertEquals(customPeerConfigUpdatedValue,
// Expect original values to be preserved since keys already exist in the peer config
assertEquals(customPeerConfigValue,
replicationPeerConfigAfterValueUpdate.getConfiguration().get(customPeerConfigKey));
assertEquals(customPeerConfigSecondUpdatedValue,
assertEquals(customPeerConfigSecondValue,
replicationPeerConfigAfterValueUpdate.getConfiguration().get(customPeerConfigSecondKey));
}

@Test
public void testBaseDoesNotOverrideExistingUserConfig() {
String key = "hbase.xxx.override_test";
String userValue = "user";
String baseValue = "base";

ReplicationPeerConfig existing = getConfig(1);
ReplicationPeerConfig withUser = ReplicationPeerConfig.newBuilder(existing)
.putConfiguration(key, userValue).build();

Configuration conf = new Configuration(CONF);
conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
key.concat("=").concat(baseValue));

ReplicationPeerConfig merged =
ReplicationPeerConfigUtil.updateReplicationBasePeerConfigs(conf, withUser);

assertEquals(userValue, merged.getConfiguration().get(key));
}

@Test
public void testBaseReplicationRemovePeerConfig() throws ReplicationException {
String customPeerConfigKey = "hbase.xxx.custom_config";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,8 +440,8 @@ public void testCyclicReplication3() throws Exception {
}

/**
* Tests that base replication peer configs are applied on peer creation and the configs are
* overriden if updated as part of updateReplicationPeerConfig()
* Tests that base replication peer configs are applied on peer creation and that user-provided
* values are preserved on restart when base contains conflicting entries.
*/
@Test
public void testBasePeerConfigsForReplicationPeer() throws Exception {
Expand Down Expand Up @@ -496,8 +496,8 @@ public void testBasePeerConfigsForReplicationPeer() throws Exception {
utilities[0].restartHBaseCluster(1);
admin = utilities[0].getAdmin();

// Configurations should be updated after restart again
Assert.assertEquals(firstCustomPeerConfigValue,
// With new precedence, key 1 should retain user override on peer 1; key 2 should be added.
Assert.assertEquals(firstCustomPeerConfigUpdatedValue,
admin.getReplicationPeerConfig("1").getConfiguration().get(firstCustomPeerConfigKey));
Assert.assertEquals(firstCustomPeerConfigValue,
admin.getReplicationPeerConfig("2").getConfiguration().get(firstCustomPeerConfigKey));
Expand Down