Skip to content

Commit

Permalink
0003206: Initial load extractor threading does not support multiple
Browse files Browse the repository at this point in the history
queues
  • Loading branch information
jumpmind-josh committed Jul 21, 2017
1 parent a5772a9 commit 99dfab7
Showing 1 changed file with 35 additions and 7 deletions.
Expand Up @@ -151,6 +151,7 @@
import org.jumpmind.util.CustomizableThreadFactory;
import org.jumpmind.util.FormatUtils;
import org.jumpmind.util.Statistics;
import org.springframework.jdbc.core.RowMapper;

/**
* @see IDataExtractorService
Expand Down Expand Up @@ -1369,10 +1370,10 @@ public RemoteNodeStatuses queueWork(boolean force) {
if (identity != null) {
if (force || clusterService.lock(ClusterConstants.INITIAL_LOAD_EXTRACT)) {
try {
Map<String, String> nodes = getExtractRequestNodes();
for (Map.Entry<String, String> entry : nodes.entrySet()) {
queue(entry.getKey(), entry.getValue(), statuses);
}
List<NodeQueuePair> nodes = getExtractRequestNodes();
for (NodeQueuePair pair : nodes) {
queue(pair.getNodeId(), pair.getQueue(), statuses);
}
} finally {
if (!force) {
clusterService.unlock(ClusterConstants.INITIAL_LOAD_EXTRACT);
Expand All @@ -1394,17 +1395,44 @@ protected void queue(String nodeId, String queue, RemoteNodeStatuses statuses) {
}
}

public Map<String, String> getExtractRequestNodes() {
return sqlTemplate.queryForMap(getSql("selectNodeIdsForExtractSql"), "node_id", "queue",
public List<NodeQueuePair> getExtractRequestNodes() {
return sqlTemplate.query(getSql("selectNodeIdsForExtractSql"), new NodeQueuePairMapper(),
ExtractStatus.NE.name());
}

private class NodeQueuePair {
private String nodeId;
private String queue;
public String getNodeId() {
return nodeId;
}
public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}
public String getQueue() {
return queue;
}
public void setQueue(String queue) {
this.queue = queue;
}
}

class NodeQueuePairMapper implements ISqlRowMapper<NodeQueuePair> {
@Override
public NodeQueuePair mapRow(Row row) {
NodeQueuePair pair = new NodeQueuePair();
pair.setNodeId(row.getString("node_id"));
pair.setQueue(row.getString("queue"));
return pair;
}
}

public List<ExtractRequest> getExtractRequestsForNode(NodeCommunication nodeCommunication) {
return sqlTemplate.query(getSql("selectExtractRequestForNodeSql"),
new ExtractRequestMapper(), nodeCommunication.getNodeId(), nodeCommunication.getQueue()
, ExtractRequest.ExtractStatus.NE.name());
}

@Override
public void resetExtractRequest(OutgoingBatch batch) {
ISqlTransaction transaction = null;
Expand Down

0 comments on commit 99dfab7

Please sign in to comment.