diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/model/Channel.java b/symmetric/src/main/java/org/jumpmind/symmetric/model/Channel.java index bb53bbc369..3b0041ca7b 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/model/Channel.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/model/Channel.java @@ -42,6 +42,8 @@ public class Channel { private String batchAlgorithm = "default"; + private long extractPeriodMillis = 0; + public Channel() { } @@ -50,11 +52,13 @@ public Channel(String id, int processingOrder) { this.processingOrder = processingOrder; } - public Channel(String id, int processingOrder, int maxBatchSize, int maxBatchToSend, boolean enabled) { + public Channel(String id, int processingOrder, int maxBatchSize, int maxBatchToSend, boolean enabled, + long extractPeriodMillis) { this(id, processingOrder); this.maxBatchSize = maxBatchSize; this.maxBatchToSend = maxBatchToSend; this.enabled = enabled; + this.extractPeriodMillis = extractPeriodMillis; } public String getId() { @@ -121,4 +125,13 @@ public void setBatchAlgorithm(String batchAlgorithm) { public String getBatchAlgorithm() { return batchAlgorithm; } + + public long getExtractPeriodMillis() { + return extractPeriodMillis; + } + + public void setExtractPeriodMillis(long extractPeriodMillis) { + this.extractPeriodMillis = extractPeriodMillis; + } + } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/model/NodeChannel.java b/symmetric/src/main/java/org/jumpmind/symmetric/model/NodeChannel.java index ea901686e4..eb88dd23a5 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/model/NodeChannel.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/model/NodeChannel.java @@ -127,4 +127,12 @@ public NodeChannelControl getNodeChannelControl() { return nodeChannelControl; } + public long getExtractPeriodMillis() { + return channel.getExtractPeriodMillis(); + } + + public void setExtractPeriodMillis(long extractPeriodMillis) { + channel.setExtractPeriodMillis(extractPeriodMillis); + } + } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/model/OutgoingBatches.java b/symmetric/src/main/java/org/jumpmind/symmetric/model/OutgoingBatches.java index 3a00577783..9886a2c191 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/model/OutgoingBatches.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/model/OutgoingBatches.java @@ -1,14 +1,44 @@ package org.jumpmind.symmetric.model; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.TreeSet; public class OutgoingBatches { List batches = new ArrayList(); - Set channels = new TreeSet(); + Set activeChannels = new HashSet(); + Set activeChannelIds = new HashSet(); + + public OutgoingBatches(List batches) { + this.batches = batches; + } + + public OutgoingBatches() { + + } + + public Set getActiveChannels() { + return activeChannels; + } + + public void addActiveChannel(NodeChannel nodeChannel) { + activeChannels.add(nodeChannel); + activeChannelIds.add(nodeChannel.getId()); + } + + public Set getActiveChannelId() { + return activeChannelIds; + } + + public void setActiveChannels(Set activeChannels) { + this.activeChannels = activeChannels; + activeChannelIds = new HashSet(); + for (NodeChannel nodeChannel : activeChannels) { + activeChannelIds.add(nodeChannel.getId()); + } + } public List getBatches() { return batches; @@ -18,12 +48,120 @@ public void setBatches(List batches) { this.batches = batches; } - public Set getChannels() { - return channels; + /** + * Removes all batches associated with the provided channel from this + * object. + * + * @param channel + * - channel for which corresponding batches are removed + * @return A list of the batches removed + */ + public List filterBatchesForChannel(Channel channel) { + List filtered = getBatchesForChannel(channel); + batches.removeAll(filtered); + return filtered; + } + + public List filterBatchesForChannel(String channelId) { + List filtered = getBatchesForChannel(channelId); + batches.removeAll(filtered); + return filtered; + } + + public List filterBatchesForChannels(Set channels) { + List filtered = getBatchesForChannels(channels); + batches.removeAll(filtered); + return filtered; + } + + public List getBatchesForChannel(Channel channel) { + List batchList = new ArrayList(); + if (channel != null) { + batchList = getBatchesForChannel(channel.getId()); + } + return batchList; + } + + public List getBatchesForChannel(String channelId) { + List batchList = new ArrayList(); + if (channelId != null) { + for (OutgoingBatch batch : batches) { + if (channelId.equals(batch.getChannelId())) { + batchList.add(batch); + } + } + } + return batchList; } - public void setChannels(Set channels) { - this.channels = channels; + public List getBatchesForChannels(Set channelIds) { + List batchList = new ArrayList(); + if (channelIds != null) { + for (OutgoingBatch batch : batches) { + if (channelIds.contains(batch.getChannelId())) { + batchList.add(batch); + } + } + } + return batchList; + } + + public List getBatchesForChannelWindows(Node targetNode, NodeChannel channel, + List windows) { + List keeping = new ArrayList(); + + if (batches != null && batches.size() > 0) { + + if (channel.isEnabled() && inTimeWindow(windows, targetNode.getTimezoneOffset())) { + int max = channel.getMaxBatchToSend(); + int count = 0; + for (OutgoingBatch outgoingBatch : batches) { + if (channel.getId().equals(outgoingBatch.getChannelId()) && count < max) { + keeping.add(outgoingBatch); + count++; + } + } + } + } + return keeping; + } + + /** + * If {@link NodeGroupChannelWindow}s are defined for this channel, then + * check to see if the time (according to the offset passed in) is within on + * of the configured windows. + */ + public boolean inTimeWindow(List windows, String timezoneOffset) { + if (windows != null && windows.size() > 0) { + for (NodeGroupChannelWindow window : windows) { + if (window.inTimeWindow(timezoneOffset)) { + return true; + } + } + return false; + } else { + return true; + } + + } + + /** + * Removes all batches that are not associated with an 'activeChannel'. + * + * @return List of batches that were filtered + */ + + public List filterBatchesForInactiveChannels() { + List filtered = new ArrayList(); + + for (OutgoingBatch batch : batches) { + if (!activeChannelIds.contains(batch.getChannelId())) { + filtered.add(batch); + } + } + + batches.removeAll(filtered); + return filtered; } } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java index 9ab49e0581..4d52639797 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java @@ -21,6 +21,8 @@ package org.jumpmind.symmetric.service; +import java.util.List; + import org.jumpmind.symmetric.model.OutgoingBatch; import org.jumpmind.symmetric.model.OutgoingBatches; import org.springframework.jdbc.core.JdbcTemplate; @@ -29,8 +31,8 @@ public interface IOutgoingBatchService { public void markAllAsSentForNode(String nodeId); - public OutgoingBatch findOutgoingBatch(long batchId); - + public OutgoingBatch findOutgoingBatch(long batchId); + public OutgoingBatches getOutgoingBatches(String nodeId); public OutgoingBatches getOutgoingBatchRange(String startBatchId, String endBatchId); @@ -43,6 +45,8 @@ public interface IOutgoingBatchService { public void updateOutgoingBatch(OutgoingBatch batch); + public void updateOutgoingBatches(List batches); + public void updateOutgoingBatch(JdbcTemplate jdbcTemplate, OutgoingBatch batch); public void insertOutgoingBatch(OutgoingBatch outgoingBatch); diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java index 1b3beb8e5d..8afb1c9b27 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java @@ -149,6 +149,8 @@ public Object mapRow(java.sql.ResultSet rs, int arg1) throws SQLException { nodeChannel.setMaxBatchToSend(rs.getInt(8)); nodeChannel.setBatchAlgorithm(rs.getString(9)); nodeChannel.setLastExtractedTime(rs.getTimestamp(10)); + nodeChannel.setExtractPeriodMillis(rs.getLong(11)); + return nodeChannel; }; })); diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index 0e2a906b8c..9b76076b06 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -30,6 +30,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Calendar; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -299,31 +300,21 @@ public boolean extract(Node node, IOutgoingTransport targetTransport) throws IOE OutgoingBatches batches = outgoingBatchService.getOutgoingBatches(node.getNodeId()); if (batches != null && batches.getBatches() != null && batches.getBatches().size() > 0) { - ChannelMap suspendIgnoreChannels = targetTransport.getSuspendIgnoreChannelLists(this.configurationService); - - Set suspendedChannels = suspendIgnoreChannels.getSuspendChannels(); - Set ignoredChannels = suspendIgnoreChannels.getIgnoreChannels(); + ChannelMap suspendIgnoreChannels = targetTransport.getSuspendIgnoreChannelLists(configurationService); // We now have either our local suspend/ignore list, or the combined // remote send/ignore list and our local list (along with a // reservation, if we go this far...) // Now, we need to skip the suspended channels and ignore the - // ignored ones by setting the status to ignored and updating them. - - List suspendBatches = new ArrayList(); - List ignoredBatches = new ArrayList(); - - // Search for suspended or ignores, removing both but keeping track - // of ignores for further updates. - for (OutgoingBatch batch : batches.getBatches()) { - if (ignoredChannels.contains(batch.getChannelId())) { - ignoredBatches.add(batch); - } else if (suspendedChannels.contains(batch.getChannelId())) { - suspendBatches.add(batch); - } - } - batches.getBatches().removeAll(ignoredBatches); + // ignored ones by ultimately setting the status to ignored and + // updating them. + + List suspendBatches = batches.filterBatchesForChannels(suspendIgnoreChannels + .getSuspendChannels()); + + List ignoredBatches = batches.filterBatchesForChannels(suspendIgnoreChannels + .getIgnoreChannels()); FileOutgoingTransport fileTransport = null; @@ -344,8 +335,19 @@ public boolean extract(Node node, IOutgoingTransport targetTransport) throws IOE // will be skipped in the future. for (OutgoingBatch batch : ignoredBatches) { batch.setStatus(OutgoingBatch.Status.IG); - outgoingBatchService.updateOutgoingBatch(batch); + } + outgoingBatchService.updateOutgoingBatches(ignoredBatches); + + // Next, we update the node channel controls to the current + // timestamp + Calendar now = Calendar.getInstance(); + + for (NodeChannel nodeChannel : batches.getActiveChannels()) { + nodeChannel.setLastExtractedTime(now.getTime()); + configurationService.saveNodeChannelControl(nodeChannel, false); + } + } finally { if (fileTransport != null) { fileTransport.close(); diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java index 5d232902db..d4f75c2301 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java @@ -27,6 +27,7 @@ import java.sql.Timestamp; import java.sql.Types; import java.util.ArrayList; +import java.util.Calendar; import java.util.Collections; import java.util.Comparator; import java.util.Date; @@ -34,8 +35,10 @@ import java.util.List; import org.apache.commons.lang.StringUtils; +import org.apache.derby.iapi.services.io.ArrayUtil; import org.jumpmind.symmetric.common.Constants; import org.jumpmind.symmetric.db.SequenceIdentifier; +import org.jumpmind.symmetric.model.Channel; import org.jumpmind.symmetric.model.Node; import org.jumpmind.symmetric.model.NodeChannel; import org.jumpmind.symmetric.model.NodeGroupChannelWindow; @@ -76,6 +79,12 @@ public void updateOutgoingBatch(OutgoingBatch outgoingBatch) { updateOutgoingBatch(jdbcTemplate, outgoingBatch); } + public void updateOutgoingBatches(List outgoingBatches) { + for (OutgoingBatch batch : outgoingBatches) { + updateOutgoingBatch(jdbcTemplate, batch); + } + } + public void updateOutgoingBatch(JdbcTemplate template, OutgoingBatch outgoingBatch) { outgoingBatch.setLastUpdatedTime(new Date()); outgoingBatch.setLastUpdatedHostName(AppUtils.getServerId()); @@ -143,6 +152,7 @@ public OutgoingBatches getOutgoingBatches(String targetNodeId) { } List channels = configurationService.getNodeChannels(); + Collections.sort(channels, new Comparator() { public int compare(NodeChannel b1, NodeChannel b2) { boolean isError1 = errorChannels.contains(b1.getId()); @@ -157,57 +167,29 @@ public int compare(NodeChannel b1, NodeChannel b2) { } }); - OutgoingBatches batches = new OutgoingBatches(); - batches.setBatches(filterOutgoingBatchesForChannels(targetNodeId, list, channels)); - return batches; - } + OutgoingBatches batches = new OutgoingBatches(list); - /** - * Filter out the maximum number of batches to send. - */ - protected List filterOutgoingBatchesForChannels(String targetNodeId, List batches, - List channels) { - if (batches != null && batches.size() > 0) { - Node node = nodeService.findNode(targetNodeId); - List filtered = new ArrayList(batches.size()); - for (NodeChannel channel : channels) { - List windows = configurationService.getNodeGroupChannelWindows(parameterService - .getNodeGroupId(), channel.getId()); - - if (channel.isEnabled() && inTimeWindow(windows, node.getTimezoneOffset())) { - int max = channel.getMaxBatchToSend(); - int count = 0; - for (OutgoingBatch outgoingBatch : batches) { - if (channel.getId().equals(outgoingBatch.getChannelId()) && count < max) { - filtered.add(outgoingBatch); - count++; - } - } - } - } - return filtered; - } else { - return batches; - } - } + for (NodeChannel nodeChannel : channels) { + long extractPeriodMillis = nodeChannel.getExtractPeriodMillis(); + Date lastExtractedTime = nodeChannel.getLastExtractedTime(); - /** - * If {@link NodeGroupChannelWindow}s are defined for this channel, then - * check to see if the time (according to the offset passed in) is within on - * of the configured windows. - */ - public boolean inTimeWindow(List windows, String timezoneOffset) { - if (windows != null && windows.size() > 0) { - for (NodeGroupChannelWindow window : windows) { - if (window.inTimeWindow(timezoneOffset)) { - return true; - } + if ((extractPeriodMillis < 1) + || (Calendar.getInstance().getTimeInMillis() - lastExtractedTime.getTime() >= extractPeriodMillis)) { + batches.addActiveChannel(nodeChannel); } - return false; - } else { - return true; } + batches.filterBatchesForInactiveChannels(); + + List keepers = new ArrayList(); + + for (NodeChannel channel : channels) { + keepers.addAll(batches + .getBatchesForChannelWindows(nodeService.findNode(targetNodeId), channel, configurationService + .getNodeGroupChannelWindows(parameterService.getNodeGroupId(), channel.getId()))); + } + batches.setBatches(keepers); + return batches; } @SuppressWarnings("unchecked") diff --git a/symmetric/src/main/resources/org/jumpmind/symmetric/service/impl/configuration-service-sql.xml b/symmetric/src/main/resources/org/jumpmind/symmetric/service/impl/configuration-service-sql.xml index c57f120f93..ba17bc7053 100644 --- a/symmetric/src/main/resources/org/jumpmind/symmetric/service/impl/configuration-service-sql.xml +++ b/symmetric/src/main/resources/org/jumpmind/symmetric/service/impl/configuration-service-sql.xml @@ -29,7 +29,7 @@ select c.channel_id, nc.node_id, nc.ignore_enabled, nc.suspend_enabled, c.processing_order, - c.max_batch_size, c.enabled, c.max_batch_to_send, c.batch_algorithm, nc.last_extract_time + c.max_batch_size, c.enabled, c.max_batch_to_send, c.batch_algorithm, nc.last_extract_time, c.extract_period_millis from $[sym.sync.table.prefix]_channel c left outer join $[sym.sync.table.prefix]_node_channel_ctl nc on c.channel_id = nc.channel_id and nc.node_id = ? order by c.processing_order asc, c.channel_id diff --git a/symmetric/src/test/java/org/jumpmind/symmetric/service/mock/MockOutgoingBatchService.java b/symmetric/src/test/java/org/jumpmind/symmetric/service/mock/MockOutgoingBatchService.java index 56e343fc84..d72c04040f 100644 --- a/symmetric/src/test/java/org/jumpmind/symmetric/service/mock/MockOutgoingBatchService.java +++ b/symmetric/src/test/java/org/jumpmind/symmetric/service/mock/MockOutgoingBatchService.java @@ -88,4 +88,8 @@ public void updateOutgoingBatch(JdbcTemplate jdbcTemplate, OutgoingBatch batch) public void updateOutgoingBatch(OutgoingBatch batch) { } + + public void updateOutgoingBatches(List batch) { + + } }