Skip to content

Commit

Permalink
0002305: Add new extension point that allows batches to be filtered p…
Browse files Browse the repository at this point in the history
…rior to sending them
  • Loading branch information
chenson42 committed May 27, 2015
1 parent 92aa84c commit 6267c56
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ protected void init() {
this.groupletService = new GroupletService(this);
this.triggerRouterService = new TriggerRouterService(this);
this.outgoingBatchService = new OutgoingBatchService(parameterService, symmetricDialect,
nodeService, configurationService, sequenceService, clusterService);
nodeService, configurationService, sequenceService, clusterService, extensionService);
this.dataService = new DataService(this, extensionService);
this.routerService = buildRouterService();
this.nodeCommunicationService = buildNodeCommunicationService(clusterService, nodeService, parameterService, symmetricDialect);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.jumpmind.symmetric.ext;

import java.util.List;

import org.jumpmind.extension.IExtensionPoint;
import org.jumpmind.symmetric.model.NodeChannel;
import org.jumpmind.symmetric.model.OutgoingBatch;

public interface IOutgoingBatchFilter extends IExtensionPoint {

public List<OutgoingBatch> filter(NodeChannel channel, List<OutgoingBatch> batches);

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.ext.IOutgoingBatchFilter;
import org.jumpmind.symmetric.model.Channel;
import org.jumpmind.symmetric.model.NodeChannel;
import org.jumpmind.symmetric.model.NodeGroupChannelWindow;
Expand All @@ -51,6 +52,7 @@
import org.jumpmind.symmetric.model.OutgoingLoadSummary;
import org.jumpmind.symmetric.service.IClusterService;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.IExtensionService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IOutgoingBatchService;
import org.jumpmind.symmetric.service.IParameterService;
Expand All @@ -70,16 +72,19 @@ public class OutgoingBatchService extends AbstractService implements IOutgoingBa
private ISequenceService sequenceService;

private IClusterService clusterService;

private IExtensionService extensionService;

public OutgoingBatchService(IParameterService parameterService,
ISymmetricDialect symmetricDialect, INodeService nodeService,
IConfigurationService configurationService, ISequenceService sequenceService,
IClusterService clusterService) {
IClusterService clusterService, IExtensionService extensionService) {
super(parameterService, symmetricDialect);
this.nodeService = nodeService;
this.configurationService = configurationService;
this.sequenceService = sequenceService;
this.clusterService = clusterService;
this.extensionService = extensionService;
setSqlMap(new OutgoingBatchServiceSqlMap(symmetricDialect.getPlatform(),
createSqlReplacementTokens()));
}
Expand Down Expand Up @@ -356,23 +361,31 @@ public OutgoingBatches getOutgoingBatches(String nodeId, boolean includeDisabled
OutgoingBatch.Status.QY.name(), OutgoingBatch.Status.SE.name(),
OutgoingBatch.Status.LD.name(), OutgoingBatch.Status.ER.name(),
OutgoingBatch.Status.IG.name() }, null);

OutgoingBatches batches = new OutgoingBatches(list);

List<NodeChannel> channels = new ArrayList<NodeChannel>(configurationService.getNodeChannels(nodeId, true));
batches.sortChannels(channels);

List<IOutgoingBatchFilter> filters = extensionService.getExtensionPointList(IOutgoingBatchFilter.class);

List<OutgoingBatch> keepers = new ArrayList<OutgoingBatch>();

for (NodeChannel channel : channels) {
List<OutgoingBatch> batchesForChannel = getBatchesForChannelWindows(
batches,
nodeId,
channel,
configurationService.getNodeGroupChannelWindows(
parameterService.getNodeGroupId(), channel.getChannelId()));
if (filters != null) {
for (IOutgoingBatchFilter filter : filters) {
batchesForChannel = filter.filter(channel, batchesForChannel);
}
}
if (parameterService.is(ParameterConstants.DATA_EXTRACTOR_ENABLED)
|| channel.getChannelId().equals(Constants.CHANNEL_CONFIG)) {
keepers.addAll(getBatchesForChannelWindows(
batches,
nodeId,
channel,
configurationService.getNodeGroupChannelWindows(
parameterService.getNodeGroupId(), channel.getChannelId())));
keepers.addAll(batchesForChannel);
}
}
batches.setBatches(keepers);
Expand Down

0 comments on commit 6267c56

Please sign in to comment.