Skip to content

Commit

Permalink
0001253: Node communication records can be stranded
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Jun 5, 2013
1 parent 1c45263 commit 9342eed
Showing 1 changed file with 50 additions and 19 deletions.
Expand Up @@ -49,6 +49,8 @@
import org.jumpmind.symmetric.service.INodeCommunicationService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.util.AppUtils;
import org.jumpmind.util.RandomTimeSlot;

public class NodeCommunicationService extends AbstractService implements INodeCommunicationService {

Expand Down Expand Up @@ -286,25 +288,7 @@ public void run() {
nodeCommunication.getCommunicationType().name(),
nodeCommunication.getNodeId()), ex);
} finally {
long millis = System.currentTimeMillis() - ts;
nodeCommunication.setLockTime(null);
nodeCommunication.setLastLockMillis(millis);
if (failed) {
nodeCommunication.setFailCount(nodeCommunication.getFailCount() + 1);
nodeCommunication.setTotalFailCount(nodeCommunication
.getTotalFailCount() + 1);
nodeCommunication.setTotalFailMillis(nodeCommunication
.getTotalFailMillis() + millis);
} else {
nodeCommunication.setSuccessCount(nodeCommunication.getSuccessCount() + 1);
nodeCommunication.setTotalSuccessCount(nodeCommunication
.getTotalSuccessCount() + 1);
nodeCommunication.setTotalSuccessMillis(nodeCommunication
.getTotalSuccessMillis() + millis);
nodeCommunication.setFailCount(0);
}
status.setComplete(true);
save(nodeCommunication);
unlock(nodeCommunication, status, failed, ts);
}
}
};
Expand All @@ -316,6 +300,53 @@ public void run() {
}
return locked;
}

protected void unlock(NodeCommunication nodeCommunication,
RemoteNodeStatus status, boolean failed, long ts) {
boolean unlocked = false;
int attempts = 1;
do {
try {
long millis = System.currentTimeMillis() - ts;
nodeCommunication.setLockTime(null);
nodeCommunication.setLastLockMillis(millis);
if (failed) {
nodeCommunication.setFailCount(nodeCommunication
.getFailCount() + 1);
nodeCommunication.setTotalFailCount(nodeCommunication
.getTotalFailCount() + 1);
nodeCommunication.setTotalFailMillis(nodeCommunication
.getTotalFailMillis() + millis);
} else {
nodeCommunication.setSuccessCount(nodeCommunication
.getSuccessCount() + 1);
nodeCommunication.setTotalSuccessCount(nodeCommunication
.getTotalSuccessCount() + 1);
nodeCommunication.setTotalSuccessMillis(nodeCommunication
.getTotalSuccessMillis() + millis);
nodeCommunication.setFailCount(0);
}
status.setComplete(true);
save(nodeCommunication);
unlocked = true;
if (attempts > 1) {
log.info(String.format("Successfully unlocked %s node communication record for %s after %d attempts",
nodeCommunication.getCommunicationType().name(),
nodeCommunication.getNodeId(), attempts));
}
} catch (Exception e) {
log.error(String.format(
"Failed to unlock %s node communication record for %s",
nodeCommunication.getCommunicationType().name(),
nodeCommunication.getNodeId()), e);
long sleepTime = DateUtils.MILLIS_PER_SECOND
* new RandomTimeSlot(nodeCommunication.getNodeId(), 30).getRandomValueSeededByExternalId();
log.warn("Sleeping for {} ms before attempting to unlock the node communication record again", sleepTime);
AppUtils.sleep(sleepTime);
attempts++;
};
} while (!unlocked);
}

public void stop() {
Collection<CommunicationType> services = new HashSet<NodeCommunication.CommunicationType>(
Expand Down

0 comments on commit 9342eed

Please sign in to comment.