Skip to content

Commit

Permalink
0003227: When pushing to a large number of nodes on limited number of
Browse files Browse the repository at this point in the history
thread, only allocate nodes that have pending changes
  • Loading branch information
chenson42 committed Aug 18, 2017
1 parent cf25151 commit e578608
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.commons.lang.time.DateUtils;
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.Row;
import org.jumpmind.db.sql.mapper.StringMapper;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.model.Channel;
Expand Down Expand Up @@ -166,7 +167,9 @@ protected List<NodeCommunication> find(CommunicationType communicationType) {

public List<NodeCommunication> list(CommunicationType communicationType) {
initialize();
long ts = System.currentTimeMillis();
List<NodeCommunication> communicationRows = find(communicationType);
log.debug("Found {} node communication locks to push to in {}ms", communicationRows.size(), System.currentTimeMillis()-ts);
List<Node> nodesToCommunicateWith = null;
switch (communicationType) {
case PULL:
Expand All @@ -175,7 +178,9 @@ public List<NodeCommunication> list(CommunicationType communicationType) {
break;
case FILE_PUSH:
case PUSH:
ts = System.currentTimeMillis();
nodesToCommunicateWith = removeOfflineNodes(nodeService.findNodesToPushTo());
log.debug("Found {} nodes to push to in {}ms", nodesToCommunicateWith.size(), System.currentTimeMillis()-ts);
break;
case OFFLN_PUSH:
case OFF_FSPUSH:
Expand All @@ -189,17 +194,16 @@ public List<NodeCommunication> list(CommunicationType communicationType) {
nodesToCommunicateWith = new ArrayList<Node>(0);
break;
}


Map<String, NodeCommunication> communicationRowsMap = new HashMap<>(communicationRows.size());
for (NodeCommunication nodeCommunication : communicationRows) {
communicationRowsMap.put(nodeCommunication.getIdentifier(), nodeCommunication);
}

List<NodeCommunication> nodesToCommunicateWithList = filterForChannelThreading(nodesToCommunicateWith);

Map<String, NodeCommunication> nodesToCommunicateWithListMap = new HashMap<>(nodesToCommunicateWithList.size());
for (NodeCommunication nodeToCommunicateWith : nodesToCommunicateWithList) {
NodeCommunication comm = null;
for (NodeCommunication nodeCommunication : communicationRows) {
if (nodeCommunication.getIdentifier().equals(nodeToCommunicateWith.getIdentifier())) {
comm = nodeCommunication;
break;
}
}
NodeCommunication comm = communicationRowsMap.get(nodeToCommunicateWith.getIdentifier());

if (comm == null) {
comm = new NodeCommunication();
Expand All @@ -211,40 +215,56 @@ public List<NodeCommunication> list(CommunicationType communicationType) {
}

comm.setNode(nodeToCommunicateWith.getNode());

nodesToCommunicateWithListMap.put(nodeToCommunicateWith.getNodeId(), nodeToCommunicateWith);
}

Iterator<NodeCommunication> it = communicationRows.iterator();
while (it.hasNext()) {
NodeCommunication nodeCommunication = it.next();

Node node = null;
for (NodeCommunication nodeToCommunicateWith : nodesToCommunicateWithList) {
if (nodeCommunication.getNodeId().equals(nodeToCommunicateWith.getNodeId())) {
node = nodeToCommunicateWith.getNode();
break;
}
}

NodeCommunication nodeToCommunicateWith = nodesToCommunicateWithListMap.get(nodeCommunication.getNodeId());
Node node = nodeToCommunicateWith != null ? nodeToCommunicateWith.getNode() : null;
if (node == null) {
delete(nodeCommunication);
it.remove();
}
}

if (communicationType == CommunicationType.PUSH &&
parameterService.getInt(ParameterConstants.PUSH_THREAD_COUNT_PER_SERVER) < communicationRows.size()) {
ts = System.currentTimeMillis();
List<String> nodeIds = getNodeIdsWithUnsentCount();
List<NodeCommunication> filteredNodes = new ArrayList<NodeCommunication>(nodeIds.size());
for (NodeCommunication nodeCommunication : communicationRows) {
if (nodeIds.contains(nodeCommunication.getNodeId())) {
filteredNodes.add(nodeCommunication);
}
}
log.debug("Filtered down to {} nodes to push to in {}ms", filteredNodes.size(), System.currentTimeMillis()-ts);
communicationRows = filteredNodes;
}

if (communicationType == CommunicationType.PULL || communicationType == CommunicationType.FILE_PULL) {
communicationRows = removeNodesWithNoBatchesToSend(communicationRows);
}

return communicationRows;
}

protected List<String> getNodeIdsWithUnsentCount() {
return sqlTemplate.query(getSql("selectNodeIdsWithUnsentBatchsSql"),
new StringMapper());
}

protected List<NodeCommunication> filterForChannelThreading(List<Node> nodesToCommunicateWith) {
List<NodeCommunication> nodeCommunications = new ArrayList<NodeCommunication>();

Collection<Channel> channels = configurationService.getChannels(false).values();
for (Node node : nodesToCommunicateWith) {
if (node.isVersionGreaterThanOrEqualTo(3, 8, 0)) {
Set<String> channelThreads = new HashSet<String>();
for (Channel channel : configurationService.getChannels(false).values()) {
for (Channel channel : channels) {
if (!channelThreads.contains(channel.getQueue())) {
NodeCommunication nodeCommunication = new NodeCommunication();
nodeCommunication.setNodeId(node.getNodeId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public NodeCommunicationServiceSqlMap(IDatabasePlatform platform,
putSql("deleteSql", "delete from $(node_communication)");

putSql("clearLocksOnRestartSql", "update $(node_communication) set lock_time=null where locking_server_id=? and lock_time is not null");

putSql("selectNodeIdsWithUnsentBatchsSql", "select distinct(node_id) from $(outgoing_batch) where status <> 'OK'");

putSql("selectNodeCommunicationSql",
"select * from $(node_communication) where communication_type=? order by node_priority DESC,last_lock_time");
Expand Down

0 comments on commit e578608

Please sign in to comment.