Skip to content

Commit

Permalink
0003064: Allow the event action on group links to be overridden
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Apr 20, 2017
1 parent f419905 commit 06314f3
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 21 deletions.
Expand Up @@ -89,8 +89,6 @@ public interface IConfigurationService {

public Map<String, Channel> getChannels(boolean refreshCache);

public Map<String, Channel> getChannels(NodeGroupLinkAction eventAction, boolean refreshCache);

public NodeChannel getNodeChannel(String channelId, String nodeId, boolean refreshExtractMillis);

public void clearCache();
Expand Down
Expand Up @@ -28,6 +28,7 @@

import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.symmetric.model.LoadSummary;
import org.jumpmind.symmetric.model.NodeGroupLinkAction;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.OutgoingBatchSummary;
import org.jumpmind.symmetric.model.OutgoingBatches;
Expand Down Expand Up @@ -57,6 +58,8 @@ public interface IOutgoingBatchService {

public OutgoingBatches getOutgoingBatches(String nodeId, String channelId, boolean includeDisabledChannels);

public OutgoingBatches getOutgoingBatches(String nodeId, String channelThread, NodeGroupLinkAction eventAction, boolean includeDisabledChannels);

public OutgoingBatches getOutgoingBatchRange(long startBatchId, long endBatchId);

public OutgoingBatches getOutgoingBatchByLoad(long loadI);
Expand Down
Expand Up @@ -582,16 +582,6 @@ public Channel mapRow(Row row) {
return channels;
}

public Map<String, Channel> getChannels(NodeGroupLinkAction eventAction, boolean refreshCache) {
Map<String, Channel> channels = new HashMap<String, Channel>();
for (Channel channel : getChannels(refreshCache).values()) {
if (channel.getDataEventAction() != null && channel.getDataEventAction().equals(eventAction)) {
channels.put(channel.getChannelId(), channel);
}
}
return channels;
}

public Channel getChannel(String channelId) {
NodeChannel nodeChannel = getNodeChannel(channelId, false);
if (nodeChannel != null) {
Expand Down
Expand Up @@ -119,6 +119,7 @@
import org.jumpmind.symmetric.model.ProcessInfoDataWriter;
import org.jumpmind.symmetric.model.ProcessInfoKey;
import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType;
import org.jumpmind.symmetric.model.NodeGroupLinkAction;
import org.jumpmind.symmetric.model.RemoteNodeStatus;
import org.jumpmind.symmetric.model.RemoteNodeStatuses;
import org.jumpmind.symmetric.model.Router;
Expand Down Expand Up @@ -477,7 +478,7 @@ public List<OutgoingBatch> extract(ProcessInfo processInfo, Node targetNode,
return extract(processInfo, targetNode, null, transport);
}

public List<OutgoingBatch> extract(ProcessInfo processInfo, Node targetNode, String queue,
public List<OutgoingBatch> extract(ProcessInfo processInfo, Node targetNode, String queue,
IOutgoingTransport transport) {

/*
Expand All @@ -491,9 +492,11 @@ public List<OutgoingBatch> extract(ProcessInfo processInfo, Node targetNode, Str

OutgoingBatches batches = null;
if (queue != null) {
batches = outgoingBatchService.getOutgoingBatches(targetNode.getNodeId(), queue, false);
batches = outgoingBatchService.getOutgoingBatches(targetNode.getNodeId(), queue,
processInfo.getKey().getProcessType().equals(ProcessInfoKey.ProcessType.PUSH_JOB) ?
NodeGroupLinkAction.P : NodeGroupLinkAction.W, false);
} else {
batches = outgoingBatchService.getOutgoingBatches(targetNode.getNodeId(), false);
batches = outgoingBatchService.getOutgoingBatches(targetNode.getNodeId(), false);
}

if (batches.containsBatches()) {
Expand Down
Expand Up @@ -374,7 +374,7 @@ public List<Node> findSourceNodesFor(NodeGroupLinkAction eventAction) {
List<Node> list = sourceNodesCache.get(eventAction.name());
if (list == null || (System.currentTimeMillis() - nodeLinkCacheTime) >= cacheTimeoutInMs) {
list = sqlTemplate.query(getSql("selectNodePrefixSql", "findNodesWhoTargetMeSql"),
new NodeRowMapper(), node.getNodeGroupId(), eventAction.name(), NodeGroupLinkAction.B);
new NodeRowMapper(), node.getNodeGroupId(), eventAction.name(), NodeGroupLinkAction.B.name());
sourceNodesCache.put(eventAction.name(), list);
nodeLinkCacheTime = System.currentTimeMillis();
}
Expand All @@ -391,7 +391,7 @@ public List<Node> findTargetNodesFor(NodeGroupLinkAction eventAction) {
List<Node> list = targetNodesCache.get(eventAction.name());
if (list == null || (System.currentTimeMillis() - nodeLinkCacheTime) >= cacheTimeoutInMs) {
list = sqlTemplate.query(getSql("selectNodePrefixSql", "findNodesWhoITargetSql"),
new NodeRowMapper(), node.getNodeGroupId(), eventAction.name(), NodeGroupLinkAction.B);
new NodeRowMapper(), node.getNodeGroupId(), eventAction.name(), NodeGroupLinkAction.B.name());
targetNodesCache.put(eventAction.name(), list);
nodeLinkCacheTime = System.currentTimeMillis();
}
Expand Down
Expand Up @@ -47,6 +47,7 @@
import org.jumpmind.symmetric.model.LoadSummary;
import org.jumpmind.symmetric.model.NodeChannel;
import org.jumpmind.symmetric.model.NodeGroupChannelWindow;
import org.jumpmind.symmetric.model.NodeGroupLinkAction;
import org.jumpmind.symmetric.model.NodeHost;
import org.jumpmind.symmetric.model.NodeSecurity;
import org.jumpmind.symmetric.model.OutgoingBatch;
Expand Down Expand Up @@ -395,17 +396,32 @@ public OutgoingBatches getOutgoingBatches(String nodeId, boolean includeDisabled
return getOutgoingBatches(nodeId, null, includeDisabledChannels);
}

@Override
public OutgoingBatches getOutgoingBatches(String nodeId, String channelThread, boolean includeDisabledChannels) {
return getOutgoingBatches(nodeId, channelThread, null, includeDisabledChannels);
}

@Override
public OutgoingBatches getOutgoingBatches(String nodeId, String channelThread, NodeGroupLinkAction eventAction, boolean includeDisabledChannels) {
long ts = System.currentTimeMillis();
final int maxNumberOfBatchesToSelect = parameterService.getInt(
ParameterConstants.OUTGOING_BATCH_MAX_BATCHES_TO_SELECT, 1000);

String sql = null;
Object[] params = null;
int[] types = null;

if (channelThread != null) {

if (eventAction != null) {
sql = getSql("selectOutgoingBatchPrefixSql", "selectOutgoingBatchChannelActionSql");
params = new Object[] { eventAction.name(),
nodeId, channelThread, OutgoingBatch.Status.RQ.name(), OutgoingBatch.Status.NE.name(),
OutgoingBatch.Status.QY.name(), OutgoingBatch.Status.SE.name(),
OutgoingBatch.Status.LD.name(), OutgoingBatch.Status.ER.name(),
OutgoingBatch.Status.IG.name(), OutgoingBatch.Status.RS.name()};
types = new int[] {
Types.CHAR, Types.VARCHAR, Types.VARCHAR, Types.CHAR, Types.CHAR, Types.CHAR, Types.CHAR,
Types.CHAR, Types.CHAR, Types.CHAR, Types.CHAR
};
} else if (channelThread != null) {
sql = getSql("selectOutgoingBatchPrefixSql", "selectOutgoingBatchChannelSql");
params = new Object[] { nodeId, channelThread, OutgoingBatch.Status.RQ.name(), OutgoingBatch.Status.NE.name(),
OutgoingBatch.Status.QY.name(), OutgoingBatch.Status.SE.name(),
Expand All @@ -415,7 +431,7 @@ public OutgoingBatches getOutgoingBatches(String nodeId, String channelThread, b
Types.VARCHAR, Types.VARCHAR, Types.CHAR, Types.CHAR, Types.CHAR, Types.CHAR,
Types.CHAR, Types.CHAR, Types.CHAR, Types.CHAR
};
}
}
else {
sql = getSql("selectOutgoingBatchPrefixSql", "selectOutgoingBatchSql");
params = new Object[] { nodeId, OutgoingBatch.Status.RQ.name(), OutgoingBatch.Status.NE.name(),
Expand Down
Expand Up @@ -71,6 +71,11 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform,
putSql("selectOutgoingBatchChannelSql",
" join $(channel) c on c.channel_id = b.channel_id where node_id = ? and c.queue = ? and status in (?, ?, ?, ?, ?, ?, ?, ?) order by batch_id asc ");

putSql("selectOutgoingBatchChannelActionSql",
" join $(channel) c on c.channel_id = b.channel_id" +
" where (c.data_event_action is null or c.data_event_action = ?)" +
" and b.node_id = ? and c.queue = ? and b.status in (?, ?, ?, ?, ?, ?, ?, ?) order by b.batch_id asc ");

putSql("selectOutgoingBatchRangeSql",
"where batch_id between ? and ? order by batch_id ");

Expand Down

0 comments on commit 06314f3

Please sign in to comment.