Skip to content
Permalink
Browse files
JIRA:CURATOR-568
- Adding ensembleTracker(boolean) and withEnsembleTracker() methods to
CuratorFrameworkFactory.builder() that allows enabling/disabling
ensemble tracking
  • Loading branch information
chevaris authored and Randgalt committed May 7, 2020
1 parent 5223466 commit 61d281721f06ba85cf3b764c332da904353fb2b0
Showing 3 changed files with 80 additions and 3 deletions.
@@ -66,6 +66,7 @@
private static final DefaultACLProvider DEFAULT_ACL_PROVIDER = new DefaultACLProvider();
private static final long DEFAULT_INACTIVE_THRESHOLD_MS = (int)TimeUnit.MINUTES.toMillis(3);
private static final int DEFAULT_CLOSE_WAIT_MS = (int)TimeUnit.SECONDS.toMillis(1);
private static final boolean DEFAULT_WITH_ENSEMBLE_TRACKER = true;

/**
* Return a new builder that builds a CuratorFramework
@@ -129,6 +130,7 @@ public static byte[] getLocalAddress()
public static class Builder
{
private EnsembleProvider ensembleProvider;
private boolean withEnsembleTracker = DEFAULT_WITH_ENSEMBLE_TRACKER;
private int sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT_MS;
private int connectionTimeoutMs = DEFAULT_CONNECTION_TIMEOUT_MS;
private int maxCloseWaitMs = DEFAULT_CLOSE_WAIT_MS;
@@ -242,6 +244,29 @@ public Builder ensembleProvider(EnsembleProvider ensembleProvider)
return this;
}

/**
* Allows to configure if the ensemble configuration changes will be watched.
* The default value is {@code true}.<br>
*
* IMPORTANT: Use this method in combination with {@link #ensembleProvider(EnsembleProvider)} to provide
* and instance that returns {@code false} on {@link EnsembleProvider#updateServerListEnabled()} in order
* to fully achieve that ensemble server list changes are ignored<br>
*
* @param withTracker use {@code false} if you want to avoid following ensemble configuration changes
* @return this
*/
public Builder ensembleTracker(boolean withEnsembleTracker) {
this.withEnsembleTracker = withEnsembleTracker;
return this;
}

/**
* @return {@code true} if ensemble configuration changes MUST be watched
*/
public boolean withEnsembleTracker() {
return withEnsembleTracker;
}

/**
* Sets the data to use when {@link PathAndBytesable#forPath(String)} is used.
* This is useful for debugging purposes. For example, you could set this to be the IP of the
@@ -153,7 +153,7 @@ public void process(WatchedEvent watchedEvent)
failedRemoveWatcherManager = new FailedRemoveWatchManager(this);
namespaceFacadeCache = new NamespaceFacadeCache(this);

ensembleTracker = new EnsembleTracker(this, builder.getEnsembleProvider());
ensembleTracker = builder.withEnsembleTracker() ? new EnsembleTracker(this, builder.getEnsembleProvider()) : null;

runSafeService = makeRunSafeService(builder);
}
@@ -26,6 +26,7 @@
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.InstanceSpec;
import org.apache.curator.test.TestingCluster;
@@ -54,6 +55,7 @@
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class TestReconfiguration extends CuratorTestBase
@@ -173,6 +175,51 @@ public void testBasicGetConfig() throws Exception
}
}

@Test
public void testAddWithoutEnsembleTracker() throws Exception
{
final String initialClusterCS = cluster.getConnectString();
try ( CuratorFramework client = newClient(cluster.getConnectString(), false))
{
Assert.assertEquals(((CuratorFrameworkImpl) client).getEnsembleTracker(), null);
client.start();

QuorumVerifier oldConfig = toQuorumVerifier(client.getConfig().forEnsemble());
assertConfig(oldConfig, cluster.getInstances());

CountDownLatch latch = setChangeWaiter(client);
try ( TestingCluster newCluster = new TestingCluster(TestingCluster.makeSpecs(1, false)) )
{
newCluster.start();

client.reconfig().joining(toReconfigSpec(newCluster.getInstances())).fromConfig(oldConfig.getVersion()).forEnsemble();

Assert.assertTrue(timing.awaitLatch(latch));

byte[] newConfigData = client.getConfig().forEnsemble();
QuorumVerifier newConfig = toQuorumVerifier(newConfigData);
List<InstanceSpec> newInstances = Lists.newArrayList(cluster.getInstances());
newInstances.addAll(newCluster.getInstances());
assertConfig(newConfig, newInstances);
Assert.assertEquals(ensembleProvider.getConnectionString(), initialClusterCS);
Assert.assertNotEquals(EnsembleTracker.configToConnectionString(newConfig), ensembleProvider.getConnectionString());
Assert.assertEquals(client.getZookeeperClient().getCurrentConnectionString(), initialClusterCS);
final CountDownLatch reconnectLatch = new CountDownLatch(1);
client.getConnectionStateListenable().addListener(
(cfClient, newState) -> {
if (newState == ConnectionState.RECONNECTED) reconnectLatch.countDown();
}
);
client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
Assert.assertTrue(reconnectLatch.await(2, TimeUnit.SECONDS));
Assert.assertEquals(client.getZookeeperClient().getCurrentConnectionString(), initialClusterCS);
Assert.assertEquals(ensembleProvider.getConnectionString(), initialClusterCS);
newConfigData = client.getConfig().forEnsemble();
Assert.assertNotEquals(EnsembleTracker.configToConnectionString(newConfig), ensembleProvider.getConnectionString());
}
}
}

@Test
public void testAdd() throws Exception
{
@@ -412,10 +459,14 @@ protected void createServer() throws Exception

private CuratorFramework newClient()
{
return newClient(cluster.getConnectString());
return newClient(cluster.getConnectString(), true);
}

private CuratorFramework newClient(String connectionString) {
return newClient(connectionString, true);
}

private CuratorFramework newClient(String connectionString)
private CuratorFramework newClient(String connectionString, boolean withEnsembleProvider)
{
final AtomicReference<String> connectString = new AtomicReference<>(connectionString);
ensembleProvider = new EnsembleProvider()
@@ -450,6 +501,7 @@ public void setConnectionString(String connectionString)
};
return CuratorFrameworkFactory.builder()
.ensembleProvider(ensembleProvider)
.ensembleTracker(withEnsembleProvider)
.sessionTimeoutMs(timing.session())
.connectionTimeoutMs(timing.connection())
.authorization("digest", superUserPassword.getBytes())

0 comments on commit 61d2817

Please sign in to comment.