Skip to content

Commit

Permalink
Clean up quorum implementation
Browse files Browse the repository at this point in the history
* Once initial membership event dispatching is fixed, initialized flag is not needed in QuorumImpl.
* Instead of a boolean flag, quorum state is managed with an enum value and it is initialized properly.
* If user-provided quorum function fails during membership update, quorum status is force-set to ABSENT and the failure is logged.
  • Loading branch information
metanet committed Jan 3, 2017
1 parent 0f62712 commit ac4741b
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 59 deletions.
85 changes: 33 additions & 52 deletions hazelcast/src/main/java/com/hazelcast/quorum/impl/QuorumImpl.java
Expand Up @@ -20,11 +20,13 @@
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceAware;
import com.hazelcast.core.Member;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.ClassLoaderUtil;
import com.hazelcast.quorum.Quorum;
import com.hazelcast.quorum.QuorumEvent;
import com.hazelcast.quorum.QuorumException;
import com.hazelcast.quorum.QuorumFunction;
import com.hazelcast.quorum.QuorumService;
import com.hazelcast.quorum.QuorumType;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.ReadonlyOperation;
Expand All @@ -34,7 +36,6 @@
import com.hazelcast.util.ExceptionUtil;

import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.hazelcast.cluster.memberselector.MemberSelectors.DATA_MEMBER_SELECTOR;

Expand All @@ -43,8 +44,12 @@
*/
public class QuorumImpl implements Quorum {

private final AtomicBoolean isPresent = new AtomicBoolean(true);
private final AtomicBoolean lastPresence = new AtomicBoolean(true);
private enum QuorumState {
INITIAL,
PRESENT,
ABSENT
}


private final NodeEngineImpl nodeEngine;
private final String quorumName;
Expand All @@ -53,7 +58,8 @@ public class QuorumImpl implements Quorum {
private final InternalEventService eventService;
private QuorumFunction quorumFunction;

private volatile boolean initialized;
// we are updating the quorum state within the single thread of membership event executor
private volatile QuorumState quorumState = QuorumState.INITIAL;

public QuorumImpl(QuorumConfig config, NodeEngineImpl nodeEngine) {
this.nodeEngine = nodeEngine;
Expand All @@ -70,10 +76,22 @@ public QuorumImpl(QuorumConfig config, NodeEngineImpl nodeEngine) {
*
* @param members the members for which the presence is determined
*/
public void update(Collection<Member> members) {
boolean presence = quorumFunction.apply(members);
setLocalResult(presence);
updateLastResultAndFireEvent(members, presence);
void update(Collection<Member> members) {
QuorumState previousQuorumState = this.quorumState;
QuorumState newQuorumState = QuorumState.ABSENT;
try {
boolean present = quorumFunction.apply(members);
newQuorumState = present ? QuorumState.PRESENT : QuorumState.ABSENT;
} catch (Exception e) {
ILogger logger = nodeEngine.getLogger(QuorumService.class);
logger.severe("Quorum function of quorum: " + quorumName + " failed! Quorum status is set to "
+ newQuorumState, e);
}

this.quorumState = newQuorumState;
if (previousQuorumState != newQuorumState) {
createAndPublishEvent(members, newQuorumState == QuorumState.PRESENT);
}
}

public String getName() {
Expand All @@ -88,27 +106,9 @@ public QuorumConfig getConfig() {
return config;
}

public boolean isInitialized() {
return initialized;
}

@Override
public boolean isPresent() {
return isPresent.get();
}

/**
* Sets the current quorum presence to the given {@code presence} and marks the instance as initialized.
*
* @param presence the quorum presence to be set
*/
public void setLocalResult(boolean presence) {
setLocalResultInternal(presence);
}

private void setLocalResultInternal(boolean presence) {
this.initialized = true;
this.isPresent.set(presence);
return quorumState == QuorumState.PRESENT;
}

/**
Expand Down Expand Up @@ -149,43 +149,25 @@ private static boolean isWriteOperation(Operation op) {
* @param op the operation for which the quorum should be present
* @throws QuorumException if the operation requires a quorum and the quorum is not present
*/
public void ensureQuorumPresent(Operation op) {
void ensureQuorumPresent(Operation op) {
if (!isQuorumNeeded(op)) {
return;
}
Collection<Member> memberList = nodeEngine.getClusterService().getMembers(DATA_MEMBER_SELECTOR);
if (!isInitialized()) {
update(memberList);
}

if (!isPresent()) {
updateLastResultAndFireEvent(memberList, false);
throw newQuorumException(memberList);
throw newQuorumException();
}
updateLastResultAndFireEvent(memberList, true);
}

private QuorumException newQuorumException(Collection<Member> memberList) {
private QuorumException newQuorumException() {
if (size == 0) {
throw new QuorumException("Cluster quorum failed");
}
Collection<Member> memberList = nodeEngine.getClusterService().getMembers(DATA_MEMBER_SELECTOR);
throw new QuorumException("Cluster quorum failed, quorum minimum size: "
+ size + ", current size: " + memberList.size());
}


private void updateLastResultAndFireEvent(Collection<Member> memberList, Boolean presence) {
for (; ; ) {
boolean currentPresence = lastPresence.get();
if (presence.equals(currentPresence)) {
return;
}
if (lastPresence.compareAndSet(currentPresence, presence)) {
createAndPublishEvent(memberList, presence);
return;
}
}
}

private void createAndPublishEvent(Collection<Member> memberList, boolean presence) {
QuorumEvent quorumEvent = new QuorumEvent(nodeEngine.getThisAddress(), size, memberList, presence);
eventService.publishEvent(QuorumServiceImpl.SERVICE_NAME, quorumName, quorumEvent, quorumEvent.hashCode());
Expand Down Expand Up @@ -221,11 +203,10 @@ public boolean apply(Collection<Member> members) {
public String toString() {
return "QuorumImpl{"
+ "quorumName='" + quorumName + '\''
+ ", isPresent=" + isPresent
+ ", isPresent=" + isPresent()
+ ", size=" + size
+ ", config=" + config
+ ", quorumFunction=" + quorumFunction
+ ", initialized=" + initialized
+ '}';
}
}
Expand Up @@ -19,8 +19,6 @@
import com.hazelcast.config.Config;
import com.hazelcast.config.QuorumConfig;
import com.hazelcast.config.QuorumListenerConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IQueue;
import com.hazelcast.core.Member;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
Expand All @@ -35,6 +33,7 @@
import java.util.concurrent.CountDownLatch;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

@RunWith(HazelcastSerialClassRunner.class)
@Category({QuickTest.class, ParallelTest.class})
Expand Down Expand Up @@ -113,8 +112,8 @@ public void testQuorumEventProvidesCorrectMemberListSize() throws Exception {
public void onChange(QuorumEvent quorumEvent) {
if (!quorumEvent.isPresent()) {
final Collection<Member> currentMembers = quorumEvent.getCurrentMembers();
assertEquals(2, currentMembers.size());
assertEquals(3, quorumEvent.getThreshold());
assertTrue(currentMembers.size() < quorumEvent.getThreshold());
quorumNotPresent.countDown();
}
}
Expand Down
49 changes: 48 additions & 1 deletion hazelcast/src/test/java/com/hazelcast/quorum/QuorumTest.java
Expand Up @@ -27,6 +27,7 @@
import com.hazelcast.spi.MemberAttributeServiceEvent;
import com.hazelcast.spi.MembershipAwareService;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.test.AssertTask;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.TestHazelcastInstanceFactory;
Expand All @@ -41,24 +42,70 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;

@RunWith(HazelcastSerialClassRunner.class)
@Category({QuickTest.class, ParallelTest.class})
public class QuorumTest extends HazelcastTestSupport {

@Test
public void testQuorumIsSetCorrectlyOnNodeInitialization() {
Config config = new Config();
QuorumConfig quorumConfig1 = new QuorumConfig();
String quorumName1 = randomString();
quorumConfig1.setName(quorumName1);
quorumConfig1.setEnabled(true);
quorumConfig1.setQuorumFunctionImplementation(new QuorumFunction() {
@Override
public boolean apply(Collection<Member> members) {
return true;
}
});

QuorumConfig quorumConfig2 = new QuorumConfig();
String quorumName2 = randomString();
quorumConfig2.setName(quorumName2);
quorumConfig2.setEnabled(true);
quorumConfig2.setSize(2);

config.addQuorumConfig(quorumConfig1);
config.addQuorumConfig(quorumConfig2);

HazelcastInstance hazelcastInstance = createHazelcastInstance(config);
final Quorum quorum1 = hazelcastInstance.getQuorumService().getQuorum(quorumName1);
final Quorum quorum2 = hazelcastInstance.getQuorumService().getQuorum(quorumName2);
assertTrueEventually(new AssertTask() {
@Override
public void run()
throws Exception {
assertTrue(quorum1.isPresent());
assertFalse(quorum2.isPresent());
}
});
}

@Test
public void testQuorumIgnoresMemberAttributeEvents() throws Exception {
Config config = new Config();
QuorumConfig quorumConfig = new QuorumConfig().setName(randomString()).setEnabled(true);
RecordingQuorumFunction function = new RecordingQuorumFunction();
final RecordingQuorumFunction function = new RecordingQuorumFunction();
quorumConfig.setQuorumFunctionImplementation(function);
config.addQuorumConfig(quorumConfig);
HazelcastInstance hazelcastInstance = createHazelcastInstance(config);
NodeEngineImpl nodeEngine = getNodeEngineImpl(hazelcastInstance);
MembershipAwareService service = nodeEngine.getService(QuorumServiceImpl.SERVICE_NAME);

assertTrueEventually(new AssertTask() {
@Override
public void run()
throws Exception {
assertTrue(function.wasCalled);
}
});
function.wasCalled = false;

MemberAttributeServiceEvent event = mock(MemberAttributeServiceEvent.class);
service.memberAttributeChanged(event);

Expand Down
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.CountDownLatch;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@RunWith(HazelcastSerialClassRunner.class)
Expand Down Expand Up @@ -226,8 +227,8 @@ public void testQuorumEventProvidesCorrectMemberListSize() throws Exception {
public void onChange(QuorumEvent quorumEvent) {
if (!quorumEvent.isPresent()) {
Collection<Member> currentMembers = quorumEvent.getCurrentMembers();
assertEquals(2, currentMembers.size());
assertEquals(3, quorumEvent.getThreshold());
assertTrue(currentMembers.size() < quorumEvent.getThreshold());
belowLatch.countDown();
}
}
Expand Down
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.CountDownLatch;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

@RunWith(HazelcastSerialClassRunner.class)
@Category({QuickTest.class, ParallelTest.class})
Expand Down Expand Up @@ -205,15 +206,15 @@ public boolean apply(Collection<Member> members) {

@Test
public void testQuorumEventProvidesCorrectMemberListSize() throws Exception {
final CountDownLatch belowLatch = new CountDownLatch(1);
final CountDownLatch belowLatch = new CountDownLatch(2);
Config config = new Config();
QuorumListenerConfig listenerConfig = new QuorumListenerConfig();
listenerConfig.setImplementation(new QuorumListener() {
public void onChange(QuorumEvent quorumEvent) {
if (!quorumEvent.isPresent()) {
Collection<Member> currentMembers = quorumEvent.getCurrentMembers();
assertEquals(2, currentMembers.size());
assertEquals(3, quorumEvent.getThreshold());
assertTrue(currentMembers.size() < quorumEvent.getThreshold());
belowLatch.countDown();
}
}
Expand Down

0 comments on commit ac4741b

Please sign in to comment.