Skip to content

Commit

Permalink
ARTEMIS-4355 Update Curator to 5.5.0; Zookeeper 3.8.2
Browse files Browse the repository at this point in the history
  • Loading branch information
amarkevich authored and gemmellr committed Aug 30, 2023
1 parent 6796e54 commit ce8163b
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.apache.activemq.artemis.quorum.DistributedLock;
import org.apache.activemq.artemis.quorum.DistributedPrimitiveManager;
Expand Down Expand Up @@ -83,8 +81,6 @@ public void setupEnv() throws Throwable {
}
testingServer = new TestingCluster(clusterSpecs);
testingServer.start();
// start waits for quorumPeer!=null but not that it has started...
Wait.waitFor(this::ensembleHasLeader);
connectString = testingServer.getConnectString();
super.setupEnv();
}
Expand Down Expand Up @@ -207,7 +203,7 @@ public void canAcquireLockOnMajorityRestart() throws Exception {
public void cannotStartManagerWithoutQuorum() throws Exception {
Assume.assumeTrue(zkNodes + " <= 1", zkNodes > 1);
DistributedPrimitiveManager manager = createManagedDistributeManager();
stopMajorityNotLeaderNodes(true);
stopMajority(true);
Assert.assertFalse(manager.start(2, TimeUnit.SECONDS));
Assert.assertFalse(manager.isStarted());
}
Expand All @@ -217,7 +213,7 @@ public void cannotAcquireLockWithoutQuorum() throws Exception {
Assume.assumeTrue(zkNodes + " <= 1", zkNodes > 1);
DistributedPrimitiveManager manager = createManagedDistributeManager();
manager.start();
stopMajorityNotLeaderNodes(true);
stopMajority(true);
DistributedLock lock = manager.getDistributedLock("a");
lock.tryLock();
}
Expand All @@ -227,7 +223,7 @@ public void cannotCheckLockWithoutQuorum() throws Exception {
Assume.assumeTrue(zkNodes + " <= 1", zkNodes > 1);
DistributedPrimitiveManager manager = createManagedDistributeManager();
manager.start();
stopMajorityNotLeaderNodes(true);
stopMajority(true);
DistributedLock lock = manager.getDistributedLock("a");
final boolean held;
try {
Expand All @@ -243,7 +239,7 @@ public void canGetLockWithoutQuorum() throws Exception {
Assume.assumeTrue(zkNodes + " <= 1", zkNodes > 1);
DistributedPrimitiveManager manager = createManagedDistributeManager();
manager.start();
stopMajorityNotLeaderNodes(true);
stopMajority(true);
DistributedLock lock = manager.getDistributedLock("a");
Assert.assertNotNull(lock);
}
Expand All @@ -256,7 +252,7 @@ public void notifiedAsUnavailableWhileLoosingQuorum() throws Exception {
DistributedLock lock = manager.getDistributedLock("a");
CountDownLatch unavailable = new CountDownLatch(1);
lock.addListener(unavailable::countDown);
stopMajorityNotLeaderNodes(true);
stopMajority(true);
Assert.assertTrue(unavailable.await(SESSION_MS + SERVER_TICK_MS, TimeUnit.MILLISECONDS));
}

Expand All @@ -270,7 +266,7 @@ public void beNotifiedOnce() throws Exception {
final AtomicInteger unavailableLock = new AtomicInteger(0);
manager.addUnavailableManagerListener(unavailableManager::incrementAndGet);
lock.addListener(unavailableLock::incrementAndGet);
stopMajorityNotLeaderNodes(true);
stopMajority(true);
TimeUnit.MILLISECONDS.sleep(SESSION_MS + SERVER_TICK_MS + CONNECTION_MS);
Assert.assertEquals(1, unavailableLock.get());
Assert.assertEquals(1, unavailableManager.get());
Expand Down Expand Up @@ -305,7 +301,7 @@ public void beNotifiedOfUnavailabilityWhileBlockedOnTimedLock() throws Exception
timedLock.start();
Assert.assertTrue(startedTimedLock.await(10, TimeUnit.SECONDS));
TimeUnit.SECONDS.sleep(1);
stopMajorityNotLeaderNodes(true);
stopMajority(true);
TimeUnit.MILLISECONDS.sleep(SESSION_MS + CONNECTION_MS);
Wait.waitFor(() -> unavailableLock.get() > 0, SERVER_TICK_MS);
Assert.assertEquals(1, unavailableManager.get());
Expand All @@ -323,7 +319,7 @@ public void beNotifiedOfAlreadyUnavailableManagerAfterAddingListener() throws Ex
};
manager.addUnavailableManagerListener(managerListener);
Assert.assertFalse(unavailable.get());
stopMajorityNotLeaderNodes(true);
stopMajority(true);
Wait.waitFor(unavailable::get);
manager.removeUnavailableManagerListener(managerListener);
final AtomicInteger unavailableOnRegister = new AtomicInteger();
Expand All @@ -336,21 +332,8 @@ public void beNotifiedOfAlreadyUnavailableManagerAfterAddingListener() throws Ex
}
}

private boolean ensembleHasLeader() {
return testingServer.getServers().stream().filter(CuratorDistributedLockTest::isLeader).count() != 0;
}

private static boolean isLeader(TestingZooKeeperServer server) {
if (server.getInstanceSpecs().size() == 1) {
return true;
}
long leaderId = server.getQuorumPeer().getLeaderId();
long id = server.getQuorumPeer().getId();
return id == leaderId;
}

private void stopMajorityNotLeaderNodes(boolean fromLast) throws Exception {
List<TestingZooKeeperServer> followers = testingServer.getServers().stream().filter(Predicate.not(CuratorDistributedLockTest::isLeader)).collect(Collectors.toList());
private void stopMajority(boolean fromLast) throws Exception {
List<TestingZooKeeperServer> followers = testingServer.getServers();
final int quorum = (zkNodes / 2) + 1;
for (int i = 0; i < quorum; i++) {
final int nodeIndex = fromLast ? (followers.size() - 1) - i : i;
Expand Down
12 changes: 4 additions & 8 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@
<jctools.version>2.1.2</jctools.version>
<netty.version>4.1.96.Final</netty.version>
<hdrhistogram.version>2.1.12</hdrhistogram.version>
<curator.version>5.2.0</curator.version>
<zookeeper.version>3.6.3</zookeeper.version>
<curator.version>5.5.0</curator.version>
<zookeeper.version>3.8.2</zookeeper.version>
<woodstox.version>4.4.0</woodstox.version>

<!-- docs -->
Expand Down Expand Up @@ -982,12 +982,8 @@
<version>${zookeeper.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<groupId>ch.qos.logback</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import javax.management.remote.JMXServiceURL;
import java.net.MalformedURLException;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -128,12 +128,12 @@ public static Iterable<Object[]> getParams() {

protected BrokerControl primary;
protected BrokerControl backup;
protected LinkedList<BrokerControl> brokers;
protected List<BrokerControl> brokers;

public PluggableQuorumSinglePairTest(String brokerFolderPrefix) {
primary = new BrokerControl("primary", JMX_PORT_PRIMARY, brokerFolderPrefix + PRIMARY_DATA_FOLDER, PRIMARY_PORT_OFFSET);
backup = new BrokerControl("backup", JMX_PORT_BACKUP, brokerFolderPrefix + BACKUP_DATA_FOLDER, BACKUP_PORT_OFFSET);
brokers = new LinkedList(Arrays.asList(primary, backup));
brokers = Arrays.asList(primary, backup);
}

protected abstract boolean awaitAsyncSetupCompleted(long timeout, TimeUnit unit) throws InterruptedException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
package org.apache.activemq.artemis.tests.smoke.quorum;

import java.util.Arrays;
import java.util.LinkedList;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import org.apache.activemq.artemis.util.ServerUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,15 +48,14 @@ public ZookeeperPluggableQuorumPeerTest() {
// both roles as both wish to be primary but will revert to backup
primary = new BrokerControl("primary-peer-a", JMX_PORT_PRIMARY, "zkReplicationPrimaryPeerA", PRIMARY_PORT_OFFSET);
backup = new BrokerControl("primary-peer-b", JMX_PORT_BACKUP, "zkReplicationPrimaryPeerB", BACKUP_PORT_OFFSET);
brokers = new LinkedList(Arrays.asList(primary, backup));
brokers = Arrays.asList(primary, backup);
}

@Ignore
@Test
@Override
public void testBackupFailoverAndPrimaryFailback() throws Exception {
// peers don't request fail back by default
// just wait for setup to avoid partial stop of zk via fast tear down with async setup
Wait.waitFor(this::ensembleHasLeader);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,6 @@ protected boolean awaitAsyncSetupCompleted(long timeout, TimeUnit unit) {
return true;
}

protected boolean ensembleHasLeader() {
return testingServer.getServers().stream().filter(ZookeeperPluggableQuorumSinglePairTest::isLeader).count() != 0;
}

private static boolean isLeader(TestingZooKeeperServer server) {
long leaderId = server.getQuorumPeer().getLeaderId();
long id = server.getQuorumPeer().getId();
return id == leaderId;
}

@Override
protected int[] stopMajority() throws Exception {
List<TestingZooKeeperServer> followers = testingServer.getServers();
Expand Down

0 comments on commit ce8163b

Please sign in to comment.