Skip to content

Commit

Permalink
0002155: Reduce updates to sym_node_communication
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Jan 23, 2015
1 parent 42a8135 commit 69e5f1d
Showing 1 changed file with 14 additions and 2 deletions.
Expand Up @@ -28,6 +28,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -60,6 +61,8 @@ public class NodeCommunicationService extends AbstractService implements INodeCo
private IClusterService clusterService;

private boolean initialized = false;

private Map<CommunicationType, Set<String>> currentlyExecuting;

public NodeCommunicationService(IClusterService clusterService, INodeService nodeService, IParameterService parameterService,
ISymmetricDialect symmetricDialect) {
Expand All @@ -68,6 +71,11 @@ public NodeCommunicationService(IClusterService clusterService, INodeService nod
createSqlReplacementTokens()));
this.clusterService = clusterService;
this.nodeService = nodeService;
this.currentlyExecuting = new HashMap<NodeCommunication.CommunicationType, Set<String>>();
CommunicationType[] types = CommunicationType.values();
for (CommunicationType communicationType : types) {
this.currentlyExecuting.put(communicationType, new HashSet<String>());
}
}

private final void initialize() {
Expand Down Expand Up @@ -291,10 +299,13 @@ public boolean execute(final NodeCommunication nodeCommunication, RemoteNodeStat
final INodeCommunicationExecutor executor) {
Date now = new Date();
Date lockTimeout = getLockTimeoutDate(nodeCommunication.getCommunicationType());
boolean locked = sqlTemplate.update(getSql("aquireLockSql"), clusterService.getServerId(), now, now,
final Set<String> executing = this.currentlyExecuting.get(nodeCommunication.getCommunicationType());
boolean locked = !executing.contains(nodeCommunication.getNodeId()) &&
sqlTemplate.update(getSql("aquireLockSql"), clusterService.getServerId(), now, now,
nodeCommunication.getNodeId(), nodeCommunication.getCommunicationType().name(),
lockTimeout) == 1;
if (locked) {
executing.add(nodeCommunication.getNodeId());
nodeCommunication.setLastLockTime(now);
nodeCommunication.setLockingServerId(clusterService.getServerId());
final RemoteNodeStatus status = statuses.add(nodeCommunication.getNodeId());
Expand All @@ -312,6 +323,7 @@ public void run() {
nodeCommunication.getNodeId()), ex);
} finally {
unlock(nodeCommunication, status, failed, ts);
executing.remove(nodeCommunication.getNodeId());
}
}
};
Expand Down Expand Up @@ -358,7 +370,7 @@ protected void unlock(NodeCommunication nodeCommunication,
nodeCommunication.getCommunicationType().name(),
nodeCommunication.getNodeId(), attempts));
}
} catch (Exception e) {
} catch (Throwable e) {
log.error(String.format(
"Failed to unlock %s node communication record for %s",
nodeCommunication.getCommunicationType().name(),
Expand Down

0 comments on commit 69e5f1d

Please sign in to comment.