Skip to content

Commit

Permalink
0003064: Allow the event action on group links to be overridden
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Apr 20, 2017
1 parent a68eec0 commit 4d6be78
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 9 deletions.
Expand Up @@ -58,7 +58,8 @@ public interface IOutgoingBatchService {

public OutgoingBatches getOutgoingBatches(String nodeId, String channelId, boolean includeDisabledChannels);

public OutgoingBatches getOutgoingBatches(String nodeId, String channelThread, NodeGroupLinkAction eventAction, boolean includeDisabledChannels);
public OutgoingBatches getOutgoingBatches(String nodeId, String channelThread, NodeGroupLinkAction eventAction,
NodeGroupLinkAction defaultEventAction, boolean includeDisabledChannels);

public OutgoingBatches getOutgoingBatchRange(long startBatchId, long endBatchId);

Expand Down
Expand Up @@ -106,11 +106,12 @@
import org.jumpmind.symmetric.model.DataMetaData;
import org.jumpmind.symmetric.model.ExtractRequest;
import org.jumpmind.symmetric.model.ExtractRequest.ExtractStatus;
import org.jumpmind.symmetric.model.NodeCommunication.CommunicationType;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeChannel;
import org.jumpmind.symmetric.model.NodeCommunication;
import org.jumpmind.symmetric.model.NodeCommunication.CommunicationType;
import org.jumpmind.symmetric.model.NodeGroupLink;
import org.jumpmind.symmetric.model.NodeGroupLinkAction;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.OutgoingBatch.Status;
import org.jumpmind.symmetric.model.OutgoingBatchWithPayload;
Expand All @@ -119,7 +120,6 @@
import org.jumpmind.symmetric.model.ProcessInfoDataWriter;
import org.jumpmind.symmetric.model.ProcessInfoKey;
import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType;
import org.jumpmind.symmetric.model.NodeGroupLinkAction;
import org.jumpmind.symmetric.model.RemoteNodeStatus;
import org.jumpmind.symmetric.model.RemoteNodeStatuses;
import org.jumpmind.symmetric.model.Router;
Expand Down Expand Up @@ -492,9 +492,17 @@ public List<OutgoingBatch> extract(ProcessInfo processInfo, Node targetNode, Str

OutgoingBatches batches = null;
if (queue != null) {
batches = outgoingBatchService.getOutgoingBatches(targetNode.getNodeId(), queue,
processInfo.getKey().getProcessType().equals(ProcessInfoKey.ProcessType.PUSH_JOB) ? NodeGroupLinkAction.P :
processInfo.getKey().getProcessType().equals(ProcessInfoKey.ProcessType.PULL_HANDLER) ? NodeGroupLinkAction.W : null, false);
NodeGroupLinkAction defaultAction = configurationService.getNodeGroupLinkFor(nodeService.findIdentity().getNodeGroupId(),
targetNode.getNodeGroupId(), false).getDataEventAction();
ProcessInfoKey.ProcessType processType = processInfo.getKey().getProcessType();
NodeGroupLinkAction action = null;

if (processType.equals(ProcessInfoKey.ProcessType.PUSH_JOB)) {
action = NodeGroupLinkAction.P;
} else if (processType.equals(ProcessInfoKey.ProcessType.PULL_HANDLER)) {
action = NodeGroupLinkAction.W;
}
batches = outgoingBatchService.getOutgoingBatches(targetNode.getNodeId(), queue, action, defaultAction, false);
} else {
batches = outgoingBatchService.getOutgoingBatches(targetNode.getNodeId(), false);
}
Expand Down
Expand Up @@ -397,11 +397,12 @@ public OutgoingBatches getOutgoingBatches(String nodeId, boolean includeDisabled
}

public OutgoingBatches getOutgoingBatches(String nodeId, String channelThread, boolean includeDisabledChannels) {
return getOutgoingBatches(nodeId, channelThread, null, includeDisabledChannels);
return getOutgoingBatches(nodeId, channelThread, null, null, includeDisabledChannels);
}

@Override
public OutgoingBatches getOutgoingBatches(String nodeId, String channelThread, NodeGroupLinkAction eventAction, boolean includeDisabledChannels) {
public OutgoingBatches getOutgoingBatches(String nodeId, String channelThread, NodeGroupLinkAction eventAction, NodeGroupLinkAction defaultEventAction,
boolean includeDisabledChannels) {
long ts = System.currentTimeMillis();
final int maxNumberOfBatchesToSelect = parameterService.getInt(
ParameterConstants.OUTGOING_BATCH_MAX_BATCHES_TO_SELECT, 1000);
Expand All @@ -411,7 +412,12 @@ public OutgoingBatches getOutgoingBatches(String nodeId, String channelThread, N
int[] types = null;

if (eventAction != null) {
sql = getSql("selectOutgoingBatchPrefixSql", "selectOutgoingBatchChannelActionSql");
if (eventAction.equals(defaultEventAction)) {
sql = getSql("selectOutgoingBatchPrefixSql", "selectOutgoingBatchChannelActionNullSql");
} else {
sql = getSql("selectOutgoingBatchPrefixSql", "selectOutgoingBatchChannelActionSql");
}

params = new Object[] { eventAction.name(),
nodeId, channelThread, OutgoingBatch.Status.RQ.name(), OutgoingBatch.Status.NE.name(),
OutgoingBatch.Status.QY.name(), OutgoingBatch.Status.SE.name(),
Expand Down
Expand Up @@ -72,6 +72,11 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform,
" join $(channel) c on c.channel_id = b.channel_id where node_id = ? and c.queue = ? and status in (?, ?, ?, ?, ?, ?, ?, ?) order by batch_id asc ");

putSql("selectOutgoingBatchChannelActionSql",
" join $(channel) c on c.channel_id = b.channel_id" +
" where c.data_event_action = ?" +
" and b.node_id = ? and c.queue = ? and b.status in (?, ?, ?, ?, ?, ?, ?, ?) order by b.batch_id asc ");

putSql("selectOutgoingBatchChannelActionNullSql",
" join $(channel) c on c.channel_id = b.channel_id" +
" where (c.data_event_action is null or c.data_event_action = ?)" +
" and b.node_id = ? and c.queue = ? and b.status in (?, ?, ?, ?, ?, ?, ?, ?) order by b.batch_id asc ");
Expand Down

0 comments on commit 4d6be78

Please sign in to comment.