Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,11 @@ public void testResetSnapshots() {
.setResources(new HashSet<>(_dataAccessor.getChildNames(_keyBuilder.idealStates())))
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
.build();
Assert.assertTrue(_clusterVerifier.verify());

try {
Assert.assertTrue(_clusterVerifier.verify());
} finally{
_clusterVerifier.close();
}
// Initialize a new detector with the existing data
ResourceChangeDetector changeDetector = new ResourceChangeDetector();
_dataProvider.notifyDataChange(ChangeType.IDEAL_STATE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,18 @@ public void testBasic() throws Exception {
participants[i].syncStart();
}

boolean result =
ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
clusterName));
Assert.assertTrue(result);
// Change to three is because there is an extra factory registered
// So one extra NO_OP message send
Assert.assertTrue(listener._maxNumberOfChildren <= 3,
"Should get no more than 2 messages (O->S and S->M)");
BestPossAndExtViewZkVerifier verifier = new BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName);
try {
boolean result = ClusterStateVerifier
.verifyByZkCallback(verifier);
Assert.assertTrue(result);
// Change to three is because there is an extra factory registered
// So one extra NO_OP message send
Assert.assertTrue(listener._maxNumberOfChildren <= 3,
"Should get no more than 2 messages (O->S and S->M)");
} finally {
verifier.close();
}

// clean up
// wait for all zk callbacks done
Expand Down Expand Up @@ -158,10 +162,13 @@ public void testChangeBatchMessageMode() throws Exception {
participants[i].syncStart();
}

boolean result =
ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
clusterName));
Assert.assertTrue(result);
BestPossAndExtViewZkVerifier verifier = new BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName);
try {
boolean result = ClusterStateVerifier.verifyByZkCallback(verifier);
Assert.assertTrue(result);
} finally {
verifier.close();
}

// stop all participants
for (int i = 0; i < n; i++) {
Expand All @@ -188,14 +195,18 @@ public void testChangeBatchMessageMode() throws Exception {
participants[i].syncStart();
}

result =
ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
clusterName));
Assert.assertTrue(result);
// Change to three is because there is an extra factory registered
// So one extra NO_OP message send
Assert.assertTrue(listener._maxNumberOfChildren <= 3,
"Should get no more than 2 messages (O->S and S->M)");
verifier = new BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName);
try {
boolean result = ClusterStateVerifier
.verifyByZkCallback(verifier);
Assert.assertTrue(result);
// Change to three is because there is an extra factory registered
// So one extra NO_OP message send
Assert.assertTrue(listener._maxNumberOfChildren <= 3,
"Should get no more than 2 messages (O->S and S->M)");
} finally {
verifier.close();
}

// clean up
// wait for all zk callbacks done
Expand Down Expand Up @@ -264,10 +275,16 @@ public void testSubMsgExecutionFail() throws Exception {
Map<String, Map<String, String>> errStates = new HashMap<String, Map<String, String>>();
errStates.put("TestDB0", new HashMap<String, String>());
errStates.get("TestDB0").put(errPartition, masterOfPartition0);
boolean result =
ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
ZK_ADDR, clusterName, errStates));
Assert.assertTrue(result);

BestPossAndExtViewZkVerifier verifier = new BestPossAndExtViewZkVerifier(
ZK_ADDR, clusterName, errStates);
try {
boolean result = ClusterStateVerifier.verifyByPolling(
verifier);
Assert.assertTrue(result);
} finally {
verifier.close();
}

Map<String, Set<String>> errorStateMap = new HashMap<String, Set<String>>();
errorStateMap.put(errPartition, TestHelper.setOf(masterOfPartition0));
Expand All @@ -277,7 +294,7 @@ public void testSubMsgExecutionFail() throws Exception {

// clean up
controller.syncStop();
for (int i = 0; i < 5; i++) {
for (int i = 0; i < n; i++) {
participants[i].syncStop();
}
deleteCluster(clusterName);
Expand Down Expand Up @@ -349,12 +366,16 @@ public void testParticipantIncompatibleWithBatchMsg() throws Exception {
"--zkSvr", ZK_ADDR, "--enableCluster", clusterName, "true"
});

boolean result =
ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
clusterName));
Assert.assertTrue(result);
Assert.assertTrue(listener._maxNumberOfChildren > 16,
"Should see more than 16 messages at the same time (32 O->S and 32 S->M)");
BestPossAndExtViewZkVerifier verifier = new BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName);
try {
boolean result = ClusterStateVerifier
.verifyByZkCallback(verifier);
Assert.assertTrue(result);
Assert.assertTrue(listener._maxNumberOfChildren > 16,
"Should see more than 16 messages at the same time (32 O->S and 32 S->M)");
} finally {
verifier.close();
}

// clean up
// wait for all zk callbacks done
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.msdcommon.mock.MockMetadataStoreDirectoryServer;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.store.HelixPropertyStore;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.task.TaskDriver;
Expand Down Expand Up @@ -180,7 +181,16 @@ public void afterClass() throws Exception {
try {
// Kill all mock controllers and participants
MOCK_CONTROLLERS.values().forEach(ClusterControllerManager::syncStop);
MOCK_PARTICIPANTS.forEach(MockParticipantManager::syncStop);
MOCK_PARTICIPANTS.forEach(mockParticipantManager -> {
mockParticipantManager.syncStop();
StateMachineEngine stateMachine = mockParticipantManager.getStateMachineEngine();
if (stateMachine != null) {
StateModelFactory stateModelFactory = stateMachine.getStateModelFactory("Task");
if (stateModelFactory != null && stateModelFactory instanceof TaskStateModelFactory) {
((TaskStateModelFactory) stateModelFactory).shutdown();
}
}
});

// Tear down all clusters
CLUSTER_LIST.forEach(cluster -> TestHelper.dropCluster(cluster, _zkClient));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,11 @@ private void pollForDoubleAssign() {
private void breakConnection() {
_executorServiceConnection = Executors.newScheduledThreadPool(THREAD_COUNT);
_executorServiceConnection.scheduleAtFixedRate(() -> {
// Randomly pick a Participant and cause a transient connection issue
int participantIndex = RANDOM.nextInt(_numNodes);
_participants[participantIndex].disconnect();
startParticipant(participantIndex);
synchronized (this) {
int participantIndex = RANDOM.nextInt(_numNodes);
stopParticipant(participantIndex);
startParticipant(participantIndex);
}
}, 0L, CONNECTION_DELAY, TimeUnit.MILLISECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* under the License.
*/

import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
Expand Down Expand Up @@ -72,36 +73,45 @@ public void testController() throws Exception {
DistributedLeaderElection election =
new DistributedLeaderElection(manager, controller0, timerTasks);
NotificationContext context = new NotificationContext(manager);
context.setType(NotificationContext.Type.INIT);
election.onControllerChange(context);

// path = PropertyPathConfig.getPath(PropertyType.LEADER, clusterName);
// ZNRecord leaderRecord = _gZkClient.<ZNRecord> readData(path);
LiveInstance liveInstance = accessor.getProperty(keyBuilder.controllerLeader());
AssertJUnit.assertEquals(controllerName, liveInstance.getInstanceName());
// AssertJUnit.assertNotNull(election.getController());
// AssertJUnit.assertNull(election.getLeader());
try {
context.setType(NotificationContext.Type.INIT);
election.onControllerChange(context);

// path = PropertyPathConfig.getPath(PropertyType.LEADER, clusterName);
// ZNRecord leaderRecord = _gZkClient.<ZNRecord> readData(path);
LiveInstance liveInstance = accessor.getProperty(keyBuilder.controllerLeader());
AssertJUnit.assertEquals(controllerName, liveInstance.getInstanceName());
// AssertJUnit.assertNotNull(election.getController());
// AssertJUnit.assertNull(election.getLeader());
} finally {
manager.disconnect();
controller0.shutdown();
}

manager =
new MockZKHelixManager(clusterName, "controller_1", InstanceType.CONTROLLER, _gZkClient);
GenericHelixController controller1 = new GenericHelixController();
election = new DistributedLeaderElection(manager, controller1, timerTasks);
context = new NotificationContext(manager);
context.setType(NotificationContext.Type.INIT);
election.onControllerChange(context);
// leaderRecord = _gZkClient.<ZNRecord> readData(path);
liveInstance = accessor.getProperty(keyBuilder.controllerLeader());
AssertJUnit.assertEquals(controllerName, liveInstance.getInstanceName());
// AssertJUnit.assertNull(election.getController());
// AssertJUnit.assertNull(election.getLeader());

accessor.removeProperty(keyBuilder.controllerLeader());
TestHelper.dropCluster(clusterName, _gZkClient);
try {
election.onControllerChange(context);
// leaderRecord = _gZkClient.<ZNRecord> readData(path);
LiveInstance liveInstance = accessor.getProperty(keyBuilder.controllerLeader());
AssertJUnit.assertEquals(controllerName, liveInstance.getInstanceName());
// AssertJUnit.assertNull(election.getController());
// AssertJUnit.assertNull(election.getLeader());
} finally {
manager.disconnect();
controller1.shutdown();
accessor.removeProperty(keyBuilder.controllerLeader());
TestHelper.dropCluster(clusterName, _gZkClient);
}

System.out.println("END TestDistControllerElection at " + new Date(System.currentTimeMillis()));
}

@Test()
@Test(dependsOnMethods = "testController")
public void testControllerParticipant() throws Exception {
String className = getShortClassName();
LOG.info("RUN " + className + " at " + new Date(System.currentTimeMillis()));
Expand All @@ -126,16 +136,22 @@ public void testControllerParticipant() throws Exception {
new DistributedLeaderElection(manager, controller0, timerTasks);
NotificationContext context = new NotificationContext(manager);
context.setType(NotificationContext.Type.CALLBACK);
election.onControllerChange(context);
try {
election.onControllerChange(context);

LiveInstance liveInstance = accessor.getProperty(keyBuilder.controllerLeader());
AssertJUnit.assertEquals(controllerName, liveInstance.getInstanceName());
LiveInstance liveInstance = accessor.getProperty(keyBuilder.controllerLeader());
AssertJUnit.assertEquals(controllerName, liveInstance.getInstanceName());

// path = PropertyPathConfig.getPath(PropertyType.LEADER, clusterName);
// ZNRecord leaderRecord = _gZkClient.<ZNRecord> readData(path);
// AssertJUnit.assertEquals(controllerName, leaderRecord.getSimpleField("LEADER"));
// AssertJUnit.assertNotNull(election.getController());
// AssertJUnit.assertNotNull(election.getLeader());
// path = PropertyPathConfig.getPath(PropertyType.LEADER, clusterName);
// ZNRecord leaderRecord = _gZkClient.<ZNRecord> readData(path);
// AssertJUnit.assertEquals(controllerName, leaderRecord.getSimpleField("LEADER"));
// AssertJUnit.assertNotNull(election.getController());
// AssertJUnit.assertNotNull(election.getLeader());
}
finally {
manager.disconnect();
controller0.shutdown();
}

manager =
new MockZKHelixManager(clusterName, "controller_1", InstanceType.CONTROLLER_PARTICIPANT,
Expand All @@ -144,22 +160,25 @@ public void testControllerParticipant() throws Exception {
election = new DistributedLeaderElection(manager, controller1, timerTasks);
context = new NotificationContext(manager);
context.setType(NotificationContext.Type.CALLBACK);
election.onControllerChange(context);

liveInstance = accessor.getProperty(keyBuilder.controllerLeader());
AssertJUnit.assertEquals(controllerName, liveInstance.getInstanceName());

// leaderRecord = _gZkClient.<ZNRecord> readData(path);
// AssertJUnit.assertEquals(controllerName, leaderRecord.getSimpleField("LEADER"));
// AssertJUnit.assertNull(election.getController());
// AssertJUnit.assertNull(election.getLeader());
try {
election.onControllerChange(context);

LiveInstance liveInstance = accessor.getProperty(keyBuilder.controllerLeader());
AssertJUnit.assertEquals(controllerName, liveInstance.getInstanceName());

// leaderRecord = _gZkClient.<ZNRecord> readData(path);
// AssertJUnit.assertEquals(controllerName, leaderRecord.getSimpleField("LEADER"));
// AssertJUnit.assertNull(election.getController());
// AssertJUnit.assertNull(election.getLeader());
} finally {
manager.disconnect();
controller1.shutdown();
}

accessor.removeProperty(keyBuilder.controllerLeader());
TestHelper.dropCluster(clusterName, _gZkClient);
LOG.info("END " + getShortClassName() + " at " + new Date(System.currentTimeMillis()));
}

@Test()
@Test(dependsOnMethods = "testController")
public void testParticipant() throws Exception {
String className = getShortClassName();
LOG.info("RUN " + className + " at " + new Date(System.currentTimeMillis()));
Expand All @@ -182,6 +201,8 @@ public void testParticipant() throws Exception {
// expected
}

participant0.shutdown();
manager.disconnect();
TestHelper.dropCluster(clusterName, _gZkClient);
}

Expand All @@ -197,35 +218,45 @@ public void testCompeteLeadership() throws Exception {

// Create controller leaders
final Map<String, ZKHelixManager> managerList = new HashMap<>();
final List<GenericHelixController> controllers = new ArrayList<>();
for (int i = 0; i < managerCount; i++) {
String controllerName = "controller_" + i;
ZKHelixManager manager =
new ZKHelixManager(clusterName, controllerName, InstanceType.CONTROLLER, ZK_ADDR);
GenericHelixController controller0 = new GenericHelixController();
DistributedLeaderElection election =
new DistributedLeaderElection(manager, controller0, Collections.EMPTY_LIST);
controllers.add(controller0);
manager.connect();
managerList.put(manager.getInstanceName(), manager);
}

// Remove leader manager one by one, and verify if the leader node exists
while(!managerList.isEmpty()) {
// Ensure a controller successfully acquired leadership.
Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() {
@Override
public boolean verify() {
LiveInstance liveInstance = accessor.getProperty(keyBuilder.controllerLeader());
if (liveInstance != null) {
// disconnect the current leader manager
managerList.remove(liveInstance.getInstanceName()).disconnect();
return true;
} else {
return false;
try {
// Remove leader manager one by one, and verify if the leader node exists
while (!managerList.isEmpty()) {
// Ensure a controller successfully acquired leadership.
Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() {
@Override
public boolean verify() {
LiveInstance liveInstance = accessor.getProperty(keyBuilder.controllerLeader());
if (liveInstance != null) {
// disconnect the current leader manager
managerList.remove(liveInstance.getInstanceName()).disconnect();
return true;
} else {
return false;
}
}
}
}, 1000));
}, 1000));
}
} finally {
for (GenericHelixController controller : controllers) {
controller.shutdown();
}
for (ZKHelixManager mgr: managerList.values()) {
mgr.disconnect();
}
TestHelper.dropCluster(clusterName, _gZkClient);
}

TestHelper.dropCluster(clusterName, _gZkClient);
}
}
Loading