Skip to content

Commit

Permalink
Update for issue #274.
Browse files Browse the repository at this point in the history
  • Loading branch information
mdogan committed Oct 15, 2012
1 parent f32f908 commit 7996c29
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 38 deletions.
19 changes: 10 additions & 9 deletions hazelcast/src/main/java/com/hazelcast/cluster/ClusterManager.java
Expand Up @@ -345,7 +345,7 @@ public final void heartBeater() {
if (conn != null) {
sendHeartbeat(conn);
} else {
logger.log(Level.FINEST, "could not connect to " + address + " to send heartbeat");
logger.log(Level.FINEST, "Could not connect to " + address + " to send heartbeat");
}
}
}
Expand Down Expand Up @@ -382,20 +382,21 @@ public void process() {
return;
}
final ILogger logger = node.getLogger(MasterConfirmation.class.getName());
if (!getNode().isMaster()) {
logger.log(Level.WARNING, endPoint + " has sent MasterConfirmation, but this node is not master!");
return;
}
final ClusterManager clusterManager = node.clusterManager;
MemberImpl member = clusterManager.getMember(endPoint);
final MemberImpl member = clusterManager.getMember(endPoint);
if (member != null) {
if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "MasterConfirmation has been received from " + member);
if (getNode().isMaster()) {
if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "MasterConfirmation has been received from " + member);
}
clusterManager.memberMasterConfirmationTimes.put(member, Clock.currentTimeMillis());
} else {
logger.log(Level.WARNING, endPoint + " has sent MasterConfirmation, but this node is not master!");
}
clusterManager.memberMasterConfirmationTimes.put(member, Clock.currentTimeMillis());
} else {
logger.log(Level.WARNING, "MasterConfirmation has been received from " + endPoint
+ ", but it is not a member of this cluster!");
clusterManager.sendProcessableTo(new MemberRemover(clusterManager.thisAddress), conn);
}
}
}
Expand Down
Expand Up @@ -20,6 +20,7 @@
import com.hazelcast.impl.base.PacketProcessor;
import com.hazelcast.impl.base.SystemLogService;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.Packet;
import com.hazelcast.util.Clock;
import com.hazelcast.util.ThreadWatcher;
Expand Down Expand Up @@ -160,7 +161,8 @@ private void unpark() {

private void processPacket(Packet packet) {
if (!running) return;
final MemberImpl memberFrom = node.clusterManager.getMember(packet.conn.getEndPoint());
final Address endPoint = packet.conn.getEndPoint();
final MemberImpl memberFrom = node.clusterManager.getMember(endPoint);
if (memberFrom != null) {
memberFrom.didRead();
}
Expand Down
19 changes: 14 additions & 5 deletions hazelcast/src/main/java/com/hazelcast/impl/ExecutorManager.java
Expand Up @@ -19,6 +19,7 @@
import com.hazelcast.config.ExecutorConfig;
import com.hazelcast.core.DistributedTask;
import com.hazelcast.core.Member;
import com.hazelcast.impl.Constants.RedoType;
import com.hazelcast.impl.executor.ParallelExecutor;
import com.hazelcast.impl.executor.ParallelExecutorService;
import com.hazelcast.impl.monitor.ExecutorOperationsCounter;
Expand Down Expand Up @@ -163,16 +164,24 @@ public void handle(Request request) {

class ExecutionOperationHandler extends AbstractOperationHandler {
void doOperation(Request request) {
NamedExecutorService namedExecutorService = getOrCreateNamedExecutorService(request.name);
ExecutionKey executionKey = new ExecutionKey(request.caller, request.longValue);
RequestExecutor requestExecutor = new RequestExecutor(request, executionKey);
executions.put(executionKey, requestExecutor);
namedExecutorService.execute(requestExecutor);
if (isCallerKnownMember(request)) {
NamedExecutorService namedExecutorService = getOrCreateNamedExecutorService(request.name);
ExecutionKey executionKey = new ExecutionKey(request.caller, request.longValue);
RequestExecutor requestExecutor = new RequestExecutor(request, executionKey);
executions.put(executionKey, requestExecutor);
namedExecutorService.execute(requestExecutor);
} else {
returnRedoResponse(request, RedoType.REDO_MEMBER_UNKNOWN);
}
}

public void handle(Request request) {
doOperation(request);
}

boolean isCallerKnownMember(Request request) {
return (request.local || getMember(request.caller) != null);
}
}

class ExecutionKey {
Expand Down
58 changes: 35 additions & 23 deletions hazelcast/src/test/java/com/hazelcast/cluster/MemberListTest.java
Expand Up @@ -61,32 +61,42 @@ public void testOutOfSyncMemberListIssue274() throws Exception {
assertEquals(3, h2.getCluster().getMembers().size());
assertEquals(3, h3.getCluster().getMembers().size());

// This simulates node 1 doing at least one read from the other nodes in the list at regular intervals
// This simulates each node reading from the other nodes in the list at regular intervals
// This prevents the heart beat code from timing out
final HazelcastInstance[] instances = new HazelcastInstance[] {h1, h2, h3};
final AtomicBoolean doingWork = new AtomicBoolean(true);
Thread workThread = new Thread(new Runnable() {
public void run() {
while (doingWork.get()) {
Set<Member> members = new HashSet<Member>(h1.getCluster().getMembers());
members.remove(h1.getCluster().getLocalMember());
MultiTask<String> task = new MultiTask<String>(new PingCallable(), members);
h1.getExecutorService().execute(task);

try {
task.get();
} catch (Exception e) {
e.printStackTrace();
}

try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
Thread[] workThreads = new Thread[instances.length];
for (int i = 0; i < instances.length; i++) {
final int threadNum = i;
workThreads[threadNum] = new Thread(new Runnable() {

public void run() {
while (doingWork.get()) {
final HazelcastInstance hz = instances[threadNum];

Set<Member> members = new HashSet<Member>(hz.getCluster().getMembers());
members.remove(hz.getCluster().getLocalMember());

MultiTask<String> task = new MultiTask<String>(new PingCallable(), members);
hz.getExecutorService().execute(task);

try {
task.get();
} catch (Exception e) {
e.printStackTrace();
}

try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
});
workThread.start();
});
workThreads[threadNum].start();
}


final Node n3 = TestUtil.getNode(h3);
n3.clusterManager.enqueueAndWait(new Processable() {
Expand All @@ -102,7 +112,9 @@ public void process() {
Thread.sleep(30 * 1000);

doingWork.set(false);
workThread.join();
for (Thread t : workThreads) {
t.join();
}

assertEquals(3, h1.getCluster().getMembers().size());
assertEquals(3, h2.getCluster().getMembers().size());
Expand Down

0 comments on commit 7996c29

Please sign in to comment.